In [None]:
import os
import sys
from datetime import date
from datetime import datetime
from pathlib import Path

import pandas as pd
import duckdb

In [None]:
from finlab import data
import finlab

In [None]:
# 引用自建公用模組
sys.path.insert(0, str(Path.cwd().parent))
from proj_util_pkg.settings import ProjEnvSettings

from proj_util_pkg.finlab_api import finlab_manager as flm
from proj_util_pkg.google_api import gspread_manager as gsm
from proj_util_pkg.common import tw_stock_topic as tst

In [None]:
# 資訊輸出Google SpreadSheet 表單參數設定
GSPERAD_SHEET_KEY = os.environ.get('gspread_wb_key')  # Google SpreadSheet 表單ID
OUTPUT_GSHEET_NAME = '選股清單05'

In [None]:
# finlab api 服務初始化
finlab = flm.FinlabManager()
data.force_cloud_download = False

In [None]:
# 本地報表輸出路徑
REPORT_PATH = os.environ.get('report_path')

## 共用參數設定

In [None]:
# 均線變異係數糾結程度
MA_VC_THRESHOLD = 1  # 1%

## 外部資料讀取

In [None]:
# 讀取台股收盤價資訊
close = data.get("price:收盤價", save_to_storage=True)
vol = data.get("price:成交股數", save_to_storage=True)
stock_info = data.get('company_basic_info', save_to_storage=True)
pe_ratio = data.get('price_earning_ratio:本益比', save_to_storage=True)
pb_ratio = data.get('price_earning_ratio:股價淨值比', save_to_storage=True)

In [None]:
# 從DuckDB讀取股票名稱資料 (使用唯讀模式避免鎖定衝突)
TWSTOCK_DATA_ROOT = os.environ.get("hist_data_path")
twstock_db_path = f"{TWSTOCK_DATA_ROOT}/twstock.duckdb"
conn_duckdb = duckdb.connect(twstock_db_path, read_only=True)
stock_name_df = conn_duckdb.execute("SELECT stock_id as symbol, stock_name FROM tw_stock_list").fetch_df()
conn_duckdb.close()
stock_name_df

## 數據分析

In [None]:
# 計算移動平均線
ma5 = close.average(5)
ma10 = close.average(10)
ma20 = close.average(20)
ma60 = close.average(60)
vol_sma5 = vol.average(5)

In [None]:
# 計算均線變異係數 (Coefficient of Variation)
# CV = (標準差 / 平均值) * 100

# 2線均線變異係數 (MA5, MA10)
ma_2_stack = pd.concat([ma5, ma10], axis=0, keys=['ma5', 'ma10'])
ma_2_grouped = ma_2_stack.groupby(level=1)
cv_2ma = (ma_2_grouped.std() / ma_2_grouped.mean()) * 100

# 3線均線變異係數 (MA5, MA10, MA20)
ma_3_stack = pd.concat([ma5, ma10, ma20], axis=0, keys=['ma5', 'ma10', 'ma20'])
ma_3_grouped = ma_3_stack.groupby(level=1)
cv_3ma = (ma_3_grouped.std() / ma_3_grouped.mean()) * 100

# 4線均線變異係數 (MA5, MA10, MA20, MA60)
ma_4_stack = pd.concat([ma5, ma10, ma20, ma60], axis=0, keys=['ma5', 'ma10', 'ma20', 'ma60'])
ma_4_grouped = ma_4_stack.groupby(level=1)
cv_4ma = (ma_4_grouped.std() / ma_4_grouped.mean()) * 100

In [None]:
# 定義均線多頭排列條件
# 2線多排: MA5 > MA10 且均線上升，變異係數低於閾值
cond_2ma_align = (ma5 > ma10) & ma5.rise() & ma10.rise()
cond_2ma_cv = cv_2ma < MA_VC_THRESHOLD
cond_2ma = cond_2ma_align & cond_2ma_cv

# 3線多排: MA5 > MA10 > MA20 且均線上升，變異係數低於閾值
cond_3ma_align = (ma5 > ma10) & (ma10 > ma20) & ma5.rise() & ma10.rise() & ma20.rise()
cond_3ma_cv = cv_3ma < MA_VC_THRESHOLD
cond_3ma = cond_3ma_align & cond_3ma_cv

# 4線多排: MA5 > MA10 > MA20 > MA60 且均線上升，變異係數低於閾值
cond_4ma_align = (ma5 > ma10) & (ma10 > ma20) & (ma20 > ma60) & ma5.rise() & ma10.rise() & ma20.rise() & ma60.rise()
cond_4ma_cv = cv_4ma < MA_VC_THRESHOLD
cond_4ma = cond_4ma_align & cond_4ma_cv

# 成交量條件: 5日均量 > 1000張 (1張=1000股)
cond_vol = vol_sma5 > 1000000

In [None]:
# 篩選符合條件的股票並分類
# 取最新交易日資料
latest_4ma = (cond_4ma & cond_vol).tail(1)
latest_3ma = (cond_3ma & ~cond_4ma & cond_vol).tail(1)  # 3線多排但不是4線多排
latest_2ma = (cond_2ma & ~cond_3ma & cond_vol).tail(1)  # 2線多排但不是3線多排

# 提取符合條件的股票代號
symbols_4ma = latest_4ma.columns[latest_4ma.iloc[0]].tolist()
symbols_3ma = latest_3ma.columns[latest_3ma.iloc[0]].tolist()
symbols_2ma = latest_2ma.columns[latest_2ma.iloc[0]].tolist()

print(f"4線多排股票數: {len(symbols_4ma)}")
print(f"3線多排股票數: {len(symbols_3ma)}")
print(f"2線多排股票數: {len(symbols_2ma)}")

In [None]:
# 建立分類DataFrame
df_4ma = pd.DataFrame({'symbol': symbols_4ma, '多排分類': '4線多排'})
df_3ma = pd.DataFrame({'symbol': symbols_3ma, '多排分類': '3線多排'})
df_2ma = pd.DataFrame({'symbol': symbols_2ma, '多排分類': '2線多排'})

# 合併所有分類
df_all_symbols = pd.concat([df_4ma, df_3ma, df_2ma], ignore_index=True)
df_all_symbols

In [None]:
# 準備各項指標的最新數據
all_symbols = df_all_symbols['symbol'].tolist()

# 最新收盤價
last_close = close.tail(1).T.reset_index()
last_close.columns = ['symbol', '最新收盤價']

# 各均線最新值
last_ma5 = ma5.tail(1).T.reset_index()
last_ma5.columns = ['symbol', '5日均線']

last_ma10 = ma10.tail(1).T.reset_index()
last_ma10.columns = ['symbol', '10日均線']

last_ma20 = ma20.tail(1).T.reset_index()
last_ma20.columns = ['symbol', '20日均線']

last_ma60 = ma60.tail(1).T.reset_index()
last_ma60.columns = ['symbol', '60日均線']

# 5日均量 (轉換為張數，先填補NaN再轉整數)
last_vol_sma5 = vol_sma5.tail(1).T.reset_index()
last_vol_sma5.columns = ['symbol', 'vol_sma5']
last_vol_sma5['vol_sma5'] = last_vol_sma5['vol_sma5'].fillna(0)
last_vol_sma5['五日均量'] = (last_vol_sma5['vol_sma5'] / 1000).round().astype(int)
del last_vol_sma5['vol_sma5']

# 變異係數 (根據分類選擇對應的CV)
last_cv_2ma = cv_2ma.tail(1).T.reset_index()
last_cv_2ma.columns = ['symbol', 'cv_2ma']

last_cv_3ma = cv_3ma.tail(1).T.reset_index()
last_cv_3ma.columns = ['symbol', 'cv_3ma']

last_cv_4ma = cv_4ma.tail(1).T.reset_index()
last_cv_4ma.columns = ['symbol', 'cv_4ma']

# 本益比
last_pe = pe_ratio.tail(1).T.reset_index()
last_pe.columns = ['symbol', '本益比']

# 股價淨值比
last_pb = pb_ratio.tail(1).T.reset_index()
last_pb.columns = ['symbol', '股價淨值比']

In [None]:
# 合併所有數據
merged_df = df_all_symbols.merge(stock_name_df, on='symbol', how='left')
merged_df = merged_df.merge(last_close, on='symbol', how='left')
merged_df = merged_df.merge(last_ma5, on='symbol', how='left')
merged_df = merged_df.merge(last_ma10, on='symbol', how='left')
merged_df = merged_df.merge(last_ma20, on='symbol', how='left')
merged_df = merged_df.merge(last_ma60, on='symbol', how='left')
merged_df = merged_df.merge(last_vol_sma5, on='symbol', how='left')
merged_df = merged_df.merge(last_cv_2ma, on='symbol', how='left')
merged_df = merged_df.merge(last_cv_3ma, on='symbol', how='left')
merged_df = merged_df.merge(last_cv_4ma, on='symbol', how='left')
merged_df = merged_df.merge(last_pe, on='symbol', how='left')
merged_df = merged_df.merge(last_pb, on='symbol', how='left')

# 根據多排分類選擇對應的變異係數
def get_cv_by_class(row):
    if row['多排分類'] == '4線多排':
        return row['cv_4ma']
    elif row['多排分類'] == '3線多排':
        return row['cv_3ma']
    else:
        return row['cv_2ma']

merged_df['均線變異係數'] = merged_df.apply(get_cv_by_class, axis=1)
merged_df['均線變異係數'] = merged_df['均線變異係數'].round(2)

# 移除臨時變異係數欄位
merged_df = merged_df.drop(columns=['cv_2ma', 'cv_3ma', 'cv_4ma'])

# 加入網頁連結
merged_df["web_link"] = merged_df["symbol"].apply(lambda x: f"https://www.wantgoo.com/stock/{x}/technical-chart")

# 加入題材概念股
merged_df["題材概念股"] = merged_df["symbol"].apply(lambda x: tst.read_topic_stocks(x))

In [None]:
# 重新排列欄位順序
output_columns = [
    'symbol',
    'stock_name',
    '最新收盤價',
    '5日均線',
    '10日均線',
    '20日均線',
    '60日均線',
    '均線變異係數',
    '多排分類',
    '五日均量',
    '本益比',
    '股價淨值比',
    'web_link',
    '題材概念股'
]

# 重命名欄位
merged_df = merged_df.rename(columns={'symbol': '股票代號', 'stock_name': '股票名稱'})
output_columns = [col.replace('symbol', '股票代號').replace('stock_name', '股票名稱') for col in output_columns]

merged_df = merged_df[output_columns]
merged_df

In [None]:
# 輸出報表留存
today = datetime.now().strftime("%Y%m%d")
merged_df.to_excel(f'{REPORT_PATH}/選股05_{today}.xlsx', index=False)
print(f"報表已儲存至: {REPORT_PATH}/選股05_{today}.xlsx")

## 輸出結果至Google Sheet

In [None]:
# Google SpreadSheet 公用程式初始化
gspread_mgr = gsm.GspreadManager()
gspread_wb = gspread_mgr.get_spreadsheet(GSPERAD_SHEET_KEY)

print(f"更新Google 表單：{gspread_wb.title}，工作表：{OUTPUT_GSHEET_NAME}")

In [None]:
# 刪除再重建工作表
gspread_mgr.recreate_worksheet(GSPERAD_SHEET_KEY, OUTPUT_GSHEET_NAME)

In [None]:
# 更新工作表資料
# 將NaN值轉換為空字串，避免上傳時出錯
output_df = merged_df.fillna('')

gspread_mgr.update_worksheet_values(
    GSPERAD_SHEET_KEY, 
    OUTPUT_GSHEET_NAME, 
    [output_df.columns.values.tolist()] + output_df.values.tolist()
)

print(f"已成功更新 {len(output_df)} 筆資料至 Google SpreadSheet")