In [None]:
import pandas as pd
from sqlalchemy import create_engine, inspect
import pymysql
import logging
from dotenv import load_dotenv
import os

# 載入 .env
load_dotenv()

# 設定 logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='csv_to_sql.log',
    filemode='a'
)

# 從 .env 讀取資料庫資訊
DB_USERNAME = os.getenv('DB_USERNAME')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')

def csv_to_sql(name: str, table_name: str):
    try:
        logging.info(f"開始處理 CSV: {name} -> {table_name}")

        # 讀取 CSV
        df = pd.read_csv(name)
        logging.info(f"CSV 讀取完成, shape: {df.shape}")

        # 清理列名
        df.columns = df.columns.str.strip().str.replace(' ', '_').str.replace('(', '').str.replace(')', '')

        # 建立資料庫連線
        engine = create_engine(f'mysql+pymysql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}', pool_size=10)

        # 檢查資料表是否存在
        inspector = inspect(engine)
        if table_name in inspector.get_table_names():
            if_exists_option = 'append'
            logging.info(f"資料表 {table_name} 已存在，將以 append 模式寫入")
        else:
            if_exists_option = 'replace'
            logging.info(f"資料表 {table_name} 不存在，將建立新表")

        # 寫入資料庫
        df.to_sql(table_name, engine, if_exists=if_exists_option, index=False)
        logging.info(f"CSV 已成功上傳到 MySQL 的 {table_name} 表中")

    except Exception as e:
        logging.error(f"處理 CSV {name} 時發生錯誤: {e}")
        print(f"Error: {e}")

# 使用範例
csv_to_sql('檔案.csv', '表名')


