In [None]:
from pathlib import Path

import numpy as np
import pandas as pd
import yfinance as yf


# NASDAQ + Crypto price pipeline (2000–2025)

This notebook builds clean daily + weekly Open/Close datasets for:
- **NASDAQ equities** (from local CSVs, plus split sanity checks)
- **Crypto** (from local CSVs, plus yfinance enrichment)
- **Optional EURUSD FX** (from yfinance, useful for 2.5 DCA normalization)

## What this pipeline does
1) Load raw files
2) Normalize schema + dates
3) Fix splits for NASDAQ (only when unadjusted jump detected)
4) Use **yfinance** as a safety net:
   - fill missing values
   - replace extreme outliers (optional, enabled here)
5) Aggregate daily -> weekly
6) Save standardized CSVs into `data/processed/`

`report_quality(...)` prints a quick data quality summary of each dataset along the way at key steps.

## Folder layout (expected)

We assume this structure:

- `data/raw/`
  - `nasdaq-daily/`  (CSV per ticker OR CSVs containing a `ticker` column)
  - `crypto-daily/`
  - `nasdaq_screener.csv`
  - `nasdaq_company_addresses.csv`
  - `splits_2000_2025.csv`

- `data/processed/` *(created automatically)*
  - `df_nasdaq_daily.csv`
  - `df_nasdaq_weekly.csv`
  - `df_crypto_daily.csv`
  - `df_crypto_weekly.csv`
  - `df_nasdaq_meta.csv`
  - optional: `df_eurusd_daily.csv`, `df_eurusd_weekly.csv`

# Pipeline: config
Define inputs/outputs, date bounds, and tuning parameters used across the pipeline.

In [None]:
DATA_DIR = Path("../data")
RAW, OUT = DATA_DIR / "raw", DATA_DIR / "processed"
OUT.mkdir(parents=True, exist_ok=True)

START = pd.Timestamp("2000-01-01")
END = pd.Timestamp("2025-12-31")
EFFECTIVE_END = min(END, pd.Timestamp.today().normalize())

# Relative tolerance for confirming a split jump:
# confirmed if |ratio - split_factor| / |split_factor| <= TOL
TOL = 0.05

# Tukey fences for outlier detection (classic boxplot rule)
IQR_K = 1.5

## Config notes

- `START` / `END` are the target date bounds for the final outputs.
- `EFFECTIVE_END` prevents asking yfinance for future data.
- `TOL` controls how strict we are when confirming split jumps.
- `IQR_K` controls outlier aggressiveness. Bump to 2.0–3.0 to reduce replacements if needed.

In [None]:
def to_date(s: pd.Series) -> pd.Series:
    """
    Converts a Series to tz-naive pandas timestamps.

    :param s: A date-like pandas Series.
    :return: A Series of tz-naive timestamps (NaT where parsing fails).
    """
    return pd.to_datetime(s, errors="coerce").dt.tz_localize(None)


def clean_cols(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normalizes DataFrame column names (lowercase + stripped).

    :param df: Input DataFrame.
    :return: The same DataFrame with cleaned column names.
    """
    df.columns = df.columns.str.strip().str.lower()
    return df


def coverage_by_ticker(df: pd.DataFrame) -> pd.DataFrame:
    """
    Computes per-ticker coverage stats and calendar-day gaps.

    :param df: Price-like DataFrame with columns (ticker, date).
    :return: One row per ticker with counts, date span, and gap statistics.
    """
    x = df[["ticker", "date"]].dropna().sort_values(["ticker", "date"]).copy()
    x["gap_days"] = x.groupby("ticker")["date"].diff().dt.days
    out = x.groupby("ticker", as_index=False).agg(
        n=("date", "size"),
        start=("date", "min"),
        end=("date", "max"),
        max_gap_days=("gap_days", "max"),
        p50_gap_days=("gap_days", "median"),
    )
    out["span_days"] = (out["end"] - out["start"]).dt.days
    return out

## Quick QA

Always prints:
  - rows, tickers, date range
  - duplicates on (ticker, date)
  - missing rate for open/close (if present)
  - non-positive counts (<=0) for open/close (if present)
  - open/close distribution (p01/p50/p99) (if present)
  - per-ticker observation count distribution (min/median/max)
  - per-ticker gap stats (median of max gaps; global max gap)

For diagnostics DataFrames from `merge_prices_with_yf_replace_outliers(...)`, also prints:
  - base-missing rate, yfinance-available rate
  - outlier counts/rates
  - action counts: kept_or_filled / filled_from_yf / replaced_outlier_with_yf
  - top tickers by outlier count and by replacement count

In [None]:
def report_quality(
    df: pd.DataFrame,
    name: str,
    diag: pd.DataFrame | None = None,
    top_k: int = 10,
) -> None:
    """
    Prints a quick quality snapshot for a price-like dataset.

    :param df: DataFrame to inspect with columns (typically includes ticker/date/open/close).
    :param name: Label shown in the printed report.
    :param diag: Optional diagnostics output from merge_prices_with_yf_replace_outliers(...).
    :param top_k: Number of tickers to show in the "top outliers/replacements" tables.
    :return: None.
    """
    print(f"\n=== {name} ===")

    n = len(df)
    tickers = df["ticker"].nunique() if "ticker" in df else 0
    print("rows:", n, "| tickers:", tickers)

    if "date" in df:
        print("range:", df["date"].min(), "->", df["date"].max())

    if {"ticker", "date"}.issubset(df.columns):
        print("dup(ticker,date):", int(df.duplicated(["ticker", "date"]).sum()))

    if {"open", "close"}.issubset(df.columns):
        miss_open = float(df["open"].isna().mean())
        miss_close = float(df["close"].isna().mean())
        print("missing open:", miss_open, "| missing close:", miss_close)

        nonpos_open = int((df["open"] <= 0).sum(skipna=True))
        nonpos_close = int((df["close"] <= 0).sum(skipna=True))
        if nonpos_open or nonpos_close:
            print("non-positive (<=0) open:", nonpos_open, "| close:", nonpos_close)

        def q(s: pd.Series):
            s = pd.to_numeric(s, errors="coerce").dropna()
            return (np.nan, np.nan, np.nan) if s.empty else (
                s.quantile(0.01),
                s.quantile(0.50),
                s.quantile(0.99),
            )

        o01, o50, o99 = q(df["open"])
        c01, c50, c99 = q(df["close"])
        print("open  (p01/p50/p99):", float(o01), float(o50), float(o99))
        print("close (p01/p50/p99):", float(c01), float(c50), float(c99))

    if {"ticker", "date"}.issubset(df.columns) and n:
        cov = coverage_by_ticker(df)
        if not cov.empty:
            nn = cov["n"]
            print("obs/ticker (min/median/max):", int(nn.min()), int(nn.median()), int(nn.max()))
            mg_med = float(cov["max_gap_days"].median(skipna=True))
            mg_max = float(cov["max_gap_days"].max(skipna=True))
            print("max gap days (median/max):", mg_med, "/", mg_max)

    if diag is not None and not diag.empty:
        print("\n--- diagnostics (merge/fill/outliers) ---")

        nd = len(diag)
        base_present = diag["base_open"].notna() & diag["base_close"].notna()
        yf_avail = diag["yf_open"].notna() & diag["yf_close"].notna()

        n_base_missing = int((~base_present).sum())
        n_yf_avail = int(yf_avail.sum())

        n_out_any = int(diag["outlier_any"].sum())
        n_out_open = int(diag["outlier_open"].sum())
        n_out_close = int(diag["outlier_close"].sum())

        vc = diag["action"].value_counts()
        n_kept = int(vc.get("kept_or_filled", 0))
        n_fill = int(vc.get("filled_from_yf", 0))
        n_repl = int(vc.get("replaced_outlier_with_yf", 0))

        print("rows:", nd, "| tickers:", diag["ticker"].nunique())
        print("base missing rows:", n_base_missing, f"({n_base_missing / max(nd,1):.3%})")
        print("yfinance available rows:", n_yf_avail, f"({n_yf_avail / max(nd,1):.3%})")
        print("outliers open/close/any:", n_out_open, n_out_close, n_out_any, f"({n_out_any / max(nd,1):.3%})")
        print("actions kept/fill/replaced:", n_kept, n_fill, n_repl)

        by = diag.assign(
            base_missing=~base_present,
            yf_available=yf_avail,
            is_filled=diag["action"].eq("filled_from_yf"),
            is_replaced=diag["action"].eq("replaced_outlier_with_yf"),
        ).groupby("ticker", as_index=False).agg(
            n=("date", "size"),
            n_outlier=("outlier_any", "sum"),
            n_replaced=("is_replaced", "sum"),
            n_filled=("is_filled", "sum"),
            base_missing_rate=("base_missing", "mean"),
            yf_available_rate=("yf_available", "mean"),
        )
        by["outlier_rate"] = (by["n_outlier"] / by["n"]).fillna(0.0)
        by["replaced_rate"] = (by["n_replaced"] / by["n"]).fillna(0.0)

        top_out = by.sort_values(["n_outlier", "outlier_rate"], ascending=False).head(top_k)
        top_rep = by.sort_values(["n_replaced", "replaced_rate"], ascending=False).head(top_k)

        print(f"\nTop {top_k} tickers by outlier count:")
        print(top_out[
            ["ticker", "n", "n_outlier", "outlier_rate", "n_replaced", "n_filled", "base_missing_rate"]
        ].to_string(index=False))

        print(f"\nTop {top_k} tickers by replaced-outlier count:")
        print(top_rep[
            ["ticker", "n", "n_replaced", "replaced_rate", "n_outlier", "yf_available_rate"]
        ].to_string(index=False))

# Outliers: log-returns + Tukey fences

Raw prices aren't comparable across tickers (a $5 move means something different for a $2 stock vs $2000 stock); returns normalize that.
Additionally, log-returns play nicely with symmetry: if a price goes 3 -> 4 (simple +33.3%) then back 4 -> 3 (simple -25%), the simple returns don't cancel. 
With log-returns: log(4/3) = -log(3/4) ≈ 0.2877 (≈28.8%), so the two log-returns are equal and opposite and sum to zero.

We do outlier detection per ticker, so each asset gets judged relative to its own behavior.
`IQR_K` controls how aggressive the Tukey fences are (1.5 is classic boxplot; increase to 2.0–3.0 to reduce replacements if needed).

In [None]:
def add_log_returns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Adds per-ticker log prices and log returns for open/close.

    :param df: DataFrame with columns (ticker, date, open, close).
    :return: A copy of df with added columns log_open/log_close and ret_open/ret_close.
    """
    x = df.sort_values(["ticker", "date"]).copy()
    x["log_open"] = np.log(x["open"].where(x["open"] > 0))
    x["log_close"] = np.log(x["close"].where(x["close"] > 0))
    x["ret_open"] = x.groupby("ticker")["log_open"].diff()
    x["ret_close"] = x.groupby("ticker")["log_close"].diff()
    return x


def tukey_outlier_mask(df: pd.DataFrame, value_col: str, k: float = IQR_K) -> pd.Series:
    """
    Flags per-ticker outliers using Tukey fences on a numeric column.

    :param df: DataFrame that includes (ticker) and the column in value_col.
    :param value_col: Name of the numeric column to test (e.g., ret_open).
    :param k: IQR multiplier used to form the lower/upper fences.
    :return: Boolean Series aligned to df indicating outliers.
    """
    g = df.groupby("ticker")[value_col]
    q1 = g.transform(lambda s: s.quantile(0.25))
    q3 = g.transform(lambda s: s.quantile(0.75))
    iqr = q3 - q1
    lo = q1 - k * iqr
    hi = q3 + k * iqr
    x = df[value_col]
    return (x < lo) | (x > hi)

# IO and transformations
This section contains functions to load/transform:
- load raw CSVs
- normalize to `(ticker, date, open, close)`
- filter to `[START, END]`
- turn daily into weekly

Weekly aggregation:
- weekly open = first open of the week
- weekly close = last close of the week

In [None]:
def load_price_dir(price_file: Path) -> pd.DataFrame:
    """
    Loads all CSVs in a directory into one long price DataFrame.

    :param price_file: Directory containing one CSV per ticker (or CSVs with a ticker column).
    :return: DataFrame with columns (ticker, date, open, close) filtered to [START, END].
    :raises RuntimeError: If no usable CSVs are found.
    """
    frames = []
    for f in sorted(price_file.glob("*.csv")):
        d = clean_cols(pd.read_csv(f, keep_default_na=False))
        if "date" not in d.columns:
            continue

        if "ticker" not in d.columns:
            d["ticker"] = f.stem

        d["ticker"] = d["ticker"].astype(str).str.strip()
        d["date"] = to_date(d["date"])

        for c in ("open", "close"):
            if c in d.columns:
                d[c] = pd.to_numeric(d[c], errors="coerce")
                d.loc[d[c] <= 0, c] = np.nan  # treat non-positive as missing
            else:
                d[c] = np.nan

        d = (
            d.dropna(subset=["ticker", "date"])
            .loc[lambda x: x["date"].between(START, END), ["ticker", "date", "open", "close"]]
            .drop_duplicates(["ticker", "date"])
        )
        frames.append(d)

    if not frames:
        raise RuntimeError(f"No usable CSVs in {price_file}")

    return pd.concat(frames).sort_values(["ticker", "date"]).reset_index(drop=True)


def daily_to_weekly(df: pd.DataFrame, week_ending: str = "FRI") -> pd.DataFrame:
    """
    Aggregates daily open/close prices to weekly open/close.

    :param df: Daily DataFrame with columns (ticker, date, open, close).
    :param week_ending: Week anchor (e.g., 'FRI' for equities, 'SUN' for crypto).
    :return: Weekly DataFrame with (ticker, date, open, close) where date is the week timestamp.
    """
    x = df.sort_values(["ticker", "date"]).copy()
    x["date"] = x["date"].dt.to_period(f"W-{week_ending}").dt.to_timestamp()
    w = x.groupby(["ticker", "date"], as_index=False).agg(open=("open", "first"), close=("close", "last"))
    return w.dropna(subset=["open", "close"]).sort_values(["ticker", "date"]).reset_index(drop=True)


def load_splits(split_file: Path) -> pd.DataFrame:
    """
    Loads split events into (ticker, date, split_factor).

    :param split_file: CSV containing split events with at least (date, symbol, stock splits).
    :return: DataFrame with columns (ticker, date, split_factor) filtered to [START, END].
    """
    sp = clean_cols(pd.read_csv(split_file, keep_default_na=False))
    sp = sp.rename(columns={"symbol": "ticker", "stock splits": "split_factor"})
    sp["date"] = to_date(sp["date"]).dt.normalize()
    sp["split_factor"] = pd.to_numeric(sp["split_factor"], errors="coerce")
    sp = sp.dropna(subset=["ticker", "date", "split_factor"])
    sp = sp[sp["date"].between(START, END) & (sp["split_factor"] != 0)]
    return sp[["ticker", "date", "split_factor"]]

## Split adjustment: only when we see the "split jump"

Some tickers may already have splits adjusted in the base feed while others may not.
We apply splits only if the observed jump around the split date matches the split factor (within `TOL`).
This avoids "double-adjusting" when the base feed is already clean.

In [None]:
def adjust_splits_if_needed(prices: pd.DataFrame, splits: pd.DataFrame, tol: float = TOL) -> tuple[pd.DataFrame, int]:
    """
    Applies split adjustments only when an unadjusted split jump is detected.

    :param prices: Daily prices with columns (ticker, date, open, close).
    :param splits: Split events with columns (ticker, date, split_factor).
    :param tol: Relative tolerance used to confirm the observed jump matches split_factor.
    :return: (adjusted_prices_df, n_applied_events).
    """
    out = []
    applied = 0

    prices = prices[["ticker", "date", "open", "close"]].copy()
    splits = splits[["ticker", "date", "split_factor"]].copy()

    for t, dft in prices.groupby("ticker", sort=False):
        spt = splits[splits["ticker"] == t].sort_values("date")
        if spt.empty or len(dft) < 2:
            out.append(dft.sort_values("date"))
            continue

        dft = dft.sort_values("date").copy()
        dft["prev_close"] = dft["close"].shift(1)

        left = spt.rename(columns={"date": "split_date"}).sort_values("split_date")
        right = dft[["date", "close", "prev_close"]].rename(columns={"date": "trade_date"}).sort_values("trade_date")

        chk = pd.merge_asof(
            left,
            right,
            left_on="split_date",
            right_on="trade_date",
            direction="forward",
            allow_exact_matches=True,
        )

        chk = chk.dropna(subset=["trade_date", "close", "prev_close", "split_factor"])
        if chk.empty:
            out.append(dft.drop(columns="prev_close"))
            continue

        ratio = chk["prev_close"] / chk["close"]
        ok = ((ratio - chk["split_factor"]).abs() / chk["split_factor"].abs()) <= tol

        confirmed = (
            chk.loc[ok, ["trade_date", "split_factor"]]
            .groupby("trade_date", as_index=False)["split_factor"]
            .prod()
            .sort_values("trade_date", ascending=False)
        )

        if confirmed.empty:
            out.append(dft.drop(columns="prev_close"))
            continue

        for dt, f in confirmed.itertuples(index=False):
            dft.loc[dft["date"] < dt, ["open", "close"]] = dft.loc[dft["date"] < dt, ["open", "close"]] / float(f)

        applied += len(confirmed)
        out.append(dft.drop(columns="prev_close"))

    prices_adj = pd.concat(out, ignore_index=True).sort_values(["ticker", "date"]).reset_index(drop=True)
    return prices_adj, applied

# yfinance enrichment

We use yfinance as a "backup feed":
- fill missing values
- replace outlier days (optional, but enabled here)

In [None]:
def yf_fetch_open_close(
    tickers: list[str],
    start: pd.Timestamp = START,
    end: pd.Timestamp = EFFECTIVE_END
) -> pd.DataFrame:
    """
    Fetches daily Open/Close from yfinance for the given tickers.

    :param tickers: List of yfinance tickers (e.g., ['AAPL', 'BTC-USD']).
    :param start: Start date (inclusive).
    :param end: End date (inclusive for this pipeline; requested as end+1 day to yfinance).
    :return: DataFrame with columns (ticker, date, yf_open, yf_close).
    """
    if not tickers:
        return pd.DataFrame(columns=["ticker", "date", "yf_open", "yf_close"])

    data = yf.download(
        tickers=tickers,
        start=start.strftime("%Y-%m-%d"),
        end=(end + pd.Timedelta(days=1)).strftime("%Y-%m-%d"),  # end exclusive
        auto_adjust=True,
        group_by="ticker",
        progress=True,
        threads=False,
    )

    if data is None or data.empty:
        return pd.DataFrame(columns=["ticker", "date", "yf_open", "yf_close"])

    syms = sorted(set(data.columns.get_level_values(0)))
    frames = [
        data[sym][["Open", "Close"]]
        .rename(columns={"Open": "yf_open", "Close": "yf_close"})
        .reset_index()
        .assign(ticker=sym)
        for sym in syms
    ]

    y = pd.concat(frames, ignore_index=True).rename(columns={"Date": "date"})
    y["date"] = to_date(y["date"])
    y["yf_open"] = pd.to_numeric(y["yf_open"], errors="coerce").round(5)
    y["yf_close"] = pd.to_numeric(y["yf_close"], errors="coerce").round(5)

    return y.dropna(subset=["ticker", "date", "yf_open", "yf_close"])[["ticker", "date", "yf_open", "yf_close"]]

## Merge logic + outlier replacement

We do a full outer merge of base + yfinance, then:
1) fill missing base values from yfinance
2) detect outliers on base-only values
3) if `replace_outliers_yf=True`, overwrite those outlier days with yfinance values (when available)

In [None]:
def merge_prices_with_yf_replace_outliers(
    base: pd.DataFrame,
    yf_df: pd.DataFrame,
    replace_outliers_yf: bool = True,
    tukey_k: float = IQR_K,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Merges base prices with yfinance, fills missing, and optionally replaces outliers.

    :param base: Base DataFrame with columns (ticker, date, open, close).
    :param yf_df: yfinance DataFrame with columns (ticker, date, yf_open, yf_close).
    :param replace_outliers_yf: If True, replace outlier rows with yfinance when available.
    :param tukey_k: IQR multiplier for Tukey outlier fences on per-ticker log-returns.
    :return: (final_prices_df, diagnostics_df).
    """
    m = (
        base.merge(yf_df, on=["ticker", "date"], how="outer")
        .sort_values(["ticker", "date"])
        .reset_index(drop=True)
    )

    # Preserve originals (and detect outliers on BASE only)
    m["base_open"] = m["open"]
    m["base_close"] = m["close"]

    # Fill missing from yfinance
    m["open"] = m["open"].combine_first(m["yf_open"])
    m["close"] = m["close"].combine_first(m["yf_close"])

    # Outliers (BASE-only)
    m["outlier_open"] = False
    m["outlier_close"] = False

    base_ok = m["base_open"].notna() & m["base_close"].notna()
    if base_ok.any():
        b = m.loc[base_ok, ["ticker", "date", "base_open", "base_close"]].rename(
            columns={"base_open": "open", "base_close": "close"}
        )
        b = add_log_returns(b)

        b_open = b.dropna(subset=["ret_open"])
        b_close = b.dropna(subset=["ret_close"])

        if not b_open.empty:
            m.loc[b_open.index, "outlier_open"] = tukey_outlier_mask(b_open, "ret_open", k=tukey_k).values
        if not b_close.empty:
            m.loc[b_close.index, "outlier_close"] = tukey_outlier_mask(b_close, "ret_close", k=tukey_k).values

    m["outlier_any"] = m["outlier_open"] | m["outlier_close"]

    # Actions (for QA)
    m["action"] = "kept_or_filled"
    filled = m["base_open"].isna() & m["yf_open"].notna() & m["yf_close"].notna()
    m.loc[filled, "action"] = "filled_from_yf"

    if replace_outliers_yf:
        repl = m["outlier_any"] & m["yf_open"].notna() & m["yf_close"].notna()
        m.loc[repl, ["open", "close"]] = np.c_[m.loc[repl, "yf_open"], m.loc[repl, "yf_close"]]
        m.loc[repl, "action"] = "replaced_outlier_with_yf"
    else:
        m = m.loc[~m["outlier_any"]].copy()

    # Final cleaning
    m = (
        m.dropna(subset=["ticker", "date"])
        .loc[m["date"].between(START, END)]
        .dropna(subset=["open", "close"])
        .drop_duplicates(["ticker", "date"])
    )
    m[["open", "close"]] = m[["open", "close"]].round(5)

    final = m[["ticker", "date", "open", "close"]].sort_values(["ticker", "date"]).reset_index(drop=True)

    diag = m[
        [
            "ticker", "date",
            "base_open", "base_close",
            "yf_open", "yf_close",
            "open", "close",
            "outlier_open", "outlier_close", "outlier_any",
            "action",
        ]
    ].sort_values(["ticker", "date"]).reset_index(drop=True)

    return final, diag

# Metadata
Useful for joining sectors/industries, or using addresses later.

In [None]:
def load_meta(meta_file: Path, addr_file: Path) -> pd.DataFrame:
    """
    Loads NASDAQ metadata and merges it with an address file.

    :param meta_file: NASDAQ screener CSV path (expects a symbol column).
    :param addr_file: Company address CSV path (expects ticker; address optional).
    :return: Metadata DataFrame keyed by ticker.
    :raises RuntimeError: If the address file has no ticker column.
    """
    meta = clean_cols(pd.read_csv(meta_file, keep_default_na=False)).rename(columns={"symbol": "ticker"})
    meta = meta.drop_duplicates("ticker")

    addr = clean_cols(pd.read_csv(addr_file, keep_default_na=False))
    if "ticker" not in addr.columns:
        raise RuntimeError("Address file must have column 'ticker'.")
    if "address" not in addr.columns:
        addr["address"] = np.nan

    addr = addr.drop_duplicates("ticker")[["ticker", "address"]]
    return meta.merge(addr, on="ticker", how="left").reset_index(drop=True)


def yf_symbol_crypto(t: str) -> str:
    """
    Converts a crypto ticker to a yfinance USD pair symbol.

    :param t: Plain crypto ticker (e.g., 'BTC') or already-qualified (e.g., 'BTC-USD').
    :return: yfinance symbol (e.g., 'BTC-USD').
    """
    return t if "-" in t else f"{t}-USD"

# Run the pipeline (extract -> clean -> enrich -> aggregate)

This section produces:
- NASDAQ daily/weekly (split check + yfinance fill/replacement)
- Crypto daily/weekly (yfinance mapping and fill/replacement)
- Optional EURUSD (for FX normalization)

In [None]:
df_nasdaq_meta = load_meta(RAW / "nasdaq_screener.csv", RAW / "nasdaq_company_addresses.csv")

# ------------------ NASDAQ ------------------

nasdaq = load_price_dir(RAW / "nasdaq-daily")
splits = load_splits(RAW / "splits_2000_2025.csv")

nasdaq, n_adj = adjust_splits_if_needed(nasdaq, splits, tol=TOL)
report_quality(nasdaq, "NASDAQ daily (after split-check, before yfinance)")

all_n = sorted(nasdaq["ticker"].unique())
yf_n = yf_fetch_open_close(all_n, START, EFFECTIVE_END)

df_nasdaq_daily, diag_n = merge_prices_with_yf_replace_outliers(
    nasdaq, yf_n, replace_outliers_yf=True, tukey_k=IQR_K
)
df_nasdaq_weekly = daily_to_weekly(df_nasdaq_daily, week_ending="FRI")

# ------------------ Crypto ------------------

crypto = load_price_dir(RAW / "crypto-daily")
report_quality(crypto, "Crypto daily (before yfinance)")

all_c = sorted(crypto["ticker"].unique())
sym_map = {t: yf_symbol_crypto(t) for t in all_c}
inv_map = {v: k for k, v in sym_map.items()}

yf_c = yf_fetch_open_close(list(sym_map.values()), START, EFFECTIVE_END)
if not yf_c.empty:
    yf_c["ticker"] = yf_c["ticker"].map(inv_map).fillna(yf_c["ticker"])

df_crypto_daily, diag_c = merge_prices_with_yf_replace_outliers(
    crypto, yf_c, replace_outliers_yf=True, tukey_k=IQR_K
)
df_crypto_weekly = daily_to_weekly(df_crypto_daily, week_ending="SUN")

# ------------------ EURUSD from yfinance ------------------

eurusd_raw = yf_fetch_open_close(["EURUSD=X"], START, EFFECTIVE_END)
df_eurusd_daily = (
    eurusd_raw.rename(columns={"yf_open": "open", "yf_close": "close"})
    .assign(ticker="EURUSD")[["ticker", "date", "open", "close"]]
    .sort_values(["ticker", "date"])
    .reset_index(drop=True)
)
df_eurusd_weekly = daily_to_weekly(df_eurusd_daily, week_ending="FRI")

In [None]:
# ------------------ quality + save ------------------

report_quality(df_nasdaq_daily, "df_nasdaq_daily (final)", diag=diag_n)
report_quality(df_nasdaq_weekly, "df_nasdaq_weekly (final)")
report_quality(df_crypto_daily, "df_crypto_daily (final)", diag=diag_c)
report_quality(df_crypto_weekly, "df_crypto_weekly (final)")
report_quality(df_eurusd_daily, "df_eurusd_daily (final)")
report_quality(df_eurusd_weekly, "df_eurusd_weekly (final)")
report_quality(df_nasdaq_meta, "df_nasdaq_meta (final)")

# Required file names
df_nasdaq_daily.to_csv(OUT / "df_nasdaq_daily.csv", index=False)
df_nasdaq_weekly.to_csv(OUT / "df_nasdaq_weekly.csv", index=False)
df_crypto_daily.to_csv(OUT / "df_crypto_daily.csv", index=False)
df_crypto_weekly.to_csv(OUT / "df_crypto_weekly.csv", index=False)
df_nasdaq_meta.to_csv(OUT / "df_nasdaq_meta.csv", index=False)

# Optional (FX)
df_eurusd_daily.to_csv(OUT / "df_eurusd_daily.csv", index=False)
df_eurusd_weekly.to_csv(OUT / "df_eurusd_weekly.csv", index=False)

print("\nSaved to:", OUT.resolve())
print("Splits adjusted (unadjusted jumps detected):", n_adj)
print("EFFECTIVE_END used for yfinance:", EFFECTIVE_END.date())

## Graphs: yfinance corrections (fills + outlier replacements)

These plots are a quick visual QA of the "merge/fill/outlier-replacement" step.

- **fill** = base `open/close` missing -> filled from yfinance  
- **replace** = base present but flagged as log-return outlier -> overwritten with yfinance (when available)

### Top 50 tickers by corrections (counts)
Stacked bars show, per ticker:
- `repl_n`: number of replaced outlier days  
- `fill_n`: number of filled missing days  

### Per-ticker drilldown (interactive)
Dropdown lets you inspect a single ticker:
- **final_close** (solid): the saved output  
- **base_close** (dotted): original raw series  
- **yf_close** (dashed): yfinance reference  
- markers highlight **filled** and **replaced** days  

### Corrections over time (weekly)
Weekly counts of:
- `fills` (missing -> yfinance)  
- `repls` (outlier -> yfinance)

In [None]:
import plotly.express as px
import plotly.graph_objects as go
import ipywidgets as w
from IPython.display import display

In [None]:
sum_n = (
    diag_n.groupby("ticker", as_index=False)
    .agg(
        n=("date", "size"),
        fill_n=("action", lambda s: (s == "filled_from_yf").sum()),
        repl_n=("action", lambda s: (s == "replaced_outlier_with_yf").sum()),
    )
    .sort_values(["repl_n", "fill_n"], ascending=False)
)

fig = px.bar(sum_n.head(50), x="ticker", y=["repl_n", "fill_n"],
             title="Top 50 tickers by corrections (counts)")
fig.update_layout(barmode="stack", xaxis_tickangle=-45)
fig.show()

In [None]:
tickers = sorted(sum_n["ticker"].tolist())
dd = w.Dropdown(options=tickers, description="Ticker:")
out = w.Output()

def draw_ticker(_=None):
    t = dd.value
    d_final = df_nasdaq_daily.query("ticker == @t").sort_values("date")
    d_diag  = diag_n.query("ticker == @t").sort_values("date")

    fill = d_diag["action"].eq("filled_from_yf")
    repl = d_diag["action"].eq("replaced_outlier_with_yf")

    fig = go.Figure()
    fig.add_trace(go.Scattergl(x=d_final["date"], y=d_final["close"], name="final_close", mode="lines"))
    fig.add_trace(go.Scattergl(x=d_diag["date"],  y=d_diag["base_close"], name="base_close", mode="lines",
                               line=dict(dash="dot"), opacity=0.6))
    fig.add_trace(go.Scattergl(x=d_diag["date"],  y=d_diag["yf_close"], name="yf_close", mode="lines",
                               line=dict(dash="dash"), opacity=0.4))
    fig.add_trace(go.Scattergl(x=d_diag.loc[fill, "date"], y=d_diag.loc[fill, "close"],
                               name="filled_from_yf", mode="markers",
                               marker=dict(size=5, color="orange")))
    fig.add_trace(go.Scattergl(x=d_diag.loc[repl, "date"], y=d_diag.loc[repl, "close"],
                               name="replaced_outlier_with_yf", mode="markers",
                               marker=dict(size=6, symbol="x", color="red")))

    fig.update_layout(
        title=f"{t} — close (final/base/yf + corrections)",
        height=550,
        legend=dict(orientation="h"),
    )

    with out:
        out.clear_output(wait=True)
        fig.show()

dd.observe(draw_ticker, names="value")
display(w.VBox([dd, out]))
draw_ticker()

In [None]:
t = (
    diag_n.assign(bucket=diag_n["date"].dt.to_period("W").dt.to_timestamp())
    .groupby("bucket", as_index=False)
    .agg(
        fills=("action", lambda s: (s == "filled_from_yf").sum()),
        repls=("action", lambda s: (s == "replaced_outlier_with_yf").sum()),
    )
)

px.line(t, x="bucket", y=["fills", "repls"], title="Corrections over time (weekly)").show()