# Trading strategy

1. Distinguish industry category.
2. Cointegration test, the p-value of the following three tests must below 0.05
    - Augmented Dickey Fuller
    - Phillips-Perron
    - Kwiatkowski-Phillips-Schmidt-Shin
3. Calculate the hedge ratio through linear regression
    - It should be rolling window.
    - Using simple split right now.
4. Backtest for five years for 2021.

# Import packages

In [1]:
import finlab
# Read the API key from the text file
with open('credential.txt', 'r') as file:
    api_key = file.readline().strip()

# Use the API key to log in
finlab.login(api_key)

輸入成功!


In [2]:
from finlab import data
from finlab.backtest import sim

In [3]:
import pandas as pd
import numpy as np

import warnings
import itertools

import statsmodels.api as sm
from statsmodels.regression.linear_model import OLS

# Function

### 分產業類別

In [4]:
def get_same_industry_pairs(price_data):
    pairs = []
    unique_industries = price_data['產業類別'].unique()
    
    for industry in unique_industries:
        industry_data = price_data[price_data['產業類別'] == industry]
        stocks = industry_data['證券代碼'].unique()
        
        # Create all possible pairs within the industry
        for i in range(len(stocks)):
            for j in range(i + 1, len(stocks)):
                pairs.append((stocks[i], stocks[j]))
    
    return pairs

### Slice the stocks price for the specific period.

In [5]:
def get_price_for_period(dataframe, start_year, end_year):
    df_copy = dataframe.copy()

    # 創建時間段
    start_date = f"{start_year}-01-01"
    end_date = f"{end_year}-12-31"

    # 確保日期列是日期的時間格式
    df_copy['Date'] = pd.to_datetime(df_copy['Date'])

    # 篩選出指定時間段內的數據
    filtered_data = df_copy[(df_copy['Date'] >= start_date) & (df_copy['Date'] <= end_date)]
    filtered_data = filtered_data['收盤價(元)']
    filtered_data = filtered_data.reset_index(drop=True)

    return filtered_data

### Cointegration test

In [6]:
def adfuller_test(stock1, stock2):
    # Align two srtocks
    stock1, stock2 = stock1.align(stock2, join='inner')

    # linear regression
    model = sm.OLS(stock1, sm.add_constant(stock2)).fit()
    residuals = model.resid

    # ad-fuller test
    adf_test = sm.tsa.adfuller(residuals)
    p_value = adf_test[1]

    return p_value

In [7]:
def pp_test(stock1, stock2):
    # Align two srtocks
    stock1, stock2 = stock1.align(stock2, join='inner')

    # linear regression
    model = sm.OLS(stock1, sm.add_constant(stock2)).fit()
    residuals = model.resid

    # Phillips-Perron test
    pp_test = sm.tsa.adfuller(residuals, regression='ct')
    p_value = pp_test[1]

    return p_value

In [8]:
def kpss_test(stock1, stock2):
    # Align two srtocks
    stock1, stock2 = stock1.align(stock2, join='inner')

    # linear regression
    model = sm.OLS(stock1, sm.add_constant(stock2)).fit()
    residuals = model.resid

    # KPSS test
    kpss_test = sm.tsa.kpss(residuals)
    p_value = kpss_test[1]

    return p_value

### Calculate hedge ratio.

In [9]:
def calculate_hedge_ratio(stock1_prices, stock2_prices):
    # Align the data to ensure both series have the same length
    stock1_prices, stock2_prices = stock1_prices.align(stock2_prices, join='inner')
    
    # Remove NaN and infinite values
    valid_data = pd.DataFrame({'stock1': stock1_prices, 'stock2': stock2_prices})
    valid_data = valid_data.replace([np.inf, -np.inf], np.nan).dropna()

    # Extract cleaned prices
    clean_stock1_prices = valid_data['stock1']
    clean_stock2_prices = valid_data['stock2']

    # Perform linear regression: stock1_prices ~ stock2_prices
    model = OLS(clean_stock1_prices, sm.add_constant(clean_stock2_prices)).fit()

    # The hedge ratio is the slope of the regression line
    hedge_ratio = model.params[1]

    return hedge_ratio

### Calculate spread and std.

In [10]:
def compute_spread_and_std(stock1_prices, stock2_prices, hedge_ratio, window= 60):
    stock1_prices, stock2_prices = stock1_prices.align(stock2_prices, join='inner')
    # Remove NaN and infinite values
    valid_data = pd.DataFrame({'stock1': stock1_prices, 'stock2': stock2_prices})
    valid_data = valid_data.replace([np.inf, -np.inf], np.nan).dropna()

    valid_data['spread'] = valid_data['stock1'] - hedge_ratio * valid_data['stock2']
    
    # Calculate the mean and the standard deviation over the years
    valid_data['mean'] = valid_data['spread'].rolling(window).mean()
    valid_data['rolling_std'] = valid_data['spread'].rolling(window).std()
    
    # Return a DataFrame with spread and rolling standard deviation
    return valid_data

### Generate Entry and Exit Signals.

In [11]:
def generate_signals(valid_data, threshold=1.5):
    # Set a column called 'signal'
    valid_data['signal'] = np.where(valid_data['spread'] > valid_data['mean'] + threshold * valid_data['rolling_std'], True, 
                                    np.where(valid_data['spread'] < valid_data['mean'] - threshold * valid_data['rolling_std'], True, False))
    valid_data['signal'] = valid_data['signal'].shift(1)
    valid_data['signal'] = valid_data['signal'].fillna(False)
    
    return valid_data

### Position

In [12]:
def generate_position(valid_data, stock1, stock2):
    # If stock1 is overvalued, we short stock1 and long stock2
    valid_data['position_stock1'] = np.where(valid_data['signal'] == True,
                                             np.where(valid_data['stock1'] > valid_data['stock2'], 1, -1), # 做空高估的股票
                                             False)
    valid_data['position_stock2'] = np.where(valid_data['signal'] == True,
                                             np.where(valid_data['stock1'] > valid_data['stock2'], -1, 1), # 做多低估的股票
                                             False)
    
    position = valid_data[['position_stock1', 'position_stock2']]
    position = position.rename(columns={'position_stock1': stock1, 'position_stock2': stock2})
    
    return position

# 主程式碼

## 定義年份

In [13]:
CONST_START_YEAR = 2008 # Set the global variable for the year.

## 抓資料

In [14]:
close=data.get("price:收盤價")

# 取得 OHLC 直式格式

price = close.T.stack().reset_index()
price.columns = ["證券代碼", "Date", "收盤價(元)"]

Your version is 1.2.14, please install a newer version.
Use "pip install finlab==1.2.15" to update the latest version.


In [15]:
price

Unnamed: 0,證券代碼,Date,收盤價(元)
0,0015,2007-04-23,9.54
1,0015,2007-04-24,9.54
2,0015,2007-04-25,9.52
3,0015,2007-04-26,9.59
4,0015,2007-04-27,9.55
...,...,...,...
7098352,9962,2024-09-24,15.60
7098353,9962,2024-09-25,15.50
7098354,9962,2024-09-26,15.50
7098355,9962,2024-09-27,16.35


In [16]:
industry = data.get('company_basic_info')[['stock_id', '產業類別']]
industry.rename(columns= {'stock_id': '證券代碼', '產業類別': 'category'}, inplace= True)

Daily usage: 23.9 / 5000 MB - company_basic_info


In [17]:
# industry = data.get("security_industry_themes")
# industry.rename(columns = {"stock_id": "證券代碼"}, inplace = True)
# # industry

In [18]:
# # 假設 industry 是你的 DataFrame
# result = industry.groupby('category').agg(
#     stock_id_list=('stock_id', lambda x: list(x)),
#     name_list=('name', lambda x: list(x)),
#     count=('stock_id', 'count')
# ).reset_index()

# # 顯示結果
# result

In [19]:
# 篩選出 4 位數字
price = price[price["證券代碼"].apply(lambda x: len(str(x)) == 4 and not str(x).startswith("00"))]
price = price.merge(industry[['證券代碼', 'category']], on="證券代碼", how="left")
df = pd.DataFrame(price)
df

Unnamed: 0,證券代碼,Date,收盤價(元),category
0,1101,2007-04-23,29.60,水泥工業
1,1101,2007-04-24,30.25,水泥工業
2,1101,2007-04-25,29.65,水泥工業
3,1101,2007-04-26,29.65,水泥工業
4,1101,2007-04-27,30.35,水泥工業
...,...,...,...,...
6549005,9962,2024-09-24,15.60,鋼鐵工業
6549006,9962,2024-09-25,15.50,鋼鐵工業
6549007,9962,2024-09-26,15.50,鋼鐵工業
6549008,9962,2024-09-27,16.35,鋼鐵工業


### Check the stock list

In [20]:
stock_code_list = []

stock_codes_set = set()

for code in df['證券代碼']:
    stock_codes_set.add(code)

stock_code_list = list(sorted(stock_codes_set))

print("不同的股票代碼总数:", len(stock_code_list))

不同的股票代碼总数: 2087


## Cointegration test

In [21]:
df.dropna(subset=['category'], inplace=True)
unique_industries = list(df['category'].unique())

In [22]:
unique_industries

['水泥工業',
 '食品工業',
 '農業科技',
 '觀光餐旅',
 '塑膠工業',
 '建材營造',
 '汽車工業',
 '電子零組件業',
 '其他',
 '紡織纖維',
 '運動休閒',
 '電機機械',
 '生技醫療業',
 '電腦及週邊設備業',
 '電器電纜',
 '化學工業',
 '其他電子業',
 '玻璃陶瓷',
 '造紙工業',
 '鋼鐵工業',
 '居家生活',
 '橡膠工業',
 '航運業',
 '半導體業',
 '通信網路業',
 '光電業',
 '電子通路業',
 '資訊服務業',
 '貿易百貨',
 '油電燃氣業',
 '數位雲端',
 '金融保險業',
 '文化創意業',
 '綠能環保',
 '金融業',
 '存託憑證']

In [23]:
warnings.filterwarnings("ignore")

for YEAR in range(CONST_START_YEAR + 8, CONST_START_YEAR + 13):  # Example: 2008 ~ 2020
    print(f"===========================正在處理 {YEAR + 3} 年的 stock pairs===========================")
    # Initialize the DataFrame to store chosen pairs and their p-values
    final_chosen_pairs = pd.DataFrame(columns=[
        'stock1', 'stock2', 
        'adfuller_p_value_1', 'adfuller_p_value_2', 
        'pp_p_value_1', 'pp_p_value_2', 
        'kpss_p_value_1', 'kpss_p_value_2'
    ])

    for industry in unique_industries:
        print(f"正在處理 {industry} 這個產業")
        # Slice the dataframe to get the specific industry
        df_price_industry = df[df['category'] == industry]
        # Get the unique stocks in the chosen industry
        stocks_in_industry = df_price_industry['證券代碼'].unique()
        # Get all possible pairs of stocks in the chosen industry
        stock_pairs = list(itertools.combinations(stocks_in_industry, 2))
        
        for pair in stock_pairs:
            stock1, stock2 = pair
            df_stock1 = df_price_industry[df_price_industry['證券代碼'] == stock1]
            df_stock2 = df_price_industry[df_price_industry['證券代碼'] == stock2]

            # Get the stock prices for the specified period
            stock1_prices = get_price_for_period(df_stock1, YEAR, YEAR + 2)
            stock2_prices = get_price_for_period(df_stock2, YEAR, YEAR + 2)

            # Skip if the price data is empty or insufficient
            if stock1_prices.empty or stock2_prices.empty:
                continue
            elif len(stock1_prices) < 600 or len(stock2_prices) < 600:
                continue

            # Perform cointegration tests
            adfuller_p_value_1 = adfuller_test(stock1_prices, stock2_prices)
            adfuller_p_value_2 = adfuller_test(stock2_prices, stock1_prices)

            pp_p_value_1 = pp_test(stock1_prices, stock2_prices)
            pp_p_value_2 = pp_test(stock2_prices, stock1_prices)

            kpss_p_value_1 = kpss_test(stock1_prices, stock2_prices)
            kpss_p_value_2 = kpss_test(stock2_prices, stock1_prices)

            # Condition to check if the p-values meet the criteria
            condition = (adfuller_p_value_1 < 0.05 or adfuller_p_value_2 < 0.05) \
                        and (pp_p_value_1 < 0.05 or pp_p_value_2 < 0.05) \
                        and (kpss_p_value_1 < 0.05 or kpss_p_value_2 < 0.05)

            # If the condition is met, store this stock pair and p-values
            if condition:
                temp = pd.DataFrame({
                    'stock1': [stock1],
                    'stock2': [stock2],
                    'adfuller_p_value_1': [adfuller_p_value_1],
                    'adfuller_p_value_2': [adfuller_p_value_2],
                    'pp_p_value_1': [pp_p_value_1],
                    'pp_p_value_2': [pp_p_value_2],
                    'kpss_p_value_1': [kpss_p_value_1],
                    'kpss_p_value_2': [kpss_p_value_2]
                })
                final_chosen_pairs = pd.concat([final_chosen_pairs, temp], ignore_index=True)

    # Save the results to an Excel file
    final_chosen_pairs.to_excel(f'final_chosen_pairs_{YEAR + 3}.xlsx', index=False)

正在處理 水泥工業 這個產業
正在處理 食品工業 這個產業
正在處理 農業科技 這個產業
正在處理 觀光餐旅 這個產業
正在處理 塑膠工業 這個產業
正在處理 建材營造 這個產業
正在處理 汽車工業 這個產業
正在處理 電子零組件業 這個產業
正在處理 其他 這個產業
正在處理 紡織纖維 這個產業
正在處理 運動休閒 這個產業
正在處理 電機機械 這個產業
正在處理 生技醫療業 這個產業
正在處理 電腦及週邊設備業 這個產業
正在處理 電器電纜 這個產業
正在處理 化學工業 這個產業
正在處理 其他電子業 這個產業
正在處理 玻璃陶瓷 這個產業
正在處理 造紙工業 這個產業
正在處理 鋼鐵工業 這個產業
正在處理 居家生活 這個產業
正在處理 橡膠工業 這個產業
正在處理 航運業 這個產業
正在處理 半導體業 這個產業
正在處理 通信網路業 這個產業
正在處理 光電業 這個產業
正在處理 電子通路業 這個產業
正在處理 資訊服務業 這個產業
正在處理 貿易百貨 這個產業
正在處理 油電燃氣業 這個產業
正在處理 數位雲端 這個產業
正在處理 金融保險業 這個產業
正在處理 文化創意業 這個產業
正在處理 綠能環保 這個產業
正在處理 金融業 這個產業
正在處理 存託憑證 這個產業
正在處理 水泥工業 這個產業
正在處理 食品工業 這個產業
正在處理 農業科技 這個產業
正在處理 觀光餐旅 這個產業
正在處理 塑膠工業 這個產業
正在處理 建材營造 這個產業
正在處理 汽車工業 這個產業
正在處理 電子零組件業 這個產業
正在處理 其他 這個產業
正在處理 紡織纖維 這個產業
正在處理 運動休閒 這個產業
正在處理 電機機械 這個產業
正在處理 生技醫療業 這個產業
正在處理 電腦及週邊設備業 這個產業
正在處理 電器電纜 這個產業
正在處理 化學工業 這個產業
正在處理 其他電子業 這個產業
正在處理 玻璃陶瓷 這個產業
正在處理 造紙工業 這個產業
正在處理 鋼鐵工業 這個產業
正在處理 居家生活 這個產業
正在處理 橡膠工業 這個產業
正在處理 航運業 這個產業
正在處理 半導體業 這個產業
正在處理 通信網路業 這個產業
正在處理 光電業 這個產業
正在處理 電子通路業 這個產業
正在處理 資訊服務業 這個產業
正在處理 貿易百貨 這個產業
正在處理 油電燃氣

In [24]:
# warnings.filterwarnings("ignore")

# for YEAR in range(CONST_START_YEAR + 8, CONST_START_YEAR + 13): # 2008 ~ 2020
#     print(f"===========================正在處理 {YEAR + 3} 年的 stock pairs===========================")
#     final_chosen_pairs = pd.DataFrame()
#     final_chosen_pairs = pd.DataFrame(columns= ['stock1', 'stock2'])

#     for industry in unique_industries:
#         print(f"正在處理 {industry} 這個產業")
#         # Slice the dataframe to get the specific industry
#         df_price_industry = df[df['category'] == industry]
#         # Get the unique stocks in the chosen industry
#         stocks_in_industry = df_price_industry['證券代碼'].unique()
#         # Get all possible pairs of stocks in the chosen industry
#         stock_pairs = list(itertools.combinations(stocks_in_industry, 2))
        
#         p_values_dict = {}

#         for pair in stock_pairs:
#             stock1, stock2 = pair
#             df_stock1 = df_price_industry[df_price_industry['證券代碼'] == stock1]
#             df_stock2 = df_price_industry[df_price_industry['證券代碼'] == stock2]

#             # 取得股票價格數據
#             # It should be a rolling window
#             stock1_prices = get_price_for_period(df_stock1, YEAR, YEAR + 2)
#             stock2_prices = get_price_for_period(df_stock2, YEAR, YEAR + 2)

#             if stock1_prices.empty or stock2_prices.empty: # 如果股票價格數據為空，則跳過
#                 continue
#             elif len(stock1_prices) < 600 or len(stock2_prices) < 600: # 依照年份決定，現在是三年的股票。如果股票價格數據不足 600 個，則跳過。
#                 continue

#             # cointegration test
#             adfuller_p_value_1 = adfuller_test(stock1_prices, stock2_prices)
#             adfuller_p_value_2 = adfuller_test(stock2_prices, stock1_prices)

#             pp_p_value_1 = pp_test(stock1_prices, stock2_prices)
#             pp_p_value_2 = pp_test(stock2_prices, stock1_prices)

#             kpss_p_value_1 = kpss_test(stock1_prices, stock2_prices)
#             kpss_p_value_2 = kpss_test(stock2_prices, stock1_prices)

#             condition = (adfuller_p_value_1 < 0.05 or adfuller_p_value_2 < 0.05) \
#                         and (pp_p_value_1 < 0.05 or pp_p_value_2 < 0.05) \
#                         and (kpss_p_value_1 < 0.05 or kpss_p_value_2 < 0.05)
            
#             # condition = adfuller_p_value_1 < 0.05 and pp_p_value_1 < 0.05 and kpss_p_value_1 < 0.05

#             # 如果兩個 P 值都小於 0.05，則記錄這對股票和它們的 P-value
#             if condition:
#                 p_values_dict[(stock1, stock2)] = ((adfuller_p_value_1, adfuller_p_value_2), (pp_p_value_1, pp_p_value_2), (kpss_p_value_1, kpss_p_value_2))

#         # Find the minimum p-value pair
#         if len(p_values_dict) == 0:
#             print(f"在 {industry} 這個產業中，沒有股票對是具有共整合性的")
#         else:
#             min_p_value_pair = min(p_values_dict, key= lambda x: max(p_values_dict[x][0] + p_values_dict[x][1] + p_values_dict[x][2]))
#             min_p_value = p_values_dict[min_p_value_pair]
#             print(f"在 {industry} 這個產業中，具有最小最大 P 值的股票對是 {min_p_value_pair}，對應的 adfuller p-value 是 {min_p_value[0]} ; pp p-value 是 {min_p_value[1]} ; kpss p-value 是 {min_p_value[2]}")

#             temp = pd.DataFrame()
#             temp = pd.DataFrame([min_p_value_pair], columns = ['stock1', 'stock2'])
#             final_chosen_pairs = pd.concat([final_chosen_pairs, temp], ignore_index = True)
            
#     final_chosen_pairs.to_excel(f'final_chosen_pairs_{YEAR + 3}.xlsx', index = False)