In [None]:
pip install pandas sqlalchemy pymysql psycopg2-binary

In [None]:
pip install cryptography

In [None]:
import pandas as pd
from sqlalchemy import create_engine
from decimal import Decimal
import os

mysql_url = os.getenv('MYSQL_URL')
psql_url= os.getenv('PSQL_URL')

# 建立 MySQL 連線
mysql_engine = create_engine(mysql_url)

#建立 PostgreSQL 連線
pg_engine = create_engine(psql_url)


In [None]:
from sqlalchemy import inspect

inspector = inspect(mysql_engine)
tables = inspector.get_table_names()

print("你資料庫裡的資料表有：")
print(tables)

In [None]:
# 2. Extract: 從 MySQL 讀取 market_index
df = pd.read_sql("SELECT * FROM market_index", con=mysql_engine)

# 3. Transform: 清洗與欄位調整
# 刪除不需要的欄位 (symbol, updated_at, 還有原本的 id)
# 我們不帶入舊 ID，讓 PG 自動生成新的 Identity ID
cols_to_drop = ['id', 'symbol', 'updated_at']
df_pg = df.drop(columns=[c for c in cols_to_drop if c in df.columns])

# 處理時間格式
df_pg['created_at'] = pd.to_datetime(df_pg['created_at'])

# 處理數值精確度 (確保 float 轉換成符合 decimal 規範的格式)
df_pg['price'] = df_pg['price'].apply(lambda x: round(float(x), 6))
df_pg['change'] = df_pg['change'].apply(lambda x: round(float(x), 4) if x is not None else None)

# 4. Load: 寫入 PostgreSQL
# 注意：這裡務必使用 if_exists='append'，否則會破壞你建好的 Drizzle Schema
df_pg.to_sql(
    'price_snapshots', 
    con=pg_engine, 
    if_exists='append', 
    index=False, 
    method='multi',
    chunksize=1000
)

print(f"成功搬移 {len(df_pg)} 筆快照資料到 PostgreSQL！")

In [None]:
# 1. Extract: 讀取 MySQL 的 company 表
df = pd.read_sql("SELECT * FROM company", con=mysql_engine)

# 2. Transform: 資料清洗
# A. 移除 ID (讓 PG Identity 自動生成)
df_pg = df.drop(columns=['id'])
df_pg = df_pg.drop_duplicates(subset=['symbol'], keep='first')

# B. 重新命名欄位 (對應 PG 的 snake_case)
df_pg = df_pg.rename(columns={
    'createdAt': 'created_at',
    'updatedAt': 'updated_at'
})

# C. 處理 symbol 長度限制 (MySQL 255 -> PG 10)
df_pg['symbol'] = df_pg['symbol'].str.slice(0, 10)

# D. 處理 name 的 NotNull 限制
df_pg['name'] = df_pg['name'].fillna('Unknown')

# ==================== 強制補入 TSM 與 ARM (寫死在此) ====================
manual_data = pd.DataFrame([
    {
        'symbol': 'TSM', 
        'name': 'Taiwan Semiconductor Manufacturing', 
        'created_at': pd.Timestamp.now(), 
        'updated_at': pd.Timestamp.now()
    },
    {
        'symbol': 'ARM', 
        'name': 'Arm Holdings', 
        'created_at': pd.Timestamp.now(), 
        'updated_at': pd.Timestamp.now()
    }
])

# 將手動資料合併進主 DataFrame
df_pg = pd.concat([df_pg, manual_data], ignore_index=True)

# 再次去重：如果 MySQL 原本就有這兩筆，以手動這兩筆為準 (keep='last')
df_pg = df_pg.drop_duplicates(subset=['symbol'], keep='last')
# ======================================================================

# E. 確保時間格式
df_pg['created_at'] = pd.to_datetime(df_pg['created_at'])
df_pg['updated_at'] = pd.to_datetime(df_pg['updated_at'])

# 3. Load: 寫入 PostgreSQL 的 companies 表
df_pg.to_sql(
    'companies', 
    con=pg_engine, 
    if_exists='append', 
    index=False, 
    method='multi'
)

print(f"成功搬移 {len(df_pg)} 家公司資料！")

In [None]:
# --- 第一步：從 PostgreSQL 建立 Symbol 到 ID 的映射字典 ---
# 我們去 PostgreSQL 抓目前已經存在的公司清單
df_pg_companies = pd.read_sql("SELECT id, symbol FROM companies", con=pg_engine)

# 使用 .str.strip() 確保 Symbol 前後沒有空格，並建立字典
symbol_to_id = dict(zip(df_pg_companies['symbol'].str.strip(), df_pg_companies['id']))

# --- 第二步：從 MySQL 讀取原始資料 ---
df_mysql = pd.read_sql("SELECT * FROM company_statements", con=mysql_engine)

# --- 第三步：資料轉換 (Transform) ---
# 1. 清洗 MySQL 的 symbol 欄位
df_mysql['symbol'] = df_mysql['symbol'].str.strip()

# 2. 【關鍵點】用 MySQL 的 symbol 去映射 PostgreSQL 的 company_id
df_mysql['company_id'] = df_mysql['symbol'].map(symbol_to_id)

# 3. 檢查是否有 Symbol 存在於 MySQL 但不在 PostgreSQL 中
missing = df_mysql[df_mysql['company_id'].isna()]
if not missing.empty:
    failed_symbols = missing['symbol'].unique()
    print(f"⚠️ 警告：有 {len(missing)} 筆資料找不到對應的 Company ID，將被剔除。")
    print(f"缺失的 Symbols (請檢查 companies 表): {failed_symbols}")
    
    # 剔除找不到 ID 的 rows，避免違反 PG 的 NotNull 或 Foreign Key 限制
    df_mysql = df_mysql.dropna(subset=['company_id'])

# 4. 準備寫入 PostgreSQL 的格式
df_pg = pd.DataFrame({
    'company_id': df_mysql['company_id'].astype(int),
    'price': df_mysql['price'],
    'pe_trailing': df_mysql['pe_trailing'],
    'pe_forward': df_mysql['pe_forward'],
    'eps_trailing': df_mysql['eps_trailing'],
    'eps_forward': df_mysql['eps_forward'],
    'volume': df_mysql['volume'],
    'market_cap': df_mysql['market_cap'].astype(str).str.slice(0, 32),
    'created_at': pd.to_datetime(df_mysql['created_at'])
})

# --- 第四步：寫入 PostgreSQL ---
try:
    df_pg.to_sql(
        'company_metrics', 
        con=pg_engine, 
        if_exists='append', 
        index=False, 
        method='multi',
        chunksize=1000
    )
    print(f"✅ 成功搬移 {len(df_pg)} 筆指標資料！")
except Exception as e:
    print(e)

In [None]:
# 2. Extract: 讀取 MySQL 的 News 表
df = pd.read_sql("SELECT * FROM News", con=mysql_engine)

# 3. Transform: 資料清洗
# A. 移除 ID (由 PG Identity 生成)
df_pg = df.drop(columns=['id'])

# B. 處理 Unique 約束：content_hash 去重
# 避免因為來源資料重複導致匯入中斷
df_pg = df_pg.drop_duplicates(subset=['content_hash'], keep='first')

# C. 【重要】型態轉換：tinyint(1) -> boolean
# MySQL 的 1/0 需要轉換成 Python 的 True/False 才能對應 PG 的 boolean
df_pg['is_top'] = df_pg['is_top'].astype(bool)

# D. 處理時間欄位
df_pg['published_at'] = pd.to_datetime(df_pg['published_at'])
df_pg['created_at'] = pd.to_datetime(df_pg['created_at'])
df_pg['updated_at'] = pd.to_datetime(df_pg['updated_at'])

# E. 補全欄位名稱對應 (MySQL 欄位名與 PG 欄位名一致，通常不需 rename)
# 你的 MySQL 是 content_en, content_hash... 
# 你的 PG 也是 content_en, content_hash... 
# 所以直接確保欄位存在即可

# F. 處理空值
df_pg['view_count'] = df_pg['view_count'].fillna(0).astype(int)
df_pg['status'] = df_pg['status'].fillna('draft')

# 4. Load: 寫入 PostgreSQL
try:
    df_pg.to_sql(
        'news', 
        con=pg_engine, 
        if_exists='append', 
        index=False, 
        method='multi',
        chunksize=500 # text 內容可能很大，縮小 chunksize 比較穩
    )
    print(f"✅ 成功搬移 {len(df_pg)} 則新聞資料！")
except Exception as e:
    print(f"❌ 匯入失敗：{e}")

In [None]:
# --- 第一步：準備映射 (Mapping) 橋樑 ---

# A. 從 MySQL 取得 [MySQL_ID -> Symbol]
df_ms_comp = pd.read_sql("SELECT id, symbol FROM company", con=mysql_engine)
# 建議加上 str.strip() 移除可能存在的空格
ms_id_to_symbol = dict(zip(df_ms_comp['id'], df_ms_comp['symbol'].str.strip()))

# B. 從 PostgreSQL 取得 [Symbol -> PG_ID]
df_pg_comp = pd.read_sql("SELECT id, symbol FROM companies", con=pg_engine)
symbol_to_pg_id = dict(zip(df_pg_comp['symbol'].str.strip(), df_pg_comp['id']))

# --- 第二步：讀取 MySQL 的 stock_prices 資料 ---
df_mysql = pd.read_sql("SELECT * FROM stock_prices", con=mysql_engine)

# --- 第三步：資料轉換 (Transform) ---

# 1. 轉換步驟：
#    (1) 用 MySQL 的 ID 換成 Symbol
df_mysql['symbol_bridge'] = df_mysql['company_id'].map(ms_id_to_symbol)

#    (2) 用 Symbol 換成 PostgreSQL 的 ID
df_mysql['new_company_id'] = df_mysql['symbol_bridge'].map(symbol_to_pg_id)

# 2. 檢查是否有找不到對應的資料 (處理那 6 筆可能是 NaN 的問題)
missing = df_mysql[df_mysql['new_company_id'].isna()]
if not missing.empty:
    print(f"⚠️ 警告：有 {len(missing)} 筆股價資料因 Symbol 對齊失敗被剔除。")
    # 印出前 10 筆失敗的 Symbol 幫助你排查
    print(f"對齊失敗的 Symbols: {missing['symbol_bridge'].unique()}")
    
    # 剔除 NaN 的資料，確保不觸發 PG 外鍵錯誤
    df_mysql = df_mysql.dropna(subset=['new_company_id'])

# 3. 封裝成符合 PG Schema 的格式
df_pg = pd.DataFrame({
    'price': df_mysql['price'],
    'day_chg': df_mysql['day_chg'],
    'weight': df_mysql['weight'],
    'company_id': df_mysql['new_company_id'].astype(int),
    'created_at': pd.to_datetime(df_mysql['created_at'])
})

# --- 第四步：寫入 PostgreSQL ---
try:
    df_pg.to_sql(
        'stock_prices', 
        con=pg_engine, 
        if_exists='append', 
        index=False, 
        method='multi',
        chunksize=1000
    )
    print(f"✅ 成功透過 Symbol 橋樑搬移 {len(df_pg)} 筆股價資料！")
except Exception as e:
    print(f"❌ 匯入失敗：{e}")

In [None]:
# --- 第一步：準備映射 (Mapping) 橋樑 ---

# A. 從 MySQL 取得 [MySQL_ID -> Symbol]
df_ms_comp = pd.read_sql("SELECT id, symbol FROM company", con=mysql_engine)
ms_id_to_symbol = dict(zip(df_ms_comp['id'], df_ms_comp['symbol']))

# B. 從 PostgreSQL 取得 [Symbol -> PG_ID]
df_pg_comp = pd.read_sql("SELECT id, symbol FROM companies", con=pg_engine)
symbol_to_pg_id = dict(zip(df_pg_comp['symbol'], df_pg_comp['id']))

# --- 第二步：處理 User UUID ---
user_df = pd.read_sql("SELECT id FROM users LIMIT 1", con=pg_engine)
target_user_uuid = user_df.iloc[0]['id']

# --- 第三步：讀取並轉換交易紀錄 ---

# 1. 讀取 MySQL 原始交易
df_trans = pd.read_sql("SELECT * FROM transactions", con=mysql_engine)

# 2. 轉換步驟：
#    (1) 用 MySQL 的 ID 換成 Symbol
df_trans['symbol_bridge'] = df_trans['company_id'].map(ms_id_to_symbol)

#    (2) 用 Symbol 換成 PostgreSQL 的 ID
df_trans['new_company_id'] = df_trans['symbol_bridge'].map(symbol_to_pg_id)

# 3. 檢查是否有找不到對應的資料
missing = df_trans[df_trans['new_company_id'].isna()]
if not missing.empty:
    print(f"⚠️ 警告：有 {len(missing)} 筆交易因 Symbol 對齊失敗被剔除。")
    print(f"失敗的 MySQL Company IDs: {missing['company_id'].unique()}")
    df_trans = df_trans.dropna(subset=['new_company_id'])

# --- 第四步：格式化為 PostgreSQL Schema ---

df_pg = pd.DataFrame({
    'user_id': [target_user_uuid] * len(df_trans),
    'company_id': df_trans['new_company_id'].astype(int),
    'trade_type': df_trans['transaction_type'],
    'quantity': df_trans['quantity'].astype(float),
    'price': df_trans['price'].astype(float),
    'trade_date': pd.to_datetime(df_trans['transaction_date']).dt.date,
    'created_at': pd.to_datetime(df_trans['createdAt'])
})

# --- 第五步：寫入 PostgreSQL ---
try:
    df_pg.to_sql(
        'stock_trades', 
        con=pg_engine, 
        if_exists='append', 
        index=False, 
        method='multi',
        chunksize=1000
    )
    print(f"✅ 成功透過 Symbol 橋樑搬移 {len(df_pg)} 筆交易紀錄！")
except Exception as e:
    print(f"❌ 匯入失敗：{e}")

In [None]:
user_df = pd.read_sql("SELECT id FROM users LIMIT 1", con=pg_engine)
if user_df.empty:
    raise Exception("PostgreSQL users 表中找不到任何使用者")
target_user_uuid = user_df.iloc[0]['id']

# --- 步驟 B: 建立公司 ID 映射 (MySQL id -> PG id) ---
mysql_comp = pd.read_sql("SELECT id, symbol FROM company", con=mysql_engine)

pg_comp = pd.read_sql("SELECT id, symbol FROM companies", con=pg_engine)

# 建立映射表：結果會有 id_mysql, id_pg 兩欄
mapping_df = pd.merge(
    mysql_comp, 
    pg_comp, 
    on='symbol', 
    suffixes=('_mysql', '_pg')
)[['id_mysql', 'id_pg']]

# --- 步驟 C: 讀取並轉換 Portfolios 資料 ---
df_source = pd.read_sql("SELECT * FROM portfolios", con=mysql_engine)

# 1. 合併映射表，換成 PG 的 company_id
df_final = pd.merge(
    df_source, 
    mapping_df, 
    left_on='company_id', 
    right_on='id_mysql', 
    how='inner'
)

# 2. 欄位整理與【時間轉換】
# 我們建立一個乾淨的 DataFrame 來符合 PG 的 Schema 欄位名
portfolios_to_pg = pd.DataFrame()

portfolios_to_pg['user_id'] = [target_user_uuid] * len(df_final)
portfolios_to_pg['company_id'] = df_final['id_pg']
portfolios_to_pg['quantity'] = df_final['quantity'].astype(float)
portfolios_to_pg['average_price'] = df_final['average_price']

# 將 MySQL 的小駝峰 createdAt 轉為 PG 的底線 created_at，並確保格式為 datetime
portfolios_to_pg['created_at'] = pd.to_datetime(df_final['createdAt'])
portfolios_to_pg['updated_at'] = pd.to_datetime(df_final['updatedAt'])

# --- 步驟 D: 寫入 PostgreSQL ---
# 注意：不傳入 'id' 欄位，讓 PG 的 generatedAlwaysAsIdentity 自動生成
portfolios_to_pg.to_sql(
    'portfolios', 
    con=pg_engine, 
    if_exists='append', 
    index=False, 
    method='multi'
)

print(f"成功遷移 {len(portfolios_to_pg)} 筆資料到 PostgreSQL！")

In [None]:
import pandas as pd
import uuid  # 導入 UUID 庫
from sqlalchemy import text

# 1. 取得第一個使用者
user_df = pd.read_sql("SELECT id FROM users LIMIT 1", con=pg_engine)

if user_df.empty:
    print("找不任何使用者，無法建立管理員。")
else:
    target_user_uuid = user_df.iloc[0]['id']

    # 2. 準備 Admin 資料 (手動加上 id)
    admin_data = {
        'id': str(uuid.uuid4()),  # 在 Python 端生成 UUID
        'user_id': target_user_uuid,
    }

    # 3. 轉換成 DataFrame
    admin_df = pd.DataFrame([admin_data])

    # 4. 寫入 PostgreSQL
    try:
        # 先檢查是否已經是 Admin
        check_exist = pd.read_sql(
            text(f"SELECT 1 FROM admins WHERE user_id = :uid"), 
            con=pg_engine,
            params={"uid": target_user_uuid}
        )
        
        if check_exist.empty:
            admin_df.to_sql(
                'admins', 
                con=pg_engine, 
                if_exists='append', 
                index=False
            )
            print(f"成功將使用者 {target_user_uuid} 設為管理員！")
        else:
            print("該使用者已經是管理員，無需重複操作。")
            
    except Exception as e:
        print(f"寫入 Admin 表失敗：{e}")