In [1]:
from google.colab import drive
drive.mount('/content/drive')

import os
from pathlib import Path

# CHANGE THIS to your actual Drive folder location
PROJECT_ROOT = Path("/content/drive/MyDrive/pairs_trading_project")

RAW_DIR = PROJECT_ROOT / "data" / "raw"
PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"

RESULTS_DIR = PROJECT_ROOT / "results"
FIG_DIR = RESULTS_DIR / "figures"
STATS_DIR = RESULTS_DIR / "statistics"

MODELS_DIR = PROJECT_ROOT / "models" / "saved_parameters"

for d in [RAW_DIR, PROCESSED_DIR, FIG_DIR, STATS_DIR, MODELS_DIR]:
    d.mkdir(parents=True, exist_ok=True)

print("Project root:", PROJECT_ROOT)

Mounted at /content/drive
Project root: /content/drive/MyDrive/pairs_trading_project


In [2]:
!pip -q install yfinance pandas numpy pyarrow pandas_market_calendars statsmodels

import numpy as np
import pandas as pd
import yfinance as yf
import pandas_market_calendars as mcal
from datetime import datetime, timezone

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/131.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m131.4/131.4 kB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 kB[0m [31m14.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.1/58.1 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
ibis-framework 9.5.0 requires toolz<1,>=0.11, but you have toolz 1.1.0 which is incompatible.[0m[31m
[0m

In [9]:
universe = [
    # Financials - banks
    ("JPM","financials","banks"), ("BAC","financials","banks"), ("WFC","financials","banks"),
    ("C","financials","banks"), ("GS","financials","banks"), ("MS","financials","banks"),
    ("USB","financials","banks"), ("PNC","financials","banks"), ("TFC","financials","banks"),
    ("SCHW","financials","brokers"),

    # Financials - payments
    ("V","financials","payments"), ("MA","financials","payments"), ("AXP","financials","payments"),
    ("PYPL","financials","payments"),

    # Energy - integrated / E&P / services / midstream
    ("XOM","energy","integrated"), ("CVX","energy","integrated"),
    ("COP","energy","e&p"), ("EOG","energy","e&p"), ("OXY","energy","e&p"),
    ("DVN","energy","e&p"), ("FANG","energy","e&p"), ("APA","energy","e&p"),
    ("SLB","energy","services"), ("HAL","energy","services"),
    ("KMI","energy","midstream"), ("WMB","energy","midstream"),

    # Tech - mega-cap
    ("AAPL","tech","mega"), ("MSFT","tech","mega"), ("GOOGL","tech","mega"),
    ("AMZN","tech","mega"), ("META","tech","mega"),

    # Tech - semis
    ("NVDA","tech","semis"), ("AMD","tech","semis"), ("INTC","tech","semis"),
    ("QCOM","tech","semis"), ("AVGO","tech","semis"), ("TXN","tech","semis"), ("MU","tech","semis"),

    # Tech - software
    ("ORCL","tech","software"), ("CRM","tech","software"), ("NOW","tech","software"),
    ("ADBE","tech","software"),

    # Commodities (stocks that proxy commodity exposure) - miners/metals
    ("FCX","commodities","miners"), ("NEM","commodities","miners"), ("GOLD","commodities","miners"),
    ("RIO","commodities","miners"), ("BHP","commodities","miners"), ("VALE","commodities","miners"),

    # Commodities - steel/aluminum
    ("NUE","commodities","steel"), ("STLD","commodities","steel"), ("AA","commodities","aluminum"),

    # Commodities - chemicals/fertilizers
    ("LIN","commodities","chemicals"), ("APD","commodities","chemicals"), ("ECL","commodities","chemicals"),
    ("CF","commodities","fertilizers"), ("MOS","commodities","fertilizers"),
]

ticker_meta = pd.DataFrame(universe, columns=["ticker","bucket","subcluster"])
ticker_meta["exchange"] = "US"
ticker_meta["notes"] = ""
ticker_meta.head(), len(ticker_meta)

(  ticker      bucket subcluster exchange notes
 0    JPM  financials      banks       US      
 1    BAC  financials      banks       US      
 2    WFC  financials      banks       US      
 3      C  financials      banks       US      
 4     GS  financials      banks       US      ,
 56)

In [10]:
START = "2016-01-01"
END = None              # None = today

ADV_WINDOW = 60         # trading days
MIN_ADV_USD = 5_000_000 # $5M/day threshold

print("Start:", START, "| End:", END or "today")
print("ADV window:", ADV_WINDOW, "| Min ADV:", MIN_ADV_USD)

Start: 2016-01-01 | End: today
ADV window: 60 | Min ADV: 5000000


In [11]:
tickers = ticker_meta["ticker"].tolist()

raw = yf.download(
    tickers=" ".join(tickers),
    start=START,
    end=END,
    auto_adjust=False,
    group_by="ticker",
    threads=True,
    progress=True
)

# Convert yfinance wide/multiindex format into a long table
records = []
for t in tickers:
    if t not in raw.columns.get_level_values(0):
        continue
    df_t = raw[t].copy()
    df_t["ticker"] = t
    df_t = df_t.reset_index().rename(columns={"Date":"date"})
    # keep columns we need
    keep = ["date","ticker","Close","Adj Close","Volume"]
    df_t = df_t[keep]
    df_t.columns = ["date","ticker","close","adj_close","volume"]
    records.append(df_t)

prices_long = pd.concat(records, ignore_index=True)
prices_long["date"] = pd.to_datetime(prices_long["date"])
prices_long = prices_long.sort_values(["ticker","date"]).reset_index(drop=True)

prices_long.head(), prices_long["ticker"].nunique()

[*********************100%***********************]  56 of 56 completed


(        date ticker      close  adj_close    volume
 0 2016-01-04     AA  23.333130  21.976374  15779983
 1 2016-01-05     AA  22.275810  20.980534  16364003
 2 2016-01-06     AA  20.689831  19.486774  22970995
 3 2016-01-07     AA  19.872810  18.717262  14182605
 4 2016-01-08     AA  19.392210  18.264606  17366583,
 56)

In [13]:
# 1) Compute dollar volume
prices_long["dollar_vol"] = prices_long["close"] * prices_long["volume"]

# 2) Compute ADV_60d per ticker (no groupby.apply warnings)
adv = (
    prices_long.dropna(subset=["dollar_vol"])
    .sort_values(["ticker", "date"])
    .groupby("ticker", as_index=False)
    .tail(ADV_WINDOW)                     # last 60 rows per ticker
    .groupby("ticker", as_index=False)["dollar_vol"]
    .mean()
    .rename(columns={"dollar_vol": "adv_usd_60d"})
)

# 3) Merge into ticker_meta (this creates ticker_meta["adv_usd_60d"])
ticker_meta = ticker_meta.drop(columns=["adv_usd_60d"], errors="ignore")
ticker_meta = ticker_meta.merge(adv, on="ticker", how="left")

# 4) Filter
before = len(ticker_meta)
ticker_meta_filtered = ticker_meta[ticker_meta["adv_usd_60d"].fillna(0) >= MIN_ADV_USD].copy()
after = len(ticker_meta_filtered)

dropped = ticker_meta[~ticker_meta["ticker"].isin(ticker_meta_filtered["ticker"])].copy()

print(f"Universe before: {before}, after ADV filter: {after}, dropped: {before-after}")
print("Dropped (lowest ADV / missing):")
display(dropped.sort_values("adv_usd_60d").head(20))

Universe before: 56, after ADV filter: 56, dropped: 0
Dropped (lowest ADV / missing):


Unnamed: 0,ticker,bucket,subcluster,exchange,notes,adv_usd_60d


In [14]:
valid_tickers = ticker_meta_filtered["ticker"].tolist()
prices_f = prices_long[prices_long["ticker"].isin(valid_tickers)].copy()

# Missingness summary (count missing close/adj/volume)
missing = (
    prices_f.assign(
        miss_close=prices_f["close"].isna().astype(int),
        miss_adj=prices_f["adj_close"].isna().astype(int),
        miss_vol=prices_f["volume"].isna().astype(int),
    )
    .groupby("ticker")[["miss_close","miss_adj","miss_vol"]]
    .sum()
    .reset_index()
)

# Date coverage per ticker
coverage = (
    prices_f.groupby("ticker")
    .agg(start_date=("date","min"), end_date=("date","max"), n_obs=("date","count"))
    .reset_index()
)

# Return spikes on adj_close (top absolute daily returns)
spikes = []
for t, g in prices_f.groupby("ticker"):
    gg = g.sort_values("date").dropna(subset=["adj_close"]).copy()
    gg["ret"] = gg["adj_close"].pct_change()
    if gg["ret"].notna().any():
        top = gg.loc[gg["ret"].abs().nlargest(5).index, ["ticker","date","ret","adj_close"]]
        spikes.append(top)
spikes = pd.concat(spikes, ignore_index=True) if spikes else pd.DataFrame(columns=["ticker","date","ret","adj_close"])

# Overlap range across tickers
global_start = coverage["start_date"].max()
global_end = coverage["end_date"].min()

summary = coverage.merge(missing, on="ticker", how="left").merge(
    ticker_meta_filtered[["ticker","bucket","subcluster","adv_usd_60d"]],
    on="ticker",
    how="left"
)

summary.sort_values(["bucket","subcluster","ticker"]).head()

Unnamed: 0,ticker,start_date,end_date,n_obs,miss_close,miss_adj,miss_vol,bucket,subcluster,adv_usd_60d
0,AA,2016-01-04,2026-02-20,2548,0,0,0,commodities,aluminum,412960800.0
6,APD,2016-01-04,2026-02-20,2548,0,0,0,commodities,chemicals,425235900.0
17,ECL,2016-01-04,2026-02-20,2548,0,0,0,commodities,chemicals,369123400.0
28,LIN,2016-01-04,2026-02-20,2548,0,0,0,commodities,chemicals,1230139000.0
12,CF,2016-01-04,2026-02-20,2548,0,0,0,commodities,fertilizers,202838000.0


In [15]:
# NYSE trading calendar
nyse = mcal.get_calendar("NYSE")
schedule = nyse.schedule(start_date=prices_f["date"].min(), end_date=prices_f["date"].max())
nyse_days = pd.to_datetime(schedule.index)

# Pivot to wide using adj_close
wide = (
    prices_f.pivot(index="date", columns="ticker", values="adj_close")
    .reindex(nyse_days)
    .sort_index()
)

# Keep only the overlap range
wide = wide.loc[(wide.index >= global_start) & (wide.index <= global_end)].copy()

# Coverage threshold: keep dates where at least 98% tickers have data
coverage_ratio = wide.notna().mean(axis=1)
wide = wide.loc[coverage_ratio >= 0.98]

print("Wide shape:", wide.shape)
print("Date range:", wide.index.min().date(), "to", wide.index.max().date())

Wide shape: (2548, 56)
Date range: 2016-01-04 to 2026-02-20


In [16]:
run_stamp = datetime.now(timezone.utc).strftime("%Y_%m_%d")

# Save metadata (filtered universe)
meta_path = RAW_DIR / "ticker_metadata.csv"
ticker_meta_filtered.to_csv(meta_path, index=False)

# Save raw prices (FULL) - this is big; you usually DON'T commit to GitHub
raw_prices_path = RAW_DIR / f"stock_prices_{run_stamp}.csv"
prices_f.to_csv(raw_prices_path, index=False)

# Save processed aligned prices (parquet is smaller + faster)
aligned_path = PROCESSED_DIR / "prices_aligned.parquet"
wide.to_parquet(aligned_path)

# Save validation stats
summary_path = STATS_DIR / "data_quality_summary.csv"
summary.to_csv(summary_path, index=False)

spikes_path = STATS_DIR / "price_spikes_top5_each.csv"
spikes.to_csv(spikes_path, index=False)

print("Saved:")
print(" -", meta_path)
print(" -", raw_prices_path)
print(" -", aligned_path)
print(" -", summary_path)
print(" -", spikes_path)

Saved:
 - /content/drive/MyDrive/pairs_trading_project/data/raw/ticker_metadata.csv
 - /content/drive/MyDrive/pairs_trading_project/data/raw/stock_prices_2026_02_22.csv
 - /content/drive/MyDrive/pairs_trading_project/data/processed/prices_aligned.parquet
 - /content/drive/MyDrive/pairs_trading_project/results/statistics/data_quality_summary.csv
 - /content/drive/MyDrive/pairs_trading_project/results/statistics/price_spikes_top5_each.csv


In [17]:
# Sample: last 250 trading days for 10 tickers
sample_tickers = valid_tickers[:10]
sample = wide[sample_tickers].tail(250)

sample_path = PROCESSED_DIR / "prices_aligned_SAMPLE.parquet"
sample.to_parquet(sample_path)

print("Sample saved for GitHub demo:")
print(" -", sample_path)
print("Sample shape:", sample.shape)

Sample saved for GitHub demo:
 - /content/drive/MyDrive/pairs_trading_project/data/processed/prices_aligned_SAMPLE.parquet
Sample shape: (250, 10)


In [18]:
report_path = RAW_DIR / "data_quality_report.txt"

lines = []
lines.append(f"Run timestamp (UTC): {datetime.now(timezone.utc).isoformat()}")
lines.append(f"Start requested: {START}")
lines.append(f"Tickers requested: {len(tickers)}")
lines.append(f"Tickers after ADV filter (>= ${MIN_ADV_USD:,.0f}): {len(valid_tickers)}")
lines.append(f"Overlap date range (max start to min end): {global_start.date()} -> {global_end.date()}")
lines.append(f"Aligned dataset shape (rows, tickers): {wide.shape}")
lines.append("")
lines.append("Top 10 lowest ADV among kept tickers:")
tmp = ticker_meta_filtered.sort_values("adv_usd_60d").head(10)
for _, r in tmp.iterrows():
    lines.append(f"  {r['ticker']}: ADV_60d = ${r['adv_usd_60d']:,.0f} | {r['bucket']}/{r['subcluster']}")

lines.append("")
lines.append("Dropped tickers (ADV too low or missing):")
if len(dropped) == 0:
    lines.append("  None")
else:
    for _, r in dropped.sort_values("adv_usd_60d").iterrows():
        advv = r["adv_usd_60d"]
        advs = "NaN" if pd.isna(advv) else f"${advv:,.0f}"
        lines.append(f"  {r['ticker']}: ADV_60d = {advs} | {r['bucket']}/{r['subcluster']}")

report_path.write_text("\n".join(lines))
print("Wrote:", report_path)

Wrote: /content/drive/MyDrive/pairs_trading_project/data/raw/data_quality_report.txt


In [19]:
report_path = PROJECT_ROOT / "data/raw/data_quality_report.txt"
print(report_path.read_text().splitlines()[:25])

['Run timestamp (UTC): 2026-02-22T06:23:14.392329+00:00', 'Start requested: 2016-01-01', 'Tickers requested: 56', 'Tickers after ADV filter (>= $5,000,000): 56', 'Overlap date range (max start to min end): 2016-01-04 -> 2026-02-20', 'Aligned dataset shape (rows, tickers): (2548, 56)', '', 'Top 10 lowest ADV among kept tickers:', '  GOLD: ADV_60d = $34,909,215 | commodities/miners', '  APA: ADV_60d = $154,329,691 | energy/e&p', '  MOS: ADV_60d = $182,657,239 | commodities/fertilizers', '  CF: ADV_60d = $202,838,049 | commodities/fertilizers', '  STLD: ADV_60d = $237,226,912 | commodities/steel', '  BHP: ADV_60d = $251,379,834 | commodities/miners', '  NUE: ADV_60d = $294,172,074 | commodities/steel', '  FANG: ADV_60d = $301,819,313 | energy/e&p', '  RIO: ADV_60d = $327,208,768 | commodities/miners', '  ECL: ADV_60d = $369,123,401 | commodities/chemicals', '', 'Dropped tickers (ADV too low or missing):', '  None']


In [20]:
import pandas as pd

aligned_path = PROJECT_ROOT / "data/processed/prices_aligned.parquet"
wide = pd.read_parquet(aligned_path)

print("Shape:", wide.shape)
print("Date range:", wide.index.min(), "->", wide.index.max())
print("Tickers (first 10):", list(wide.columns[:10]))
print("Missing % overall:", wide.isna().mean().mean())

Shape: (2548, 56)
Date range: 2016-01-04 00:00:00 -> 2026-02-20 00:00:00
Tickers (first 10): ['AA', 'AAPL', 'ADBE', 'AMD', 'AMZN', 'APA', 'APD', 'AVGO', 'AXP', 'BAC']
Missing % overall: 0.0
