In [None]:
#pip install yfinance pandas numpy

run all once.  
last 2 cells produce the correlation matrices + print an example plot.  
you only need to re-run the second to last cell if you want to change anything.

In [None]:
"""
market_corr_pipeline.py

Implements (paper-aligned) data loading + rolling correlation matrices:
- log returns r_i(t) = log P_i(t) - log P_i(t-1)
- epoch/window M = 40 days, shifted by step = 20 days (default)
- "stocks present for entire duration": filter tickers by completeness
- "added zero return entries for missing days": align to a common trading calendar
  and fill missing RETURNS with 0.0

Dependencies:
    pip install yfinance pandas numpy

Typical use:
    python market_corr_pipeline.py --tickers tickers.csv --start 2000-01-01 --end 2024-12-31

Or import functions in your project and call run_pipeline(...).
"""

from __future__ import annotations

import argparse
import os
from dataclasses import dataclass
from typing import Iterable

import numpy as np
import pandas as pd

try:
    import yfinance as yf
except ImportError as e:
    raise ImportError("yfinance is required. Install with: pip install yfinance") from e


# ---------------------------
# Config
# ---------------------------

@dataclass(frozen=True)
class PipelineConfig:
    start: str = "2000-01-01"
    end: str | None = None  # None = today
    n_stocks_target: int = 200


    window: int = 40
    step: int = 20
    max_missing_price_frac: float = 0.01

    calendar_ticker: str = "SPY"

    eps: float = 1e-12

In [None]:

# Tickers 
def read_tickers(path: str) -> list[str]:
    """
    Reads tickers from:
    - CSV with a column named 'ticker' or a single column
    - TXT with one ticker per line
    """
    if not os.path.exists(path):
        raise FileNotFoundError(path)

    if path.lower().endswith(".txt"):
        with open(path, "r", encoding="utf-8") as f:
            tickers = [line.strip() for line in f if line.strip()]
        return _dedupe_clean_tickers(tickers)

    if path.lower().endswith(".csv"):
        df = pd.read_csv(path)
        if "ticker" in df.columns:
            tickers = df["ticker"].astype(str).tolist()
        else:
            # assume first column is tickers
            tickers = df.iloc[:, 0].astype(str).tolist()
        return _dedupe_clean_tickers(tickers)

    raise ValueError("Ticker file must be .csv or .txt")


def _dedupe_clean_tickers(tickers: Iterable[str]) -> list[str]:
    seen = set()
    out = []
    for t in tickers:
        t = t.strip().upper()
        if not t or t in seen:
            continue
        seen.add(t)
        out.append(t)
    return out



# Data
def download_adjusted_close(
    tickers: list[str],
    start: str,
    end: str | None = None,
) -> pd.DataFrame:
    """
    Downloads adjusted daily closes (auto_adjust=True) and returns a DataFrame:
      index = dates, columns = tickers, values = adjusted close
    """
    if len(tickers) == 0:
        raise ValueError("No stocks provided.")

    data = yf.download(
        tickers,
        start=start,
        end=end,
        auto_adjust=True,
        progress=True,
        group_by="column",
        threads=True,
    )

    # yfinance returns either:
    # - MultiIndex columns (OHLCV x tickers) for multi-ticker
    # - Single-index columns for single ticker
    if isinstance(data.columns, pd.MultiIndex):
        prices = data["Close"].copy()
    else:
        # single ticker
        prices = data[["Close"]].copy()
        prices.columns = tickers[:1]

    prices = prices.sort_index()
    return prices


def get_trading_calendar_index(
    start: str,
    end: str | None,
    calendar_ticker: str = "SPY",
) -> pd.DatetimeIndex:
    cal_prices = download_adjusted_close([calendar_ticker], start=start, end=end)
    # keep only days where calendar ticker traded
    idx = cal_prices.dropna().index
    if len(idx) == 0:
        raise RuntimeError("Could not build trading calendar index (calendar_ticker has no data).")
    return pd.DatetimeIndex(idx)



# preprocessing


def filter_tickers_by_completeness(
    prices: pd.DataFrame,
    calendar_index: pd.DatetimeIndex,
    max_missing_frac: float,
    n_target: int,
) -> pd.DataFrame:
    """
    - align prices to a common trading-day index
    """
    aligned = prices.reindex(calendar_index)
    missing_frac = aligned.isna().mean(axis=0)
    good = missing_frac[missing_frac <= max_missing_frac].index.tolist()

    if len(good) == 0:
        raise RuntimeError(
            "No tickers passed completeness filtering. "
            "Try increasing max_missing_price_frac or choosing a later start date."
        )

    aligned_good = aligned[good]

    if aligned_good.shape[1] < n_target:

        print(f"[warn] Only {aligned_good.shape[1]} tickers passed filtering; target was {n_target}.")
        return aligned_good

    return aligned_good.iloc[:, :n_target]


def compute_log_returns_from_prices(
    prices_aligned: pd.DataFrame,
) -> pd.DataFrame:
    """
    r(t) = log(P(t)) - log(P(t-1))

    Note: prices_aligned should already share a common calendar index.
    """
    logp = np.log(prices_aligned)
    rets = logp.diff()
    return rets


def fill_missing_returns_with_zero(
    returns: pd.DataFrame,
) -> pd.DataFrame:

    return returns.fillna(0.0)


def zscore_within_window(X: np.ndarray, eps: float = 1e-12) -> np.ndarray:
    """
    Standardize each column (stock) within a window.
    X: (L, N)
    """
    mu = X.mean(axis=0, keepdims=True)
    sd = X.std(axis=0, ddof=1, keepdims=True)
    sd = np.maximum(sd, eps)
    return (X - mu) / sd


# correlation matrices

def rolling_correlation_matrices(
    returns: pd.DataFrame,
    window: int,
    step: int,
    standardize_each_window: bool = True,
    eps: float = 1e-12,
) -> tuple[np.ndarray, pd.DatetimeIndex]:
    """
    Computes rolling Pearson correlation matrices C(t) over returns.

    returns: (T, N) DataFrame, index = dates
    window: M (e.g., 40)
    step: shift (e.g., 20)

    Returns:
      C_all: (K, N, N) float array
      end_dates: length K (end date of each window)
    """
    if window < 2:
        raise ValueError("window must be >= 2")
    if step < 1:
        raise ValueError("step must be >= 1")

    R = returns.to_numpy(dtype=float)
    dates = returns.index
    T, N = R.shape


    mats: list[np.ndarray] = []
    ends: list[pd.Timestamp] = []

    # windows are [end-window, end)
    for end in range(window, T + 1, step):
        X = R[end - window:end, :]  # (window, N)

        if standardize_each_window:
            X = zscore_within_window(X, eps=eps)

        C = np.corrcoef(X, rowvar=False)  # (N, N)

        mats.append(C.astype(float, copy=False))
        ends.append(dates[end - 1])

    C_all = np.stack(mats, axis=0)
    return C_all, pd.DatetimeIndex(ends)


# pipeline
def run_pipeline(
    tickers: list[str],
    cfg: PipelineConfig,
    cache_dir: str | None = "data_cache",
) -> dict:
    """
    End-to-end:
      - download prices for tickers + calendar ticker
      - build trading calendar index
      - filter to ~200 stocks with near-complete history
      - compute log returns
      - fill missing returns with zeros (paper)
      - compute rolling correlation matrices with window/step (paper)
      - optionally save outputs

    Returns a dict with:
      prices, returns, corrs, end_dates, tickers_used
    """
    os.makedirs(cache_dir, exist_ok=True) if cache_dir else None

    # 1) calendar index
    cal_idx = get_trading_calendar_index(cfg.start, cfg.end, calendar_ticker=cfg.calendar_ticker)

    # 2) download prices for all candidate tickers
    prices_raw = download_adjusted_close(tickers, start=cfg.start, end=cfg.end)

    # 3) filter to those "present" (practical)
    prices = filter_tickers_by_completeness(
        prices_raw, cal_idx, max_missing_frac=cfg.max_missing_price_frac, n_target=cfg.n_stocks_target
    )

    tickers_used = list(prices.columns)

    # 4) returns
    rets = compute_log_returns_from_prices(prices)
    rets = fill_missing_returns_with_zero(rets)

    # 5) rolling correlations
    corrs, end_dates = rolling_correlation_matrices(
        rets, window=cfg.window, step=cfg.step, standardize_each_window=True, eps=cfg.eps
    )

    # 6) cache
    if cache_dir:
        prices.to_parquet(os.path.join(cache_dir, "prices.parquet"))
        rets.to_parquet(os.path.join(cache_dir, "returns.parquet"))
        np.save(os.path.join(cache_dir, "corrs.npy"), corrs)
        end_dates.to_series().to_csv(os.path.join(cache_dir, "corr_end_dates.csv"), header=["end_date"])
        pd.Series(tickers_used, name="ticker").to_csv(os.path.join(cache_dir, "tickers_used.csv"), index=False)

    return {
        "prices": prices,
        "returns": rets,
        "corrs": corrs,
        "end_dates": end_dates,
        "tickers_used": tickers_used,
    }




In [None]:
# using these for now, will add more later
tickers = ["AAPL", "MSFT", "AMZN", "GOOGL", "META", "JPM", "JNJ", "XOM", "PG", "NVDA"]

cfg = PipelineConfig(
    start="2000-01-01",
    end="2025-12-31",
    n_stocks_target=10,#will increase later
    window=40,
    step=20,
    max_missing_price_frac=0.01,#set to 0 if you want target stocks exaclty
)

out = run_pipeline(tickers, cfg, cache_dir=None) 

print("Tickers used:", len(out["tickers_used"]))
print("Corr shape:", out["corrs"].shape)
print("First / last date:", out["end_dates"][0], out["end_dates"][-1])


In [None]:
import matplotlib.pyplot as plt
import numpy as np

#example on printing a matrix plot, dont keep in full code

#d+1 goes forward one month
#C_all is the array of matrices

C_all = out["corrs"]
dates = out["end_dates"]

d = 20
C = C_all[d]

plt.imshow(C, vmin=-1, vmax=1, cmap="coolwarm")
plt.colorbar()
plt.title(f"Correlation matrix at {dates[d].date()}")
plt.show()

