In [None]:
!pip install arch

In [1]:
import pandas as pd
import tqdm 
import os
import glob
from scipy import stats
import numpy as np
from typing import Union

In [2]:
# 讀取 excel
reserch_table_2515 = pd.read_excel("reserch_table_2515.xlsx")
reserch_table_2542 = pd.read_excel("reserch_table_2542.xlsx")

In [3]:
reserch_table = reserch_table_2515.copy()

In [4]:
reserch_table.columns

Index(['code', 'date', 'spot_return', 'futures_return', 'SVI', 'SVI_D1',
       'SVI_D2', 'SVI_D3', 'd1_sr', 'd2_sr', 'd3_sr', 'd1_fr', 'd2_fr',
       'd3_fr'],
      dtype='object')

# 敘述性統計

In [5]:
def describeData(data: Union[pd.Series, pd.DataFrame], nan_policy:str = 'omit'):
    if isinstance(data, pd.DataFrame):
        sk = stats.skew(data, nan_policy=nan_policy)
        kurt = stats.kurtosis(data, nan_policy=nan_policy)
        sk_values = sk if isinstance(sk, np.ndarray) else sk.data
        kurt_values = kurt if isinstance(kurt, np.ndarray) else kurt.data
        describe_result = pd.concat([data.describe(),
            pd.DataFrame([sk_values], index=['skew'], columns=data.columns),
            pd.DataFrame([kurt_values], index=['kurtosis'], columns=data.columns)
        ])
        jarque_bera_test_list = []
        for col in data.columns:
            series = data[col]
            try:
                jarque_bera_test = stats.jarque_bera(series)
                jarque_bera_test_list.append({
                    "jarque_bera_statistic": jarque_bera_test.statistic,
                    "jarque_bera_pvalue": jarque_bera_test.pvalue
                })
            except TypeError:
                jarque_bera_test_list.append({
                    "jarque_bera_statistic": None,
                    "jarque_bera_pvalue": None,
                })
        jarque_bera_test_table = pd.DataFrame(jarque_bera_test_list, index=data.columns).T
        describe_result = pd.concat((describe_result, jarque_bera_test_table))
        return describe_result
    elif isinstance(data, pd.Series):
        
        sk_values = stats.skew(data, nan_policy=nan_policy)
        kurt_values = stats.kurtosis(data, nan_policy=nan_policy)

        describe_result = data.describe()
        describe_result['skew'] = sk_values
        describe_result['kurtosis'] = kurt_values
        try:
            jarque_bera_test = stats.jarque_bera(data)
            describe_result['jarque_bera_statistic'] = jarque_bera_test.statistic
            describe_result['jarque_bera_pvalue'] =  jarque_bera_test.pvalue
        except TypeError:
            describe_result['jarque_bera_statistic'] = None
            describe_result['jarque_bera_pvalue'] = None
        return describe_result      

In [6]:
describeData(reserch_table[
    ['spot_return', 'futures_return', 'SVI', 'd1_sr', 'd2_sr', 'd3_sr', 'd1_fr', 'd2_fr', 'd3_fr']
]).round(4)

Unnamed: 0,spot_return,futures_return,SVI,d1_sr,d2_sr,d3_sr,d1_fr,d2_fr,d3_fr
count,169.0,169.0,169.0,169.0,169.0,169.0,169.0,169.0,169.0
mean,0.1352,0.1594,56.1716,0.0298,0.1909,0.1983,0.0137,0.1999,0.2203
std,1.8453,1.9409,17.6166,0.5358,1.5655,1.1584,0.509,1.6494,1.2564
min,-6.1453,-6.0927,0.0,-2.4822,-4.4288,-2.7435,-1.933,-4.8694,-2.9563
25%,-0.722,-0.7762,45.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,0.0,0.0,56.0,0.0,0.0,0.0,0.0,0.0,0.0
75%,0.7491,0.6859,67.0,0.0,0.0,0.0,0.0,0.0,0.0
max,9.4521,9.9256,100.0,3.877,9.4521,9.4521,2.9412,9.9256,9.9256
skew,1.1124,1.2015,-0.0772,2.4381,2.0244,4.3605,1.2413,2.087,4.3579
kurtosis,4.9645,4.9545,0.9913,21.4512,9.8182,28.4346,11.7825,10.0123,26.4204


# 相關係數

In [7]:
reserch_table[[
    'spot_return', 'futures_return',
    "d1_sr",  "d1_fr",  
    "d2_sr",  "d2_fr", 
    "d3_sr",  "d3_fr"
]].corr()

Unnamed: 0,spot_return,futures_return,d1_sr,d1_fr,d2_sr,d2_fr,d3_sr,d3_fr
spot_return,1.0,0.956597,0.287173,0.256675,0.852115,0.828879,0.633653,0.622222
futures_return,0.956597,1.0,0.229003,0.260205,0.829117,0.852352,0.640789,0.652855
d1_sr,0.287173,0.229003,1.0,0.889287,-0.006824,-0.006782,-0.009578,-0.009811
d1_fr,0.256675,0.260205,0.889287,1.0,-0.003301,-0.003281,-0.004633,-0.004746
d2_sr,0.852115,0.829117,-0.006824,-0.003301,1.0,0.972639,0.740742,0.727119
d2_fr,0.828879,0.852352,-0.006782,-0.003281,0.972639,1.0,0.749813,0.763909
d3_sr,0.633653,0.640789,-0.009578,-0.004633,0.740742,0.749813,1.0,0.981563
d3_fr,0.622222,0.652855,-0.009811,-0.004746,0.727119,0.763909,0.981563,1.0


# 單根檢定

In [8]:
from arch import unitroot

series_data = reserch_table_2515['spot_return']

adf = unitroot.ADF(series_data, max_lags=20, method="bic")
print(adf.summary().as_text())
print("\n\n", "********"*10, "\n\n")
pp = unitroot.PhillipsPerron(series_data)
print(pp.summary().as_text())
print("\n\n", "********"*10, "\n\n")
kpss = unitroot.KPSS(series_data)
print(kpss.summary().as_text())

   Augmented Dickey-Fuller Results   
Test Statistic                -12.646
P-value                         0.000
Lags                                0
-------------------------------------

Trend: Constant
Critical Values: -3.47 (1%), -2.88 (5%), -2.58 (10%)
Null Hypothesis: The process contains a unit root.
Alternative Hypothesis: The process is weakly stationary.


 ******************************************************************************** 


     Phillips-Perron Test (Z-tau)    
Test Statistic                -12.848
P-value                         0.000
Lags                               14
-------------------------------------

Trend: Constant
Critical Values: -3.47 (1%), -2.88 (5%), -2.58 (10%)
Null Hypothesis: The process contains a unit root.
Alternative Hypothesis: The process is weakly stationary.


 ******************************************************************************** 


    KPSS Stationarity Test Results   
Test Statistic                  0.078
P-value     

# VAR

In [9]:
from statsmodels.tsa.api import VAR

In [10]:
# 建立VAR模型
model = VAR(reserch_table[[
    'spot_return', 'futures_return',
    # "d1_sr",  "d1_fr",  
    # "d2_sr",  "d2_fr", 
    # "d3_sr",  "d3_fr"
]])
lag_order = model.select_order()
print("最是落後期：")
print(lag_order.summary())

最是落後期：
 VAR Order Selection (* highlights the minimums)  
       AIC         BIC         FPE         HQIC   
--------------------------------------------------
0     -0.02192     0.01735      0.9783   -0.005973
1      -0.2678     -0.1500      0.7650     -0.2200
2      -0.3523    -0.1560*      0.7031    -0.2726*
3      -0.3830     -0.1081      0.6819     -0.2713
4      -0.4069    -0.05343      0.6659     -0.2633
5      -0.4061     0.02590      0.6666     -0.2306
6      -0.3799      0.1306      0.6845     -0.1725
7     -0.4170*      0.1720     0.6598*     -0.1778
8      -0.3909      0.2767      0.6777     -0.1197
9      -0.3566      0.3895      0.7018    -0.05357
10     -0.3459      0.4788      0.7100    -0.01089
11     -0.3015      0.6017      0.7430     0.06535
12     -0.2662      0.7155      0.7706      0.1325
13     -0.2632      0.7970      0.7741      0.1674
14     -0.2210      0.9178      0.8089      0.2415
--------------------------------------------------


In [11]:
# 使用選定的滯後期數進行模型估計
model_fitted = model.fit(2)
print(model_fitted.summary())

  Summary of Regression Results   
Model:                         VAR
Method:                        OLS
Date:           Thu, 08, May, 2025
Time:                     20:00:50
--------------------------------------------------------------------
No. of Equations:         2.00000    BIC:                  -0.110369
Nobs:                     167.000    HQIC:                 -0.221295
Log likelihood:          -439.120    FPE:                   0.743015
AIC:                    -0.297075    Det(Omega_mle):        0.700444
--------------------------------------------------------------------
Results for equation spot_return
                       coefficient       std. error           t-stat            prob
------------------------------------------------------------------------------------
const                     0.112571         0.139823            0.805           0.421
L1.spot_return            0.088504         0.289500            0.306           0.760
L1.futures_return        -0.085655    

# 加入 SVI 交乘項

In [12]:
# 建立VAR模型
model = VAR(reserch_table[[
    'spot_return', 'futures_return',
    "d1_sr",  "d1_fr",  
    # "d2_sr",  "d2_fr", 
    # "d3_sr",  "d3_fr"
]])
lag_order = model.select_order()
# print(lag_order.summary())
# 使用選定的滯後期數進行模型估計
model_fitted = model.fit(2)
print(model_fitted.summary())

  Summary of Regression Results   
Model:                         VAR
Method:                        OLS
Date:           Thu, 08, May, 2025
Time:                     20:00:50
--------------------------------------------------------------------
No. of Equations:         4.00000    BIC:                   -4.12106
Nobs:                     167.000    HQIC:                  -4.52040
Log likelihood:          -511.618    FPE:                 0.00828934
AIC:                     -4.79320    Det(Omega_mle):      0.00671947
--------------------------------------------------------------------
Results for equation spot_return
                       coefficient       std. error           t-stat            prob
------------------------------------------------------------------------------------
const                     0.112143         0.141978            0.790           0.430
L1.spot_return           -0.058021         0.329240           -0.176           0.860
L1.futures_return         0.062905    

# Granger causality test

In [13]:
from statsmodels.tsa.stattools import grangercausalitytests

In [14]:
MAX_LAG = 4

In [15]:
# 創建不同注意力水平下的子數據集
# 論文中 D1, D2, D3 的定義有些複雜，以下是根據常見理解簡化的版本
# 且表五中是 D1 (特低), D2 (高), D3 (特高) vs. D1=0, D2=0, D3=0 (其餘情況)

# 注意力特低 (SVI < 25th percentile) - 對應 D1=1 的情況
df_svi_very_low = reserch_table[reserch_table["SVI_D1"] == 1]
# 注意力非特低 (SVI >= 25th percentile) - 對應 D1=0 的情況
df_svi_not_very_low = reserch_table[reserch_table["SVI_D1"] == 0]

# 注意力高 (SVI > Median) - 對應 D2=1 的情況
df_svi_high = reserch_table[reserch_table["SVI_D2"] == 1]
# 注意力低 (SVI <= Median) - 對應 D2=0 的情況
df_svi_low =  reserch_table[reserch_table["SVI_D2"] == 0]

# 注意力特高 (SVI > 75th percentile) - 對應 D3=1 的情況
df_svi_very_high = reserch_table[reserch_table["SVI_D3"] == 1]
# 注意力非特高 (SVI <= 75th percentile) - 對應 D3=0 的情況
df_svi_not_very_high = reserch_table[reserch_table["SVI_D3"] == 0]

# 基礎模型 (全樣本)
regimes = {
    "Full Sample (Baseline)": reserch_table,
    f"SVI < Q25 (D1=1, Very Low Attention)": df_svi_very_low,
    f"SVI >= Q25 (D1=0, Not Very Low Attention)": df_svi_not_very_low,
    f"SVI > Median (D2=1, High Attention)": df_svi_high,
    f"SVI <= Median (D2=0, Low Attention)": df_svi_low,
    f"SVI > Q75 (D3=1, Very High Attention)": df_svi_very_high,
    f"SVI <= Q75 (D3=0, Not Very High Attention)": df_svi_not_very_high,
}

# --- 4. 執行 Granger 因果檢定 ---
def run_granger_tests_for_regime(data, regime_name, max_lag):
    print(f"\n--- Granger Causality Tests for: {regime_name} (N={len(data)}) ---")
    if len(data) < max_lag + 5: # 樣本數過少則不執行
        print("Sample size too small to perform test.")
        return

    # 檢定 FR 是否 Granger-causes SR (FR -> SR)
    # H0: FR does not Granger-cause SR
    print(f"\nTesting if FR Granger-causes SR (H0: FR does not Granger-cause SR) for {regime_name}:")
    try:
        # grangercausalitytests 的輸入是一個 (nobs, 2) 的 array/DataFrame
        # 第一列是被解釋變數 (y)，第二列是解釋變數 (x)
        gc_results_fr_sr = grangercausalitytests(data[['spot_return', 'futures_return']], maxlag=max_lag, verbose=False) 	
        # 提取特定 lag 的 p-value (例如 AIC 選擇的 lag 或論文中的 lag)
        # gc_results_fr_sr is a dict where keys are lags
        # Each value is a dict with test statistics, p-values
        # We are interested in the 'ssr_ftest' p-value typically
        for lag in range(1, max_lag + 1):
            p_value = gc_results_fr_sr[lag][0]['ssr_ftest'][1]
            f_stat = gc_results_fr_sr[lag][0]['ssr_ftest'][0]
            print(f"  Lag {lag}: F-statistic={f_stat:.4f}, p-value={p_value:.4f}{' *' if p_value < 0.1 else ''}{'**' if p_value < 0.05 else ''}{'***' if p_value < 0.01 else ''}")
    except Exception as e:
        print(f"  Error testing FR->SR: {e}")


    # 檢定 SR 是否 Granger-causes FR (SR -> FR)
    # H0: SR does not Granger-cause FR
    print(f"\nTesting if SR Granger-causes FR (H0: SR does not Granger-cause FR) for {regime_name}:")
    try:
        gc_results_sr_fr = grangercausalitytests(data[['spot_return', 'futures_return']], maxlag=max_lag, verbose=False)
        for lag in range(1, max_lag + 1):
            p_value = gc_results_sr_fr[lag][0]['ssr_ftest'][1]
            f_stat = gc_results_sr_fr[lag][0]['ssr_ftest'][0]
            print(f"  Lag {lag}: F-statistic={f_stat:.4f}, p-value={p_value:.4f}{' *' if p_value < 0.1 else ''}{'**' if p_value < 0.05 else ''}{'***' if p_value < 0.01 else ''}")
    except Exception as e:
        print(f"  Error testing SR->FR: {e}")

for name, data_subset in regimes.items():
    run_granger_tests_for_regime(data_subset.copy(), name, MAX_LAG)

# --- 5. 解釋結果 ---
#   - p-value < 顯著性水平 (e.g., 0.05 or 0.10): 拒絕原假設 (H0)。
#     - 若檢定 FR -> SR 時拒絕 H0，則表示 FR 的過去值有助於預測 SR (FR Granger-causes SR)。
#     - 若檢定 SR -> FR 時拒絕 H0，則表示 SR 的過去值有助於預測 FR (SR Granger-causes FR)。
#   - p-value >= 顯著性水平: 無法拒絕原假設。

#   比較不同 SVI 區間下的結果，看投資人注意力是否改變了市場間的資訊傳遞方向或強度。
#   例如，若在高注意力時 SR -> FR 的 p-value 變得很小 (顯著)，
#   而在低注意力時不顯著，則支持了「高注意力會增強現貨市場價格發現功能」的結論。


--- Granger Causality Tests for: Full Sample (Baseline) (N=169) ---

Testing if FR Granger-causes SR (H0: FR does not Granger-cause SR) for Full Sample (Baseline):
  Lag 1: F-statistic=0.0825, p-value=0.7743
  Lag 2: F-statistic=0.0754, p-value=0.9274
  Lag 3: F-statistic=0.8091, p-value=0.4906
  Lag 4: F-statistic=0.8372, p-value=0.5035

Testing if SR Granger-causes FR (H0: SR does not Granger-cause FR) for Full Sample (Baseline):
  Lag 1: F-statistic=0.0825, p-value=0.7743
  Lag 2: F-statistic=0.0754, p-value=0.9274
  Lag 3: F-statistic=0.8091, p-value=0.4906
  Lag 4: F-statistic=0.8372, p-value=0.5035

--- Granger Causality Tests for: SVI < Q25 (D1=1, Very Low Attention) (N=41) ---

Testing if FR Granger-causes SR (H0: FR does not Granger-cause SR) for SVI < Q25 (D1=1, Very Low Attention):
  Lag 1: F-statistic=1.0266, p-value=0.3175
  Lag 2: F-statistic=0.5816, p-value=0.5645
  Lag 3: F-statistic=0.2915, p-value=0.8312
  Lag 4: F-statistic=0.4309, p-value=0.7851

Testing if SR Gran

