In [7]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "hazel-chiller-454317-n2-d8a21af382bb.json"


In [8]:
from google.cloud import bigquery

client = bigquery.Client()
datasets = list(client.list_datasets())

if datasets:
    print("BigQuery datasets found:")
    for dataset in datasets:
        print(dataset.dataset_id)
else:
    print("No datasets found in BigQuery.")


BigQuery datasets found:
proj3todo


In [None]:
from flask import Flask, request, jsonify
from google.cloud import bigquery

app = Flask(__name__)

# 连接 Google BigQuery
client = bigquery.Client()
dataset_id = "hazel-chiller-454317-n2.proj3todo"
table_id = f"{dataset_id}.todos"

### 获取所有 TODO ###
@app.route("/todos", methods=["GET"])
def get_todos():
    query = f"SELECT * FROM `{table_id}`"
    results = client.query(query).result()
    todos = [{"id": row.id, "task": row.task, "status": row.status} for row in results]
    return jsonify(todos)


### 添加 TODO ###
@app.route("/todos", methods=["POST"])
def add_todo():
    data = request.get_json()
    task = data.get("task")
    status = data.get("status", "Pending")  # 默认状态是 Pending
    
    if not task:
        return jsonify({"error": "Task is required"}), 400
    
    query = f"""
    INSERT INTO `{table_id}` (task, status)
    VALUES (@task, @status)
    """
    
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("task", "STRING", task),
            bigquery.ScalarQueryParameter("status", "STRING", status)
        ]
    )
    
    client.query(query, job_config=job_config).result()
    
    return jsonify({"message": "TODO added successfully"}), 201


### 修改 TODO ###
@app.route("/todos/<int:todo_id>", methods=["PUT"])
def update_todo(todo_id):
    data = request.get_json()
    task = data.get("task")
    status = data.get("status")
    
    if not task and not status:
        return jsonify({"error": "At least one field (task or status) must be updated"}), 400
    
    set_clauses = []
    query_params = []
    
    if task:
        set_clauses.append("task = @task")
        query_params.append(bigquery.ScalarQueryParameter("task", "STRING", task))
    
    if status:
        set_clauses.append("status = @status")
        query_params.append(bigquery.ScalarQueryParameter("status", "STRING", status))
    
    query = f"""
    UPDATE `{table_id}`
    SET {", ".join(set_clauses)}
    WHERE id = @todo_id
    """
    
    query_params.append(bigquery.ScalarQueryParameter("todo_id", "INT64", todo_id))
    job_config = bigquery.QueryJobConfig(query_parameters=query_params)
    
    client.query(query, job_config=job_config).result()
    
    return jsonify({"message": "TODO updated successfully"})


### 删除 TODO ###
@app.route("/todos/<int:todo_id>", methods=["DELETE"])
def delete_todo(todo_id):
    query = f"DELETE FROM `{table_id}` WHERE id = @todo_id"
    
    job_config = bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("todo_id", "INT64", todo_id)]
    )
    
    client.query(query, job_config=job_config).result()
    
    return jsonify({"message": "TODO deleted successfully"})


if __name__ == "__main__":
    app.run(debug=True)


[]


In [11]:
import requests
import json

BASE_URL = "http://127.0.0.1:5000/todos"  # 确保 Flask 服务器已启动

def test_get_todos():
    """测试获取所有 TODO"""
    response = requests.get(BASE_URL)
    print("\n[GET] 获取所有 TODO:")
    print(response.status_code, response.json())

def test_add_todo(task, status="Pending"):
    """测试添加 TODO"""
    payload = {"task": task, "status": status}
    response = requests.post(BASE_URL, json=payload)
    print("\n[POST] 添加 TODO:")
    print(response.status_code, response.json())

def test_update_todo(todo_id, new_task=None, new_status=None):
    """测试更新 TODO"""
    payload = {}
    if new_task:
        payload["task"] = new_task
    if new_status:
        payload["status"] = new_status
    
    if not payload:
        print("\n[PUT] 没有提供需要更新的字段")
        return
    
    response = requests.put(f"{BASE_URL}/{todo_id}", json=payload)
    print("\n[PUT] 更新 TODO:")
    print(response.status_code, response.json())

def test_delete_todo(todo_id):
    """测试删除 TODO"""
    response = requests.delete(f"{BASE_URL}/{todo_id}")
    print("\n[DELETE] 删除 TODO:")
    print(response.status_code, response.json())


test_get_todos()  # 1. 获取初始 TODO 列表
test_add_todo("Finish HCI project")  # 2. 添加一个 TODO
# test_get_todos()  # 3. 获取最新的 TODO 列表
# test_update_todo(1, new_status="Completed")  # 4. 更新 ID=1 的 TODO
# test_delete_todo(1)  # 5. 删除 ID=1 的 TODO
# test_get_todos()  # 6. 确保 ID=1 被删除


ConnectionError: HTTPConnectionPool(host='127.0.0.1', port=5000): Max retries exceeded with url: /todos (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x000001AFF29BD480>: Failed to establish a new connection: [WinError 10061] 由于目标计算机积极拒绝，无法连接。'))