In [1]:
# %pip install fastcluster
# %pip install joblib

# Cointegration Tests & Pairs Trading

## What is cointegration?
We have seen how a time series can have a unit root that creates a stochastic trend and makes the time series highly persistent. When we use such an integrated time series in their original, rather than in differenced, form as a feature in a linear regression model, its relationship with the outcome will often appear statistically significant, even though it is not. This phenomenon is called spurious regression (for details, see Chapter 18 in [Wooldridge, 2008](https://economics.ut.ac.ir/documents/3030266/14100645/Jeffrey_M._Wooldridge_Introductory_Econometrics_A_Modern_Approach__2012.pdf)). Therefore, the recommended solution is to difference the time series so they become stationary before using them in a model.

However, there is an exception when there are cointegration relationships between the outcome and one or more input variables. To understand the concept of cointegration, let's first remember that the residuals of a regression model are a linear combination of the inputs and the output series.

Usually, the residuals of the regression of one integrated time series on one or more such series yields non-stationary residuals that are also integrated, and thus behave like a random walk. However, for some time series, this is not the case: the regression produces coefficients that yield a linear combination of the time series in the form of the residuals that are stationary, even though the individual series are not. Such time series are
cointegrated.

A non-technical example is that of a drunken man on a random walk accompanied by his dog (on a leash). Both trajectories are non-stationary but cointegrated because the dog will occasionally revert to his owner. In the trading context, arbitrage constraints imply cointegration between spot and futures prices.

In other words, a linear combination of two or more cointegrated series has a stable mean to which this linear combination reverts. This also applies when the individual series are integrated of a higher order and the linear combination reduces the overall order of integration.

Cointegration differs from correlation: two series can be highly correlated but need not be cointegrated. For example, if two growing series are constant multiples of each other, their correlation will be high, but any linear combination will also grow rather than revert to a stable mean.

## Cointegration for Pairs Trading

Cointegration is very useful: if two or more asset price series tend to revert to a common mean, we can leverage deviations from the trend because they should imply future price moves in the opposite direction. The mathematics behind cointegration is more involved, so we will only focus on the practical aspects; for an in-depth treatment, see [Lütkepohl (2005)](https://www.springer.com/gp/book/9783540401728).

In this notebook, we will address how we can identify pairs with such a long-term stationary relationship, estimate the expected time for any disequilibrium to correct, and how to utilize these tools to implement and backtest a long-short pairs trading strategy. There are two approaches to testing for cointegration:
- The Engle-Granger two-step method
- The Johansen test

The book chapter discusses each test in turn; in this notebook we show how they help identify cointegrated securities that tend to revert to a common trend, a fact that we can leverage for a statistical arbitrage
strategy.

## Imports & Settings

In [2]:
# import warnings
# warnings.filterwarnings('ignore')

In [3]:
from time import time
from pathlib import Path
from tqdm import tqdm 

import numpy as np
from numpy.linalg import LinAlgError
import pandas as pd

from sklearn.model_selection import StratifiedKFold, GridSearchCV
from sklearn.metrics import confusion_matrix
from sklearn.tree import  DecisionTreeClassifier
from sklearn.linear_model import LogisticRegressionCV

from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.api import VAR

import matplotlib.pyplot as plt
import seaborn as sns

from joblib import Parallel, delayed

In [4]:
pd.set_option('display.float_format', lambda x: f'{x:,.2f}')

In [5]:
DATA_PATH = Path('..', 'data')
STORE = DATA_PATH / 'assets.h5'

### Johansen Test Critical Values

These critical values for the Johansen trace statistic are sourced from the tables in Osterwald-Lenum (1992), "A Note with Quantiles of the Asymptotic Distribution of the Maximum Likelihood Cointegration Rank Test Statistics," published in the Oxford Bulletin of Economics and Statistics (Volume 54, Issue 3, pages 461-472). They correspond to the case with a constant term but no linear trend (det_order=0 in statsmodels), for a bivariate system (two variables), at the 90%, 95%, and 99% significance levels:

- For testing 0 cointegration relationships (rank 0): 13.4294 (90%), 15.4943 (95%), 19.9349 (99%).
- For testing 1 cointegration relationship (rank 1): 2.7055 (90%), 3.8415 (95%), 6.6349 (99%).

These values are commonly hardcoded or referenced in implementations like statsmodels' `coint_johansen` function, which draws from this paper for systems with fewer variables.

In [6]:
critical_values = {0: {.9: 13.4294, .95: 15.4943, .99: 19.9349},
                   1: {.9: 2.7055, .95: 3.8415, .99: 6.6349}}

In [7]:
trace0_cv = critical_values[0][.95] # critical value for 0 cointegration relationships
trace1_cv = critical_values[1][.95] # critical value for 1 cointegration relationship

## Load & Clean Stock & ETF Data

### Remove highly correlated assets

A function removes highly correlated assets (correlation > 0.99) to avoid redundancy:

It computes the correlation matrix, identifies pairs above the cutoff, and decides which to keep/drop to minimize duplicates.

In [8]:
def remove_correlated_assets(df, cutoff=.99):
    corr = df.corr().stack()
    corr = corr[corr < 1]
    to_check = corr[corr.abs() > cutoff].index
    keep, drop = set(), set()
    for s1, s2 in to_check:
        if s1 not in keep:
            if s2 not in keep:
                keep.add(s1)
                drop.add(s2)
            else:
                drop.add(s1)
        else:
            keep.discard(s2)
            drop.add(s2)
    return df.drop(drop, axis=1)

### Remove stationary series

It runs ADF on each series (with constant and trend) and collects p-values.

A related function removes stationary series (ADF p-value <= 0.05):

In [9]:
# def check_stationarity(df):
#     results = []
#     for ticker, prices in df.items():
#         results.append([ticker, adfuller(prices, regression='ct')[1]])
#     return pd.DataFrame(results, columns=['ticker', 'adf']).sort_values('adf')

def check_stationarity(df):
    def run_adf(ticker, prices):
        return [ticker, adfuller(prices, regression='ct')[1]]

    results = Parallel(n_jobs=-1)(
        delayed(run_adf)(ticker, prices) for ticker, prices in df.items()
    )
    return pd.DataFrame(results, columns=['ticker', 'adf']).sort_values('adf')

In [10]:
def remove_stationary_assets(df, pval=.05):
    test_result = check_stationarity(df)
    stationary = test_result.loc[test_result.adf <= pval, 'ticker'].tolist()
    return df.drop(stationary, axis=1).sort_index()

### Select Assets

In [11]:
with pd.HDFStore(STORE) as store:
    print(store.keys())

['/engineered_features', '/us_equities/stocks', '/stooq/us/nysemkt/stocks/prices', '/stooq/us/nysemkt/stocks/tickers', '/stooq/us/nyse/stocks/prices', '/stooq/us/nyse/stocks/tickers', '/stooq/us/nyse/etfs/prices', '/stooq/us/nyse/etfs/tickers', '/stooq/us/nasdaq/stocks/prices', '/stooq/us/nasdaq/stocks/tickers', '/stooq/us/nasdaq/etfs/prices', '/stooq/us/nasdaq/etfs/tickers', '/stooq/jp/tse/stocks/prices', '/stooq/jp/tse/stocks/tickers', '/sp500/fred', '/sp500/stocks', '/sp500/stooq', '/quandl/wiki/prices', '/quandl/wiki/stocks']


In [12]:
def select_assets(asset_class='stocks', n=500, start=2010, end=2019):
    idx = pd.IndexSlice
    with pd.HDFStore(STORE) as store:
        df = (pd.concat([store[f'stooq/us/nasdaq/{asset_class}/prices'],
                         store[f'stooq/us/nyse/{asset_class}/prices']])
              # stooq download can have duplicate assets
              .loc[lambda df: ~df.index.duplicated()]
              .sort_index()
              .loc[idx[:, f'{start}':f'{end}'], :]
              .assign(dv=lambda df: df.close.mul(df.volume)))

    # select n assets with the highest average trading volume
    # we are taking a shortcut to simplify; should select
    # based on historical only, e.g. yearly rolling avg
    most_traded = (df.groupby(level='ticker')
                   .dv.mean()
                   .nlargest(n=n).index)

    df = (df.loc[idx[most_traded, :], 'close']
          .unstack('ticker')
          .ffill(limit=5)  # fill up to five values
          .dropna(axis=1))  # remove assets with any missing values

    df = remove_correlated_assets(df)
    return remove_stationary_assets(df).sort_index()

In [13]:
with pd.HDFStore(STORE) as store:
    print(store.keys())

['/engineered_features', '/us_equities/stocks', '/stooq/us/nysemkt/stocks/prices', '/stooq/us/nysemkt/stocks/tickers', '/stooq/us/nyse/stocks/prices', '/stooq/us/nyse/stocks/tickers', '/stooq/us/nyse/etfs/prices', '/stooq/us/nyse/etfs/tickers', '/stooq/us/nasdaq/stocks/prices', '/stooq/us/nasdaq/stocks/tickers', '/stooq/us/nasdaq/etfs/prices', '/stooq/us/nasdaq/etfs/tickers', '/stooq/jp/tse/stocks/prices', '/stooq/jp/tse/stocks/tickers', '/sp500/fred', '/sp500/stocks', '/sp500/stooq', '/quandl/wiki/prices', '/quandl/wiki/stocks']


We store the intermediate result:

In [None]:
for asset_class, n in [('etfs', 500), ('stocks', 250)]:
    df = select_assets(asset_class=asset_class, n=n)
    df.to_hdf('data.h5', f'{asset_class}/close')

  df.to_hdf('data.h5', f'{asset_class}/close')


### Get ticker dictionary

In [None]:
def get_ticker_dict():
    with pd.HDFStore(STORE) as store:
        return (pd.concat([
            store['stooq/us/nyse/stocks/tickers'],
            store['stooq/us/nyse/etfs/tickers'],
            store['stooq/us/nasdaq/etfs/tickers'],
            store['stooq/us/nasdaq/stocks/tickers']
        ]).drop_duplicates().set_index('ticker').squeeze().to_dict())

In [None]:
names = get_ticker_dict()

## Visualize Correlation Clusters 

Reload intermediate results:

In [None]:
stocks = pd.read_hdf('data.h5', 'stocks/close')
stocks.info()

In [None]:
etfs = pd.read_hdf('data.h5', 'etfs/close')
etfs.info()

In [None]:
tickers = {k: v for k, v in names.items() if k in etfs.columns.union(stocks.columns)}
pd.Series(tickers).to_hdf('data.h5', 'tickers')

The correlations in the matrix (`corr`) are computed using the Pearson correlation coefficient (the default for pandas' `.corrwith()` method) between the time series of closing prices for each stock (columns in `stocks`) and each ETF (columns in `etfs`). This measures the linear relationship between the raw price levels over the shared time period (2010-2019 in the dataset), without any transformations like differencing or normalization applied at this stage.

In [None]:
corr_list = []
for etf, data in etfs.items():
    corr_list.append(stocks.corrwith(data).rename(etf))
corr = pd.concat(corr_list, axis=1)
corr.index = stocks.columns  # Ensure index is stock tickers if needed

In [None]:
corr.info()
print(corr.shape)
corr

The clustermap generated by this code visualizes a correlation matrix (`corr`) as a heatmap with added hierarchical clustering, making it easier to spot patterns, groups, and relationships in the data. Here's a step-by-step guide to reading and interpreting it:

### 1. **Understand the Overall Structure**
   - **Central Heatmap**: This is the main grid of colored cells. Each row represents a stock (from the index of `corr`), and each column represents an ETF (from the columns of `corr`). The color of each cell shows the correlation value between that specific stock and ETF.
     - Correlations range from -1 (strong negative) to +1 (strong positive).
     - The map is not in the original order of `corr`; rows and columns are reordered based on clustering to group similar items together.
   - **Dendrograms (Tree-like Structures)**:
     - **Left Side**: Shows the hierarchical clustering of rows (stocks). It illustrates how stocks are grouped based on the similarity of their correlation patterns with all ETFs.
     - **Top Side**: Shows the hierarchical clustering of columns (ETFs). It groups ETFs based on how similarly they correlate with all stocks.
     - These are like upside-down trees: branches represent merges of similar groups, with the height of each merge indicating dissimilarity (taller branches mean less similar items merged later).
   - **No Colorbar by Default**: Seaborn often adds a colorbar legend on the right, showing the mapping from colors to numerical values (e.g., -1 to +1). If it's missing in your output, you can add `fig.colorbar()` in code to include it.

### 2. **Interpret the Colors (Heatmap Values)**
   - The colormap (`cmap`) is a diverging palette created with `sns.diverging_palette(220, 10, as_cmap=True)`:
     - Hue 220 is typically blue/cool tones for negative values.
     - Hue 10 is typically red/warm tones for positive values.
     - `center=0` sets the midpoint to neutral (likely white or light gray), so:
       - **Blue shades**: Negative correlations (e.g., dark blue for ≈ -1, meaning assets move in opposite directions).
       - **Red shades**: Positive correlations (e.g., dark red for ≈ +1, meaning assets move together).
       - **White/Light Gray**: Near-zero correlations (no strong relationship).
     - Intensity/depth of color indicates strength: darker/more saturated = stronger correlation (closer to ±1); lighter = weaker (closer to 0).
   - Scan for patterns: Blocks of similar colors suggest clusters where groups of stocks and ETFs have consistent correlation behaviors (e.g., a red block means a group of positively correlated assets).

### 3. **Read the Clustering (Dendrograms)**
   - **How Clustering Works**: The map uses hierarchical clustering (default: average linkage with Euclidean distance). It measures dissimilarity between rows/columns based on their vectors of correlation values, then groups them step-by-step.
     - Short branches/low merge points: Highly similar items (e.g., stocks with nearly identical correlation profiles to ETFs).
     - Long branches/high merge points: More dissimilar items, merged later.
   - **Identify Clusters**:
     - Trace from the leaves (individual labels) up the dendrogram to find merge points. A "cluster" is a subtree where items merge early (low height).
     - For example, if several stocks merge into a small branch on the left dendrogram, they form a cluster—meaning those stocks have similar correlation patterns across all ETFs (e.g., they might all be tech stocks correlating highly with tech ETFs).
     - Similarly for ETFs on top: Clustered ETFs might represent similar sectors (e.g., energy ETFs grouping together).
     - The reordering aligns similar clusters near each other in the heatmap, creating visible "blocks" of high/low correlations.
   - **Cluster Size and Hierarchy**: Larger clusters (many leaves under one high branch) indicate broad similarities; smaller ones show niche groups. You can mentally "cut" the dendrogram at a certain height to define cluster boundaries (e.g., cut low for many small clusters, high for fewer large ones).

### 4. **Spot Insights and Patterns**
   - **Similar Assets**: Look for dense red blocks—these highlight groups of stocks and ETFs that move together (potential for sector analysis or diversification risks).
   - **Opposites**: Blue blocks indicate inverse relationships (useful for hedging in trading).
   - **Outliers**: Isolated branches or rows/columns with unique color patterns suggest assets that don't fit well with others.
   - **Overall Trends**: If the map shows mostly red, correlations are generally positive (common in markets). Fragmented clusters might indicate diverse behaviors.
   - This visualization helps identify "clusters of similar assets" as noted—e.g., stocks clustering with certain ETFs might share industries or risk factors.

### 5. **Tips for Interaction/Exploration**
   - **Zoom In**: If the plot is crowded (many stocks/ETFs), hover over cells in an interactive environment (e.g., Jupyter) for exact values, or subset `corr` before plotting.
   - **Customize for Clarity**: To make it easier to read, you could modify the code:
     - Add labels: `sns.clustermap(corr, cmap=cmap, center=0, xticklabels=True, yticklabels=True)`
     - Change clustering: `method='single'` or `metric='correlation'` for different groupings.
     - Save/Export: `fig = sns.clustermap(...); fig.savefig('clustermap.png')`
   - **Limitations**: Clustering is sensitive to parameters; it's exploratory, not definitive. Correlations don't imply causation, and results depend on the data period.

If the plot isn't displaying or you need a specific example from running the code, provide more details about your environment!

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt  # Ensure this is imported

cmap = sns.diverging_palette(220, 10, as_cmap=True)
fig = sns.clustermap(corr, cmap=cmap, center=0, xticklabels=True, yticklabels=True, figsize=(20, 20))  # Keep larger size for visibility

# Make x and y tick labels smaller (adjust labelsize as needed, e.g., 6 for very small)
fig.ax_heatmap.tick_params(axis='x', labelsize=8)
fig.ax_heatmap.tick_params(axis='y', labelsize=8)

# Optional: Rotate x labels for better fit if still crowded
fig.ax_heatmap.set_xticklabels(fig.ax_heatmap.get_xticklabels(), rotation=90)

# Save if needed for zooming
fig.savefig('clustermap_small_labels.png', dpi=300, bbox_inches='tight')

## Candidate Selection using Heuristics

### Computational Complexity: Comparing running times

In this section, we compare the running times of various cointegration tests. More specifically, we are running tests for a single asset vs. the remaining set of securities.

#### Prepare Data

In [None]:
stocks.info()
print(stocks.shape)
stocks

In [None]:
etfs.info()
print(etfs.shape)
etfs

In [None]:
# etfs.iloc[:,].plot(figsize=(20, 6))
# plt.show()

2011-02-17  data corrupted

In [None]:
security = etfs['AAXJ.US'].loc['2012': '2015']
candidates = stocks.loc['2012': '2015']
# candidates = stocks.loc['2011-02-17': '2011-02-28']

In [None]:
security.plot(figsize=(20, 6))
plt.show()

#### 1. Normalization of Price Series
```python
security = security.div(security.iloc[0])
candidates = candidates.div(candidates.iloc[0])
```
- **What it does**: Normalizes both the `security` (e.g., ETF prices) and `candidates` (e.g., stock prices) by dividing each series by its first value (`iloc[0]`). This scales all series to start at 1, making them comparable regardless of absolute price levels.
- **Why?**: In pairs trading, we care about relative movements, not absolute prices. Normalization helps compute meaningful spreads and correlations (e.g., avoiding bias from high-priced vs. low-priced assets).
- **Details**: `security` is a Series (one asset's prices over time). `candidates` is a DataFrame (multiple assets' prices, same time index). `ticker` extracts the name (e.g., ETF ticker) for later use in results.
- **Context from markdown**: This aligns with the "distance approach," where normalized prices are used to compute correlations or spreads for identifying comoving pairs.

In [None]:
print(security.iloc[0])
print(security)
security = security.div(security.iloc[0])
print("After scales all series to start at 1, making them comparable regardless of absolute price levels.")
print(security.describe())
security

In [None]:
candidates.iloc[0]
candidates.head()

In [None]:
candidates = candidates.div(candidates.iloc[0])
candidates

#### 2. Compute Spreads

**Purpose:** 
- In pairs trading, the spread represents the relative deviation between two assets. For cointegrated pairs, the spread should be stationary (mean-reverting) rather than trending or random-walking. Low drift and volatility in the spread are heuristics for potential cointegration.

**How to read/understand the result:**

- Print spreads to see the DataFrame: Index is dates, columns are stock tickers, values are differences (e.g., 0.05 means the stock is 5% above the security at that date, assuming normalization).
- If the spread hovers around zero with small fluctuations, it suggests mean-reversion (good for trading). If it trends up/down, it has drift (less desirable).
- Example: If a spread starts near 0 (due to normalization) and ends at 0.2, the pair diverged; if it oscillates and returns to 0, it's potentially cointegrated.

In [None]:
spreads = candidates.sub(security, axis=0)
spreads

In [None]:
def plot_spread(spreads, security, ticker_index):
    # Create the figure
    fig = plt.figure()
    fig.set_figwidth(10)
    fig.set_figheight(3)

    # Plot the specified column of the spreads DataFrame
    plt.plot(spreads.index, spreads.iloc[:, ticker_index], label=f'{spreads.iloc[:, ticker_index].name}')

    # Draw a horizontal line at y=0
    plt.axhline(y=0, color='red', linestyle='--', label='y=0')

    # Add labels and legend for better visualization
    plt.xlabel('Date')
    plt.ylabel('Value')
    plt.title(f'{spreads.iloc[:, ticker_index].name} ({ticker_index}) Spread from Security {security.name}')
    plt.legend()
    plt.show()
    
mean_reverting_list = [14,17,27,42,77,106]
drift_list = [1,3,23,73,98,81]

# for i in range(spreads.shape[1]):
#     plot_spread(spreads, security, i)

for i in mean_reverting_list:
    plot_spread(spreads, security, i)
    
for i in drift_list:
    plot_spread(spreads, security, i)

In [None]:
n, m = spreads.shape
print(n,m)

#### 3. Set Up Design Matrix for Drift Calculation.

- **What it is**: `X` is a numpy array (n rows x 2 columns) acting as the design matrix for ordinary least squares (OLS) regression.
  - Column 0: All 1s (intercept term).
  - Column 1: A time trend from 1 to n (linear sequence over the dates).
  - `n` is the number of time points (rows in `spreads`).
- **Purpose**: This sets up a simple linear regression model: spread_t = β0 + β1 * t + ε_t, where t is time. The slope β1 estimates the "drift" (average daily change in the spread). Low absolute drift indicates a stable, non-trending spread (heuristic for cointegration).
- **Output/Result**: A 2D array like `[[1, 1], [1, 2], ..., [1, n]]`.
- **How to read/understand the result**:
  - This isn't directly outputted, but it's used in the timed OLS below. The full regression (not shown here but in the full code) extracts β1 as drift for each spread column.
  - Interpret drift: If β1 ≈ 0, the spread is stable (good). If β1 = 0.001, the spread widens by 0.1% per period on average (bad for mean-reversion trading).
  - In context: Pairs with low |drift| (e.g., <0.0001) are prioritized for further testing.


In [None]:
X = np.ones(shape=(n, 2))
X[:, 1] = np.arange(1, n+1)

print(len(X))
X

#### Heuristics

#### 4. Compute Drift

Computes the ordinary least squares (OLS) regression coefficients for a simple linear regression model fitted independently to each column of the spreads DataFrame. It uses matrix algebra to solve for the coefficients in a vectorized way (efficient for multiple regressions at once, like across m candidate spreads).

2. **Derivation of the OLS Estimator**:
    - The least squares solution comes from setting the derivative of the error to zero (normal equations):  $X^T X \beta = X^T y$ .
    - Solve for β: Multiply both sides by the inverse of  $X^T X$ :

$\hat{\beta} = (X^T X)^{-1} X^T y$

https://www.youtube.com/watch?v=NN7mBupK-8o

In [None]:
# %%timeit
# np.linalg.inv(X.T @ X) @ X.T @ spreads

In [None]:
# Compute OLS coefficients
drift_result = np.linalg.inv(X.T @ X) @ X.T @ spreads

# Time the operation (mimicking %%timeit output)
import time
start = time.time()
np.linalg.inv(X.T @ X) @ X.T @ spreads
elapsed = time.time() - start
print(f"\nExecution time: {elapsed*1e6:.2f} µs")

# Print result
print("OLS Coefficients (β₀, β₁) for each spread:")
drift_result

#### 5. Compute Volatility

In [None]:
# %%timeit
# spreads.std()

In [None]:
# Compute standard deviation
vol_result = spreads.std()

# Time the operation
start = time.time()
spreads.std()
elapsed = time.time() - start
print(f"\nExecution time: {elapsed*1e6:.2f} µs")

# Print result
print("Standard Deviation (Volatility) of each spread:")
vol_result

In [None]:
# %%timeit
# candidates.corrwith(security)

In [None]:
# Compute correlation
corr_result = candidates.corrwith(security)

# Time the operation
start = time.time()
candidates.corrwith(security)
elapsed = time.time() - start
print(f"\nExecution time: {elapsed*1e6:.2f} µs")

# Print result
print("Correlation between security and each candidate:")
corr_result

In [None]:
# Transpose drift_result to shape (n, 2) and convert to DataFrame
drift_df = pd.DataFrame(drift_result.T.values, columns=['beta0', 'beta1'], index=vol_result.index)
combined_df = drift_df.assign(vol=vol_result, corr=corr_result)
combined_df


#### Cointegration Tests

2m 1.7s

In [None]:
# %%timeit
# for candidate, prices in candidates.items():
#     df = pd.DataFrame({'s1': security,
#                        's2': prices})
#     var = VAR(df.values)
#     lags = var.select_order()
#     k_ar_diff = lags.selected_orders['aic']
#     coint_johansen(df, det_order=0, k_ar_diff=k_ar_diff)
#     coint(security, prices, trend='c')[:2]
#     coint(prices, security, trend='c')[:2]

4.1s

In [None]:
from joblib import Parallel, delayed

def process_candidate(candidate, prices, security):
    # Step 1: Create a DataFrame for the two series.
    # - 's1' is the security (reference ETF), 's2' is the candidate stock.
    # - This formats the data for vector-based models like VAR and Johansen.
    df = pd.DataFrame({'s1': security, 's2': prices})
    
    # Step 2: Fit a Vector Autoregression (VAR) model on the raw values.
    # - VAR models the joint dynamics of the two series to determine dependencies.
    # - Uses `df.values` (NumPy array) for efficiency.
    # - Context: VAR is a precursor to VECM for cointegration; it's used here to select lags empirically.
    var = VAR(df.values)
    
    # Step 3: Select optimal lag order for the VAR model.
    # - `select_order()` computes information criteria (AIC, BIC, etc.) for different lags.
    # - Context: Lag selection is crucial for accurate cointegration testing; too few/many lags can bias results.
    lags = var.select_order()
    
    # Step 4: Extract the AIC-selected lag order (k_ar_diff).
    # - 'aic' is chosen as the criterion (Akaike Information Criterion balances fit and complexity).
    # - This lag is used in the Johansen test to account for autocorrelation in residuals.
    # - Context: In the full code, this avoids arbitrary lag choices; Johansen requires this for reliable trace statistics.
    k_ar_diff = lags.selected_orders['aic']
    
    # Step 5: Run the Johansen cointegration test.
    # - `coint_johansen(df, det_order=0, k_ar_diff=k_ar_diff)`:
    #   - `df`: The two-series DataFrame.
    #   - `det_order=0`: No deterministic trend (constant or linear) in the cointegrating relation (simple model).
    #   - `k_ar_diff`: The VAR-selected lags for differencing.
    # - Returns a `JohansenTestResult` object with eigenvalues, trace statistics, etc.
    # - Context: Johansen is the primary test here; it can detect up to 1 cointegrating vector (for two series). Later, you'd compare trace stats to critical values (e.g., `trace0_cv` > stat rejects no cointegration). This is more robust than Engle-Granger for small samples or multiple relations.
    cj = coint_johansen(df, det_order=0, k_ar_diff=k_ar_diff)
    
    # Step 6: Run Engle-Granger test with security as dependent variable.
    # - `coint(security, prices, trend='c')`: Tests if security ~ prices + constant (trend='c' includes intercept).
    # - Slices `[:2]` to get only the test statistic and p-value (ignores critical values).
    # - Context: Engle-Granger is a residual-based test: Regress one on the other, then check if residuals are stationary (via ADF). Running it shows if the relationship is stronger in one direction.
    eg1 = coint(security, prices, trend='c')[:2]
    
    # Step 7: Run Engle-Granger test with candidate as dependent variable.
    # - Same as above, but reversed: prices ~ security + constant.
    # - Context: Cointegration should hold regardless of order, but asymmetry can indicate lead-lag effects (useful for trading signals).
    eg2 = coint(prices, security, trend='c')[:2]
    
    # Step 8: Return results as a tuple.
    # - Includes the candidate ticker for identification in parallel results.
    # - Context: These are aggregated in `results_df` for post-processing (e.g., filter p-values < 0.05 or trace > critical values).
    return candidate, cj, eg1, eg2

results = Parallel(n_jobs=-1)(
    delayed(process_candidate)(candidate, prices, security) for candidate, prices in candidates.items()
)

results_df = pd.DataFrame(results, columns=['candidate', 'johansen', 'eg1', 'eg2']).set_index('candidate')
results_df

Clearly, cointegration tests are significantly more costly. It would be great if the heuristics worked just as well, or at least 'good enough'.

### Compute Heuristics

The function `compute_pair_metrics()` computes the following distance metrics for over 23,000 pairs of
stocks and Exchange Traded Funds (ETFs) for 2010-14 and 2015-19:

- The **drift of the spread**, defined as a linear regression of a time trend on the spread
- The spread's  **volatility**
- The **correlations** between the normalized price series and between their returns

Low drift and volatility, as well as high correlation, are simple proxies for cointegration. 

To evaluate the predictive power of these heuristics, we also run Engle-Granger and Johansen **cointegration tests** using `statsmodels` for the preceding pairs. This takes place in the loop in the second half of `compute_pair_metrics()`.

We first estimate the optimal number of lags that we need to specify for the Johansen test. For both tests, we assume that the cointegrated series (the spread) may have an intercept different from zero but no trend:

6m 40s

In [None]:
from joblib import Parallel, delayed

def compute_pair_metrics(security, candidates):
    # `security` is a Series (one asset's prices over time).
    security = security.div(security.iloc[0])
    # `ticker` extracts the name (e.g., ETF ticker) for later use in results.
    ticker = security.name 
    # `candidates` is a DataFrame (multiple assets' prices, same time index).
    candidates = candidates.div(candidates.iloc[0])
    spreads = candidates.sub(security, axis=0)
    n, m = spreads.shape
    X = np.ones(shape=(n, 2))
    X[:, 1] = np.arange(1, n + 1)
    
    # Compute drift (vectorized)
    # What it does: Performs vectorized OLS regression of each spread column on the time trend (X). 
    # Computes the coefficient matrix using the normal equation: β = (X'X)^(-1) X'y, where y is spreads. 
    # Takes the second row (iloc[1]) as the slope (drift), converts to a DataFrame column named 'drift'.
    # Why?: Drift measures how much the spread trends over time (e.g., positive drift means widening). 
    # Low absolute drift proxies for a stable, mean-reverting relationship (heuristic for cointegration).
    # Details: @ is matrix multiplication. inv computes inverse. This is efficient for all m candidates 
    # at once (vectorized over columns). Index matches candidate tickers.
    # Context: Markdown notes low drift as a proxy; it's part of screening to reduce candidates before expensive tests.
    drift = ((np.linalg.inv(X.T @ X) @ X.T @ spreads).iloc[1].to_frame('drift'))
    
    # Compute volatility (vectorized)
    # What it does: Computes the standard deviation of each spread column, as a Series converted to DataFrame 'vol'.
    # Why?: Volatility of the spread indicates trading opportunities—higher variance means more frequent/wider divergences for profits, 
    # but too high might signal instability. Low vol (with low drift) heuristics for cointegration.
    # Details: std() defaults to sample std (ddof=1). Index: candidate tickers.
    # Context: Empirical studies (e.g., Huck and Afawubo 2015) show cointegrated pairs have ~2x higher spread volatility
    # than distance-based pairs, making this a key screener.
    vol = spreads.std().to_frame('vol')
    
    # Return correlation (vectorized)
    # What it does: Computes percentage changes (returns) for candidates and security, 
    # then pairwise correlations of each candidate's returns with the security's returns.
    # Why?: High return correlation indicates comovement on a daily basis, a distance heuristic. 
    # But as markdown notes, correlation ≠ cointegration (e.g., correlated trends without mean-reversion).
    # Details: pct_change() is (price_t / price_{t-1} - 1). corrwith uses Pearson correlation. Handles NaNs from first row.
    corr_ret = (candidates.pct_change().corrwith(security.pct_change()).to_frame('corr_ret'))
    
    # Normalized price series correlation (vectorized)
    corr = candidates.corrwith(security).to_frame('corr')
    metrics = drift.join(vol).join(corr).join(corr_ret).assign(n=n)
    
    # Parallelize the slow per-candidate tests
    def process_candidate(candidate, prices):
        df = pd.DataFrame({'s1': security, 's2': prices})
        var = VAR(df.values)
        lags = var.select_order()  # This is still per-pair; consider fixing if too slow
        k_ar_diff = lags.selected_orders['aic']
        cj0 = coint_johansen(df, det_order=0, k_ar_diff=k_ar_diff)
        t1, p1 = coint(security, prices, trend='c')[:2]
        t2, p2 = coint(prices, security, trend='c')[:2]
        return [ticker, candidate, t1, p1, t2, p2, k_ar_diff, *cj0.lr1]
    
    # Run in parallel (use n_jobs=8-12 if all cores overheat; -1 uses all)
    tests = Parallel(n_jobs=-1)(
        delayed(process_candidate)(candidate, prices) 
        for candidate, prices in candidates.items()
    )
    
    columns = ['s1', 's2', 't1', 'p1', 't2', 'p2', 'k_ar_diff', 'trace0', 'trace1']
    tests = pd.DataFrame(tests, columns=columns).set_index('s2')
    return metrics.join(tests)


In [None]:
# Outer loop remains the same, but now inner is parallelized
spreads = []
start = 2010
stop = 2019
etf_candidates = etfs.loc[str(start): str(stop), :]
stock_candidates = stocks.loc[str(start): str(stop), :]
s = time.time()
for i, (etf_ticker, etf_prices) in enumerate(etf_candidates.items(), 1):
    df = compute_pair_metrics(etf_prices, stock_candidates)
    spreads.append(df.set_index('s1', append=True))
    if i % 10 == 0:
        print(f'\n{i:>3} {time.time() - s:.1f}\n')
        s = time.time()

In [None]:
# def compute_pair_metrics(security, candidates):
#     security = security.div(security.iloc[0])
#     ticker = security.name
#     candidates = candidates.div(candidates.iloc[0])
#     spreads = candidates.sub(security, axis=0)
#     n, m = spreads.shape
#     X = np.ones(shape=(n, 2))
#     X[:, 1] = np.arange(1, n + 1)
    
#     # compute drift
#     drift = ((np.linalg.inv(X.T @ X) @ X.T @ spreads).iloc[1]
#              .to_frame('drift'))
    
#     # compute volatility
#     vol = spreads.std().to_frame('vol')
    
#     # return correlation
#     corr_ret = (candidates.pct_change()
#                 .corrwith(security.pct_change())
#                 .to_frame('corr_ret'))
    
#     # normalized price series correlation
#     corr = candidates.corrwith(security).to_frame('corr')
#     metrics = drift.join(vol).join(corr).join(corr_ret).assign(n=n)
    
#     tests = []
#     # run cointegration tests
#     for candidate, prices in tqdm(candidates.items()):
#         df = pd.DataFrame({'s1': security, 's2': prices})
#         var = VAR(df.values)
#         lags = var.select_order() # select VAR order
#         k_ar_diff = lags.selected_orders['aic']
#         # Johansen Test with constant Term and estd. lag order
#         cj0 = coint_johansen(df, det_order=0, k_ar_diff=k_ar_diff)
#         # Engle-Granger Tests
#         t1, p1 = coint(security, prices, trend='c')[:2]
#         t2, p2 = coint(prices, security, trend='c')[:2]
#         tests.append([ticker, candidate, t1, p1, t2, p2, 
#                       k_ar_diff, *cj0.lr1])
#     columns = ['s1', 's2', 't1', 'p1', 't2', 'p2', 'k_ar_diff', 'trace0', 'trace1']
#     tests = pd.DataFrame(tests, columns=columns).set_index('s2')
#     return metrics.join(tests)

In [None]:
# spreads = []
# start = 2010
# stop = 2019
# etf_candidates = etfs.loc[str(start): str(stop), :]
# stock_candidates = stocks.loc[str(start): str(stop), :]
# s = time()
# for i, (etf_ticker, etf_prices) in enumerate(etf_candidates.items(), 1):
#     df = compute_pair_metrics(etf_prices, stock_candidates)
#     spreads.append(df.set_index('s1', append=True))
#     if i % 10 == 0:
#         print(f'\n{i:>3} {time() - s:.1f}\n')
#         s = time()

In [None]:
names = get_ticker_dict()
spreads = pd.concat(spreads)
spreads.index.names = ['s2', 's1']
spreads = spreads.swaplevel()
spreads['name1'] = spreads.index.get_level_values('s1').map(names)
spreads['name2'] = spreads.index.get_level_values('s2').map(names)

In [None]:
spreads['t'] = spreads[['t1', 't2']].min(axis=1)
spreads['p'] = spreads[['p1', 'p2']].min(axis=1)

### Engle-Granger vs Johansen: how do their findings compare?

To check for the significance of the cointegration tests, we compare the Johansen trace statistic for rank 0 and 1 to their respective critical values and obtain the Engle-Granger p-value.

We follow the recommendation by Gonzalo and Lee (1998) to apply both tests and accept pairs where they agree. The authors suggest additional due diligence in case of disagreement, which we are going to skip.

In [None]:
spreads['trace_sig'] = ((spreads.trace0 > trace0_cv) &
                        (spreads.trace1 > trace1_cv)).astype(int)
spreads['eg_sig'] = (spreads.p < .05).astype(int)

For the over 46,000 pairs across both sample periods, the Johansen test considers 3.2 percent of the relationships as significant, while the Engle-Granger considers 6.5 percent. They agree on 366 pairs (results may change with new data downloaded from stooq).

In [None]:
pd.crosstab(spreads.eg_sig, spreads.trace_sig)

In [None]:
spreads['coint'] = (spreads.trace_sig & spreads.eg_sig).astype(int)

In [None]:
spreads.info()

In [None]:
spreads = spreads.reset_index()
spreads

In [None]:
sns.scatterplot(x=np.log1p(spreads.t.abs()), 
                y=np.log1p(spreads.trace1), 
                hue='coint', data=spreads[spreads.trace0>trace0_cv]);

In [None]:
spreads.to_hdf('heuristics.h5', 'spreads')

In [None]:
spreads = pd.read_hdf('heuristics.h5', 'spreads')

### Evaluate Heuristics

In [None]:
spreads.drift = spreads.drift.abs()

In [None]:
pd.crosstab(spreads.eg_sig, spreads.trace_sig)

In [None]:
pd.set_option('display.float_format', lambda x: f'{x:.2%}')
pd.crosstab(spreads.eg_sig, spreads.trace_sig, normalize=True)

In [None]:
fig, axes = plt.subplots(ncols=4, figsize=(20, 5))
for i, heuristic in enumerate(['drift', 'vol', 'corr', 'corr_ret']):
    sns.boxplot(x='coint', y=heuristic, data=spreads, ax=axes[i])
fig.tight_layout();

### How well do the heuristics predict significant cointegration?

When we compare the distributions of the heuristics for series that are cointegrated according to both tests with the remainder that is not, volatility and drift are indeed lower (in absolute terms). Figure 9.14 shows that the picture is less clear for the two correlation measures:

In [None]:
spreads.groupby(spreads.coint)[['drift', 'vol', 'corr']].describe().stack(level=0).swaplevel().sort_index()

In [None]:
spreads.coint.value_counts()

#### Logistic Regression

To evaluate the predictive accuracy of the heuristics, we first run a logistic regression model with these features to predict significant cointegration. It achieves an area-under-the-curve (AUC) cross-validation score of 0.815; excluding the correlation metrics, it still scores 0.804. A decision tree does slightly better at AUC=0.821, with or without the correlation features.

In [None]:
y = spreads.coint
X = spreads[['drift', 'vol', 'corr', 'corr_ret']]
# X = spreads[['drift', 'vol']]

In [None]:
kf = StratifiedKFold(n_splits=5, shuffle=True)

In [None]:
log_reg = LogisticRegressionCV(Cs=np.logspace(-10, 10, 21), 
                               class_weight='balanced',
                               scoring='roc_auc')

In [None]:
log_reg.fit(X=X, y=y)
Cs = log_reg.Cs_
scores = pd.DataFrame(log_reg.scores_[True], columns=Cs).mean()
scores.plot(logx=True);
f'C:{np.log10(scores.idxmax()):.2f}, AUC: {scores.max():.2%}'

In [None]:
log_reg.coef_

In [None]:
y_pred = log_reg.predict_proba(X)[:, 1]
confusion_matrix(y_true=spreads.coint, y_pred=(y_pred>.5))

In [None]:
spreads.assign(y_pred=log_reg.predict_proba(X)[:, 1]).groupby(spreads.coint).y_pred.describe()

Not least due to the strong class imbalance, there are large numbers of false positives:
correctly identifying 80 percent of the 366 cointegrated pairs implies over 16,500 false positives, but eliminates almost 30,000 of the candidates. See the notebook cointegration_
tests for additional detail.

The **key takeaway** is that distance heuristics can help screen a large universe more  efficiently, but this comes at a cost of missing some cointegrated pairs and still requires
substantial testing.

#### Decision Tree Classifier

In [None]:
model = DecisionTreeClassifier(class_weight='balanced')
decision_tree = GridSearchCV(model,
                             param_grid={'max_depth': list(range(1, 10))},
                             cv=5,
                             scoring='roc_auc')

In [None]:
decision_tree.fit(X=X, y=y)

In [None]:
f'{decision_tree.best_score_:.2%}, Depth: {decision_tree.best_params_["max_depth"]}'

In [None]:
pd.Series(data=decision_tree.best_estimator_.feature_importances_, 
          index=X.columns).sort_values().plot.barh(title='Feature Importance')
sns.despine();

In [None]:
spreads.assign(y_pred=decision_tree.predict_proba(X)[:, 1]).groupby(spreads.coint).y_pred.describe()

In [None]:
sns.catplot(x='coint', 
            y='y_pred', 
            data=spreads.assign(y_pred=decision_tree.predict_proba(X)[:, 1]), 
            kind='box');