In [None]:
# Time Series Analysis and Granger Causality - Version 01
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller, grangercausalitytests
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tools.eval_measures import aic, bic

In [None]:
def read_data(file_path):
    df = pd.read_csv(file_path)
    df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time'])
    df.set_index('datetime', inplace=True)
    df = df.drop(['date', 'time'], axis=1)
    price = df['price']
    features = df.drop('price', axis=1)
    return price, features

In [None]:
def make_stationary(series):
    def adf_test(timeseries):
        result = adfuller(timeseries, autolag='AIC')
        return result[1] <= 0.05

In [None]:
 diff_order = 0
    while not adf_test(series) and diff_order < 2:
        series = series.diff().dropna()
        diff_order += 1
    return series, diff_order

In [None]:
def optimize_lag(series, max_lag=10):
    aic_values = []
    for lag in range(1, max_lag + 1):
        model = AutoReg(series, lags=lag)
        results = model.fit()
        aic_values.append(aic(results.llf, results.nobs, results.df_model))
    best_lag = np.argmin(aic_values) + 1
    return best_lag, aic_values

In [None]:
def rolling_granger_causality(y, x, window_size, max_lag):
    results = []
    for i in range(len(y) - window_size + 1):
        y_window = y[i:i+window_size]
        x_window = x[i:i+window_size]
        gc_res = grangercausalitytests(pd.concat([y_window, x_window], axis=1), maxlag=max_lag, verbose=False)
        p_value = gc_res[max_lag][0]['ssr_ftest'][1]
        results.append(p_value)
    return results

In [None]:
# File path - replace with your actual file path
file_path = 'your_data.csv'
# Read the data
price, features = read_data(file_path)

In [None]:
# Print info about the loaded data
print("Data loaded successfully.")
print(f"Time range: {price.index.min()} to {price.index.max()}")
print(f"Number of data points: {len(price)}")
print(f"Number of features: {len(features.columns)}")
print("Features:", ", ".join(features.columns))

In [None]:
# Make price and features stationary
price_stationary, price_diff_order = make_stationary(price)
features_stationary = {}
for col in features.columns:
    features_stationary[col], _ = make_stationary(features[col])

In [None]:
# Optimize lag for each feature and store AIC values
optimal_lags = {}
aic_values = {}
for col, series in features_stationary.items():
    optimal_lags[col], aic_values[col] = optimize_lag(series)

In [None]:
# Visualize lag optimization
plt.figure(figsize=(15, 5 * ((len(features_stationary) + 1) // 2)))
for i, (col, aics) in enumerate(aic_values.items(), 1):
    plt.subplot(((len(features_stationary) + 1) // 2), 2, i)
    plt.plot(range(1, len(aics) + 1), aics, marker='o')
    plt.axvline(optimal_lags[col], color='r', linestyle='--', label=f'Optimal lag: {optimal_lags[col]}')
    plt.title(f'AIC vs Lag for {col}')
    plt.xlabel('Lag')
    plt.ylabel('AIC')
    plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# Perform rolling window Granger causality tests
window_size = 252  # Example: 1 year of daily data
granger_results = {}

In [None]:
# Price -> Features
for col, series in features_stationary.items():
    granger_results[f'Price -> {col}'] = rolling_granger_causality(series, price_stationary, window_size, optimal_lags[col])

In [None]:
# Features -> Price
for col, series in features_stationary.items():
    granger_results[f'{col} -> Price'] = rolling_granger_causality(price_stationary, series, window_size, optimal_lags[col])

In [None]:
# Features -> Features
for col1, series1 in features_stationary.items():
    for col2, series2 in features_stationary.items():
        if col1 != col2:
            granger_results[f'{col1} -> {col2}'] = rolling_granger_causality(series2, series1, window_size, optimal_lags[col1])


In [None]:
# Visualize Granger causality results
plt.figure(figsize=(15, 10))
plt.subplot(211)
plt.plot(price.index[window_size-1:], price[window_size-1:])
plt.title('Price')
plt.subplot(212)
for key, values in granger_results.items():
    if 'Price' in key:
        plt.plot(price.index[window_size-1:], values, label=key)
plt.title('Granger Causality p-values')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# Print optimal lag values
print("Optimal lag values:")
for col, lag in optimal_lags.items():
    print(f"{col}: {lag}")