In [None]:

import os
from datetime import date
from datetime import datetime

import pandas as pd

In [None]:
from finlab import data

In [None]:
# 引用自建公用模組
from proj_util_pkg.settings import ProjEnvSettings
from proj_util_pkg.finlab_api import finlab_manager as flm
from proj_util_pkg.google_api import gspread_manager as gsm
from proj_util_pkg.common import tw_stock_topic as tst

## 公用參數設定

In [None]:
# finlab api 服務初始化
finlab = flm.FinlabManager()
data.force_cloud_download = False

In [None]:
# 資訊輸出Google SpreadSheet 表單參數設定
GSPERAD_SHEET_KEY = os.environ.get('gspread_wb_key')  # Google SpreadSheet 表單ID
OUTPUT_GSHEET_NAME = '選股清單04'

In [None]:
# 本地報表輸出路徑
REPORT_PATH = os.environ.get('report_path')

## 外部資料讀取

In [None]:
# 讀取台股收盤價資訊
stock_info = data.get('company_basic_info', save_to_storage=True)
close = data.get("price:收盤價", save_to_storage=True)
low = data.get('price:最低價', save_to_storage=True)
high = data.get('price:最高價', save_to_storage=True)

## 數據分析

In [None]:
# 布林通道上軌、中軌、下軌計算
upperband, middleband, lowerband = data.indicator(
    "BBANDS",
    market='TW_STOCK',
    adjust_price=False,
    resample="D", 
    timeperiod=20, 
    nbdevup=float(2.0),
    nbdevdn=float(2.0),
    matype=0
)

In [None]:
# 布林通道帶寬計算
band_width = ((upperband / lowerband) - 1) * 100

In [None]:
band_width

In [None]:
# # 確認資料計算結果
# sid = '2888'
# print(upperband[sid].tail(1))
# print(lowerband[sid].tail(1))
# print(middleband[sid].tail(1))
# print(upperband[sid].tail(1) / lowerband[sid].tail(1))
# print(band_width[sid].tail(1))

In [None]:
# print(close[sid].shift(2).tail(1))
# print(close[sid].shift(1).tail(1))
# print(close[sid].tail(1))

In [None]:
# # 取出close symbol 2330的資料,但要保留symbol的資訊
# test = close.iloc[:, close.columns.get_level_values(0) == '2330'].tail(3)

# test

In [None]:
def _is_turning_point(tick):
    """ 檢查報價是否為轉折點 """

    # print(close[-2], close[-1], close)
    if tick[-3] < tick[-2] and tick[-2] > tick[-1]:
        return -1
    elif tick[-3] > tick[-2] and tick[-2] < tick[-1]:
        return 1
    else:
        return 0

# # CELL INDEX: 10
# turning_point = test.apply(_is_turning_point, axis=0)
# turning_point


In [None]:
upperband.tail(3)

In [None]:
# 篩選出帶寬大於10%的股票
filter_band_width = band_width >= 10
band_width_above_10_percent = filter_band_width.tail(1)
band_width_above_10_percent_symbols = band_width_above_10_percent.columns[band_width_above_10_percent.iloc[0]].tolist()

In [None]:
len(band_width_above_10_percent_symbols)

In [None]:
# 找出最近一個上通道轉折點
upband_turning_point = upperband[band_width_above_10_percent_symbols].apply(_is_turning_point, axis=0)


In [None]:
upband_turning_point

In [None]:
# upband_turning_point 轉換為DataFrame，restset index，並設定欄位名稱，欄位名稱依序為'symbol', 'is_turning_point'
upband_turning_point_df = pd.DataFrame(upband_turning_point).reset_index()
upband_turning_point_df.columns = ['symbol', 'is_turning_point']
filter_stock = upband_turning_point_df.query('is_turning_point == 1')  # 篩選出上通道出現轉折點股票

In [None]:
filter_stock

In [None]:
filtered_symbols = filter_stock.symbol.to_list()

In [None]:
len(filtered_symbols)

In [None]:
df_filtered_symbols = pd.DataFrame(filtered_symbols, columns=['symbol'])
df_filtered_symbols

In [None]:
stock_name = stock_info[['stock_id', '公司簡稱']]
stock_name = stock_name.rename(columns={'stock_id': 'symbol'})

In [None]:
last_band_width = band_width.tail(1).T.reset_index()
# last_band_width.columns = ['symbol', 'band_width']
last_band_width = last_band_width.rename(columns={last_band_width.columns[0]: 'symbol', last_band_width.columns[1]: 'band_width'})
last_band_width.fillna(0, inplace=True)
last_band_width

In [None]:
# Merge df_filtered_symbols with stock_name on stock_id
merged_df = df_filtered_symbols.merge(stock_name, on='symbol')
merged_df = merged_df.merge(last_band_width, on='symbol')
merged_df["web_link"] = merged_df["symbol"].apply(lambda x: f"https://www.wantgoo.com/stock/{x}/technical-chart")
merged_df["題材概念股"] = merged_df["symbol"].apply(lambda x: tst.read_topic_stocks(x))

# Print the merged DataFrame
print(merged_df)

In [None]:
# 輸出報表留存
today = datetime.now().strftime("%Y%m%d")
merged_df.to_excel(f'{REPORT_PATH}/{OUTPUT_GSHEET_NAME}_{today}.xlsx', index=False)

## 輸出結果至Google sheet

In [None]:
# Google SpreadSheet 公用程式初始化
gspread_mgr = gsm.GspreadManager()
gspread_wb = gspread_mgr.get_spreadsheet(GSPERAD_SHEET_KEY)

print(f"更新Google 表單：{gspread_wb.title}，工作表：{OUTPUT_GSHEET_NAME}")

In [None]:
# 刪除再重建工作表
gspread_mgr.recreate_worksheet(GSPERAD_SHEET_KEY, OUTPUT_GSHEET_NAME)

In [None]:
# 更新工作表資料
gspread_mgr.update_worksheet_values(
    GSPERAD_SHEET_KEY, 
    OUTPUT_GSHEET_NAME, 
    [merged_df.columns.values.tolist()] + merged_df.values.tolist()
)