First, we need to run some basic initial commands and import necessary packages.

In [None]:
!rm *.csv.gz
!pip install boto3
!pip install botocore
!pip install pandas
!pip install gzip
!pip install scipy
import pandas as pd
import gzip
import boto3
from botocore.config import Config
from scipy.stats import linregress
from random import randint
import matplotlib.pyplot as plt
from google.colab import userdata

This is the backtesting function; we will need it later.

In [None]:
def run_backtest(merged_df: pd.DataFrame, m: float, b: float,
                 spread_bps: float) -> pd.DataFrame:
    merged_df = merged_df.sort_values("window_start").reset_index(drop=True)
    spr = spread_bps / 10000

    prev_px_x = prev_px_y = None
    prev_pos1 = prev_pos2 = 0
    cum_pnl = 0.0
    recs = []

    for _, row in merged_df.iterrows():
        ts = row["window_start"]
        px_x = row["open_x"]
        px_y = row["open_y"]

        if prev_px_x is None:
            pos1 = pos2 = 0
            trade_cost = 0.0
            step_pnl = 0.0
        else:
            if px_y < px_x * m + b:
                pos1, pos2 = -1 / px_x, 1 / px_y
            else:
                pos1, pos2 = 1 / px_x, -1 / px_y

            step_pnl = (
                prev_pos1 * (px_x - prev_px_x) +
                prev_pos2 * (px_y - prev_px_y)
            )

            trade1 = abs(pos1 - prev_pos1)
            trade2 = abs(pos2 - prev_pos2)

            trade_cost = spr * (trade1 * px_x + trade2 * px_y)
            step_pnl -= trade_cost

            cum_pnl += step_pnl

        recs.append({"time": ts, "pnl": cum_pnl})

        prev_px_x, prev_px_y = px_x, px_y
        prev_pos1, prev_pos2 = pos1, pos2

    return pd.DataFrame(recs)

This sets up a connection to the S3 bucket containing the data. Note that this code currently uses Google Colab's secrets functionality to store the S3 credentials for Polygon's file-based API, but other approaches for secret storage can be easily substituted in.

In [None]:
session = boto3.Session(
  aws_access_key_id=userdata.get('POLYGON_AWS_ID'),
  aws_secret_access_key=userdata.get('POLYGON_AWS_SECRET'),
)

s3 = session.client(
  's3',
  endpoint_url='https://files.polygon.io',
  config=Config(signature_version='s3v4'),
)

paginator = s3.get_paginator('list_objects_v2')

Now, we can download the data and extract it from the gzip file.

Note that, currently, it just takes in one day of data. That is due to compute constraints; it already takes about an hour to run this notebook on Google Colab, which is already pushing up against the bounds of practical usability. However, there is no reason why, in principle, more data couldn't be used given more compute, and it is set up to be extensible if you want to try adding more files.

In [None]:
all_dataframes = []

for prefix in ["us_stocks_sip/minute_aggs_v1/2025/05/2025-05-30"]:
  for page in paginator.paginate(Bucket='flatfiles', Prefix=prefix):
    for obj in page['Contents']:
      print("Processing data input file: " + obj['Key'])

      bucket_name = 'flatfiles'

      local_file_name = obj['Key'].split('/')[-1]

      local_file_path = './' + local_file_name

      s3.download_file(bucket_name, obj['Key'], local_file_path)
      with gzip.open(local_file_path) as downloaded_file:
        all_dataframes.append(pd.read_csv(local_file_path))

If multiple days of data are used, this combines all of the dataframes together.

In [None]:
combined_df = pd.concat(all_dataframes, ignore_index=True)

This goes through the data and separates it by ticker.

In [None]:
ticker_dataframes = {}
for ticker in combined_df['ticker'].unique():
  ticker_dataframes[ticker] = combined_df[combined_df['ticker'] == ticker].copy()
  print("Completed initial processing of ticker: " + str(ticker))

We won't be using combined_df or all_dataframes anymore, so we can get rid of them to free up memory.

In [None]:
del combined_df
all_dataframes = []

This extracts the features that will be needed for the subsequent regression step.

In [None]:
reg_dataframes = {}
for ticker, df in ticker_dataframes.items():

  reg_dataframes[ticker] = pd.DataFrame({
      'open_price': df['open'],
      'window_start': df['window_start']
  })
  print("Completed secondary processing of ticker: " + str(ticker))

This randomly generates a bunch of pairs for a pairs trading stat-arb strategy. Then, linear regressions are performed on each one.

In [None]:
regression_results = {}

all_tickers = list(reg_dataframes.keys())

for randct in range(16384):
    i = randint(0, len(all_tickers) - 1)
    j = randint(0, len(all_tickers) - 1)
    while j == i:
      j = randint(0, len(all_tickers) - 1)
    ticker1 = all_tickers[i]
    ticker2 = all_tickers[j]

    df1 = reg_dataframes[ticker1]
    df2 = reg_dataframes[ticker2]

    merged_df = pd.merge(df1, df2, on='window_start', how='inner')

    if len(merged_df) > 32:
      try:
        slope, intercept, r_value, p_value, std_err = linregress(merged_df['open_price_x'], merged_df['open_price_y'])

        regression_results[(ticker1, ticker2)] = (abs(r_value), slope, intercept)
      except:
        pass

Next, with all of the regressions complete, we can sort by correlation.

In [None]:
regression_list = list(regression_results.items())
sorted_regression_list = sorted(regression_list, key=lambda item: item[1][0], reverse=True)

Before the strategy can be tested, a test data file must be obtained.

In [None]:
for prefix in ["us_stocks_sip/minute_aggs_v1/2025/06/2025-06-02"]:
  for page in paginator.paginate(Bucket='flatfiles', Prefix=prefix):
    for obj in page['Contents']:
      print("Processing data input file: " + obj['Key'])

      bucket_name = 'flatfiles'

      local_file_name = obj['Key'].split('/')[-1]

      local_file_path = './' + local_file_name

      s3.download_file(bucket_name, obj['Key'], local_file_path)
      with gzip.open(local_file_path) as downloaded_file:
        all_dataframes.append(pd.read_csv(local_file_path))

Just like before, if there are multiple days, they must be combined.

In [None]:
combined_df = pd.concat(all_dataframes, ignore_index=True)

The test data then gets separated by ticker.

In [None]:
ticker_dataframes = {}
for ticker in combined_df['ticker'].unique():
  ticker_dataframes[ticker] = combined_df[combined_df['ticker'] == ticker].copy()
  print("Completed test data processing of ticker: " + str(ticker))

Again, combined_df and all_dataframes can now be safely deleted.

In [None]:
del combined_df
all_dataframes = []

For each of the selected pairs, the strategy is backtested using the data from the test day. The range to select from in sorted_regression_list were determined empirically; I wanted to ensure that the pairs had a genuine, important correlation, so I excluded the final 5/8 of pairs, but I also noticed some weird behavior (and strongly negative PnL) with the pairs at the very front of the list, so I excluded those too.

Also, note that, for financial reasons, the historical market data I have purchased access to only includes open and close prices and not bid-ask spread. Therefore, I am assuming a constant bid-ask spread for the purposes of the backtest; according to nasdaq.com, the average spread for S&P 500 stocks is 3.7 bp, so I am adding 0.5 onto that to get 4.2 bp as a pessimistic estimate to ensure a margin of safety.

In [None]:
backtest_results = {}
for regression in sorted_regression_list[1024:6144]:
  ticker1 = regression[0][0]
  ticker2 = regression[0][1]
  if (ticker1 not in ticker_dataframes) or (ticker2 not in ticker_dataframes):
    continue

  df1 = ticker_dataframes[ticker1]
  df2 = ticker_dataframes[ticker2]

  m = regression[1][1]
  b = regression[1][2]

  merged_df = pd.merge(
    df1[["window_start", "open"]],
    df2[["window_start", "open"]],
    on="window_start",
    how="inner",
    suffixes=("_x", "_y")
  )

  if merged_df.empty:
    continue

  backtest_results[(ticker1, ticker2)] = run_backtest(merged_df, m, b, 4.2)

The backtest results on a per-pair basis must be merged to get overall PnL data.

In [None]:
all_pair_pnls = []

for pair, df in backtest_results.items():
    s = df.set_index('time')['pnl'].astype(float)
    all_pair_pnls.append(s)

overall_pnl = (
  pd.concat(all_pair_pnls, axis=1)
    .sort_index()
    .fillna(method='ffill')
    .fillna(0.0)
    .sum(axis=1)
)

The Sharpe ratio can be calculated from the overall PnL data.

In [None]:
returns = overall_pnl.diff().dropna()

ann_factor = (252 * 390) ** 0.5
sharpe_ratio = returns.mean() / returns.std() * ann_factor

print(f"Sharpe ratio: {sharpe_ratio:.4f}")

Finally, the PnL graph can be displayed. Note that the timestamps are relative to Unix epoch, so they are not shown here to avoid visual clutter. However, if you want to see them anyway, feel free to comment out the `plt.xticks([])` line.

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(overall_pnl.index, overall_pnl.values)
plt.title("Cumulative P&L")
plt.xlabel("Time of Day")
plt.xticks([])
plt.ylabel("Cumulative P&L")
plt.tight_layout()
plt.show()