# 結構化資料: CSV | Structured Data: CSV

## 📝 詳解範例 | Worked Examples

---

## 💡 本檔案目的

本檔案提供 **8 個循序漸進的詳解範例**，每個範例包含：
1. **問題描述**：實際應用情境
2. **分析思路**：如何拆解問題
3. **逐步實作**：程式碼 + 註解
4. **執行結果**：預期輸出
5. **知識點總結**：學到什麼

---

## 範例 1：學生成績 CSV 讀寫

### 📋 問題描述

建立一個學生成績管理系統：
1. 寫入 5 位學生的成績（姓名、數學、英文、程式設計）
2. 讀取 CSV 檔案並計算每位學生的平均分數
3. 找出平均最高的學生

**難度**：基礎

### 🔍 分析思路

1. **寫入階段**：使用 `csv.DictWriter` 建立結構化 CSV
2. **讀取階段**：使用 `csv.DictReader` 讀取並計算
3. **資料處理**：注意型別轉換（字串→整數）

### 💻 逐步實作

In [None]:
import csv
from pathlib import Path

# 建立測試目錄
test_dir = Path('csv_worked_examples')
test_dir.mkdir(exist_ok=True)

print("=== 範例 1：學生成績 CSV 讀寫 ===")
print()

# 步驟 1：準備學生成績資料
students_data = [
    {'姓名': 'Alice', '數學': 85, '英文': 90, '程式設計': 92},
    {'姓名': 'Bob', '數學': 78, '英文': 85, '程式設計': 88},
    {'姓名': 'Charlie', '數學': 92, '英文': 88, '程式設計': 95},
    {'姓名': 'David', '數學': 88, '英文': 92, '程式設計': 90},
    {'姓名': 'Eve', '數學': 95, '英文': 96, '程式設計': 98}
]

csv_file = test_dir / 'students_scores.csv'

# 步驟 2：寫入 CSV 檔案
print("步驟 1: 寫入學生成績資料")
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
    fieldnames = ['姓名', '數學', '英文', '程式設計']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    
    writer.writeheader()  # 寫入標題列
    writer.writerows(students_data)  # 寫入所有資料

print(f"✓ 已寫入 {len(students_data)} 位學生的成績")
print(f"✓ 檔案位置: {csv_file}")
print()

# 步驟 3：讀取並計算平均分數
print("步驟 2: 讀取並計算平均分數")
print()

students_with_avg = []

with open(csv_file, encoding='utf-8') as f:
    reader = csv.DictReader(f)
    
    for row in reader:
        name = row['姓名']
        # 注意：CSV 讀取的都是字串，需轉換為整數
        math = int(row['數學'])
        english = int(row['英文'])
        programming = int(row['程式設計'])
        
        # 計算平均
        average = (math + english + programming) / 3
        
        students_with_avg.append({
            '姓名': name,
            '平均': average
        })
        
        print(f"{name:10s} | 數學: {math:2d} | 英文: {english:2d} | 程式: {programming:2d} | 平均: {average:.2f}")

print()

# 步驟 4：找出平均最高的學生
print("步驟 3: 找出平均最高的學生")
best_student = max(students_with_avg, key=lambda s: s['平均'])
print(f"🏆 最高平均: {best_student['姓名']} ({best_student['平均']:.2f} 分)")
print()

# 檢視實際檔案內容
print("步驟 4: 實際 CSV 檔案內容")
print("="*50)
print(csv_file.read_text(encoding='utf-8'))
print("="*50)

### 📚 知識點總結

- ✅ 使用 `csv.DictWriter` 進行結構化寫入
- ✅ `writeheader()` 自動寫入標題列
- ✅ `csv.DictReader` 讀取時自動對應欄位名稱
- ✅ **重要**：CSV 讀取的所有資料都是字串，需手動轉換型別
- ✅ 使用 `max()` + `lambda` 找出最大值

---

## 範例 2：商品資料過濾與匯出

### 📋 問題描述

有一個商品清單 CSV 檔案，需要：
1. 讀取所有商品資料
2. 篩選出庫存低於 50 的商品（需要補貨）
3. 將篩選結果寫入新的 CSV 檔案
4. 顯示需要補貨的商品統計

**難度**：中級

### 🔍 分析思路

1. 建立測試資料（10 個商品）
2. 使用 `DictReader` 讀取並過濾
3. 使用 `DictWriter` 寫入過濾結果
4. 計算統計資訊

### 💻 逐步實作

In [None]:
import csv
from pathlib import Path

print("=== 範例 2：商品資料過濾與匯出 ===")
print()

test_dir = Path('csv_worked_examples')
test_dir.mkdir(exist_ok=True)

# 步驟 1：建立測試商品資料
products = [
    {'產品編號': 'P001', '產品名稱': '蘋果', '價格': 50, '庫存': 120},
    {'產品編號': 'P002', '產品名稱': '香蕉', '價格': 30, '庫存': 25},  # 需補貨
    {'產品編號': 'P003', '產品名稱': '橘子', '價格': 40, '庫存': 80},
    {'產品編號': 'P004', '產品名稱': '芒果', '價格': 60, '庫存': 15},  # 需補貨
    {'產品編號': 'P005', '產品名稱': '葡萄', '價格': 80, '庫存': 60},
    {'產品編號': 'P006', '產品名稱': '西瓜', '價格': 100, '庫存': 10},  # 需補貨
    {'產品編號': 'P007', '產品名稱': '鳳梨', '價格': 55, '庫存': 45},  # 需補貨
    {'產品編號': 'P008', '產品名稱': '草莓', '價格': 120, '庫存': 70},
    {'產品編號': 'P009', '產品名稱': '櫻桃', '價格': 150, '庫存': 5},   # 需補貨
    {'產品編號': 'P010', '產品名稱': '水蜜桃', '價格': 90, '庫存': 55}
]

products_csv = test_dir / 'products.csv'

# 寫入原始商品資料
print("步驟 1: 建立商品資料檔案")
with open(products_csv, 'w', newline='', encoding='utf-8') as f:
    fieldnames = ['產品編號', '產品名稱', '價格', '庫存']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(products)

print(f"✓ 已建立 {len(products)} 個商品")
print()

# 步驟 2：讀取並過濾低庫存商品
print("步驟 2: 篩選庫存低於 50 的商品")
print()

LOW_STOCK_THRESHOLD = 50
low_stock_products = []

with open(products_csv, encoding='utf-8') as f:
    reader = csv.DictReader(f)
    
    for row in reader:
        stock = int(row['庫存'])
        
        # 過濾條件：庫存 < 50
        if stock < LOW_STOCK_THRESHOLD:
            low_stock_products.append(row)
            print(f"⚠️  {row['產品編號']} | {row['產品名稱']:6s} | 庫存: {stock:3d} | 價格: {row['價格']}")

print()
print(f"找到 {len(low_stock_products)} 個需要補貨的商品")
print()

# 步驟 3：匯出低庫存商品清單
low_stock_csv = test_dir / 'low_stock_products.csv'

print("步驟 3: 匯出低庫存商品清單")
with open(low_stock_csv, 'w', newline='', encoding='utf-8') as f:
    fieldnames = ['產品編號', '產品名稱', '價格', '庫存']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(low_stock_products)

print(f"✓ 已匯出至: {low_stock_csv}")
print()

# 步驟 4：計算統計資訊
print("步驟 4: 統計資訊")
total_stock = sum(int(p['庫存']) for p in low_stock_products)
total_value = sum(int(p['價格']) * int(p['庫存']) for p in low_stock_products)
avg_stock = total_stock / len(low_stock_products)

print(f"需補貨商品總數: {len(low_stock_products)} 項")
print(f"總庫存量: {total_stock} 件")
print(f"平均庫存: {avg_stock:.1f} 件")
print(f"總價值: ${total_value:,}")
print()

# 檢視匯出的檔案
print("匯出的 CSV 檔案內容:")
print("="*60)
print(low_stock_csv.read_text(encoding='utf-8'))
print("="*60)

### 📚 知識點總結

- ✅ 使用迴圈 + 條件判斷進行資料過濾
- ✅ 將過濾結果收集到列表中
- ✅ 使用 `DictWriter` 匯出過濾後的資料
- ✅ 使用生成器表達式計算統計值（sum + generator）
- ✅ 實際應用：庫存管理、資料篩選

---

## 範例 3：CSV 與 JSON 格式互轉

### 📋 問題描述

實作一個資料格式轉換工具：
1. 讀取 CSV 檔案並轉換為 JSON 格式
2. 讀取 JSON 檔案並轉換為 CSV 格式
3. 比較兩種格式的檔案大小與特性

**難度**：中級

### 🔍 分析思路

1. CSV → JSON：`DictReader` → `json.dump`
2. JSON → CSV：`json.load` → `DictWriter`
3. 注意欄位順序與資料型別

### 💻 逐步實作

In [None]:
import csv
import json
from pathlib import Path

print("=== 範例 3：CSV 與 JSON 格式互轉 ===")
print()

test_dir = Path('csv_worked_examples')
test_dir.mkdir(exist_ok=True)

# 步驟 1：建立測試資料（員工資料）
employees_csv = test_dir / 'employees.csv'
employees_json = test_dir / 'employees.json'
employees_from_json_csv = test_dir / 'employees_from_json.csv'

employees_data = [
    {'員工編號': 'E001', '姓名': '張三', '部門': '工程部', '薪資': 50000, '年資': 3},
    {'員工編號': 'E002', '姓名': '李四', '部門': '業務部', '薪資': 45000, '年資': 2},
    {'員工編號': 'E003', '姓名': '王五', '部門': '人資部', '薪資': 48000, '年資': 5},
    {'員工編號': 'E004', '姓名': '趙六', '部門': '工程部', '薪資': 52000, '年資': 4}
]

# 先建立原始 CSV
print("準備原始資料...")
with open(employees_csv, 'w', newline='', encoding='utf-8') as f:
    fieldnames = ['員工編號', '姓名', '部門', '薪資', '年資']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(employees_data)

print(f"✓ 原始 CSV 已建立 ({employees_csv.stat().st_size} bytes)")
print()

# 步驟 2：CSV → JSON
print("=== 轉換 1: CSV → JSON ===")
print()

# 讀取 CSV
with open(employees_csv, encoding='utf-8') as f:
    reader = csv.DictReader(f)
    data = list(reader)  # 轉為列表

print(f"從 CSV 讀取了 {len(data)} 筆資料")

# 寫入 JSON
with open(employees_json, 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

print(f"✓ 已轉換為 JSON ({employees_json.stat().st_size} bytes)")
print()

print("JSON 檔案內容:")
print("="*60)
print(employees_json.read_text(encoding='utf-8'))
print("="*60)
print()

# 步驟 3：JSON → CSV
print("=== 轉換 2: JSON → CSV ===")
print()

# 讀取 JSON
with open(employees_json, encoding='utf-8') as f:
    data = json.load(f)

print(f"從 JSON 讀取了 {len(data)} 筆資料")

# 寫入 CSV
if data:  # 確保有資料
    fieldnames = list(data[0].keys())  # 從第一筆資料取得欄位名稱
    
    with open(employees_from_json_csv, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)
    
    print(f"✓ 已轉換為 CSV ({employees_from_json_csv.stat().st_size} bytes)")
    print()

# 步驟 4：比較分析
print("=== 格式比較分析 ===")
print()

csv_size = employees_csv.stat().st_size
json_size = employees_json.stat().st_size
size_ratio = json_size / csv_size

print(f"CSV  檔案大小: {csv_size:4d} bytes")
print(f"JSON 檔案大小: {json_size:4d} bytes")
print(f"大小比例: JSON 是 CSV 的 {size_ratio:.2f} 倍")
print()

print("格式特性比較:")
print("-" * 60)
print(f"{'特性':<15} | {'CSV':<20} | {'JSON'}")
print("-" * 60)
print(f"{'可讀性':<15} | {'高 (表格)':<20} | 中 (結構化)")
print(f"{'檔案大小':<15} | {'小':<20} | 較大 (約 1.5-2 倍)")
print(f"{'資料型別':<15} | {'全是字串':<20} | 保留原始型別")
print(f"{'支援巢狀':<15} | {'否':<20} | 是")
print(f"{'Excel 相容':<15} | {'是':<20} | 否")
print("-" * 60)
print()

print("📌 建議:")
print("  - 表格資料、大檔案 → 使用 CSV")
print("  - API 資料交換、需保留型別 → 使用 JSON")
print("  - 與 Excel 互動 → 使用 CSV")

### 📚 知識點總結

- ✅ CSV → JSON: `DictReader` + `json.dump`
- ✅ JSON → CSV: `json.load` + `DictWriter`
- ✅ JSON 檔案較大但保留資料型別
- ✅ CSV 更輕量但所有資料都是字串
- ✅ 根據需求選擇適合的格式

---

## 範例 4：處理中文與特殊字元（Excel 相容）

### 📋 問題描述

建立一個包含中文與特殊字元的 CSV 檔案，確保：
1. 在 Excel 中能正確顯示中文
2. 正確處理欄位中包含逗號、換行的資料
3. 比較不同編碼的效果

**難度**：進階

### 🔍 分析思路

1. 使用 **UTF-8-sig** 編碼（含 BOM），讓 Excel 正確辨識
2. csv 模組會自動處理特殊字元（加引號）
3. 比較 UTF-8 vs UTF-8-sig 的差異

### 💻 逐步實作

In [None]:
import csv
from pathlib import Path

print("=== 範例 4：處理中文與特殊字元（Excel 相容）===")
print()

test_dir = Path('csv_worked_examples')
test_dir.mkdir(exist_ok=True)

# 步驟 1：準備包含特殊字元的資料
print("步驟 1: 準備測試資料（包含特殊字元）")
print()

special_data = [
    {
        '姓名': '張三',
        '職稱': '資深工程師, Team Lead',  # 含逗號
        '技能': 'Python\nJavaScript\nGo',  # 含換行
        '座右銘': '"Stay hungry, stay foolish"'  # 含引號
    },
    {
        '姓名': '李四',
        '職稱': 'UI/UX 設計師',
        '技能': 'Figma, Sketch, Adobe XD',  # 含逗號
        '座右銘': '設計改變世界'
    },
    {
        '姓名': '王五',
        '職稱': '專案經理（PM）',
        '技能': 'Agile\nScrum\nKanban',  # 含換行
        '座右銘': '"Done is better than perfect"'  # 含引號
    }
]

for i, person in enumerate(special_data, 1):
    print(f"資料 {i}:")
    print(f"  姓名: {person['姓名']}")
    print(f"  職稱: {person['職稱']}")
    print(f"  技能: {repr(person['技能'])}")
    print(f"  座右銘: {person['座右銘']}")
    print()

# 步驟 2：使用 UTF-8 寫入（可能在 Windows Excel 中顯示亂碼）
print("步驟 2: 使用 UTF-8 編碼寫入")
utf8_csv = test_dir / 'special_utf8.csv'

with open(utf8_csv, 'w', newline='', encoding='utf-8') as f:
    fieldnames = ['姓名', '職稱', '技能', '座右銘']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(special_data)

print(f"✓ UTF-8 檔案: {utf8_csv}")
print(f"  檔案大小: {utf8_csv.stat().st_size} bytes")
print()

# 步驟 3：使用 UTF-8-sig 寫入（Excel 相容）
print("步驟 3: 使用 UTF-8-sig 編碼寫入（Excel 相容）")
utf8sig_csv = test_dir / 'special_utf8_sig.csv'

with open(utf8sig_csv, 'w', newline='', encoding='utf-8-sig') as f:
    fieldnames = ['姓名', '職稱', '技能', '座右銘']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(special_data)

print(f"✓ UTF-8-sig 檔案: {utf8sig_csv}")
print(f"  檔案大小: {utf8sig_csv.stat().st_size} bytes")
print()

# 步驟 4：檢視檔案內容
print("步驟 4: 檢視檔案內容（注意特殊字元處理）")
print("="*70)
print(utf8sig_csv.read_text(encoding='utf-8-sig'))
print("="*70)
print()

print("📌 觀察要點:")
print("  1. 含逗號的欄位 → 自動加上雙引號")
print("  2. 含換行的欄位 → 用雙引號包裹")
print("  3. 含引號的欄位 → 引號被自動跳脫")
print()

# 步驟 5：讀取驗證
print("步驟 5: 讀取驗證（確保資料正確還原）")
print()

with open(utf8sig_csv, encoding='utf-8-sig') as f:
    reader = csv.DictReader(f)
    
    for i, row in enumerate(reader, 1):
        print(f"讀取第 {i} 筆:")
        print(f"  姓名: {row['姓名']}")
        print(f"  職稱: {row['職稱']}")
        print(f"  技能: {repr(row['技能'])}")
        print(f"  座右銘: {row['座右銘']}")
        print()

print("✓ 資料完整還原，無任何損失")
print()

# 編碼比較
print("=== 編碼比較 ===")
print()
utf8_size = utf8_csv.stat().st_size
utf8sig_size = utf8sig_csv.stat().st_size
bom_size = utf8sig_size - utf8_size

print(f"UTF-8 檔案:     {utf8_size} bytes")
print(f"UTF-8-sig 檔案: {utf8sig_size} bytes")
print(f"差異（BOM）:    {bom_size} bytes")
print()
print("💡 建議:")
print("  - 需要在 Windows Excel 開啟 → 使用 UTF-8-sig")
print("  - 純程式處理 → 使用 UTF-8")
print("  - 跨平台相容性 → 使用 UTF-8-sig")

### 📚 知識點總結

- ✅ **UTF-8-sig**：含 BOM，Excel 可正確顯示中文
- ✅ csv 模組自動處理特殊字元（加引號、跳脫）
- ✅ 含逗號、換行、引號的欄位會自動加雙引號
- ✅ 讀取時資料會自動還原，無需手動處理
- ✅ UTF-8-sig 僅比 UTF-8 多 3 bytes（BOM）

---

## 範例 5：CSV 資料清理與驗證

### 📋 問題描述

處理真實世界的「髒資料」CSV 檔案：
1. 檢測常見問題：空值、格式錯誤、異常值
2. 清理並修正資料
3. 產生清理報告
4. 匯出乾淨的資料

**難度**：進階

### 🔍 分析思路

1. 定義驗證規則
2. 逐列檢查並記錄問題
3. 實施清理策略（修正或移除）
4. 產生清理前後對比報告

### 💻 逐步實作

In [None]:
import csv
from pathlib import Path
import re

print("=== 範例 5：CSV 資料清理與驗證 ===")
print()

test_dir = Path('csv_worked_examples')
test_dir.mkdir(exist_ok=True)

# 步驟 1：建立「髒資料」檔案
print("步驟 1: 建立測試資料（包含各種問題）")
print()

dirty_csv = test_dir / 'dirty_data.csv'
dirty_csv.write_text(
'''姓名,年齡,Email,電話,薪資
張三,25,zhang@example.com,0912345678,50000
李四,,li@example.com,0923456789,45000
,30,wang@invalid,0934567890,48000
趙六,abc,zhao@example.com,091-234-5678,52000
錢七,28,qian,0945678901,
孫八,150,sun@example.com,0956789012,30000
周九,27,zhou@example.com,123,99999999
吳十,22,wu@example.com,0967890123,35000
''',
    encoding='utf-8'
)

print("原始髒資料內容:")
print("="*70)
print(dirty_csv.read_text(encoding='utf-8'))
print("="*70)
print()

# 步驟 2：資料驗證與問題檢測
print("步驟 2: 資料驗證與問題檢測")
print()

issues = []  # 記錄所有問題
valid_rows = []  # 儲存有效資料

def validate_email(email):
    """簡單的 Email 驗證"""
    pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    return re.match(pattern, email) is not None

def validate_phone(phone):
    """台灣手機號碼驗證（09開頭10碼）"""
    # 移除分隔符號
    phone = phone.replace('-', '').replace(' ', '')
    return phone.startswith('09') and len(phone) == 10 and phone.isdigit()

with open(dirty_csv, encoding='utf-8') as f:
    reader = csv.DictReader(f)
    
    for line_num, row in enumerate(reader, 2):  # 從第 2 列開始（含標題）
        row_issues = []
        
        # 驗證 1: 姓名不可為空
        if not row['姓名'].strip():
            row_issues.append('姓名為空')
        
        # 驗證 2: 年齡必須是 18-65 的整數
        try:
            age = int(row['年齡'])
            if age < 18 or age > 65:
                row_issues.append(f'年齡異常 ({age})')
        except ValueError:
            row_issues.append(f'年齡格式錯誤 ({row["年齡"]})')
        
        # 驗證 3: Email 格式
        if not validate_email(row['Email']):
            row_issues.append(f'Email 格式錯誤 ({row["Email"]})')
        
        # 驗證 4: 電話格式
        if not validate_phone(row['電話']):
            row_issues.append(f'電話格式錯誤 ({row["電話"]})')
        
        # 驗證 5: 薪資必須是正整數
        try:
            salary = int(row['薪資'])
            if salary <= 0 or salary > 10000000:
                row_issues.append(f'薪資異常 ({salary})')
        except ValueError:
            row_issues.append(f'薪資格式錯誤 ({row["薪資"]})')
        
        # 記錄問題
        if row_issues:
            issues.append({
                'line': line_num,
                'data': row,
                'issues': row_issues
            })

print(f"檢測完成！發現 {len(issues)} 列有問題：")
print()

for issue in issues:
    print(f"第 {issue['line']} 列: {issue['data']['姓名'] or '(空白)'}")
    for problem in issue['issues']:
        print(f"  ❌ {problem}")
    print()

# 步驟 3：資料清理
print("步驟 3: 資料清理")
print()

clean_data = []

with open(dirty_csv, encoding='utf-8') as f:
    reader = csv.DictReader(f)
    
    for row in reader:
        # 策略 1: 跳過姓名為空的資料
        if not row['姓名'].strip():
            continue
        
        # 策略 2: 修正年齡（無效則設為 0）
        try:
            age = int(row['年齡'])
            if age < 18 or age > 65:
                age = 0  # 標記為異常
        except ValueError:
            age = 0
        
        # 策略 3: 修正 Email（無效則設預設值）
        email = row['Email'] if validate_email(row['Email']) else 'invalid@example.com'
        
        # 策略 4: 修正電話（移除分隔符號）
        phone = row['電話'].replace('-', '').replace(' ', '')
        if not validate_phone(phone):
            phone = '0900000000'  # 預設值
        
        # 策略 5: 修正薪資（無效則設為 0）
        try:
            salary = int(row['薪資'])
            if salary <= 0 or salary > 10000000:
                salary = 0
        except ValueError:
            salary = 0
        
        # 建立乾淨的資料
        clean_row = {
            '姓名': row['姓名'].strip(),
            '年齡': age,
            'Email': email,
            '電話': phone,
            '薪資': salary
        }
        clean_data.append(clean_row)

print(f"清理完成！剩餘 {len(clean_data)} 列有效資料")
print()

# 步驟 4：匯出乾淨的資料
clean_csv = test_dir / 'clean_data.csv'

with open(clean_csv, 'w', newline='', encoding='utf-8') as f:
    fieldnames = ['姓名', '年齡', 'Email', '電話', '薪資']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(clean_data)

print(f"✓ 乾淨資料已匯出: {clean_csv}")
print()

print("清理後的資料:")
print("="*70)
print(clean_csv.read_text(encoding='utf-8'))
print("="*70)
print()

# 步驟 5：清理報告
print("=== 清理報告 ===")
print()
print(f"原始資料: 8 列")
print(f"問題資料: {len(issues)} 列")
print(f"清理後: {len(clean_data)} 列")
print(f"移除: {8 - len(clean_data)} 列")
print()
print("清理策略:")
print("  ✓ 空姓名 → 移除該列")
print("  ✓ 無效年齡 → 設為 0（標記異常）")
print("  ✓ 無效 Email → 設為 invalid@example.com")
print("  ✓ 無效電話 → 設為 0900000000")
print("  ✓ 無效薪資 → 設為 0")

### 📚 知識點總結

- ✅ 使用正規表達式驗證 Email 格式
- ✅ 定義驗證規則並逐列檢查
- ✅ 清理策略：移除、修正、填入預設值
- ✅ 記錄所有問題以便追蹤稽核
- ✅ 產生清理前後對比報告

---

## 範例 6：大型 CSV 檔案串流處理

### 📋 問題描述

處理包含 10,000 筆資料的大型 CSV 檔案：
1. 比較一次載入 vs 串流處理的效能
2. 實作批次處理（每 1000 筆處理一次）
3. 計算統計資訊（不載入全部到記憶體）

**難度**：進階

### 🔍 分析思路

1. 生成大型測試檔案
2. 避免 `list(reader)` 載入全部
3. 使用 for 迴圈逐列處理（串流）
4. 批次累積處理平衡效能

### 💻 逐步實作

In [None]:
import csv
from pathlib import Path
import time

print("=== 範例 6：大型 CSV 檔案串流處理 ===")
print()

test_dir = Path('csv_worked_examples')
test_dir.mkdir(exist_ok=True)

# 步驟 1：生成大型測試檔案
print("步驟 1: 生成大型測試檔案 (10,000 筆資料)...")

large_csv = test_dir / 'large_transactions.csv'
NUM_RECORDS = 10000

start = time.time()

with open(large_csv, 'w', newline='', encoding='utf-8') as f:
    writer = csv.DictWriter(f, fieldnames=['交易ID', '日期', '金額', '分類', '備註'])
    writer.writeheader()
    
    categories = ['食品', '交通', '娛樂', '購物', '醫療']
    
    for i in range(1, NUM_RECORDS + 1):
        writer.writerow({
            '交易ID': f'T{i:06d}',
            '日期': f'2024-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}',
            '金額': (i * 13) % 5000 + 100,  # 100-5100 之間
            '分類': categories[i % len(categories)],
            '備註': f'交易備註 {i}'
        })

elapsed = time.time() - start
file_size = large_csv.stat().st_size

print(f"✓ 生成完成")
print(f"  記錄數: {NUM_RECORDS:,}")
print(f"  檔案大小: {file_size:,} bytes ({file_size/1024/1024:.2f} MB)")
print(f"  耗時: {elapsed:.3f} 秒")
print()

# 步驟 2：錯誤方法 - 一次載入全部
print("步驟 2: 方法 A - 一次載入全部（不推薦）")
print()

start = time.time()

with open(large_csv, encoding='utf-8') as f:
    reader = csv.DictReader(f)
    all_data = list(reader)  # ❌ 載入全部到記憶體

elapsed = time.time() - start

print(f"載入 {len(all_data):,} 筆資料")
print(f"耗時: {elapsed:.3f} 秒")
print(f"⚠️  記憶體使用: 所有資料都在記憶體中")
print()

# 計算統計（使用載入的資料）
total_amount = sum(int(row['金額']) for row in all_data)
avg_amount = total_amount / len(all_data)
print(f"總金額: ${total_amount:,}")
print(f"平均金額: ${avg_amount:.2f}")
print()

del all_data  # 釋放記憶體

# 步驟 3：正確方法 - 串流處理
print("步驟 3: 方法 B - 串流處理（推薦）")
print()

start = time.time()

count = 0
total_amount = 0
category_counts = {}

with open(large_csv, encoding='utf-8') as f:
    reader = csv.DictReader(f)
    
    # ✅ 逐列處理，不載入全部
    for row in reader:
        count += 1
        total_amount += int(row['金額'])
        
        # 統計分類
        category = row['分類']
        category_counts[category] = category_counts.get(category, 0) + 1

elapsed = time.time() - start
avg_amount = total_amount / count

print(f"處理 {count:,} 筆資料")
print(f"耗時: {elapsed:.3f} 秒")
print(f"✓ 記憶體使用: 穩定（僅保留當前列）")
print()
print(f"總金額: ${total_amount:,}")
print(f"平均金額: ${avg_amount:.2f}")
print()
print("分類統計:")
for category, count in sorted(category_counts.items()):
    print(f"  {category}: {count:,} 筆")
print()

# 步驟 4：批次處理
print("步驟 4: 方法 C - 批次處理（平衡效能）")
print()

BATCH_SIZE = 1000
batch_num = 0
current_batch = []
batch_stats = []

start = time.time()

with open(large_csv, encoding='utf-8') as f:
    reader = csv.DictReader(f)
    
    for row in reader:
        current_batch.append(row)
        
        # 每累積到 BATCH_SIZE 筆就處理一次
        if len(current_batch) >= BATCH_SIZE:
            batch_num += 1
            
            # 處理這批資料
            batch_total = sum(int(r['金額']) for r in current_batch)
            batch_avg = batch_total / len(current_batch)
            
            batch_stats.append({
                'batch': batch_num,
                'count': len(current_batch),
                'total': batch_total,
                'avg': batch_avg
            })
            
            print(f"批次 {batch_num:2d}: {len(current_batch):4d} 筆 | 總計: ${batch_total:8,} | 平均: ${batch_avg:7.2f}")
            
            current_batch = []  # 清空批次
    
    # 處理剩餘資料
    if current_batch:
        batch_num += 1
        batch_total = sum(int(r['金額']) for r in current_batch)
        batch_avg = batch_total / len(current_batch)
        
        batch_stats.append({
            'batch': batch_num,
            'count': len(current_batch),
            'total': batch_total,
            'avg': batch_avg
        })
        
        print(f"批次 {batch_num:2d}: {len(current_batch):4d} 筆 | 總計: ${batch_total:8,} | 平均: ${batch_avg:7.2f}")

elapsed = time.time() - start

print()
print(f"共處理 {batch_num} 個批次")
print(f"耗時: {elapsed:.3f} 秒")
print()

# 總結
print("=== 效能建議 ===")
print()
print("✓ 小檔案 (< 10MB): 一次載入 OK")
print("✓ 中檔案 (10-100MB): 串流處理")
print("✓ 大檔案 (> 100MB): 批次處理")
print("✓ 超大檔案 (> 1GB): 考慮使用 pandas + chunking")
print()
print("原則:")
print("  1. 避免 list(reader) 載入全部")
print("  2. 使用 for 迴圈逐列處理")
print("  3. 只保留需要的資料")
print("  4. 批次處理平衡記憶體與效能")

### 📚 知識點總結

- ✅ **串流處理**：逐列讀取，記憶體使用穩定
- ✅ **批次處理**：累積一定數量再處理，平衡效能
- ✅ 避免 `list(reader)` 載入全部到記憶體
- ✅ 使用 for 迴圈是最佳實踐
- ✅ 適用於處理 GB 級別的超大 CSV 檔案

---

## 範例 7：不同分隔符號的 CSV 處理

### 📋 問題描述

處理使用不同分隔符號的資料檔案：
1. Tab 分隔（TSV）
2. 分號分隔（常見於歐洲）
3. 管線分隔（PSV）
4. 自動偵測分隔符號

**難度**：中級

### 🔍 分析思路

1. 使用 `delimiter` 參數指定分隔符號
2. 實作自動偵測邏輯
3. 比較不同分隔符號的優缺點

### 💻 逐步實作

In [None]:
import csv
from pathlib import Path

print("=== 範例 7：不同分隔符號的 CSV 處理 ===")
print()

test_dir = Path('csv_worked_examples')
test_dir.mkdir(exist_ok=True)

# 測試資料
test_data = [
    {'產品': '蘋果', '價格': '50', '描述': '新鮮水果，富含維生素C'},
    {'產品': '香蕉', '價格': '30', '描述': '進口香蕉，營養豐富'},
    {'產品': '橘子', '價格': '40', '描述': '當季水果，酸甜可口'}
]

# 步驟 1：建立不同分隔符號的檔案
print("步驟 1: 建立不同分隔符號的檔案")
print()

delimiters = [
    (',', 'comma', 'CSV (逗號)', '.csv'),
    ('\t', 'tab', 'TSV (Tab)', '.tsv'),
    (';', 'semicolon', 'SSV (分號)', '.csv'),
    ('|', 'pipe', 'PSV (管線)', '.psv')
]

files = []

for delimiter, name, description, ext in delimiters:
    filename = test_dir / f'products_{name}{ext}'
    
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        fieldnames = ['產品', '價格', '描述']
        writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter=delimiter)
        writer.writeheader()
        writer.writerows(test_data)
    
    files.append((filename, delimiter, description))
    print(f"✓ {description}: {filename.name}")

print()

# 步驟 2：讀取不同分隔符號的檔案
print("步驟 2: 讀取不同分隔符號的檔案")
print()

for filename, delimiter, description in files:
    print(f"=== {description} ===")
    print(f"檔案內容（原始）:")
    content = filename.read_text(encoding='utf-8')
    print(repr(content[:100]) + '...')  # 顯示前 100 字元
    print()
    
    print("解析結果:")
    with open(filename, encoding='utf-8') as f:
        reader = csv.DictReader(f, delimiter=delimiter)
        for i, row in enumerate(reader, 1):
            print(f"  {i}. {row['產品']:6s} | ${row['價格']:3s} | {row['描述']}")
    print()

# 步驟 3：自動偵測分隔符號
print("步驟 3: 自動偵測分隔符號")
print()

def detect_delimiter(file_path, possible_delimiters=[',', '\t', ';', '|']):
    """
    自動偵測 CSV 檔案的分隔符號
    
    策略：讀取第一列，計算每個候選分隔符號出現的次數
    """
    with open(file_path, encoding='utf-8') as f:
        first_line = f.readline()
    
    # 計算每個分隔符號的出現次數
    counts = {}
    for delim in possible_delimiters:
        counts[delim] = first_line.count(delim)
    
    # 找出出現最多次的分隔符號
    if max(counts.values()) == 0:
        return None
    
    return max(counts, key=counts.get)

# 測試自動偵測
for filename, actual_delimiter, description in files:
    detected = detect_delimiter(filename)
    delim_name = {
        ',': '逗號 (,)',
        '\t': 'Tab (\\t)',
        ';': '分號 (;)',
        '|': '管線 (|)'
    }.get(detected, '未知')
    
    status = '✓' if detected == actual_delimiter else '✗'
    print(f"{status} {description:15s} → 偵測為: {delim_name}")

print()

# 步驟 4：使用 csv.Sniffer 自動偵測（進階）
print("步驟 4: 使用 csv.Sniffer 自動偵測（內建方法）")
print()

for filename, actual_delimiter, description in files:
    with open(filename, encoding='utf-8') as f:
        sample = f.read(1024)  # 讀取前 1024 字元作為樣本
        f.seek(0)  # 回到開頭
        
        try:
            sniffer = csv.Sniffer()
            dialect = sniffer.sniff(sample)
            detected = dialect.delimiter
            
            delim_name = {
                ',': '逗號 (,)',
                '\t': 'Tab (\\t)',
                ';': '分號 (;)',
                '|': '管線 (|)'
            }.get(detected, f'其他 ({repr(detected)})')
            
            status = '✓' if detected == actual_delimiter else '✗'
            print(f"{status} {description:15s} → Sniffer 偵測為: {delim_name}")
        
        except csv.Error as e:
            print(f"✗ {description:15s} → 無法偵測: {e}")

print()

# 總結
print("=== 分隔符號選擇指南 ===")
print()
print("逗號 (,):  最常見，但資料中可能包含逗號")
print("Tab (\\t):  適合資料中包含逗號的情況")
print("分號 (;):  歐洲地區常用（小數點用逗號）")
print("管線 (|):  資料複雜時的選擇")
print()
print("建議:")
print("  1. 優先使用逗號（標準）")
print("  2. 資料含逗號時使用 Tab")
print("  3. 使用 csv.Sniffer 自動偵測")

### 📚 知識點總結

- ✅ 使用 `delimiter` 參數指定分隔符號
- ✅ 常見分隔符號：`,`、`\t`、`;`、`|`
- ✅ 實作自動偵測邏輯（計算出現次數）
- ✅ `csv.Sniffer` 可自動偵測分隔符號
- ✅ 根據資料特性選擇適合的分隔符號

---

## 範例 8：實戰 - 銷售資料分析系統

### 📋 問題描述

建立一個完整的銷售資料分析系統：
1. 讀取銷售記錄 CSV
2. 計算各種統計指標（總銷售額、平均單價、最暢銷商品）
3. 按月份分組分析
4. 產生分析報告並匯出

**難度**：綜合應用

### 🔍 分析思路

1. 設計資料結構（銷售記錄）
2. 使用 DictReader 讀取
3. 進行多維度分析
4. 匯出分析結果

### 💻 逐步實作

In [None]:
import csv
from pathlib import Path
from collections import defaultdict
from datetime import datetime

print("=== 範例 8：實戰 - 銷售資料分析系統 ===")
print()

test_dir = Path('csv_worked_examples')
test_dir.mkdir(exist_ok=True)

# 步驟 1：建立銷售資料
print("步驟 1: 建立測試銷售資料")
print()

sales_data = [
    {'日期': '2024-01-05', '產品': 'Python 書籍', '數量': 5, '單價': 450},
    {'日期': '2024-01-10', '產品': 'JavaScript 書籍', '數量': 3, '單價': 500},
    {'日期': '2024-01-15', '產品': 'Python 書籍', '數量': 8, '單價': 450},
    {'日期': '2024-02-03', '產品': 'Java 書籍', '數量': 6, '單價': 480},
    {'日期': '2024-02-08', '產品': 'Python 書籍', '數量': 10, '單價': 450},
    {'日期': '2024-02-12', '產品': 'JavaScript 書籍', '數量': 7, '單價': 500},
    {'日期': '2024-03-01', '產品': 'Go 書籍', '數量': 4, '單價': 550},
    {'日期': '2024-03-07', '產品': 'Python 書籍', '數量': 12, '單價': 450},
    {'日期': '2024-03-15', '產品': 'Java 書籍', '數量': 9, '單價': 480},
    {'日期': '2024-03-20', '產品': 'JavaScript 書籍', '數量': 6, '單價': 500}
]

sales_csv = test_dir / 'sales_records.csv'

with open(sales_csv, 'w', newline='', encoding='utf-8-sig') as f:
    fieldnames = ['日期', '產品', '數量', '單價']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(sales_data)

print(f"✓ 建立 {len(sales_data)} 筆銷售記錄")
print()

# 步驟 2：讀取並計算總體統計
print("步驟 2: 總體銷售統計")
print()

total_revenue = 0
total_quantity = 0
product_stats = defaultdict(lambda: {'quantity': 0, 'revenue': 0})

with open(sales_csv, encoding='utf-8-sig') as f:
    reader = csv.DictReader(f)
    
    for row in reader:
        quantity = int(row['數量'])
        price = int(row['單價'])
        revenue = quantity * price
        product = row['產品']
        
        total_revenue += revenue
        total_quantity += quantity
        
        product_stats[product]['quantity'] += quantity
        product_stats[product]['revenue'] += revenue

print(f"總銷售額: ${total_revenue:,}")
print(f"總銷售量: {total_quantity} 件")
print(f"平均單筆: ${total_revenue / len(sales_data):.2f}")
print()

# 步驟 3：產品銷售排名
print("步驟 3: 產品銷售排名")
print()

print(f"{'產品':<20} | {'銷售量':>6} | {'銷售額':>10} | {'平均單價':>8}")
print("-" * 55)

# 按銷售額排序
sorted_products = sorted(
    product_stats.items(),
    key=lambda x: x[1]['revenue'],
    reverse=True
)

for product, stats in sorted_products:
    quantity = stats['quantity']
    revenue = stats['revenue']
    avg_price = revenue / quantity
    print(f"{product:<20} | {quantity:6d} | ${revenue:9,} | ${avg_price:7.2f}")

print()

# 步驟 4: 按月份分析
print("步驟 4: 按月份分析")
print()

monthly_stats = defaultdict(lambda: {'revenue': 0, 'quantity': 0, 'orders': 0})

with open(sales_csv, encoding='utf-8-sig') as f:
    reader = csv.DictReader(f)
    
    for row in reader:
        date = row['日期']
        month = date[:7]  # 取 YYYY-MM
        
        quantity = int(row['數量'])
        price = int(row['單價'])
        revenue = quantity * price
        
        monthly_stats[month]['revenue'] += revenue
        monthly_stats[month]['quantity'] += quantity
        monthly_stats[month]['orders'] += 1

print(f"{'月份':<10} | {'訂單數':>6} | {'銷售量':>6} | {'銷售額':>10} | {'平均訂單':>10}")
print("-" * 60)

for month in sorted(monthly_stats.keys()):
    stats = monthly_stats[month]
    orders = stats['orders']
    quantity = stats['quantity']
    revenue = stats['revenue']
    avg_order = revenue / orders
    
    print(f"{month:<10} | {orders:6d} | {quantity:6d} | ${revenue:9,} | ${avg_order:9.2f}")

print()

# 步驟 5: 匯出分析報告
print("步驟 5: 匯出分析報告")
print()

report_csv = test_dir / 'sales_report.csv'

with open(report_csv, 'w', newline='', encoding='utf-8-sig') as f:
    fieldnames = ['產品', '銷售量', '銷售額', '平均單價', '排名']
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    
    for rank, (product, stats) in enumerate(sorted_products, 1):
        writer.writerow({
            '產品': product,
            '銷售量': stats['quantity'],
            '銷售額': stats['revenue'],
            '平均單價': stats['revenue'] / stats['quantity'],
            '排名': rank
        })

print(f"✓ 分析報告已匯出: {report_csv}")
print()

print("報告內容:")
print("="*60)
print(report_csv.read_text(encoding='utf-8-sig'))
print("="*60)
print()

# 總結
best_product = sorted_products[0]
best_month = max(monthly_stats.items(), key=lambda x: x[1]['revenue'])

print("=== 分析總結 ===")
print()
print(f"🏆 最暢銷產品: {best_product[0]}")
print(f"   - 銷售量: {best_product[1]['quantity']} 件")
print(f"   - 銷售額: ${best_product[1]['revenue']:,}")
print()
print(f"📅 最佳銷售月: {best_month[0]}")
print(f"   - 訂單數: {best_month[1]['orders']} 筆")
print(f"   - 銷售額: ${best_month[1]['revenue']:,}")

### 📚 知識點總結

- ✅ 使用 `defaultdict` 進行分組統計
- ✅ 多維度資料分析（產品、月份）
- ✅ 使用 `sorted()` + `lambda` 進行排序
- ✅ 計算總體與分組統計指標
- ✅ 匯出分析報告供後續使用
- ✅ 完整的實戰應用流程

---

## 🎯 總結

本檔案完成了 **8 個循序漸進的詳解範例**，涵蓋：

1. ✅ **基礎讀寫**：學生成績管理
2. ✅ **資料過濾**：商品庫存管理
3. ✅ **格式轉換**：CSV ↔ JSON
4. ✅ **編碼處理**：中文與 Excel 相容
5. ✅ **資料清理**：驗證與修正髒資料
6. ✅ **效能優化**：大型檔案串流處理
7. ✅ **分隔符號**：多格式支援與自動偵測
8. ✅ **綜合應用**：銷售資料分析系統

**下一步**：完成 `03-practice.ipynb` 的課堂練習，鞏固所學！