In [None]:
import pandas as pd
import numpy as np
import io
import os
import datetime

import yfinance as yf

In [None]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
pd.set_option('display.float_format', '{:.5f}'.format)

In [None]:
"""
獲取大盤指數和交易日
start_date:起始日
end_date:結束日
"""
def get_trade_date(start_date, end_date):
    tw_df = yf.download('^TWII', start=start_date, end=end_date)
    tw_df['Date'] = tw_df.index
    trade_dates = tw_df.index.to_numpy()
    return tw_df, trade_dates

In [None]:
"""
獲取個股歷史價格
"""
def get_history_data():
    folder_path = 'data\history_data'
    all_data = pd.DataFrame()

    for filename in os.listdir(folder_path):
        if filename.endswith('.csv'): 
            file_path = os.path.join(folder_path, filename)
            data = pd.read_csv(file_path)
            
            data['Date'] = pd.to_datetime(data['Date'])
            data['Date'] = data['Date'].dt.tz_localize(None)
            
            stock_code = filename.split('.')[0]
            data['code'] = stock_code

            all_data = pd.concat([all_data, data])

    all_data.reset_index(drop=True, inplace=True)
    return all_data


In [None]:
"""
獲取企業在外流通股數
start_date:起始日
end_date:結束日
"""
def get_shares(start_date, end_date):
    _, trade_dates = get_trade_date(start_date, end_date)
    folder_path = 'C:\\Users\\KaiJung\\Desktop\\RFF_LSTM\\shares_data'
    all_shares_data = pd.DataFrame()

    trade_dates = pd.DatetimeIndex(trade_dates)
    trade_dates = trade_dates.tz_localize(None)

    for filename in os.listdir(folder_path):
        if filename.endswith('.csv'):
            file_path = os.path.join(folder_path, filename)
            single_stock_data = pd.read_csv(file_path)

            single_stock_data['Date'] = pd.to_datetime(single_stock_data['Date'])
            single_stock_data['Date'] = single_stock_data['Date'].dt.tz_localize(None)
            single_stock_data.set_index('Date', inplace=True)

            single_stock_data = single_stock_data[~single_stock_data.index.duplicated(keep='last')]
            stock_code = filename.split('.')[0]

            earliest_data_date = single_stock_data.index.min()
            extended_start_date = earliest_data_date if earliest_data_date < pd.to_datetime(start_date) else pd.to_datetime(start_date) - pd.DateOffset(years=1)

            extended_data_range = pd.date_range(start=extended_start_date, end=end_date)
            extended_data = single_stock_data.reindex(extended_data_range, method='ffill')
            extended_data['code'] = stock_code

            filtered_data = extended_data.loc[extended_data.index.intersection(trade_dates)]
            all_shares_data = pd.concat([all_shares_data, filtered_data])

    all_shares_data.reset_index(inplace=True)
    all_shares_data = all_shares_data.groupby('code').apply(lambda group: group.ffill().bfill()).reset_index(drop=True)
    all_shares_data.rename(columns={'index': 'Date', '0': 'Shares'}, inplace=True)

    all_shares_data = all_shares_data.dropna()

    return all_shares_data


In [None]:
'''
計算流通市值
收盤價 * 在外流通股數
params price_df 股價歷史資料
params shares_df 在外流通股數
'''
def cal_valuation(shares_df, price_df):
    valuation_df = pd.merge(shares_df, price_df, on=['Date', 'code'], how='left')
    valuation_df.fillna(method='ffill', inplace=True)
    valuation_df['market_cap'] = valuation_df['Shares'] * valuation_df['Close']

    return valuation_df

# Market

In [None]:
"""
計算滾動Beta值。
:param stock: 目標股票數據。
:param market: 市場大盤數據。
:param window_size: 滾動窗口大小。
:return: 滾動Beta值
"""
def cal_rolling_beta(stock_data, market_data, window_size):

    stock_data = stock_data.reset_index(drop=True)
    market_data = market_data.reset_index(drop=True)

    aligned_data = pd.merge(stock_data, market_data, on='Date', suffixes=('_stock', '_market'))


    aligned_data.fillna(method='ffill', inplace=True)

    aligned_data['stock_returns'] = aligned_data['Close_stock'].pct_change().fillna(0)
    aligned_data['market_returns'] = aligned_data['Close_market'].pct_change().fillna(0)

    rolling_beta_df = pd.DataFrame(index=aligned_data['Date'][window_size - 1:], columns=['Date', 'code', 'RollingBeta'])

    for end in range(window_size - 1, len(aligned_data['Date'])):
        start = end - window_size + 1
        windowed_data = aligned_data.iloc[start:end + 1]
        cov_matrix = np.cov(windowed_data['stock_returns'], windowed_data['market_returns'])
        beta = cov_matrix[0, 1] / cov_matrix[1, 1]
        
        rolling_beta_df.iloc[end - window_size + 1] = [aligned_data['Date'].iloc[end], aligned_data['code'].iloc[end], beta]

    return rolling_beta_df

In [None]:
def cal_downside_beta(stock_data, market_data, window_size):

    stock_data = stock_data.reset_index(drop=True)
    market_data = market_data.reset_index(drop=True)

    aligned_data = pd.merge(stock_data, market_data, on='Date', suffixes=('_stock', '_market'))

    aligned_data.fillna(method='ffill', inplace=True)

    aligned_data['stock_returns'] = aligned_data['Close_stock'].pct_change().fillna(0)
    aligned_data['market_returns'] = aligned_data['Close_market'].pct_change().fillna(0)

    rolling_downside_beta_df = pd.DataFrame(index=aligned_data['Date'][window_size - 1:], columns=['Date', 'code', 'DownsideBeta'])

    market_avg_return = aligned_data['market_returns'].mean()

    for end in range(window_size - 1, len(aligned_data['Date'])):
        start = end - window_size + 1
        windowed_data = aligned_data.iloc[start:end + 1]

        downside_data = windowed_data[windowed_data['market_returns'] <= market_avg_return]
        
        if len(downside_data) > 1: 
            cov_matrix = np.cov(downside_data['stock_returns'], downside_data['market_returns'])
            beta = cov_matrix[0, 1] / cov_matrix[1, 1] if cov_matrix[1, 1] != 0 else np.nan
        else:
            beta = np.nan
        
        rolling_downside_beta_df.iloc[end - window_size + 1] = [aligned_data['Date'].iloc[end], aligned_data['code'].iloc[end], beta]

    return rolling_downside_beta_df