# Cryptocurrency Market Analysis: Granger Causality Testing

## Overview
This notebook performs comprehensive statistical analysis to identify causal relationships between different cryptocurrency pairs using Granger causality tests. The analysis helps understand which cryptocurrencies lead or lag in price movements, providing insights into market dynamics and information flow.

## Background
**Granger Causality** is a statistical hypothesis test for determining whether one time series is useful in forecasting another. If variable X "Granger-causes" variable Y, past values of X contain information that helps predict Y above and beyond the information contained in past values of Y alone.

## Analysis Objectives
1. **Data Preparation**: Load and align cryptocurrency price data across multiple trading pairs
2. **Stationarity Testing**: Verify that log returns are stationary using ADF and KPSS tests
3. **Lag Selection**: Determine optimal lag structure using information criteria (AIC, BIC)
4. **Causality Testing**: Perform bidirectional Granger causality tests between BTC and other cryptocurrencies
5. **Results Interpretation**: Identify which cryptocurrencies lead or lag in market movements

---

## 1. Setup and Imports

In [None]:
import gc
import warnings
from pathlib import Path

import importlib
import numpy as np
import pandas as pd
import seaborn as sns
import ruptures as rpt
import matplotlib.pyplot as plt
import scipy.stats as stats
from scipy.stats import mannwhitneyu
from statsmodels.tsa.api import VAR
from statsmodels.formula.api import ols
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.stattools import grangercausalitytests, adfuller, kpss

warnings.filterwarnings("ignore")
raw_folder = Path.cwd().parent / "data" / "raw"

---

## 2. Data Loading Functions

The data is stored in partitioned Parquet files for efficient storage and retrieval. Each cryptocurrency has multiple part files that need to be combined into a single DataFrame for analysis.

In [None]:
def load_all_parts_to_df(folder):
	"""Load all parquet files from a folder and combine them into a single DataFrame.
	
	This function scans a directory for partitioned parquet files (named part*.parquet),
	loads them sequentially with memory management, and combines them into a single
	DataFrame. It's designed to handle large datasets that are split across multiple files.
	
	Args:
		folder: Path object pointing to the folder containing parquet files
		
	Returns:
		Combined DataFrame from all parquet files with all records concatenated
		
	Raises:
		FileNotFoundError: If no parquet files are found in the specified folder
		
	Note:
		The function uses garbage collection after each file load to manage memory
		efficiently, which is important when working with large datasets.
	"""
	parts = sorted(folder.glob("part*.parquet"))
	
	if not parts:
		raise FileNotFoundError(f"No parquet files found in {folder}")
	
	print(f"Loading {len(parts)} parquet files from {folder.name}...")
	
	dfs = []
	for p in parts:
		df = pd.read_parquet(p, engine="pyarrow")
		dfs.append(df)
		gc.collect()
	
	combined_df = pd.concat(dfs, ignore_index=True)
	del dfs
	gc.collect()
	
	print(f"  ✓ Loaded {len(combined_df):,} records")
	return combined_df.iloc[-2_000_000:-250_000]

---

## 3. Statistical Testing Functions

This section defines functions for various statistical tests required for time series analysis.

### 3.1 Stationarity Testing

Before performing Granger causality tests, we must ensure that our time series are stationary. Non-stationary data can lead to spurious regression results. We use two complementary tests:

- **ADF (Augmented Dickey-Fuller)**: Tests the null hypothesis that a unit root is present (non-stationary). We want to reject this null (p < 0.05).
- **KPSS (Kwiatkowski-Phillips-Schmidt-Shin)**: Tests the null hypothesis that the series is stationary. We want to fail to reject this null (p > 0.05).

A series is considered stationary when both tests agree.

In [None]:
def check_stationarity(series, name):
	"""Test for stationarity using ADF and KPSS tests.
	
	This function performs two complementary stationarity tests to ensure the time
	series doesn't have trends or seasonality that could lead to spurious results.
	The ADF test checks for unit roots while KPSS tests for trend stationarity.
	
	Args:
		series: Time series data to test, typically log returns
		name: Name of the series for display purposes in the output
		
	Note:
		- ADF Test: H0 = non-stationary (reject if p < 0.05 means stationary)
		- KPSS Test: H0 = stationary (reject if p < 0.05 means non-stationary)
		- Ideal result: ADF p < 0.05 AND KPSS p > 0.05
	"""
	adf_p = adfuller(series.dropna())[1]
	kpss_p = kpss(series.dropna(), regression='c', nlags='auto')[1]
	
	adf_status = "✓ Stationary" if adf_p < 0.05 else "✗ Non-stationary"
	kpss_status = "✓ Stationary" if kpss_p > 0.05 else "✗ Non-stationary"
	
	print(f"{name:10s} | ADF p={adf_p:.4f} ({adf_status:10s}) | KPSS p={kpss_p:.4f} ({kpss_status})")

    
def select_optimal_lag(df_combined, maxlag=10):
	"""Select optimal lag order for VAR model using information criteria.
	
	This function fits Vector Autoregression (VAR) models with different lag orders
	and compares them using information criteria. Lower values of AIC/BIC indicate
	better model fit while penalizing model complexity.
	
	Args:
		df_combined: Combined DataFrame with multiple time series columns
		maxlag: Maximum number of lags to test (default: 10)
		
	Returns:
		Optimal lag order based on Akaike Information Criterion (AIC)
		
	Note:
		Common information criteria:
		- AIC: Akaike Information Criterion (tends to select more lags)
		- BIC: Bayesian Information Criterion (more conservative, fewer lags)
		- HQIC: Hannan-Quinn Information Criterion (middle ground)
	"""
	model = VAR(df_combined)
	result = model.select_order(maxlags=maxlag)
	print("\n--- VAR Lag Order Selection ---")
	print(result.summary())
	return result.aic


def johansen_test(df):
	"""Perform Johansen cointegration test.
	
	The Johansen test determines whether multiple non-stationary time series have
	a long-run equilibrium relationship (cointegration). If series are cointegrated,
	they move together in the long run despite short-term deviations.
	
	Args:
		df: DataFrame with multiple time series columns to test for cointegration
		
	Note:
		- If trace statistic > critical value, reject null of no cointegration
		- Cointegration suggests using VECM instead of VAR model
		- det_order=0 means no deterministic trend in the data
	"""
	result = coint_johansen(df, det_order=0, k_ar_diff=1)
	print("\n--- Johansen Cointegration Test ---")
	print("Trace Statistics:", result.lr1)
	print("Critical Values:", result.cvt)

### 3.2 Granger Causality Analysis

The core analysis function that performs bidirectional Granger causality tests. This test determines whether past values of one time series help predict another time series better than using only the history of the predicted series itself.

**Interpretation:**
- X Granger-causes Y: Past values of X improve predictions of Y
- Bidirectional causality: Both X→Y and Y→X are significant
- No causality: Neither direction shows significant predictive power

**Important Notes:**
- "Granger causality" is about predictive power, not true causation
- Requires stationary time series (hence we use log returns)
- Results depend on the chosen lag order
- Significance at α=0.05 level is typically used

In [None]:
def compare_symbols_by_granger_causality(main_df, main_symbol, compared_df, compared_symbol, maxlag=5):
	"""Perform bidirectional Granger causality tests between two cryptocurrency pairs.
	
	This comprehensive function handles the complete workflow for testing Granger causality
	between two cryptocurrencies:
	1. Data preparation and timestamp alignment
	2. Calculation of log returns for stationarity
	3. Stationarity testing with ADF and KPSS
	4. Optimal lag selection using VAR model
	5. Bidirectional Granger causality tests
	
	The function tests both directions:
	- Does main_symbol Granger-cause compared_symbol? (e.g., BTC → ETH)
	- Does compared_symbol Granger-cause main_symbol? (e.g., ETH → BTC)
	
	Args:
		main_df: DataFrame for the main symbol containing OHLCV data
		main_symbol: Name of the main symbol (e.g., 'BTCUSDT')
		compared_df: DataFrame for the compared symbol containing OHLCV data
		compared_symbol: Name of the compared symbol (e.g., 'ETHUSDT')
		maxlag: Maximum lag to test (default: 5). Higher lags test longer-term relationships
		
	Returns:
		Dictionary with two keys, each containing a list of (F-statistic, p-value) tuples:
		- '{main_symbol}_affects_{compared_symbol}': Results for main → compared direction
		- '{compared_symbol}_affects_{main_symbol}': Results for compared → main direction
		Each list has `maxlag` elements, one for each lag tested.
		
	Raises:
		KeyError: If 'close' column is missing from either DataFrame
		ValueError: If insufficient observations remain after alignment and cleaning
		AssertionError: If timestamps don't align properly or aren't monotonically increasing
		
	Example:
		>>> result = compare_symbols_by_granger_causality(
		...     btc_df, "BTCUSDT", eth_df, "ETHUSDT", maxlag=3
		... )
		>>> # Check if BTC Granger-causes ETH at lag 1
		>>> f_stat, p_value = result['BTCUSDT_affects_ETHUSDT'][0]
		>>> if p_value < 0.05:
		...     print("BTC significantly Granger-causes ETH at lag 1")
		
	Note:
		The function uses log returns (log(price_t / price_{t-1})) instead of raw prices
		to achieve stationarity. Infinite values from zero prices are removed automatically.
		A smaller p-value (< 0.05) indicates stronger evidence of Granger causality.
	"""
	df1 = main_df.copy()
	df2 = compared_df.copy()
	
	df1["open_time"] = pd.to_datetime(df1["open_time"], utc=True, errors="coerce")
	df2["open_time"] = pd.to_datetime(df2["open_time"], utc=True, errors="coerce")
	
	df1 = df1.dropna(subset=["open_time"]).set_index("open_time").sort_index()
	df2 = df2.dropna(subset=["open_time"]).set_index("open_time").sort_index()
	
	common_idx = df1.index.intersection(df2.index)
	df1 = df1.loc[common_idx]
	df2 = df2.loc[common_idx]
	
	assert df1.index.equals(df2.index), \
		f"Index mismatch: main[{df1.index.min()}..{df1.index.max()}], compared[{df2.index.min()}..{df2.index.max()}]"
	assert df1.index.is_monotonic_increasing, "Index must be monotonically increasing"
	
	if "close" not in df1 or "close" not in df2:
		raise KeyError("Both dataframes must contain 'close' column")
	
	df1 = df1[df1["close"] > 0]
	df2 = df2[df2["close"] > 0]
	common_idx = df1.index.intersection(df2.index)
	df1 = df1.loc[common_idx]
	df2 = df2.loc[common_idx]
	
	main_return = np.log(df1["close"] / df1["close"].shift(1))
	compared_return = np.log(df2["close"] / df2["close"].shift(1))
	
	# print(f"\n{'='*80}")
	# print(f"Stationarity Tests: {main_symbol} vs {compared_symbol}")
	# print(f"{'='*80}")
	# check_stationarity(main_return, f"{main_symbol} return")
	# check_stationarity(compared_return, f"{compared_symbol} return")
	
	df_combined = pd.DataFrame({
		"main_return": main_return,
		"compared_return": compared_return
	}).replace([np.inf, -np.inf], np.nan).dropna()
	
	# print(f"\n{'='*80}")
	# print(f"Lag Selection: {main_symbol} vs {compared_symbol}")
	# print(f"{'='*80}")
	# optimal_lag = select_optimal_lag(df_combined, maxlag=maxlag)
	# print(f"Recommended lag based on AIC: {optimal_lag}")
	
	n = len(df_combined)
	if n < 3:
		raise ValueError("Not enough observations after alignment to run Granger causality tests")
	use_maxlag = int(min(maxlag, max(1, n - 2)))
	
	result = {
		f"{main_symbol}_affects_{compared_symbol}": [],
		f"{compared_symbol}_affects_{main_symbol}": []
	}
	
	def extract_ftest(res_dict):
		"""Extract F-test statistics from Granger test results.
		
		Args:
			res_dict: Result dictionary from grangercausalitytests
			
		Returns:
			Tuple of (F-statistic, p-value) from the F-test
			
		Raises:
			TypeError: If result structure is unexpected
			RuntimeError: If no valid test results found
		"""
		if isinstance(res_dict, dict):
			tests = res_dict.get(0, {}) if 0 in res_dict else next(iter(res_dict.values()))
		elif isinstance(res_dict, (list, tuple)):
			tests = res_dict[0]
		else:
			raise TypeError("Unexpected result structure in grangercausalitytests")
		
		for k in ("ssr_ftest", "params_ftest", "lrtest", "ssr_chi2test"):
			if k in tests:
				vals = tests[k]
				if isinstance(vals, (list, tuple)) and len(vals) >= 2:
					return float(vals[0]), float(vals[1])
		raise RuntimeError("Could not extract test results")
	
	# Perform Granger causality tests: compared → main
	res = grangercausalitytests(
		df_combined[["compared_return", "main_return"]],
		maxlag=use_maxlag,
		verbose=False
	)
	for i, (lag, res_dict) in enumerate(res.items()):
		assert lag == i + 1, f"Lag mismatch: {lag} != {i+1}"
		fval, pval = extract_ftest(res_dict)
		result[f"{compared_symbol}_affects_{main_symbol}"].append((fval, pval))
	
	# Perform Granger causality tests: main → compared
	res = grangercausalitytests(
		df_combined[["main_return", "compared_return"]],
		maxlag=use_maxlag,
		verbose=False
	)
	for i, (lag, res_dict) in enumerate(res.items()):
		assert lag == i + 1, f"Lag mismatch: {lag} != {i+1}"
		fval, pval = extract_ftest(res_dict)
		result[f"{main_symbol}_affects_{compared_symbol}"].append((fval, pval))
	
	return result

In [None]:
LAST_CANDLES = 300_000  # Number of recent candles to analyze
MAX_LAG = 30

MAIN_SYMBOL = "BTCUSDT"
SYMBOLS_TO_COMPARE = ["ETHUSDT", "BNBUSDT", "SOLUSDT"]

print(f"Analysis Configuration:")
print(f"  Main Symbol: {MAIN_SYMBOL}")
print(f"  Comparison Symbols: {', '.join(SYMBOLS_TO_COMPARE)}")
print(f"  Sample Size: {LAST_CANDLES:,} candles")
print(f"  Maximum Lag: {MAX_LAG}\n")

main_df = load_all_parts_to_df(raw_folder / MAIN_SYMBOL).tail(LAST_CANDLES)
granger_results = {}
for symbol_to_compare in SYMBOLS_TO_COMPARE:
	compared_df = load_all_parts_to_df(raw_folder / symbol_to_compare).tail(LAST_CANDLES)
	granger_results[symbol_to_compare] = compare_symbols_by_granger_causality(
		main_df, MAIN_SYMBOL, compared_df, symbol_to_compare, maxlag=MAX_LAG
	)

for i in range(MAX_LAG):
	for symbol, results in granger_results.items():
		main_to_comp = results[f"{MAIN_SYMBOL}_affects_{symbol}"][i]
		comp_to_main = results[f"{symbol}_affects_{MAIN_SYMBOL}"][i]
		print(f"\nLag {i+1}: {MAIN_SYMBOL} → {symbol}: F={main_to_comp[0]:.4f}, p={main_to_comp[1]:.4f}")
		print(f"Lag {i+1}: {symbol} → {MAIN_SYMBOL}: F={comp_to_main[0]:.4f}, p={comp_to_main[1]:.4f}")

### 4. Visualization of Granger Causality Results

The following visualization summarizes the Granger causality test results computed above.

- Left: heatmap of p-values for each tested lag and direction (rows = directional tests, columns = lag).
- Right: line plot of F-statistics across lags for each directional test.

Significant p-values (e.g., < 0.05) indicate evidence that one series helps predict the other at that lag.

In [None]:
rows = []
pvals = {}
fstats = {}
max_tested_lag = 0
for symbol, res in granger_results.items():
    key1 = f"{MAIN_SYMBOL}_affects_{symbol}"
    l1 = len(res[key1])
    max_tested_lag = max(max_tested_lag, l1)

directions = []
for symbol, res in granger_results.items():
    directions.append(f"{MAIN_SYMBOL}->{symbol}")

pvals_df = pd.DataFrame(index=directions, columns=range(1, max_tested_lag+1), dtype=float)
fvals_df = pd.DataFrame(index=directions, columns=range(1, max_tested_lag+1), dtype=float)

for symbol, res in granger_results.items():
    d1 = f"{MAIN_SYMBOL}->{symbol}"
    vals1 = res[f"{MAIN_SYMBOL}_affects_{symbol}"]
    for i in range(max_tested_lag):
        if i < len(vals1):
            f, p = vals1[i]
            fvals_df.loc[d1, i+1] = f
            pvals_df.loc[d1, i+1] = p
        else:
            fvals_df.loc[d1, i+1] = float('nan')
            pvals_df.loc[d1, i+1] = float('nan')

plt.figure(figsize=(16, 8))
plt.subplot(1, 2, 1)
sns.heatmap(pvals_df.astype(float), cmap='viridis_r', cbar_kws={'label': 'p-value'}, annot=False)
plt.title('Granger Causality p-values (rows: direction, cols: lag)')
plt.xlabel('Lag')
plt.ylabel('Direction')

sig_mask = pvals_df.astype(float) < 0.05
for (y, x), val in np.ndenumerate(sig_mask.values):
    if val:
        plt.gca().add_patch(plt.Rectangle((x, y), 1, 1, fill=False, edgecolor='red', lw=1.0))

plt.subplot(1, 2, 2)
for idx in fvals_df.index:
    plt.plot(fvals_df.columns, fvals_df.loc[idx], marker='o', label=idx)
plt.title('F-statistics across lags')
plt.xlabel('Lag')
plt.ylabel('F-statistic')
plt.legend(loc='best')
plt.tight_layout()
plt.show()

### 5. Additional tests: correlation, structural breaks, rank tests, wavelet coherence and FEVD

This section implements the additional checks requested by the user:

1. Correlation tests (Pearson, Spearman, Kendall)
2. Structural break tests:
   - Chow test (single known breakpoint)
   - Bai–Perron (multiple breaks) using the `ruptures` package
3. Mann–Whitney U test (non-parametric comparison of distributions)
4. Wavelet coherence (using `pycwt`, if available)
5. FEVD (Forecast Error Variance Decomposition) based on VAR

The code cell below provides helper functions and executes these tests for `MAIN_SYMBOL` vs each symbol in `SYMBOLS_TO_COMPARE`. If optional libraries are missing, the code will notify and provide install hints.

In [None]:
def compute_returns(df, col='close'):
    r = np.log(df[col] / df[col].shift(1))
    return r.replace([np.inf, -np.inf], np.nan)

def chow_test(df, x_col, y_col, t0_idx):
    df1 = df.iloc[:t0_idx]
    df2 = df.iloc[t0_idx:]
    model_full = ols(f"{y_col} ~ {x_col}", data=df).fit()
    model_1 = ols(f"{y_col} ~ {x_col}", data=df1).fit()
    model_2 = ols(f"{y_col} ~ {x_col}", data=df2).fit()
    ssr_full = float(np.sum(model_full.resid ** 2))
    ssr1 = float(np.sum(model_1.resid ** 2))
    ssr2 = float(np.sum(model_2.resid ** 2))
    k = int(model_full.df_model) + 1
    n = len(df)
    if (n - 2 * k) <= 0:
        raise ValueError(f"Invalid degrees of freedom for Chow test: n={n}, k={k}")
    num = (ssr_full - (ssr1 + ssr2)) / k
    den = (ssr1 + ssr2) / (n - 2 * k)
    Fstat = num / den
    p = 1 - stats.f.cdf(Fstat, k, n - 2 * k)
    return float(Fstat), float(p)

def bai_perron_breaks(series, n_bkps=3):
    algo = rpt.Binseg(model='l2').fit(series.values)
    breaks = algo.predict(n_bkps=n_bkps)
    return breaks

def wavelet_coherence(x, y, dt=1.0):
    import pycwt as wavelet
    WCT, aWCT, coi, freqs, sig = wavelet.wct(x, y, dt)
    phase = np.angle(aWCT)
    return WCT, phase, coi, freqs, sig

In [None]:
for symbol in SYMBOLS_TO_COMPARE:
    print('\n' + '='*60)
    print(f"Testing: {MAIN_SYMBOL} vs {symbol}")
    df_main = load_all_parts_to_df(raw_folder / MAIN_SYMBOL).tail(LAST_CANDLES)
    df_symb = load_all_parts_to_df(raw_folder / symbol).tail(LAST_CANDLES)
    df_main['open_time'] = pd.to_datetime(df_main['open_time'], utc=True, errors='coerce')
    df_symb['open_time'] = pd.to_datetime(df_symb['open_time'], utc=True, errors='coerce')
    df_main = df_main.set_index('open_time').sort_index()
    df_symb = df_symb.set_index('open_time').sort_index()
    common_idx = df_main.index.intersection(df_symb.index)
    df_main = df_main.loc[common_idx]
    df_symb = df_symb.loc[common_idx]

    r_main = compute_returns(df_main).rename(f"{MAIN_SYMBOL}_return")
    r_symb = compute_returns(df_symb).rename(f"{symbol}_return")
    df = pd.concat([r_main, r_symb], axis=1).dropna()
    if df.empty:
        print(f"No overlapping data for {symbol}")
        continue

    pear  = df.iloc[:, 0].corr(df.iloc[:, 1], method='pearson')
    spear = df.iloc[:, 0].corr(df.iloc[:, 1], method='spearman')
    kend  = df.iloc[:, 0].corr(df.iloc[:, 1], method='kendall')
    print(f"Correlations: Pearson={pear:.4f}, Spearman={spear:.4f}, Kendall={kend:.4f}")

    stat, p = mannwhitneyu(df.iloc[:, 0].values, df.iloc[:, 1].values, alternative='two-sided')
    print(f"Mann-Whitney U p={p:.4f}")

    t0 = len(df) // 2
    Fchow, pchow = chow_test(df.reset_index(drop=True), df.columns[0], df.columns[1], t0)
    print(f"Chow test at midpoint: F={Fchow:.4f}, p={pchow:.4f}")

    breaks = bai_perron_breaks(df.iloc[:, 1], n_bkps=3)
    print(f"Detected breakpoints (indices): {breaks}")

    win = min(200, len(df))
    res_wct = wavelet_coherence(df.iloc[-win:, 1].values, df.iloc[-win:, 0].values, dt=1)
    if res_wct is not None:
        WCT, phase, coi, freqs, sig = res_wct
        T = np.arange(WCT.shape[1], dtype=float)
        period = 1.0 / freqs
        
        if WCT.shape != (len(period), len(T)):
            WCT = WCT.T
        plt.figure(figsize=(10, 6))
        cs = plt.contourf(T, np.log2(period), WCT, levels=100)
        
        if sig.shape == WCT.shape:
            plt.contour(T, np.log2(period), sig, levels=[1], colors='white', linewidths=1.2)
        if len(coi) == len(T):
            plt.fill_between(T, np.log2(coi), np.log2(period.max()), color='gray', alpha=0.3)
               
        plt.ylabel("log₂(Period)")
        plt.xlabel("Time")
        plt.title("Wavelet Coherence")
        plt.colorbar(cs, label="Coherence")
        plt.tight_layout()
        plt.show()

    model = VAR(df)
    res = model.fit(maxlags=5)
    if res is None:
        raise RuntimeError("VAR result is not available")

    fevd = res.fevd(10)
    fevd.plot()
    fevd.summary()
    frac_h1 = float(fevd.decomp[0, 1, 0])
    print("FEVD -- fraction at step 1:")
    print(frac_h1)

    irf = res.irf(10)
    irf.plot(orth=False)
    irf.plot_cum_effects(orth=False)
    irf_h1 = float(irf.irfs[0, 1, 0])
    print("IRF -- step 1:")
    print(irf_h1)

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    sns.scatterplot(x=df.columns[0], y=df.columns[1], data=df, s=10)
    plt.title('Scatter of returns')
    plt.subplot(1, 3, 2)
    sns.lineplot(data=df)
    plt.title('Returns time series')
    if len(breaks) > 0:
        rpt.display(df.iloc[:, 1].values, breaks)
        plt.title('Detected breaks')
    else:
        plt.text(0.5, 0.5, 'No ruptures/plots available', ha='center')
        plt.axis('off')
    plt.tight_layout()
    plt.show()
