In [1]:
from pathlib import Path

import numpy as np
import pandas as pd

In [2]:
data_dir = Path.cwd().parent / "data"
df_factors = pd.read_csv(data_dir / "factors.csv")

In [3]:
factor_rename = {
    "ret_geo": "W",
    "vol_36m": "L",
    "value": "V",
    "investment": "C",
    "profitability": "R",
}
df_factors = df_factors.rename(columns=factor_rename)

### Helpers

In [4]:
def yearly_z_score(df, cols, date_col="date"):
    years = df[date_col]

    for c in cols:
        if c not in df.columns:
            raise KeyError(f"{c} not in DataFrame")

        def _z(s):
            std = s.std(ddof=0)
            if std == 0 or np.isnan(std):
                return pd.Series(0.0, index=s.index)
            return (s - s.mean()) / std

        df["z_" + c] = df.groupby(years)[c].transform(_z)
    return df


def yearly_rank_score(df, cols, date_col="date"):
    years = df[date_col]

    for c in cols:
        if c not in df.columns:
            raise KeyError(f"{c} not in DataFrame")

        def _rank01(s):
            n = s.notna().sum()
            if n <= 1:
                return pd.Series(0.0, index=s.index)
            r = s.rank(method="average")
            return (r - 1) / (n - 1)

        df["rank_" + c] = df.groupby(years)[c].transform(_rank01)
    return df


def yearly_score(df, cols, method, date_col="date"):
    if method == "z":
        return yearly_z_score(df, cols, date_col=date_col)
    elif method == "rank":
        return yearly_rank_score(df, cols, date_col=date_col)
    else:
        raise ValueError("method must be 'z' or 'rank'")


def append_avg_score(df, cols, method):
    if method == "z":
        prefix = "z_"
    elif method == "rank":
        prefix = "rank_"
    else:
        raise ValueError("method must be 'z' or 'rank'")

    score_cols = [prefix + c for c in cols]
    for c in score_cols:
        if c not in df.columns:
            raise KeyError(f"{c} not in DataFrame")

    df[f"avg_{method}_score"] = df[score_cols].mean(axis=1)
    return df


def append_portfolio_weights(
    df, cols, method, p, mkt_cap_col="market_cap", date_col="date"
):
    if method == "z":
        prefix = "z_"
    elif method == "rank":
        prefix = "rank_"
    else:
        raise ValueError("method must be 'z' or 'rank'")

    years = df[date_col]

    for c in cols:
        score_col = prefix + c
        if score_col not in df.columns:
            raise KeyError(f"{score_col} not in DataFrame")

        # Calculate quantile threshold per year
        thresholds = df.groupby(years)[score_col].transform(lambda x: x.quantile(1 - p))

        # Identify companies in top percentile
        mask = (df[score_col] >= thresholds) & (df[score_col].notna())

        # Calculate total market cap of selected companies per year
        masked_mkt_cap = df[mkt_cap_col].where(mask, 0.0)
        yearly_total_cap = masked_mkt_cap.groupby(years).transform("sum")

        # Calculate weights
        weights = masked_mkt_cap / yearly_total_cap
        df[f"weight_{score_col}"] = weights.fillna(0.0)

    return df