In [1]:
import numpy as np
import pandas as pd
import statsmodels.api as st
import itertools as it

In [2]:
def stationarity_prices_test(data: pd.DataFrame, alpha: float = 0.05) -> tuple[list, list]:
    # Define tickers by sector
    # 1. Information Technology
    tech = ["AAPL", "MSFT", "NVDA", "AVGO", "ADBE",
            "INTC", "CSCO", "CRM", "TXN", "AMD"]

    # 2. Health Care
    health = ["UNH", "JNJ", "LLY", "MRK", "MCK",
              "PFE", "TMO", "BMY", "DHR", "GILD"]

    # 3. Financials
    financial = ["JPM", "BAC", "V", "MS", "GS",
                 "SCHW", "BLK", "SPGI", "AXP", "CB"]

    # 4. Consumer Discretionary
    consumer = ["AMZN", "TSLA", "HD", "MCD", "NKE",
                "SBUX", "LOW", "BKNG", "TJX", "CMG"]

    # 5. Communication Services
    communication = ["GOOGL", "GOOG", "LYV", "NFLX", "DIS",
                     "CMCSA", "VZ", "T", "TMUS", "EA"]

    # Combine all tickers into a single list
    tickers = tech + health + financial + consumer + communication

    # Combine industies names
    industries = [tech, health, financial, consumer, communication]

    data = data.copy()
    # H0: The series are non-stationary
    # H1: The series are stationary

    non_stacionary_tickers = []
    for ticker in tickers:
        result = st.tsa.stattools.adfuller(data[ticker])
        p_value = result[1]
        if p_value < alpha:
            continue  # Series is stationary
        else:
            non_stacionary_tickers.append(ticker)  # Series is non-stationary

    return non_stacionary_tickers, industries


def create_combinations(tickers_list: list[str]) -> pd.DataFrame:
    return pd.DataFrame(it.combinations(tickers_list, 2), columns=["Ticker1", "Ticker2"])


def get_combinations(combinations: pd.DataFrame, combination_nbr: int) -> tuple[str, str]:
    ticker1 = combinations.iloc[combination_nbr].values[0]
    ticker2 = combinations.iloc[combination_nbr].values[1]
    return ticker1, ticker2


def stationarity_residuals_test(industries: list, combinations: pd.DataFrame, data: pd.DataFrame, alpha: float = 0.05) -> pd.DataFrame:
    data = data.copy()
    pairs = []

    for i in range(len(combinations)):
        ticker1, ticker2 = get_combinations(combinations, i)

        # Check if both tickers belong to the same industry
        for industry in industries:
            if ticker1 in industry and ticker2 in industry:
                regresion = data[[ticker1, ticker2]].dropna()

                # Compute correlation
                rt = regresion.pct_change().dropna()
                rolling_corr = rt.rolling(window=126).corr().dropna()
                mean_corr = rolling_corr.iloc[::2][ticker2].mean()

                # Define X and y for linear regression
                X = st.add_constant(regresion[ticker1])
                y = regresion[ticker2]

                # Get regression and residuals
                model = st.OLS(y, X).fit()
                residuals = model.resid

                # ADF test on residuals
                result = st.tsa.stattools.adfuller(residuals)
                p_value = result[1]

                # Tickers are cointegrated if residuals are stationary
                if p_value < alpha and mean_corr > 0.6:
                    # Series is stationary
                    pairs.append([ticker1, ticker2, p_value, mean_corr])
                break

    return pd.DataFrame(pairs, columns=["Ticker1", "Ticker2", "p-value", "Mean Rolling Corr"]).sort_values(by="p-value", ascending=True).reset_index(drop=True)

def split_data(data: pd.DataFrame, train_size: float = 0.6) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Split the DataFrame into training and testing sets based on the given train size ratio.

    Parameters:
        data (pd.DataFrame): The DataFrame to be split.
        train_size (float): The proportion of the data to be used for training (default is 0.6).

    Returns:
        tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: A tuple containing the training, testing, and validation DataFrames.
    """

    data = data.copy()

    # Calculate split indices
    train_size = int(len(data) * train_size)
    test_size = int(len(data) * 0.2)

    # Split the data
    train = data[:train_size]
    test = data[train_size:train_size + test_size]
    validation = data[train_size + test_size:]

    return train, test, validation

In [3]:
data_coint = pd.read_csv('data.csv', parse_dates=['Date']).set_index('Date')
train_coint, _, _ = split_data(data_coint)

# ---- Cointegration test ---- #

# Get non-stationary prices
tickers, industries = stationarity_prices_test(train_coint)

# Create all possible combinations
combinations = create_combinations(tickers)

# Test for stationary residuals and get cointegrated pairs of the same industry
pairs = stationarity_residuals_test(industries, combinations, train_coint)
pairs

Unnamed: 0,Ticker1,Ticker2,p-value,Mean Rolling Corr
0,SCHW,BLK,0.010269,0.675923
1,MS,GS,0.011329,0.835884
2,MS,SCHW,0.017634,0.724186
3,INTC,TXN,0.026796,0.612224
4,MS,BLK,0.03989,0.682749
5,GS,SCHW,0.04345,0.687411
