# Data Pipeline
Load fundamentals, download prices, and build the modeling panel.


In [None]:
def load_compustat_factors(compustat_csv_path):
    raw_fundamentals = pd.read_csv(compustat_csv_path)
    raw_fundamentals = normalize_column_names(raw_fundamentals)
    validate_required_columns(raw_fundamentals, REQUIRED_COMPUSTAT_COLUMNS)

    fundamentals = (
        raw_fundamentals[REQUIRED_COMPUSTAT_COLUMNS]
        .rename(columns={"tic": "ticker"})
        .copy()
    )

    fundamentals[NUMERIC_COMPUSTAT_COLUMNS] = fundamentals[NUMERIC_COMPUSTAT_COLUMNS].apply(
        pd.to_numeric,
        errors="coerce",
    )

    fundamentals = fundamentals.dropna(
        subset=["ticker", "fyear", "revenue", "net_income", "stockholders_equity"]
    )

    fundamentals["ticker"] = fundamentals["ticker"].astype(str).str.replace(".", "-", regex=False)
    fundamentals = fundamentals.sort_values(["ticker", "fyear"]).reset_index(drop=True)

    fundamentals["revenue_growth"] = fundamentals.groupby("ticker")["revenue"].pct_change()
    fundamentals["roe"] = fundamentals["net_income"] / fundamentals["stockholders_equity"]
    fundamentals["availability_date"] = pd.to_datetime(
        (fundamentals["fyear"].astype(int) + 1).astype(str) + "-01-01"
    )

    factor_dataframe = fundamentals[["ticker", "availability_date", "revenue_growth", "roe"]].copy()
    ticker_universe = sorted(factor_dataframe["ticker"].dropna().unique())
    return factor_dataframe, ticker_universe


In [None]:
def download_adjusted_close_prices(ticker_universe, start_date, end_date, use_threads=False):
    if not ticker_universe:
        raise ValueError("Ticker universe is empty.")

    downloaded_price_chunks = []
    for ticker_chunk in split_into_chunks(ticker_universe, PRICE_DOWNLOAD_CHUNK_SIZE):
        try:
            chunk_prices = yf.download(
                ticker_chunk,
                start=start_date,
                end=end_date,
                auto_adjust=False,
                group_by="column",
                progress=False,
                threads=use_threads,
            )
        except Exception as error:
            print(f"Warning: download failed for chunk starting {ticker_chunk[0]}: {error}")
            continue

        if chunk_prices is None or chunk_prices.empty:
            print(f"Warning: empty data for chunk starting {ticker_chunk[0]}")
            continue

        downloaded_price_chunks.append(chunk_prices)

    if not downloaded_price_chunks:
        raise RuntimeError("No price data returned from Yahoo Finance.")

    merged_prices = pd.concat(downloaded_price_chunks, axis=1)
    merged_prices = merged_prices.loc[:, ~merged_prices.columns.duplicated()]
    adjusted_close_prices = extract_price_field(merged_prices).dropna(axis=1, how="all")

    if adjusted_close_prices.empty:
        raise RuntimeError("Adjusted close matrix is empty after cleaning.")

    return adjusted_close_prices


In [None]:
def build_monthly_feature_panel(adjusted_close_prices):
    daily_returns = adjusted_close_prices.pct_change()
    trailing_volatility_60d = daily_returns.rolling(window=60).std().shift(1)

    monthly_close_prices = adjusted_close_prices.resample("ME").last()
    monthly_close_prices.index.name = "month_end"
    monthly_close_prices.columns.name = "ticker"

    monthly_volatility = trailing_volatility_60d.reindex(monthly_close_prices.index, method="ffill")
    monthly_volatility.index.name = "month_end"
    monthly_volatility.columns.name = "ticker"

    momentum_6_12 = (monthly_close_prices.shift(6) / monthly_close_prices.shift(12)) - 1
    momentum_6_12.index.name = "month_end"
    momentum_6_12.columns.name = "ticker"

    one_month_forward_return = (monthly_close_prices.shift(-1) / monthly_close_prices) - 1
    one_month_forward_return.index.name = "month_end"
    one_month_forward_return.columns.name = "ticker"

    monthly_panel = (
        monthly_close_prices.stack().rename("adj_close").to_frame()
        .join(monthly_volatility.stack().rename("vol_60"))
        .join(momentum_6_12.stack().rename("momentum_6_12"))
        .join(one_month_forward_return.stack().rename("target_return"))
        .reset_index()
        .sort_values(["ticker", "month_end"])
        .reset_index(drop=True)
    )
    return monthly_panel


In [None]:
def merge_features_with_fundamentals(monthly_panel, factor_dataframe):
    sorted_monthly_panel = monthly_panel.sort_values(["month_end", "ticker"]).reset_index(drop=True)
    sorted_factor_dataframe = factor_dataframe.sort_values(["availability_date", "ticker"]).reset_index(drop=True)

    modeling_panel = pd.merge_asof(
        sorted_monthly_panel,
        sorted_factor_dataframe,
        left_on="month_end",
        right_on="availability_date",
        by="ticker",
        direction="backward",
    )

    required_model_columns = MODEL_FEATURE_COLUMNS + ["target_return"]
    modeling_panel = modeling_panel.dropna(subset=required_model_columns)
    modeling_panel = modeling_panel.sort_values(["ticker", "month_end"]).reset_index(drop=True)

    if modeling_panel.empty:
        raise ValueError("Modeling panel is empty after merging and filtering.")

    return modeling_panel


In [None]:
def summarize_modeling_panel(modeling_panel, minimum_observations_per_month=100):
    print(f"Rows in final panel: {len(modeling_panel)}")
    print(f"Unique months: {modeling_panel['month_end'].nunique()}")
    print("Missing values by column:")
    print(modeling_panel.isna().sum())

    monthly_observation_counts = modeling_panel["month_end"].value_counts().sort_index()
    low_count_months = monthly_observation_counts[
        monthly_observation_counts < minimum_observations_per_month
    ]

    if not low_count_months.empty:
        print(f"Warning: months with fewer than {minimum_observations_per_month} observations:")
        print(low_count_months)

    return modeling_panel.describe()


In [None]:
def run_data_pipeline(
    compustat_csv_path,
    fixed_end_date=None,
    price_cache_path=None,
    force_refresh=False,
    download_threads=False,
):
    import os

    factor_dataframe, ticker_universe = load_compustat_factors(compustat_csv_path)

    first_fundamental_date = factor_dataframe["availability_date"].min()
    download_start_date = (first_fundamental_date - pd.Timedelta(days=LOOKBACK_BUFFER_DAYS)).date()
    download_end_date = (
        pd.Timestamp.today().date()
        if fixed_end_date is None
        else pd.Timestamp(fixed_end_date).date()
    )

    print(f"Compustat tickers: {len(ticker_universe)}")
    print(f"Price window: {download_start_date} to {download_end_date}")

    use_cache = bool(price_cache_path) and os.path.exists(price_cache_path) and not force_refresh
    if use_cache:
        adjusted_close_prices = pd.read_pickle(price_cache_path)
        adjusted_close_prices.index = pd.to_datetime(adjusted_close_prices.index)
        adjusted_close_prices.columns.name = "ticker"
        print(f"Loaded cached prices from {price_cache_path}")
    else:
        adjusted_close_prices = download_adjusted_close_prices(
            ticker_universe,
            download_start_date,
            download_end_date,
            use_threads=download_threads,
        )

        if price_cache_path:
            os.makedirs(os.path.dirname(price_cache_path), exist_ok=True)
            adjusted_close_prices.to_pickle(price_cache_path)
            print(f"Saved price cache to {price_cache_path}")

    missing_tickers = sorted(set(ticker_universe) - set(adjusted_close_prices.columns))
    if missing_tickers:
        print(f"Warning: Missing price data for {len(missing_tickers)} tickers: {missing_tickers[:10]}")

    monthly_panel = build_monthly_feature_panel(adjusted_close_prices)
    modeling_panel = merge_features_with_fundamentals(monthly_panel, factor_dataframe)

    print(f"Downloaded/loaded adjusted-close prices for {adjusted_close_prices.shape[1]} tickers")
    print(f"Panel rows (pre-fundamentals): {len(monthly_panel)}")
    print(f"Panel rows (final): {len(modeling_panel)}")
    print(
        "Date range:",
        modeling_panel["month_end"].min().date(),
        "to",
        modeling_panel["month_end"].max().date(),
    )

    return {
        "factor_dataframe": factor_dataframe,
        "ticker_universe": ticker_universe,
        "adjusted_close_prices": adjusted_close_prices,
        "monthly_panel": monthly_panel,
        "modeling_panel": modeling_panel,
    }
