In [None]:
import os
os.chdir('/Users/anthony/git-projects/midas')

import pandas as pd
import numpy as np
from core.strategies.research import DataProcessing, TimeseriesTests

In [None]:
class CointegrationStrategy:
    def __init__(self, data:pd.DataFrame) -> None:
        self.data = data # Timestamp | Asset1 | Asset2 structure

    def create_spread(self,data:pd.DataFrame, cointegration_vector:list):
        return data.dot(cointegration_vector)

    def calculate_z_score(self,spread:pd.Series):
        """
        Calculate the z-score of a given time series.

        Args:
        series (Series): A pandas Series representing the spread.

        Returns:
        Series: A pandas Series of the z-scores.
        """
        return (spread - spread.mean()) / spread.std()

    def visualize_spread_formula(self,data:pd.DataFrame, cointegration_vector:np.array):
        """
        Visualize the formula for creating the spread using the cointegration vector.

        Parameters:
            data (pd.DataFrame): DataFrame containing the data with columns as tickers.
            cointegration_vector (list): List representing the cointegration vector (hedge ratios).

        Returns:
            None (prints the formula)
        """
        # Create a DataFrame to display the formula
        formula_df = pd.DataFrame(columns=["Ticker", "Hedge Ratio"])
        
        # Populate the DataFrame with tickers and corresponding hedge ratios
        formula_df["Ticker"] = data.columns
        formula_df["Hedge Ratio"] = cointegration_vector
        
        # Print the formula
        print("Spread = ")
        for index, row in formula_df.iterrows():
            print(f"({row['Hedge Ratio']} * {row['Ticker']}) + ", end="")
        print("Constant Term")


In [None]:
## Step 1 : Retrieve and prepare data
start_date = "2018-05-18"
end_date = "2023-01-19"
# symbols = ['AAPL','MSFT']
symbols = ["HE.n.0", "ZC.n.0"]#, "HE.n.0"]

data_processing = DataProcessing()
data = data_processing.get_data(symbols, start_date,end_date)
Strategy = CointegrationStrategy(data=data)


In [None]:
## Step 2 : Training and Testing Split
data = data.ffill()
# log_data = np.log(data)
train_data,test_data = data_processing.split_data(data)
data_processing.check_missing(train_data)
print(test_data)


In [None]:
## Step 3 : Test for exponential nature (determine if log prices will be used)
time_array = range(1, len(train_data) + 1)
bp_results = {}

for column in train_data.columns:
    value_array = train_data[column].values
    TimeseriesTests.line_plot(x=time_array, y=value_array)

    residuals_linear, residuals_exp = data_processing.fit_and_compare(time_array, value_array)

    bp_results[f"{column}_linear_residual"] = TimeseriesTests.breusch_pagan(np.array(residuals_linear), np.array(time_array))
    constant_to_add = abs(residuals_exp.min()) + 1

    # Adjust residuals and apply log transformation
    adjusted_residuals_exp = np.log(np.abs(residuals_exp) + constant_to_add)

    # Perform Breusch-Pagan test on adjusted residuals
    bp_results[f"{column}_exp_residual"] = TimeseriesTests.breusch_pagan(np.array(time_array), np.array(adjusted_residuals_exp))


TimeseriesTests.display_breusch_pagan_results(bp_results)



In [None]:
## Step 4 : Check stationarity
adf_results = {}
kpss_results = {}
pp_results = {}


for column in train_data.columns:
    series = train_data[column]
    adf_results[column] = TimeseriesTests.adf_test(series, trend='ct')
    pp_results[column] = TimeseriesTests.phillips_perron_test(series, trend='ct')
    # kpss_results[column] = TimeseriesTests.kpss_test(series, trend='ct')

TimeseriesTests.display_adf_results(adf_results)
TimeseriesTests.display_pp_results(pp_results)
# TimeseriesTests.display_kpss_results(kpss_results)


In [None]:
## Step 5 : Check stationarity at first difference
adf_results_diff = {}
kpss_results_diff = {}
pp_results_diff = {}

for column in train_data.columns:
    series = train_data[column].diff(1)
    series.dropna(inplace=True)
    adf_results_diff[f"{column}_diff"] = TimeseriesTests.adf_test(series)
    pp_results_diff[f"{column}_diff"] = TimeseriesTests.phillips_perron_test(series, trend='ct')
    # kpss_results_diff[f"{column}_diff"] = TimeseriesTests.kpss_test(series)

TimeseriesTests.display_adf_results(adf_results_diff)
TimeseriesTests.display_pp_results(pp_results_diff)
# TimeseriesTests.display_kpss_results(kpss_results_diff)

In [None]:
## Step 6 : Check seasonality
seasonal_adf_results = {}

for column in train_data.columns:
    series = train_data[column]
    seasonal_adf_results[f"{column}_seasonality"] = TimeseriesTests.seasonal_adf_test(series)

TimeseriesTests.display_adf_results(seasonal_adf_results)

In [None]:
## Step 7 : Detemine lag length
lag = TimeseriesTests.select_lag_length(data=data)
print(f"\nIdeal Lag : {lag}")

In [None]:
## Step 8 : Check cointegrations
johansen_results, num_cointegrations = TimeseriesTests.johansen_test(data=train_data,k_ar_diff=lag-1)
TimeseriesTests.display_johansen_results(johansen_results, num_cointegrations)

In [None]:
## Step 9 : Check granger causality (Use differenced data as it needs to be stationary)
data_diff = data.diff(1).dropna()
granger_results = TimeseriesTests.granger_causality(data_diff, max_lag=4)
TimeseriesTests.display_granger_results(granger_results)

In [None]:
# Step 10 : Build VECM
model = TimeseriesTests.vecm_model(data=train_data, coint_rank=max(num_cointegrations,1), k_ar_diff=lag-1)
residual_array = model.resid
residuals = pd.DataFrame(residual_array, columns=train_data.columns)

In [None]:
# Step 11 : Validate Model (Residual Tests)

# Test for Serial Correlation in Residuals
ljung_box_results = TimeseriesTests.ljung_box(residuals, lags=lag)
TimeseriesTests.display_ljung_box_results(ljung_box_results)

durbin_watson_result = TimeseriesTests.durbin_watson(residuals)
TimeseriesTests.display_durbin_watson_results(durbin_watson_result)

# Test for Stationarity in Resiuals
adf_residuals = {}
kpss_residuals = {}
pp_residuals = {}

for column in residuals.columns:
    series = residuals[column]
    adf_residuals[f"{column}_residuals"] = TimeseriesTests.adf_test(series)
    pp_residuals[f"{column}_residuals"] = TimeseriesTests.phillips_perron_test(series)
    # kpss_residuals[f"{column}_residuals"] = TimeseriesTests.kpss_test(series)

TimeseriesTests.display_adf_results(adf_residuals)
TimeseriesTests.display_pp_results(pp_residuals)
# TimeseriesTests.display_kpss_results(kpss_residuals)

# Test for Normality in Residuals
shapiro_wilk_residuals = {}

for column in residuals.columns:
    series = residuals[column]
    shapiro_wilk_residuals[column] = TimeseriesTests.shapiro_wilk(series) # safe for less than 2000 observations
    # TimeseriesTests.qq_plot(series, 'Sample Q-Q Plot')
    TimeseriesTests.histogram_ndc(series)
    TimeseriesTests.histogram_kde(series)

TimeseriesTests.display_shapiro_wilk_results(shapiro_wilk_residuals)

# -- TODO: Fix below test and add display function --
# Test for Homoscedascity
lagged_values = train_data.shift(lag).dropna()

bp_results = {}
white_results = {}

# Perform the Breusch-Pagan test for each series in residuals
for column in residuals.columns:
    white_results[column] = TimeseriesTests.white_test(np.array(lagged_values[column]), np.array(residuals[column]))
    bp_results[column] = TimeseriesTests.breusch_pagan(np.array(lagged_values[column]), np.array(residuals[column]))

TimeseriesTests.display_white_test_results(white_results)
TimeseriesTests.display_breusch_pagan_results(bp_results)


In [None]:
# Step 12 : Forecast and Model Evaluation 
forecast, lower, upper = model.predict(steps=len(test_data), alpha=0.05)  # 95% confidence interval
forecast_df = pd.DataFrame(data=forecast, index=test_data.index, columns=test_data.columns)

TimeseriesTests.evaluate_forecast(test_data, forecast_df)
TimeseriesTests.plot_forecast(actual=test_data, forecast=forecast_df)


In [None]:
# Step 13 : Create fitted values(spread) based on long-term equilibrium relationship (V = SUM(aAt + bBt + cCt ...) for i = 0 -> i = n)
first_cointegration_vector = johansen_results['Cointegrating Vector'][0]
spread = Strategy.create_spread(data, first_cointegration_vector) 

# Test Stationarity in Spread
adf_spread_results = {}
pp_spread_results = {}

adf_spread_results['spread'] = TimeseriesTests.adf_test(spread)
pp_spread_results['spread'] = TimeseriesTests.phillips_perron_test(spread)

TimeseriesTests.display_adf_results(adf_spread_results)
TimeseriesTests.display_pp_results(pp_spread_results)

# Test historical nature of spread w/ Hurst Exponent
hurst_exponent_result = TimeseriesTests.hurst_exponent(np.array(spread))
print(f"\nHurst Exponent : {hurst_exponent_result}")

# Test historical half-life (expected time to mean revert)
spread_lagged = DataProcessing.lag_series(spread)
spread_combined = pd.DataFrame({'Original': spread, 'Lagged': spread_lagged}).dropna()
half_life, residuals = TimeseriesTests.half_life(Y = spread_combined['Original'], Y_lagged = spread_combined['Lagged'])
print(f"\nHalf-Life : {half_life}")

# Visualize spread/price relationship
TimeseriesTests.plot_price_and_spread(price_data = data, spread = pd.Series(spread))


In [None]:
# Step 14 :Establish Strategy Rules : utltize johansen for hedge ratios and vecm as an signal indicator
z_score_spread = Strategy.calculate_z_score(spread) 
TimeseriesTests.plot_price_and_spread(price_data = data, spread = pd.Series(z_score_spread))

In [None]:
TimeseriesTests.plot_zscore(pd.Series(z_score_spread))
print(type(first_cointegration_vector))
Strategy.visualize_spread_formula(data, first_cointegration_vector)
