# **第7章 查詢和操作SQL數據庫**

## 7.1 使用SQLAlchemy操作SQLite數據庫

### 7.1.1 安裝和配置SQLAlchemy

In [None]:
pip install sqlalchemy sqlite

In [None]:
from sqlalchemy import create_engine

# 建立到SQLite數據庫的連接
engine = create_engine('sqlite:///example.db')

# 建立與數據庫的連接
connection = engine.connect()

### 7.1.2 建立和操作表

#### 定義數據庫模式（Schema）

In [None]:
from sqlalchemy import Column, Integer, String, MetaData, Table

# 初始化元數據對象，用來包含多個表的結構信息
metadata = MetaData()

# 定義用戶表（users）
# Table是一個SQLAlchemy的函數，用於定義資料庫中的表
# 'users' 是表的名稱
# metadata 是前面定義的MetaData對象，表示這個表屬於哪個MetaData
# Column 用來定義表中的每一列
# 'id' 是列的名稱，Integer表示這列是整數類型，並且是主鍵 primary_key=True
# 'name' 是列的名稱，String 表示這列是字符串（文本）類型
# 'age' 是列的名稱，Integer表示這列是整數類型

users_table = Table('users', metadata,
    Column('id', Integer, primary_key=True),  # 定義ID列，為整數類型，且為主鍵
    Column('name', String),                   # 定義名稱列，為字符串類型
    Column('age', Integer)                    # 定義年齡列，為整數類型
)

# 使用這些定義後，可以將此表結構映射到資料庫中，進行如創建、查詢、插入和更新操作

#### 創建表和插入數據

In [None]:
# 在數據庫中創建表
# 這個步驟使用metadata.create_all(engine) 方法來生成數據庫中的表結構。
# ‘metadata’ 是 SQLAlchemy 中的 MetaData 對象，它包含了我們所有的表定義。
# ‘engine’ 是用於連接數據庫的引擎對象，定義了我們將要使用的數據庫。
# 這個方法會閱覽 'metadata' 中的所有 Table 對象，並在數據庫中創建它們（如果它們還不存在的話）。
metadata.create_all(engine)

# 插入數據
# 此步驟用於向指定的數據表中插入一條新的數據記錄。
# 首先，使用 ‘users_table.insert().values()’ 方法來創建一個 SQL INSERT 語句。
# 'users_table' 是我們之前定義的數據表對象。
# insert().values(name='Alice', age=30) 表示插入一條新記錄，其中 name 字段的值為 'Alice'，age 字段的值為 30。
# 接下來，調用 connection.execute(insert_statement) 來執行這條插入語句。
# 'connection' 是一個連接對象，用於與數據庫進行交互。
insert_statement = users_table.insert().values(name='Alice', age=30)
connection.execute(insert_statement)

#### 查詢表格中的數據

In [None]:
# 查詢所有用戶數據
# 此步驟用於從數據庫中檢索所有用戶的數據記錄。
# 首先，使用 'users_table.select()' 方法來創建一個 SQL SELECT 語句。
# 'users_table' 是我們之前定義的數據表對象。
# select() 方法用來生成 SELECT 語句，這裡沒有給定任何條件，表示選取表中的所有記錄。
select_statement = users_table.select()

# 接下來，使用 'connection.execute(select_statement)' 執行這條 SELECT 語句。
# 'connection' 是用於與數據庫進行交互的連接對象。
# execute() 方法執行傳入的 SQL 語句並返回結果。
result = connection.execute(select_statement)

# 使用迴圈來遍歷結果集中每一行。
# 'result' 包含了所有檢索到的數據記錄。
# 每一行都是一個包含表中每個列值的對象，這裡使用 print(row) 來輸出每一行的內容。
for row in result:
    print(row)

### 7.1.3 高級查詢和操作

#### 過濾查詢：使用`filter()`和`filter_by()`

In [None]:
# 使用 .filter_by 查詢名字為 Alice 的用戶
# 此步驟用於從數據庫中檢索 name 字段值為 'Alice' 的用戶數據記錄。
# 首先，使用 'users_table.select().where(users_table.c.name == 'Alice')' 方法來創建一個 SQL SELECT 語句。
# 'users_table' 是我們之前定義的數據表對象。
# select() 方法用來生成 SELECT 語句，where() 方法用來添加過濾條件。
# 'users_table.c.name' 表示表中的 name 列，'== 'Alice'' 則是我們要匹配的條件。
alice = users_table.select().where(users_table.c.name == 'Alice')

# 接下來，使用 'connection.execute(alice)' 執行這條 SELECT 語句。
# 'connection' 是用於與數據庫進行交互的連接對象。
# execute() 方法執行傳入的 SQL 語句並返回結果。
result = connection.execute(alice)

# 使用迴圈來遍歷結果集中每一行。
# 'result' 包含了所有檢索到的數據記錄。
# 每一行都是一個包含表中每個列值的對象，這裡使用 print(row) 來輸出每一行的內容。
for row in result:
    print(row)

#### 更新和刪除數據

In [None]:
# 更新用戶年齡
# 此步驟用於更新數據庫中指定用戶的年齡。
# 首先，使用 'users_table.update().where(users_table.c.name == 'Alice').values(age=31)' 方法來創建一個 SQL UPDATE 語句。
# 'users_table' 是我們之前定義的數據表對象。
# update() 方法用來生成 UPDATE 語句，where() 方法用來指定更新的條件。
# 'users_table.c.name == 'Alice'' 指定了更新的條件，即匹配 name 字段值為 'Alice' 的記錄。
# values(age=31) 則指定了要更新的內容，把 age 字段的值修改為 31。
update_statement = users_table.update().where(users_table.c.name == 'Alice').values(age=31)

# 接下來，使用 'connection.execute(update_statement)' 執行這條 UPDATE 語句。
# 'connection' 是用於與數據庫進行交互的連接對象。
# execute() 方法執行傳入的 SQL 語句並更新相應的記錄。
connection.execute(update_statement)


# 刪除用戶
# 此步驟用於刪除數據庫中指定用戶的記錄。
# 首先，使用 'users_table.delete().where(users_table.c.name == 'Alice')' 方法來創建一個 SQL DELETE 語句。
# 'users_table' 是我們之前定義的數據表對象。
# delete() 方法用來生成 DELETE 語句，where() 方法用來指定刪除的條件。
# 'users_table.c.name == 'Alice'' 指定了刪除的條件，即匹配 name 字段值為 'Alice' 的記錄。
delete_statement = users_table.delete().where(users_table.c.name == 'Alice')

# 接下來，使用 'connection.execute(delete_statement)' 執行這條 DELETE 語句。
# 'connection' 是用於與數據庫進行交互的連接對象。
# execute() 方法執行傳入的 SQL 語句並刪除相應的記錄。
connection.execute(delete_statement)

In [None]:
connection.commit()

#### 使用SQLAlchemy的ORM（對象關係映射）功能

In [None]:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 建立到 SQLite 數據庫的連接
# 'create_engine' 函數設置一個用戶端引擎，連接到指定的 'example.db' 數據庫文件
engine = create_engine('sqlite:///example.db')

# 建立與數據庫的連接
# 'engine.connect()' 方法返回一個連接對象，用於直接與數據庫進行交互
connection = engine.connect()

# 創建基類
# 'declarative_base()' 函數創建一個基類，所有的 ORM 模型都將繼承這個基類
Base = declarative_base()

# 定義一個 ORM 模型對應到 'users' 表
class User(Base):
    __tablename__ = 'users'  # 指定表名
    id = Column(Integer, primary_key=True)  # 定義一個主鍵列
    name = Column(String)  # 定義一個字符列
    age = Column(Integer)  # 定義一個整數列

# 創建表
# 根據 Base 類和其子類中的表定義，創建相應的表在數據庫中（如果表不存在）
Base.metadata.create_all(engine)

# 創建會話
# 'sessionmaker' 函數創建一個會話類，用於管理與數據庫的所有交互
# 'bind=engine' 參數表示會話將被綁定到之前創建的引擎
Session = sessionmaker(bind=engine)
# 創建一個會話實例
session = Session()

# 插入數據
# 創建一個新的 User 對象，並設置其屬性
new_user = User(name='Bob', age=25)
# 使用會話的 'add()' 方法將這個對象加入當前會話
session.add(new_user)
# 'commit()' 方法保存所有在當前會話中進行的變化並提交到數據庫
session.commit()

# 查詢數據
# 使用會話的 查詢方法 來查詢 User 表中的數據
# 'filter_by(name='Bob')' 方法添加了一個過濾條件，查詢 name 為 'Bob' 的用戶
for user in session.query(User).filter_by(name='Bob'):
    # 打印查詢結果，每個結果都是一個 User 對象
    print(user.id, user.name, user.age)

### 7.1.4 案例：構建簡單的專案管理數據庫

#### 設計數據庫模式

In [None]:
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker

# 建立到 SQLite 數據庫的連接
# 'create_engine' 函數設置一個用戶端引擎，連接到指定的 'example.db' 數據庫文件
engine = create_engine('sqlite:///example.db')

# 創建基類
# 'declarative_base()' 函數創建一個基類，所有的 ORM 模型都將繼承這個基類
Base = declarative_base()

# 定義一個升級至主表的 ORM 模型對應到 'projects' 表
class Project(Base):
    __tablename__ = 'projects'  # 指定表名
    id = Column(Integer, primary_key=True)  # 定義一個主鍵列
    name = Column(String)  # 定義一個字符列

    # 定義與子表 Task 的雙向關聯
    tasks = relationship("Task", order_by="Task.id", back_populates="project")

# 定義一個子表的 ORM 模型對應到 'tasks' 表
class Task(Base):
    __tablename__ = 'tasks'  # 指定表名
    id = Column(Integer, primary_key=True)  # 定義一個主鍵列
    name = Column(String)  # 定義一個字符列
    project_id = Column(Integer, ForeignKey('projects.id'))  # 定義一個外鍵列，引用 projects 表的主鍵

    # 定義與主表 Project 的雙向關聯
    project = relationship("Project", back_populates="tasks")

# 創建表
# 根據 Base 類和其子類中的表定義，創建相應的表在數據庫中（如果表不存在）
Base.metadata.create_all(engine)

# 創建會話
# 'sessionmaker' 函數創建一個會話類，用於管理與數據庫的所有交互
# 'bind=engine' 參數表示會話將被綁定到之前創建的引擎
Session = sessionmaker(bind=engine)
# 創建一個會話實例
session = Session()

# 插入數據，新增一個 Project 和一個與其關聯的 Task
new_project = Project(name='Project A')
new_task = Task(name='Task 1', project=new_project)

# 使用會話的 'add()' 方法將對象加入當前會話
session.add(new_project)
session.add(new_task)

# 'commit()' 方法保存所有在當前會話中進行的變化並提交到數據庫
session.commit()

# 查詢新插入的數據
for project in session.query(Project).all():
    print(f'Project: {project.name}')
    for task in project.tasks:
        print(f'  Task: {task.name}')

#### 實現數據的增、刪、改、查

In [None]:
# 插入數據
# 創建一個新的 Project 對象和兩個與其關聯的 Task 對象
new_project = Project(name="Project A")
new_task1 = Task(name="Task 1", project=new_project)
new_task2 = Task(name="Task 2", project=new_project)

# 使用會話的 'add()' 方法將對象加入當前會話
session.add(new_project)
session.add(new_task1)
session.add(new_task2)

# 'commit()' 方法保存所有在當前會話中進行的變化並提交到數據庫
session.commit()

# 查詢數據
# 查詢所有 Project 對象
for project in session.query(Project).all():
    # 打印 Project 的 name
    print(project.name)
    # 遍歷並打印與該 Project 關聯的所有 Task 的 name
    for task in project.tasks:
        print("Task:", task.name)

# 更新數據
# 查詢需更新的 Project 對象
project_to_update = session.query(Project).filter_by(name="Project A").first()
# 更新其 name 屬性
project_to_update.name = "Updated Project A"
# 提交修改
session.commit()

# 刪除數據
# 查詢需刪除的 Task 對象
task_to_delete = session.query(Task).filter_by(name="Task 1").first()
# 刪除該 Task 對象
session.delete(task_to_delete)
# 提交修改
session.commit()

#### 簡單的數據分析報告

In [None]:
from sqlalchemy import func

# 聚合查詢計算每個項目下的任務數量
# 使用 SQLAlchemy 的 ORM 查詢 API 進行聚合查詢
# 'session.query(Project.name, func.count(Task.id))' 替代了 SELECT 語句，其中 'Project.name' 和 'func.count(Task.id)' 是要查詢的列
# 'join(Task)' 用於進行表聯接，將 Project 表和 Task 表連接起來
# 'group_by(Project.name)' 用於按照 Project 的 name 字段進行分組
tasks_per_project = session.query(Project.name, func.count(Task.id)).join(Task).group_by(Project.name).all()

# 遍歷查詢結果並打印每個項目及其任務數量
# 查詢結果是一個列表，每個元素都是一個元組 (project_name, task_count)
for project_name, task_count in tasks_per_project:
    # 打印每個項目名和該項目下的任務數量
    print(f"Project: {project_name}, Number of Tasks: {task_count}")

## 7.2 使用pyodbc操作DB2數據庫

### 7.2.1 安裝和配置pyodbc

#### 安裝pyodbc和DB2的驅動程序

In [None]:
pip install pyodbc

#### 配置連接字符串和連接數據庫

In [None]:
import pyodbc
from unittest.mock import MagicMock, patch

# 設置連線字串，用於連接IBM DB2數據庫
connection_string = (
    "DRIVER={IBM DB2 ODBC DRIVER};"
    "DATABASE=sample;"
    "HOSTNAME=hostname;"
    "PORT=50000;"
    "PROTOCOL=TCPIP;"
    "UID=username;"
    "PWD=password;"
)

# 創建Mock對象，模擬數據庫連接
mock_connection = MagicMock()

# 創建Mock光標對象，模擬數據庫查詢
mock_cursor = MagicMock()

# 設置Mock對象的方法返回值
# 模擬 cursor() 方法返回 mock_cursor
mock_connection.cursor.return_value = mock_cursor

# 模擬 fetchone() 方法返回單一的查詢結果
# 使用固定的返回值 '2023-01-01'
mock_cursor.fetchone.return_value = ['2023-01-01']

# 模擬 fetchall() 方法返回多行查詢結果
# 使用固定的返回值 (1, 'John Doe', 45) 和 (2, 'Jane Doe', 40)
mock_cursor.fetchall.return_value = [(1, 'John Doe', 45), (2, 'Jane Doe', 40)]

# 使用 patch 方法替換 pyodbc.connect，使其返回我們的 mock_connection
# 這樣可以在不實際連接數據庫的情況下進行測試
with patch('pyodbc.connect', return_value=mock_connection):
    # 試圖連接數據庫，實際使用的是 mock_connection
    connection = pyodbc.connect(connection_string)

### 7.2.2 基本操作

#### 連接數據庫

In [None]:
# 創建游標對象，用於執行SQL查詢
cursor = connection.cursor()
# 執行SQL查詢，獲取當前日期
cursor.execute("SELECT CURRENT DATE FROM sysibm.sysdummy1")
# 獲取查詢結果中的一行
row = cursor.fetchone()
# 輸出查詢結果
print(row)

#### 查詢和讀取數據

In [None]:
# 執行SQL查詢，獲取employees表中的所有數據
cursor.execute("SELECT * FROM employees")

# 獲取查詢結果中的所有行
rows = cursor.fetchall()

# 輸出每一行的查詢結果
for row in rows:
    print(row)

#### 插入和更新數據

In [None]:
# 插入數據到 employees 表
cursor.execute("INSERT INTO employees (id, name, age) VALUES (1, 'John Doe', 45)")
# 提交插入操作
connection.commit()

# 更新 employees 表中的數據
cursor.execute("UPDATE employees SET age = 46 WHERE id = 1")
# 提交更新操作
connection.commit()

### 7.2.3 高級操作

#### 批量操作數據

In [None]:
# 準備要插入的數據，這裡使用了包含多個元組的列表，每個元組代表一行數據
# 例如，(2, 'Jane Doe', 40) 表示第二位員工，名字是 Jane Doe，年齡是 40 歲
data_to_insert = [(2, 'Jane Doe', 40), (3, 'Max Mustermann', 35)]

# 使用 cursor.executeMany 方法將多行數據插入到 employees 表中
# cursor.executemany 方法的優點是可以一次性插入多行數據，這樣可以提高效率並減少與資料庫的連接次數
# 在 SQL 語句中，VALUES (?, ?, ?) 代表將元組中的數據依次填入相應的欄位中
cursor.executemany("INSERT INTO employees (id, name, age) VALUES (?, ?, ?)", data_to_insert)

# 提交插入操作，這一步是必須的，因為如果不提交，插入操作不會真正生效
# connection.commit() 方法將當前的事務提交到資料庫，確保數據持久化保存
connection.commit()

#### 錯誤處理

In [None]:
try:
    # 嘗試執行 SQL 插入操作
    # 這行程式碼將一行新的員工數據插入到 employees 表中，資料為：(4, 'Error Prone', 50)
    # cursor.execute 方法會將 SQL 語句發送到資料庫中執行
    cursor.execute("INSERT INTO employees (id, name, age) VALUES (4, 'Error Prone', 50)")

    # 提交插入操作，這一步是必須的，因為如果不提交，插入操作不會真正生效
    connection.commit()
except pyodbc.Error as ex:
    # 如果在 try 區塊中發生了任何 pyodbc.Error 類型的錯誤，則會捕捉到這個錯誤並執行 except 區塊中的代碼
    # 這是錯誤處理的一部分，可以避免程式因為未處理的錯誤而崩潰
    # 這行代碼會打印一條錯誤信息，包含具體的錯誤訊息 ex
    print(f"An error occurred: {ex}")

    # 如果發生錯誤，則回滾插入操作
    # connection.rollback() 方法將未提交的變更回滾到事務開始之前的狀態，確保資料的一致性和完整性
    connection.rollback()

### 7.2.4 案例：自動化客戶數據管理

#### 使用SQL語句查詢和更新客戶數據

In [None]:
# 查詢客戶數據
# 這行程式碼會發送一個 SQL 查詢語句到資料庫，查詢所有在 2023-01-01 之後有購買行為的客戶數據
# cursor.execute 方法執行了這個查詢語句
cursor.execute("SELECT * FROM customers WHERE last_purchase_date > '2023-01-01'")

# 使用 cursor.fetchall 方法獲取查詢結果
# 這會抓取所有符合查詢條件的數據行，並將它們存儲在 customers 列表中
customers = cursor.fetchall()

# 迭代查詢結果並打印每個客戶的信息
# 這個 for 迴圈會遍歷 customers 列表，依次打印每一行數據
for customer in customers:
    print(customer)

# 更新客戶數據
# 這行程式碼會發送一個 SQL 更新語句到資料庫，將 id 為 123 的客戶狀態更新為 'active'
# cursor.execute 方法執行了這個更新語句
cursor.execute("UPDATE customers SET status = 'active' WHERE id = 123")

# 提交更新操作
connection.commit()

#### 實現批量導入和導出數據

In [None]:
import csv

# 批量導入客戶數據
# 這裡準備了一個包含多個元組的列表，每個元組代表一行要插入的客戶數據
# 例如，(1, 'Customer A', '2023-02-01', 'inactive') 表示第一位客戶，名字是 Customer A，最後購買日期是 2023-02-01，狀態是 inactive
customers_to_insert = [(1, 'Customer A', '2023-02-01', 'inactive'), (2, 'Customer B', '2022-12-15', 'active')]

# 使用 cursor.executemany 方法將多行客戶數據插入到 customers 表中
# cursor.executemany 方法的優點是可以一次性插入多行數據，這樣可以提高效率並減少與資料庫的連接次數
cursor.executemany("INSERT INTO customers (id, name, last_purchase_date, status) VALUES (?, ?, ?, ?)", customers_to_insert)

# 提交插入操作，這一步是必須的，因為如果不提交，插入操作不會真正生效
# connection.commit() 方法將當前的事務提交到資料庫，確保數據持久化保存
connection.commit()

# 導出客戶數據到文件
# 這行程式碼會發送一個 SQL 查詢語句到資料庫，查詢所有 customers 表中的數據
# cursor.execute 方法執行了這個查詢語句
cursor.execute("SELECT * FROM customers")

# 使用 cursor.fetchall 方法獲取查詢結果
# 這會抓取所有符合查詢條件的數據行，並將它們存儲在 customers 列表中
customers = cursor.fetchall()

# 打開一個叫做 'customers.csv' 的文件，以寫入模式 ('w') 打開，並且設置 newline='' 防止行距問題
with open('customers.csv', 'w', newline='') as file:
    # 使用 csv.writer 創建一個 CSV 寫入器對象
    writer = csv.writer(file)

    # 寫入表頭，即 customers 表中的欄位名稱
    # cursor.description 包含了欄位資訊，所以我們可以利用它提取出每個欄位的名稱
    writer.writerow([desc[0] for desc in cursor.description])

    # 寫入所有客戶數據
    # writer.writerows 方法會將列表中的所有數據行逐行寫入 CSV 文件中
    writer.writerows(customers)