In [29]:
import pandas as pd
import random
import yfinance as yf
from datetime import datetime
import numpy as np
from statsmodels.tsa.stattools import coint, adfuller

In [30]:
# Get S&P 500 tickers
def get_sp500_tickers(sample_size=200):
    url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
    table = pd.read_html(url)[0]
    tickers = table['Symbol'].tolist()
    sample_tickers = random.sample(tickers, sample_size)
    return sample_tickers

In [31]:
# Fetch historical data
def get_historical_data(tickers):
    data = pd.DataFrame()
    for i in tickers:
        stock_data = yf.download(i, start="2022-01-01", end="2024-09-10")
        data[i] = stock_data['Adj Close']
    return data

In [32]:
# Perform the ADF test for stationarity
def adf_test(series):
    result = adfuller(series)
    return result[1] 

In [33]:
# Cointegration test between pairs of stocks
def cointegration_test(data):
    pairs = []
    results = []
    
    tickers = data.columns
    for i in range(len(tickers)):
        for j in range(i + 1, len(tickers)):
            x = data[tickers[i]].dropna()
            y = data[tickers[j]].dropna()
            
            x, y = x.align(y, join='inner')
            if x.empty or y.empty:
                continue
            
            try:
                score, p_value, _ = coint(x, y)
            except ValueError:
                continue
            
            spread = x - y
            adf_p_value = adf_test(spread)
            
            pairs.append((tickers[i], tickers[j]))
            results.append((tickers[i], tickers[j], score, p_value, adf_p_value))
    
    results_df = pd.DataFrame(results, columns=['Stock 1', 'Stock 2', 'Cointegration Score', 'Cointegration p-value', 'ADF p-value'])
    return results_df

In [34]:
# Calculate z-score for the spread
def calculate_zscore(spread):
    mean = spread.rolling(window=30).mean()
    std = spread.rolling(window=30).std()
    zscore = (spread - mean) / std
    return zscore

In [35]:
# Get S&P 500 tickers and historical data
ticks = get_sp500_tickers()
historical_data = get_historical_data(ticks)

# Cointegration test
results_df = cointegration_test(historical_data)

# Filter pairs with the lowest ADF p-values
filtered_results = results_df[results_df['ADF p-value'] < 0.05]

# Sort pairs by cointegration score to find the most cointegrated pair
sorted_results = filtered_results.sort_values(by='Cointegration Score', ascending=True)

# Get the top cointegrated pair
top_pair = sorted_results.iloc[0]
stock1, stock2 = top_pair['Stock 1'], top_pair['Stock 2']

# Calculate the spread for the top pair
spread = historical_data[stock1] - historical_data[stock2]

# Calculate the z-score of the spread
zscore = calculate_zscore(spread)

# Define entry and exit thresholds
entry_threshold = 1
exit_threshold = 0

# Generate trading signals based on z-score
long_signal = (zscore < -entry_threshold)  # Long Stock 1, Short Stock 2
short_signal = (zscore > entry_threshold)  # Short Stock 1, Long Stock 2
exit_signal = (abs(zscore) < exit_threshold)  # Exit when z-score is close to 0

def backtest_strategy(spread, long_signal, short_signal, exit_signal):
    positions = []
    returns = []
    
    position = None  # Track the current position: "long", "short", or None
    
    for i in range(1, len(spread)):
        if long_signal[i] and position is None:
            # Enter long position
            entry_spread = spread.iloc[i]
            position = 'long'
            positions.append(('long', i))
        
        elif short_signal[i] and position is None:
            # Enter short position
            entry_spread = spread.iloc[i]
            position = 'short'
            positions.append(('short', i))
        
        elif exit_signal[i] and position is not None:
            # Exit position
            exit_spread = spread[i]
            
            if position == 'long':
                returns.append(exit_spread - entry_spread)  # Profit for long position
            elif position == 'short':
                returns.append(entry_spread - exit_spread)  # Profit for short position
            
            position = None  # Reset position after exit
    
    return positions, returns

# Run backtest
positions, returns = backtest_strategy(spread, long_signal, short_signal, exit_signal)

# Print positions and returns
print("Positions:", positions)
print("Returns:", returns)
print("Total return:", sum(returns))

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%********

Positions: [('short', 29)]
Returns: []
Total return: 0


  if long_signal[i] and position is None:
  elif short_signal[i] and position is None:
  elif exit_signal[i] and position is not None:
  entry_spread = spread[i]
