In [1]:
import pandas as pd
from arch.unitroot import ADF, KPSS
from arch.unitroot.cointegration import phillips_ouliaris
from statsmodels.stats.stattools import durbin_watson
from statsmodels.tsa.ardl import UECM
import matplotlib.pyplot as plt
import numpy as np

from statsmodels.tsa.stattools import adfuller
import itertools

import pickle

pd.options.plotting.backend = "plotly"

### Price Data Preperation

In [2]:
df_price = pd.read_pickle("df_price_price_only_f_20130101_t_20230919.pkl")

In [3]:
# code_list = df_price.code.unique().tolist()
code_list = (
    df_price.loc[lambda df : df.marcap > 5000_00_000_000]
    .code.unique().tolist()
)

In [4]:
code_name_dict = df_price.set_index('code')['name'].to_dict()

In [5]:
df_close = (
    df_price
    .pivot(
        index = 'date',
        columns='code',
        values='close'
    )
)

In [7]:
df_close_change = df_close.pct_change(1)
df_close_diff = df_close.diff()
df_close_lag1 = df_close.shift(1)

df_close_log_diff = np.log(df_close).diff()
df_close_log_diff_lag1 = df_close_log_diff.shift(1)

  df_close_change = df_close.pct_change(1)


### Calc Cointegration

#### Calculation Scheme

1. Prepare Stationary data
    1) Check Stationarity for raw time series
    2) (if non-stationary) Check Stationarity for 1st diff
    3) Keep the codes that the 1st diff is stationary

2. Check Cointegration
    1) Pick two code from the stationary time series
    2) Run test for cointegration
    3) (if cointegrated) Keep the codes

3. Calculate Coefficients

In [9]:
def apply_unit_root_test(timeseries : pd.Series):
    """
    Apply ADF and KPSS test to the given time seires
    """
    
    adf_test_score = 0 # if this value greater than 0, this time series has unit root
    kpss_test_score = 0 # If this value greater than 0, this time series has unit root

    for type in ['n', 'c', 'ct']:
        result = adfuller(timeseries, regression=type, autolag='BIC')
        p_value = result[1]

        if p_value < 0.05:
            # print("Has no unit root")
            adf_test_score += 0
            
        else :
            # print("Has unit root")
            adf_test_score += 1

    result = KPSS(timeseries, trend="ct", lags=-1)
    p_value_kpss = result.pvalue

    # print(f"KPSS : {p_value_kpss}")

    if p_value_kpss < 0.05:
        # print("Has unit root")
        kpss_test_score += 1
    else :
        # print("weakly stationary")    
        kpss_test_score += 0

    return adf_test_score, kpss_test_score

In [10]:
stationay_raw_time_series = []
stationary_1st_diff_time_series = []

for code in code_list:

    try :
        time_series_raw = df_close[code].dropna()
        adf_test_score, kpss_test_score = apply_unit_root_test(time_series_raw)

        if kpss_test_score == 0 and adf_test_score == 0 :
            stationay_raw_time_series.append(code)
        else :
            time_series_1st_diff = df_close_log_diff[code].dropna()
            adf_test_score, kpss_test_score = apply_unit_root_test(time_series_1st_diff)

            if kpss_test_score == 0 and adf_test_score == 0:
                stationary_1st_diff_time_series.append(code)
    except :
        print(f"something wrong with this code : {code}")

something wrong with this code : 086520


In [9]:
# Check Stationarity Result
print(f"Number of total codes : {code_list.__len__()}")
print(f"Number of stationary at raw : {stationay_raw_time_series.__len__()}")
print(f"Number of stationary at 1st diff : {stationary_1st_diff_time_series.__len__()}")


Number of total codes : 431
Number of stationary at raw : 0
Number of stationary at 1st diff : 409


In [None]:
# Cointegration Test
combinations = list(itertools.combinations(stationary_1st_diff_time_series, 2))

l_of_cointegrated_combi_ct = []
for combi in combinations:

    df_combi = (
        df_close[[combi[0], combi[1]]]
        .dropna()
    )


    po_result = phillips_ouliaris(
            df_combi[combi[0]], df_combi[combi[1]], trend="ct", test_type="Za", kernel="bartlett"
        )
    

    if po_result.pvalue < 0.05 :

        temp_dict = {
            'code0' : combi[0],
            'code1' : combi[1],
            'p_value' : po_result.pvalue
        }

        l_of_cointegrated_combi_ct.append(temp_dict)

        print(f"{code_name_dict[combi[0]]}({combi[0]}), {code_name_dict[combi[1]]}({combi[1]}) : {po_result.pvalue}")

In [8]:
with open("l_of_cointegrated_combi.pkl", "rb") as f:
    l_of_cointegrated_combi = pickle.load(f)

In [52]:

# with open('l_of_cointegrated_combi_ct.pkl', 'wb') as file:
#     # Serialize the list of dictionaries to the file
#     pickle.dump(l_of_cointegrated_combi_ct, file)

In [None]:
# for result in l_of_cointegrated_combi:

#     code0 = result['code0']
#     code1 = result['code1']
#     p_value = result['p_value']

#     print(f"{code_name_dict[code0]}({code0}), {code_name_dict[code1]}({code1}) : {p_value}")

In [None]:
# for result in l_of_cointegrated_combi_ct:

#     code0 = result['code0']
#     code1 = result['code1']
#     p_value = result['p_value']

#     print(f"{code_name_dict[code0]}({code0}), {code_name_dict[code1]}({code1}) : {p_value}")

In [10]:
l_of_cointegrated_combi.__len__()

4273

In [9]:
# Example dictionary
sorted_combi = sorted(l_of_cointegrated_combi, key=lambda x: x['p_value'])

In [None]:
# df_close_change = df_close.pct_change(1)
# df_close_diff = df_close.diff()
# df_close_lag1 = df_close.shift(1)

# df_close_log_diff = np.log(df_close).diff()
# df_close_log_diff_lag1 = df_close_log_diff.shift(1)

In [39]:
selected_combi = sorted_combi[30]
combi_array = [selected_combi['code0'], selected_combi['code1']]

df_combi_arry = df_close[combi_array]

In [40]:
df_combi = df_combi_arry.dropna()

In [41]:
ecm_model = UECM(
    endog=df_combi[
        [combi_array[0]]
        ],
    lags=1,
    exog=df_combi[
        [combi_array[1]]
        ],
    order=1,
    trend="c",
).fit()

print(ecm_model.summary())
print(combi_array[0], combi_array[1])

                              UECM Model Results                              
Dep. Variable:               D.032830   No. Observations:                  772
Model:                     UECM(1, 1)   Log Likelihood               -6579.321
Method:               Conditional MLE   S.D. of innovations          69357.901
Date:                Mon, 22 Apr 2024   AIC                          13168.642
Time:                        15:53:44   BIC                          13191.881
Sample:                             1   HQIC                         13177.585
                                  772                                         
                  coef    std err          z      P>|z|      [0.025      0.975]
-------------------------------------------------------------------------------
const        2077.6109    479.019      4.337      0.000    1137.267    3017.955
032830.L1      -0.0540      0.011     -5.026      0.000      -0.075      -0.033
357780.L1       0.0066      0.002      3.604    


A date index has been provided, but it has no associated frequency information and so will be ignored when e.g. forecasting.



In [42]:
const = ecm_model.params.iloc[0]
y_L1_coeff = ecm_model.params.iloc[1]
x_L1_coeff = ecm_model.params.iloc[2]
delta_X_L0 = ecm_model.params.iloc[3]

In [43]:
delta_y = const + y_L1_coeff * df_close_lag1[combi_array[0]] + x_L1_coeff * df_close_lag1[combi_array[1]] + delta_X_L0 * df_close_diff[combi_array[1]]
delta_y.dropna(inplace=True)

Δ005930 
t
​
 =(const)+(005930.L1×Lagged 005930 
t−1
​
 )+(000660.L1×Lagged 000660 
t−1
​
 )+(D.000660.L0×Δ000660 
t
​
 )

In [44]:
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

x = df_combi.index
# y1 = df_close_diff[combi_array[0]]
# y2 = delta_y #df_combi[combi_array[1]]

y1 = df_combi[combi_array[0]] / df_combi[combi_array[0]].iloc[0]
y2 = df_combi[combi_array[1]] / df_combi[combi_array[1]].iloc[0]

name1 = code_name_dict[combi_array[0]]
name2 = code_name_dict[combi_array[1]]

# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])

# Add traces
fig.add_trace(
    go.Scatter(x=x, y=y1, name=name1, line=dict(color='blue')),
    secondary_y=False,  # False to use the left y-axis
)

fig.add_trace(
    go.Scatter(x=x, y=y2, name=name2, line=dict(color='red')),
    secondary_y=True,  # True to use the right y-axis
)

# fig.add_trace(
#     go.Scatter(x=x, y=y3, name='calculated', line=dict(color='green')),
#     secondary_y=True,  # True to use the right y-axis
# )

# Add figure title
fig.update_layout(
    title_text="Dual Y-Axis Example",
    width=1200,  # Set the width of the figure (in pixels)
    height=600  # Optional: you can also set the height
)

# Set x-axis title
fig.update_xaxes(title_text="Date")

# Set y-axes titles
fig.update_yaxes(title_text="<b>Primary</b> Y-axis", secondary_y=False)
fig.update_yaxes(title_text="<b>Secondary</b> Y-axis", secondary_y=True)

# Show the plot
fig.show()


Δ005930 
t
​
 =0.001824−0.007162×Lagged 005930 
t−1
​
 +0.003365×Lagged 000660 
t−1
​
 +0.235107
 ×Δ000660 
t
​
