In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
import warnings

from pandas import DataFrame
from IPython.display import display
from statsmodels.tsa.stattools import adfuller, grangercausalitytests

In [None]:
def load_timeseries_data(file_path: str, start_year: int = None) -> DataFrame:
    """
    Loads and preprocesses time-series data from a CSV file.

    :param file_path:  Path to the CSV file
    :param start_year: Optional starting year to filter data from
    :return:           Cleaned pandas DataFrame indexed by time
    """
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

    df = pd.read_csv(file_path, sep=';', parse_dates=["time"])
    df.dropna(axis=1, how='all', inplace=True)
    df.set_index("time", inplace=True)

    if start_year:
        start_date = f"{start_year}-01-01"
        df = df.loc[start_date:].copy()

    return df

In [None]:
def create_market_stats(futures_df: DataFrame, spot_df: DataFrame, risk_free_df: DataFrame, back_adjusted: bool = False, illiq_factor: float = 300) -> DataFrame:
    """
    Creates a standardized market statistics DataFrame for futures and spot prices.

    :param futures_df:    DataFrame with futures data ('close', 'Volume', 'Futures Open Interest')
    :param spot_df:       DataFrame with spot market data ('close')
    :param risk_free_df:  DataFrame with risk-free rates ('close')
    :param back_adjusted: Boolean, if True compute Amihud illiquidity instead of abs_basis
    :param illiq_factor:  Scaling factor used in Amihud illiquidity calculation (default 300)
    :return:              DataFrame containing all computed statistics
    """
    stats = pd.DataFrame()
    
    stats['fut_close'] = futures_df['close']
    stats['spot_close'] = spot_df['close']
    stats['basis'] = stats['fut_close'] - stats['spot_close']
    stats['norm_basis'] = stats['basis'] / stats['spot_close']

    stats['fut_ret'] = np.log(stats['fut_close']).diff()
    stats['spot_ret'] = np.log(stats['spot_close']).diff()

    stats['volume'] = futures_df['Volume']
    stats = stats[stats['volume'] > 0]
    
    stats['open_interest'] = futures_df['Futures Open Interest']
    stats['risk_free'] = risk_free_df['close']
    
    if back_adjusted:
        stats['amihud_illiq'] = 1e6 * abs(stats['fut_ret']) / (stats['volume'] * stats['fut_close'] * illiq_factor)
    else:
        stats['abs_basis'] = abs(stats['basis'])

    stats.dropna(inplace=True)
    
    return stats

In [None]:
csi_futures: DataFrame       = load_timeseries_data("./data/csi300_futures_csv.csv", 2017)
snp_futures: DataFrame       = load_timeseries_data("./data/snp500_futures_csv.csv", 2017)

csi_futures_badj: DataFrame  = load_timeseries_data("./data/csi300_futures_badj_csv.csv", 2017)
snp_futures_badj: DataFrame  = load_timeseries_data("./data/snp500_futures_badj_csv.csv", 2017)

csi_spot: DataFrame          = load_timeseries_data("./data/csi300_spot_csv.csv", 2017)
snp_spot: DataFrame          = load_timeseries_data("./data/snp500_spot_csv.csv", 2017)

rf_cn: DataFrame             = load_timeseries_data("./data/rf_cn.csv", 2017)
rf_us: DataFrame             = load_timeseries_data("./data/rf_us.csv", 2017)

cn_stats: DataFrame          = create_market_stats(csi_futures, csi_spot, rf_cn)
us_stats: DataFrame          = create_market_stats(snp_futures, snp_spot, rf_us)

cn_badj_stats: DataFrame     = create_market_stats(csi_futures_badj, csi_spot, rf_cn, back_adjusted=True, illiq_factor=300)
us_badj_stats: DataFrame     = create_market_stats(snp_futures_badj, snp_spot, rf_us, back_adjusted=True, illiq_factor=50)

cn_stats["amihud_illiq"]     = cn_badj_stats["amihud_illiq"]
us_stats["amihud_illiq"]     = us_badj_stats["amihud_illiq"]

In [None]:
plot_configs = [
    (['basis', 'basis'], ['csi300 Basis', 'snp500 Basis'], 'snp500 vs csi300 Basis (snp500-USD, csi300-CNY)', True),
    (['norm_basis', 'norm_basis'], ['csi300 Norm Basis', 'snp500 Norm Basis'], 'snp500 vs csi300 Norm Basis (%)', True),
    (['volume', 'volume'], ['csi300 Volume', 'snp500 Volume'], 'snp500 vs csi300 Daily Volume', False),
    (['amihud_illiq', 'amihud_illiq'], ['csi300 Amihud Illiq', 'snp500 Amihud Illiq'], 'snp500 vs csi300 Amihud Illiquidity / million currency traded', False)
]

fig, ax = plt.subplots(len(plot_configs), 1, figsize=(12, 18), sharex=True)

for i, (cols, labels, title, add_hline) in enumerate(plot_configs):
    ax[i].plot(cn_stats.index, cn_stats[cols[0]], label=labels[0])
    ax[i].plot(us_stats.index, us_stats[cols[1]], label=labels[1])
    ax[i].set_title(title)
    if add_hline:
        ax[i].axhline(0, color='k', lw=0.5)
    ax[i].legend()

plt.tight_layout()
plt.show()

In [None]:
def calculate_rolling_ohr_he(stats: DataFrame, window_size: int=60) -> DataFrame:
    """
    Calculates rolling OHR (Optimal Hedge Ratio) and HE (Hedging Effectiveness)
    using a rolling window regression of spot returns on futures returns.

    :param stats:       DataFrame containing at least 'spot_ret' and 'fut_ret' columns
    :param window_size: Number of observations in each rolling window (default 60)
    :return:            DataFrame with original stats plus 'ohr' and 'he' columns
    """
    ohr_list = []
    he_list = []
    dates_list = []

    for i in range(window_size, len(stats)):
        window = stats.iloc[i - window_size : i]

        Y = window['spot_ret']
        X = sm.add_constant(window['fut_ret'])

        model = sm.OLS(Y, X, missing='drop').fit()

        ohr_list.append(model.params['fut_ret'])
        he_list.append(model.rsquared)
        dates_list.append(window.index[-1])

    return pd.DataFrame({
        'ohr': ohr_list,
        'he': he_list
    }, index=dates_list)

In [None]:
cn_ohr = calculate_rolling_ohr_he(cn_badj_stats)
us_ohr = calculate_rolling_ohr_he(us_badj_stats)

cn_badj_stats = cn_badj_stats.join(cn_ohr)
us_badj_stats = us_badj_stats.join(us_ohr)

print("optimal hedging ratio (CN)")
display(cn_ohr.head(3))
print("optimal hedging ratio (US)")
display(us_ohr.head(3))


In [None]:
def plot_basis_stats(stats: DataFrame) -> None:
    """
    Plots a series of market statistics for a futures market.

    :param stats: DataFrame containing columns like 
                  'spot_close', 'fut_close', 'basis', 'norm_basis', 
                  'fut_ret', 'spot_ret', 'volume', 'open_interest', 
                  'risk_free', 'amihud_illiq', 'ohr', 'he'
    """
    index_name = 'Csi300' if stats is cn_stats or stats is cn_badj_stats else 'Snp500'
    risk_free_name = 'Chinese' if stats is cn_stats or stats is cn_badj_stats else 'US'

    plots = [
        (['spot_close', 'fut_close'], f'{index_name} Index Spot and Futures (Currency)', True),
        (['basis'], f'{index_name} Futures - Spot (Basis) (Currency)', True),
        (['norm_basis'], f'{index_name} Normalized Basis (%)', True),
        (['fut_ret'], f'{index_name} % Change DoD in Futures Prices', False),
        (['spot_ret'], f'{index_name} % Change DoD in Spot Prices', False),
        (['open_interest'], f'{index_name} Daily Open Interest on Futures', False),
        (['volume'], f'{index_name} Daily Volume on Futures', False),
        (['risk_free'], f'{risk_free_name} Risk Free Rate (%)', False),
        (['amihud_illiq'], f'{index_name} Futures Amihud Illiquidity', False),
        (['ohr'], f'{index_name} Optimal Hedge Ratio (60 day window)', False),
        (['he'], f'{index_name} Hedging Effectiveness (60 day window)', False)
    ]

    fig, ax = plt.subplots(len(plots), 1, figsize=(12, 22), sharex=True)

    for i, (cols, title, add_hline) in enumerate(plots):
        for col in cols:
            ax[i].plot(stats.index, stats[col], label=col)
        ax[i].set_title(title)
        if len(cols) > 1:
            ax[i].legend()
        if add_hline:
            ax[i].axhline(0, color='k', lw=0.5)

    plt.tight_layout()
    plt.show()


plot_basis_stats(cn_badj_stats)
plot_basis_stats(us_badj_stats)

In [None]:
plot_configs = [
    (['ohr', 'ohr'], ['csi300 OHR', 'snp500 OHR'], 'snp500 vs csi300 Optimal Hedging Ratio'),
    (['he', 'he'], ['csi300 HE', 'snp500 HE'], 'snp500 vs csi300 Hedging Effectiveness')
]

fig, ax = plt.subplots(len(plot_configs), 1, figsize=(12, 10), sharex=True)

for i, (cols, labels, title) in enumerate(plot_configs):
    ax[i].plot(cn_badj_stats.index, cn_badj_stats[cols[0]], label=labels[0])
    ax[i].plot(us_badj_stats.index, us_badj_stats[cols[1]], label=labels[1])
    ax[i].set_title(title)
    ax[i].legend()

plt.tight_layout()
plt.show()

In [None]:
def summary_df(stats: pd.DataFrame, name: str) -> pd.DataFrame:
    """
    Returns a summary DataFrame with mean, std, skew, kurtosis
    for key columns in the stats DataFrame.

    :param stats: DataFrame containing market stats
    :param name: Name of the market/index (e.g., 'Csi300', 'Snp500')
    :return: DataFrame with summary statistics
    """
    cols_to_summarize = ['fut_ret', 'basis', 'volume', 'open_interest', 
                         'risk_free', 'amihud_illiq', 'ohr', 'he']

    summary_list = []
    for col in cols_to_summarize:
        s = stats[col].dropna()
        summary_list.append(pd.Series({
            'mean': s.mean(),
            'std': s.std(),
            'skew': s.skew(),
            'kurt': s.kurtosis()
        }, name=f"{name} {col}"))

    return pd.DataFrame(summary_list)

cn_summary = summary_df(cn_badj_stats, 'Csi300')
us_summary = summary_df(us_badj_stats, 'Snp500')

display(cn_summary)
display(us_summary)

In [None]:
warnings.filterwarnings("ignore")
def check_stationarity(df: pd.DataFrame, columns: list[str], name: str = None) -> pd.DataFrame:
    """
    Performs the Augmented Dickey-Fuller test for stationarity on selected columns.

    :param df: DataFrame containing time series data
    :param columns: List of column names to test
    :param name: Optional name for labeling (e.g., 'Csi300', 'Snp500')
    :return: DataFrame with ADF statistic, p-value, and stationarity conclusion
    """
    if name is None:
        name = 'Csi300' if df is cn_stats else 'Snp500'

    results = []

    for col in columns:
        series = df[col].dropna()
        adf_result = adfuller(series)
        stat, pvalue = adf_result[0], adf_result[1]
        stationary = 'Yes' if pvalue < 0.05 else 'No'

        results.append({
            'column': f'{name} {col}',
            'adf_stat': stat,
            'p_value': pvalue,
            'stationary': stationary
        })

    return pd.DataFrame(results)

In [None]:
def run_granger_causality(df: pd.DataFrame, variables: list[str], max_lag: int = 5, name: str = None) -> pd.DataFrame:
    """
    Runs Granger causality tests between two variables in a DataFrame.

    :param df: DataFrame containing the time series data
    :param variables: List of two column names [cause, effect]
    :param max_lag: Maximum number of lags to test (default 5)
    :param name: Optional name for labeling (e.g., 'Csi300', 'Snp500')
    :return: DataFrame with lag and p-value of F-test
    """
    if name is None:
        name = 'Csi300' if df is cn_stats else 'Snp500'

    print(f"Testing Granger causality between {variables[0]} → {variables[1]} ({name})")
    
    data = df[variables].dropna()
    results = grangercausalitytests(data, maxlag=max_lag, verbose=False)

    pvals = {lag: results[lag][0]['ssr_ftest'][1] for lag in range(1, max_lag + 1)}
    summary_df = pd.DataFrame.from_dict(pvals, orient='index', columns=['p_value'])
    summary_df.index.name = 'lag'

    return summary_df

In [None]:
display(check_stationarity(cn_stats, ['abs_basis']))
display(check_stationarity(cn_stats, ['amihud_illiq']))

display(run_granger_causality(cn_stats, ['abs_basis', 'amihud_illiq']))
display(run_granger_causality(cn_stats, ['amihud_illiq', 'abs_basis']))

display(check_stationarity(us_stats, ['abs_basis']))
display(check_stationarity(us_stats, ['amihud_illiq']))

display(run_granger_causality(us_stats, ['abs_basis', 'amihud_illiq']))
display(run_granger_causality(us_stats, ['amihud_illiq', 'abs_basis']))