### CB數據自動化更新

1. 重新設定成自動偵測今天日期，每當下午5點將當天以及當天以前未更新的數據更新上去
2. 新增數據，從後面補上 (前面數據不重跑)，如果遇到沒出現過的標的名稱，自動新增欄位標記上去

#### 將新的日期之數據下載至cb_reports

In [None]:
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import Select, WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import requests, os, time

def download_cb_reports_month(year="2024", month="7"):
    # 民國年資料夾：e.g., 2024 ➜ 113年
    roc_year = str(int(year) - 1911)
    folder = os.path.join("U:/CB數據庫/cb_reports", f"{roc_year}年")
    os.makedirs(folder, exist_ok=True)

    options = webdriver.ChromeOptions()
    # 如需除錯可註解 headless
    options.add_argument("--headless")
    driver = webdriver.Chrome(service=Service(), options=options)
    wait = WebDriverWait(driver, 10)

    # 開啟目標網頁
    driver.get("https://www.tpex.org.tw/zh-tw/bond/info/statistics-cb/day.html")

    try:
        # 等待所有選單元素
        wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "select.select-year.selectobj")))
        wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "select.select-month.selectobj")))
        wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "select[name='fileCode']")))

        # 每次都重新定位選單元素（避免舊元素失效）
        year_select = Select(driver.find_element(By.CSS_SELECTOR, "select.select-year.selectobj"))
        month_select = Select(driver.find_element(By.CSS_SELECTOR, "select.select-month.selectobj"))
        report_select = Select(driver.find_element(By.CSS_SELECTOR, "select[name='fileCode']"))

        year_select.select_by_value(year)
        month_select.select_by_value(str(month))
        report_select.select_by_value("rsta0113")

        # 等待頁面更新
        time.sleep(2)
        wait.until(EC.presence_of_element_located((By.XPATH, "//a[contains(@href, '.csv')]")))

        # 擷取表格
        rows = driver.find_elements(By.XPATH, '//table//tr')[1:]  # 跳過表頭
        for row in rows:
            try:
                tds = row.find_elements(By.TAG_NAME, "td")
                if len(tds) >= 2:
                    date_str = tds[0].text.strip().replace("/", "")
                    link = tds[1].find_element(By.TAG_NAME, "a").get_attribute("href")
                    filepath = os.path.join(folder, f"{date_str}.csv")

                    if os.path.exists(filepath):
                        print(f"{date_str}.csv 已存在，略過")
                        continue

                    res = requests.get(link)
                    if res.status_code == 200:
                        with open(filepath, "wb") as f:
                            f.write(res.content)
                        print(f"{date_str}.csv ✅ 成功下載")
                    else:
                        print(f"{date_str}.csv ❌ 下載失敗")
            except Exception as e:
                print(f"⚠️ 跳過一列：{e}")

    except Exception as e:
        print(f"❌ 無法處理 {year} 年 {month} 月：{e}")

    driver.quit()
    print(f"📦 完成：{year} 年 {month} 月資料已儲存至 {folder}")

from datetime import datetime, timedelta

def get_target_months():
    today = datetime.today()
    year_month_list = []

    # 永遠抓當月
    year_month_list.append((today.year, today.month))

    # 如果今天是月初（前 3 天），也抓上個月
    if today.day <= 5:
        last_month = today.replace(day=1) - timedelta(days=1)
        year_month_list.append((last_month.year, last_month.month))

    return year_month_list

if __name__ == "__main__":
    for y, m in get_target_months():
        download_cb_reports_month(str(y), str(m))


  folder = os.path.join("U:\CB數據庫\cb_reports", f"{roc_year}年")


1140828.csv ✅ 成功下載
1140827.csv 已存在，略過
1140826.csv 已存在，略過
1140825.csv 已存在，略過
1140822.csv 已存在，略過
1140821.csv 已存在，略過
1140820.csv 已存在，略過
1140819.csv 已存在，略過
1140818.csv 已存在，略過
1140815.csv 已存在，略過
1140814.csv 已存在，略過
1140813.csv 已存在，略過
1140812.csv 已存在，略過
1140811.csv 已存在，略過
1140808.csv 已存在，略過
1140807.csv 已存在，略過
1140806.csv 已存在，略過
1140805.csv 已存在，略過
1140804.csv 已存在，略過
1140801.csv 已存在，略過
📦 完成：2025 年 8 月資料已儲存至 U:\CB數據庫\cb_reports\114年


#### 讀取與整合原始數據

In [None]:
import pandas as pd
import os

def safe_read_cp950(file_path):
    """使用 cp950 編碼讀取 CSV，跳過前3行與後3行之表頭與備註"""
    try:
        df = pd.read_csv(file_path, encoding="cp950", skiprows=3)
        df = df[:-3] if len(df) > 3 else df
        df = df[df["交易"] == "等價"]
        return df
    
    # 如果讀取失敗，回傳空的 DataFrame
    except Exception as e:
        print(f"❌ {os.path.basename(file_path)} 讀取失敗：{e}")
        return pd.DataFrame()

def normalize_roc_date(raw_str):
    """將100年以內數據前面補0處理，並轉為 datetime"""
    if len(raw_str) == 6 and raw_str.isdigit():
        raw_str = "0" + raw_str
    elif len(raw_str) != 7 or not raw_str.isdigit():
        return pd.NaT
    try:
        roc_year = int(raw_str[:3])
        year = roc_year + 1911
        month = int(raw_str[3:5])
        day = int(raw_str[5:7])
        return pd.Timestamp(year=year, month=month, day=day)
    except:
        return pd.NaT
    
def load_existing_matrix(csv_path):
    if os.path.exists(csv_path):
        df = pd.read_csv(csv_path, encoding="utf-8-sig", header=[0,1], index_col=0, parse_dates=True)
        df.columns.names = ["代號", "標的"]
        return df
    return pd.DataFrame()

def collect_new_data(base_folder, start_year, end_year, base, existing_dates):
    dfs = []
    for year in range(int(start_year), int(end_year) + 1):
        folder_path = os.path.join(base_folder, f"{year}年")
        if not os.path.exists(folder_path):
            continue
        for file in sorted(os.listdir(folder_path)):
            if not file.endswith(".csv"):
                continue
            date = normalize_roc_date(file.replace(".csv", ""))
            if pd.isna(date) or date in existing_dates:
                continue
            df = safe_read_cp950(os.path.join(folder_path, file))
            if df.empty:
                continue
            code_col = next((c for c in df.columns if "代號" in c), None)
            name_col = next((c for c in df.columns if "名稱" in c or "債券名稱" in c), None)
            close_col = next((c for c in df.columns if base in c), None)
            if not all([code_col, name_col, close_col]):
                continue
            sub_df = df[[code_col, name_col, close_col]].dropna()
            sub_df.columns = ["code", "name", "close"]
            sub_df["date"] = date
            dfs.append(sub_df)
    return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()

def merge_and_sort(existing_matrix, new_df):
    if new_df.empty:
        return existing_matrix

    # 更新名稱對照
    latest_name_map = (
        new_df.sort_values("date")
        .drop_duplicates(subset=["code"], keep="last")
        .set_index("code")["name"]
    )
    new_df["name"] = new_df["code"].map(latest_name_map)

    # Pivot 新資料
    new_pivot = new_df.pivot_table(
        index="date", columns=["code", "name"], values="close", aggfunc="first"
    )
    new_pivot.columns.names = ["代號", "標的"]

    # 合併舊資料
    combined = pd.concat([existing_matrix, new_pivot])

    # 記憶體友好：直接對欄位逐一計算首次出現日期
    first_dates = {
        col: combined.index[combined[col].notna()].min()
        for col in combined.columns
    }

    sorted_columns = sorted(
        combined.columns,
        key=lambda c: (first_dates[c], c[0])  # 先比首次出現日期，再比代號
    )

    return combined[sorted_columns]


def update_cb_matrix(base_folder, start_year, end_year, base, base_name):
    csv_path = os.path.join(base_folder, base_name + ".csv")
    existing_matrix = load_existing_matrix(csv_path)
    existing_dates = set(existing_matrix.index) if not existing_matrix.empty else set()
    new_df = collect_new_data(base_folder, start_year, end_year, base, existing_dates)
    updated_matrix = merge_and_sort(existing_matrix, new_df)
    updated_matrix.to_csv(csv_path, encoding="utf-8-sig")
    print(f"✅ {base_name} 更新完成，總筆數：{len(updated_matrix)}")


if __name__ == "__main__":
    base_folder = r"U:/CB數據庫/cb_reports"
    start_year = "96"
    end_year = "114"
    items = {
        "收市": "收盤價",
        "開市": "開盤價",
        "最高": "最高價",
        "最低": "最低價",
        "單位": "單位",
        "漲跌": "漲跌",
        "金額": "成交金額"
    }
    for base, base_name in items.items():
        update_cb_matrix(base_folder, start_year, end_year, base, base_name)


  return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()


✅ 收盤價 更新完成，總筆數：4577


  return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()


✅ 開盤價 更新完成，總筆數：4577


  return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()


✅ 最高價 更新完成，總筆數：4577


  return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()


✅ 最低價 更新完成，總筆數：4577


  df = pd.read_csv(csv_path, encoding="utf-8-sig", header=[0,1], index_col=0, parse_dates=True)


✅ 單位 更新完成，總筆數：4577


  return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()


✅ 漲跌 更新完成，總筆數：4577


  df = pd.read_csv(csv_path, encoding="utf-8-sig", header=[0,1], index_col=0, parse_dates=True)


✅ 成交金額 更新完成，總筆數：4577


#### 數據優化與產出收盤、開盤、最低、最高、漲跌、金額等最終數據

In [None]:
import pandas as pd
import numpy as np
import os

def insert_copy_column_and_save(filepath):
    """讀取 CSV，插入複製欄並儲存為不含欄位名稱與 index 的修正 CSV"""
    
    # 讀取原始 CSV（無欄位名稱）
    try:
        df = pd.read_csv(filepath, header=None)
    except Exception as e:
        print(f"❌ 無法讀取檔案：{filepath}，錯誤：{e}")
        return

    # 取得檔名與資料夾
    file_name = os.path.splitext(os.path.basename(filepath))[0]
    output_folder = os.path.dirname(filepath)

    # 插入複製欄在第 0 欄右側
    df = df.copy()
    df.insert(1, '複製欄', df.iloc[:, 0])

    # 第 0 欄：將「代號」「標的」改為 NaN
    df.iloc[0:2, 0] = np.nan

    # 複製欄：將「date」與所有日期列改為 NaN
    df.iloc[2:, 1] = np.nan

    # 複製欄：第 3 格填入檔名（如「收盤價」「開盤價」「單位」等）
    df.iloc[2, 1] = file_name

    # 儲存為 CSV（無 index、無欄位名稱）
    output_path = os.path.join(output_folder, f"{file_name}_修正.csv")
    df.to_csv(output_path, index=False, header=False, encoding="utf-8-sig")


def clean_and_save_cb_report(report_type, input_dir, output_dir):
    """
    清理 CB 報表並儲存為好讀取的 CSV 檔案。
    
    參數：
    - report_type: 報表類型（如 "開盤價", "最高價"）
    - input_dir: 原始 CSV 檔案所在資料夾
    - output_dir: 清理後 CSV 檔案儲存資料夾
    """
    filename = f"{report_type}_修正.csv"
    input_path = os.path.join(input_dir, filename)

    # 讀取原始 CSV（不使用欄位名稱）
    df_raw = pd.read_csv(input_path, encoding="utf-8-sig", header=None, low_memory=False)

    # 第 0 列是代號，第 1 列是標的名稱
    codes = df_raw.iloc[0]
    names = df_raw.iloc[1]

    # 合併代號與標的名稱作為欄位名稱
    combined_columns = [
        f"{str(code).strip()} {str(name).strip()}"
        if pd.notna(code) and pd.notna(name) else ""
        for code, name in zip(codes, names)
    ]

    # 移除前 3 列（代號、標的、收盤價文字列） 
    df_cleaned = df_raw.iloc[3:].copy() 
    df_cleaned.columns = combined_columns 
    
    # 將第一欄（日期）設為 index 
    df_cleaned = df_cleaned.set_index(df_cleaned.columns[0]) 
    df_cleaned.index.name = "日期"
    df_cleaned = df_cleaned.drop(columns=["代號 標的"], errors="ignore") 
    
    # 將 index 轉為 datetime，並移除無法解析的日期 
    df_cleaned.index = pd.to_datetime(df_cleaned.index, format="%Y-%m-%d", errors="coerce") 
    df_cleaned = df_cleaned[~df_cleaned.index.isna()]

    # 儲存清理後的 CSV
    output_path = os.path.join(output_dir, f"CB_{report_type}.csv")
    df_cleaned.to_csv(output_path, encoding="utf-8-sig")

    return df_cleaned

insert_copy_column_and_save(r"U:/CB數據庫/cb_reports/收盤價.csv")
insert_copy_column_and_save(r"U:/CB數據庫/cb_reports/開盤價.csv")
insert_copy_column_and_save(r"U:/CB數據庫/cb_reports/最高價.csv")
insert_copy_column_and_save(r"U:/CB數據庫/cb_reports/最低價.csv")
insert_copy_column_and_save(r"U:/CB數據庫/cb_reports/單位.csv")
insert_copy_column_and_save(r"U:/CB數據庫/cb_reports/成交金額.csv")

# 使用clean_and_save_cb_report函數
input_folder = r"U:/CB數據庫/cb_reports"
output_folder = r"U:/CB數據庫"
clean_and_save_cb_report("開盤價", input_folder, output_folder)
clean_and_save_cb_report("最高價", input_folder, output_folder)
clean_and_save_cb_report("最低價", input_folder, output_folder)
clean_and_save_cb_report("收盤價", input_folder, output_folder)
clean_and_save_cb_report("單位", input_folder, output_folder)
clean_and_save_cb_report("成交金額", input_folder, output_folder)

  df = pd.read_csv(filepath, header=None)
  df = pd.read_csv(filepath, header=None)
  df = pd.read_csv(filepath, header=None)
  df = pd.read_csv(filepath, header=None)
  df = pd.read_csv(filepath, header=None)
  df = pd.read_csv(filepath, header=None)


Unnamed: 0_level_0,14772 聚陽二,15221 堤維一,15291 樂士一,15362 和大二,15691 濱川一,16091 大亞1A,16171 榮星一,17851 光洋一,19022 台紙二,20062 東鋼二,...,31672 大量二,75561 意德士一永,45811 光隆精密一KY,68211 聯寶一,75562 意德士二,37075 漢磊五,62075 雷科五,65914 動力四KY,27431 山富一,45581 寶緯一
日期,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2007-01-02,1245000,952000,579900,4199700,672400,910000,494000,16215500,262000,3653100,...,,,,,,,,,,
2007-01-03,,3331200,96500,20745200,112500,,629800,234650,,3988950,...,,,,,,,,,,
2007-01-04,,1422750,,2671750,219500,1720000,1107150,26790850,,721400,...,,,,,,,,,,
2007-01-05,,2622050,96500,367000,,,365200,13043400,,135650,...,,,,,,,,,,
2007-01-08,,121400,281750,3702550,777000,700000,123800,2286050,,989900,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025-08-22,,,,,,,,,,,...,42996750,1314800,146922700,136484650,97994350,232061500,,,,
2025-08-25,,,,,,,,,,,...,29609650,120000,32196350,74082000,15809950,131327650,,,,
2025-08-26,,,,,,,,,,,...,13978300,2101000,27292350,2662100,9371550,168514650,1100000,,,
2025-08-27,,,,,,,,,,,...,13284250,5575650,13822750,11330150,10895600,125087800,27951000,125165650,,
