In [None]:
# ===========================
# Imports
# ===========================
from __future__ import annotations

import json
from typing import Dict, List

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import skew, kurtosis
from sklearn.linear_model import ElasticNetCV


# ===========================
# CONFIG — edit these paths
# ===========================
fund_id        = "Zimmer Utilities"  # column name in fund_returns.csv
benchmark_name = "N/A"
description    = (
    "Long/Short Equity Energy & Infrastructure hedge fund focusing on "
    "U.S. utilities, telecoms, oilfield services, renewables."
)

# Replace with your actual locations
fund_path   = r"C:\Users\...\Qlib Data\fund_returns.csv"
factor_path = r"C:\Users\...\Qlib Data\factor_returns.csv"
meta_path   = r"C:\Users\...\Qlib Data\factors_meta.json"


# ============================================================
# 1) LOAD & CLEAN DATA
# ============================================================
# The fund CSV and factor CSV are expected at monthly frequency.
# We coerce their dates, convert % strings to decimal floats, and align them.

fund_df   = pd.read_csv(fund_path)
factor_df = pd.read_csv(factor_path)
factors_meta: Dict[str, Dict] = json.load(open(meta_path))

# --- Fund data: parse dates & convert percent strings to decimals ---
fund_df["datetime"] = pd.to_datetime(fund_df["datetime"], dayfirst=True, errors="coerce")
fund_df[fund_id] = (
    fund_df[fund_id]
    .astype(str)                # ensure string for safe replace
    .str.replace("%", "", regex=False)
    .astype(float) / 100.0      # convert % to decimal (e.g., 0.85% -> 0.0085)
)

# --- Factor data: parse dates & convert all factor columns to decimals ---
factor_df["datetime"] = pd.to_datetime(factor_df["datetime"], dayfirst=True, errors="coerce")
for col in factor_df.columns.drop("datetime"):
    factor_df[col] = (
        factor_df[col].astype(str).str.replace("%", "", regex=False).astype(float) / 100.0
    )

# Use datetime index for clean slicing / alignment
factor_df.set_index("datetime", inplace=True)
factor_wide = factor_df.sort_index()

# The target fund series (monthly decimals)
fund_series = (
    fund_df
    .set_index("datetime")[fund_id]
    .sort_index()
)

# --- Align both datasets on overlapping dates to avoid lookahead/holes ---
common_dates = factor_wide.index.intersection(fund_series.index)
if len(common_dates) == 0:
    raise ValueError(
        "No overlapping dates between fund and factor files.\n"
        f"Fund date range:   {fund_series.index.min()} → {fund_series.index.max()}\n"
        f"Factor date range: {factor_wide.index.min()} → {factor_wide.index.max()}"
    )

fund_series_aligned = fund_series.loc[common_dates]
factor_wide         = factor_wide.loc[common_dates]

# Normalize all indices to month-end for consistent monthly math/plots
fund_series.index         = fund_series.index.to_period("M").to_timestamp("M")
fund_series_aligned.index = fund_series_aligned.index.to_period("M").to_timestamp("M")
factor_wide.index         = factor_wide.index.to_period("M").to_timestamp("M")

print(
    f"Loaded fund data ({len(fund_series)} months) "
    f"{fund_series.index.min():%Y-%m} → {fund_series.index.max():%Y-%m}"
)
print(
    f"Loaded factor data ({len(factor_wide)} months aligned) "
    f"{factor_wide.index.min():%Y-%m} → {factor_wide.index.max():%Y-%m}"
)


# ============================================================
# 2) ROLLING WINDOW GENERATOR
# ============================================================
def rolling_windows(
    dates: pd.Index,
    lookback_months: int,
    test_horizon_months: int,
):
    """
    Yield (train_start, train_end, test_start, test_end) windows.

    • 'dates' is monthly (we coerce to unique month-end stamps).
    • Example with lookback=36, horizon=1:
        [train: 36 months up to t-1], [test: month t]
    """
    months = pd.to_datetime(sorted(set(pd.Period(d, "M").to_timestamp("M") for d in dates)))
    for i in range(lookback_months, len(months) - test_horizon_months + 1):
        train_start = months[i - lookback_months]
        train_end   = months[i - 1]
        test_start  = months[i]
        test_end    = months[i + test_horizon_months - 1]
        yield train_start, train_end, test_start, test_end


# ============================================================
# 3) ROLLING ELASTIC NET REGRESSIONS (EXPOSURES & CONTRIBUTIONS)
# ============================================================
# Why Elastic Net?
# - L1+L2 regularization stabilizes exposures when factors are correlated.
# - Cross-validation prevents overfitting and picks the best penalty balance.

train_set_window = 36   # months used for model training
test_set_window  = 1    # month ahead for out-of-sample validation

roll_dates = list(
    rolling_windows(
        fund_series_aligned.index,
        lookback_months=train_set_window,
        test_horizon_months=test_set_window
    )
)

# If there aren’t enough data points for rolling windows, fall back to full period
if len(roll_dates) == 0:
    print("⚠️  Not enough data for rolling; using full sample period instead.")
    roll_dates = [(
        fund_series_aligned.index.min(),
        fund_series_aligned.index.max(),
        fund_series_aligned.index.min(),
        fund_series_aligned.index.max()
    )]

# We will store exposure estimates and factor contributions for each test window
exposure_rows: List[Dict] = []
contrib_rows:  List[pd.Series] = []

for tr_start, tr_end, te_start, te_end in roll_dates:
    # Slice train and test windows
    y_train = fund_series_aligned.loc[tr_start:tr_end]
    X_train = factor_wide.loc[tr_start:tr_end]
    y_test  = fund_series_aligned.loc[te_start:te_end]
    X_test  = factor_wide.loc[te_start:te_end]

    if len(X_train) < 12 or len(y_test) < 1:
        continue

    # Elastic Net cross-validation: small CV folds due to limited monthly data
    cv_folds = min(5, max(2, len(X_train)//2))
    model = ElasticNetCV(
        cv=cv_folds, max_iter=5000,
        l1_ratio=[0.3, 0.5, 0.8],
        random_state=42
    )
    model.fit(X_train, y_train)

    betas = dict(zip(X_train.columns, model.coef_))
    y_true = (1 + y_test).prod() - 1  # actual compounded return over test horizon

    # Save exposure summary for this test period
    exposure_rows.append({
        "date": pd.to_datetime(te_end),
        "intercept": model.intercept_,
        "y_true": float(y_true),
        **{f: v for f, v in betas.items()}
    })

    # Compute the factor contributions for that period
    contrib = (1 + X_test.mul(pd.Series(betas), axis=1)).prod() - 1
    contrib.name = pd.to_datetime(te_end)
    contrib_rows.append(contrib)


# ============================================================
# 4) GROUP FACTOR CONTRIBUTIONS (BY CATEGORY)
# ============================================================
# This step groups individual factor contributions into categories like:
# "style", "macro", "benchmark", etc., based on factors_meta.

exposure_df = pd.DataFrame(exposure_rows).set_index("date").sort_index()
contrib_df  = pd.DataFrame(contrib_rows).sort_index()

def group_contributions(contrib_df: pd.DataFrame, meta: Dict[str, Dict]) -> pd.DataFrame:
    groups: Dict[str, List[str]] = {}
    for f in contrib_df.columns:
        g = meta.get(f, {}).get("group", "Other")
        groups.setdefault(g, []).append(f)
    return pd.DataFrame({g: contrib_df[cols].sum(axis=1) for g, cols in groups.items()})

grouped = group_contributions(contrib_df, factors_meta)

# Compute residual: fund return minus all explained factor groups
aligned_fund = fund_series_aligned.reindex(grouped.index)
grouped["Residual"] = aligned_fund - grouped.sum(axis=1)

# Convert to PeriodIndex for easy annualization later
grouped.index       = grouped.index.to_period("M")
aligned_fund.index  = aligned_fund.index.to_period("M")


# ============================================================
# 5) PERFORMANCE STATISTICS (CAGR, VOL, SKEW, DRAWDOWN)
# ============================================================
def max_drawdown(series: pd.Series) -> float:
    cum = (1 + series).cumprod()
    peak = cum.cummax()
    dd = cum / peak - 1
    return dd.min()

def stats_block(series: pd.Series) -> Dict[str, float]:
    if len(series) < 3:
        return {}

    # Full-period statistics
    cagr = ((1 + series).prod() ** (12 / len(series))) - 1
    vol  = series.std() * np.sqrt(12)
    rv   = cagr / vol if vol != 0 else np.nan
    skew_ = skew(series)
    kurt_ = kurtosis(series, fisher=False)

    # 3-year rolling stats (if enough data)
    if len(series) >= 36:
        last36 = series[-36:]
        cagr_3y = ((1 + last36).prod() ** (12 / 36)) - 1
        vol_3y  = last36.std() * np.sqrt(12)
        rv_3y   = cagr_3y / vol_3y if vol_3y != 0 else np.nan
    else:
        cagr_3y, vol_3y, rv_3y = cagr, vol, rv

    worst_month = series.min()
    worst_dd    = max_drawdown(series)

    return {
        "Full Ann. Return (%)": cagr * 100,
        "Full Ann. Vol (%)": vol * 100,
        "Full Ret/Vol": rv,
        "Skewness": skew_,
        "Kurtosis": kurt_,
        "3Y Ann. Return (%)": cagr_3y * 100,
        "3Y Ann. Vol (%)": vol_3y * 100,
        "3Y Ret/Vol": rv_3y,
        "Worst Month (%)": worst_month * 100,
        "Max Drawdown (%)": worst_dd * 100
    }

# Evaluate across different components
manager  = fund_series.dropna()
factors  = grouped.get("style", pd.Series(0, index=grouped.index))
bench    = grouped.get("benchmark", pd.Series(0, index=grouped.index))
resid    = grouped["Residual"]

stats_df = pd.DataFrame({
    "Manager":  stats_block(manager),
    "Factors":  stats_block(factors),
    "Benchmark": stats_block(bench),
    "Residual": stats_block(resid),
})


# ============================================================
# 6) R² CALCULATION — HOW WELL DO FACTORS EXPLAIN RETURNS?
# ============================================================
def r2_score_academic(y: pd.Series, yhat: pd.Series) -> float:
    y = y.dropna()
    yhat = yhat.reindex(y.index).fillna(0)
    if len(y) < 3 or y.var() == 0:
        return np.nan
    ssr = ((y - yhat)**2).sum()
    sst = ((y - y.mean())**2).sum()
    return 1 - ssr / sst

explained = grouped.drop(columns="Residual", errors="ignore").sum(axis=1)
aligned_mgr = manager.reindex(explained.index).fillna(0)

r2_full   = r2_score_academic(aligned_mgr, explained)
half      = len(aligned_mgr)//2
r2_first  = r2_score_academic(aligned_mgr.iloc[:half], explained.iloc[:half])
r2_second = r2_score_academic(aligned_mgr.iloc[half:], explained.iloc[half:])

print(f"Full-sample R²: {r2_full:.3f} | First half: {r2_first:.3f} | Second half: {r2_second:.3f}")


# ============================================================
# 7) VISUALIZATION — ATTRIBUTION AND EXPOSURES
# ============================================================

# Convert PeriodIndex → DatetimeIndex for plotting
plot_index = grouped.index.to_timestamp("M")

# --- (1) Cumulative Return Decomposition ---
cum = (1 + grouped).cumprod() - 1
cum.index = plot_index

plt.figure(figsize=(10, 6))
for col in grouped.columns:
    if col != "Residual":
        plt.plot(cum.index, cum[col]*100, label=col, linewidth=2)
if "Residual" in cum.columns:
    plt.plot(cum.index, cum["Residual"]*100, label="Residual", linestyle="--", color="black")
plt.title(f"Cumulative Return Decomposition — {fund_id}")
plt.xlabel("Date"); plt.ylabel("Cumulative Return (%)")
plt.legend(); plt.grid(True, alpha=0.3)
plt.tight_layout(); plt.show()

# --- (2) Average Factor Contribution Bar Chart ---
if not contrib_df.empty:
    avg_contrib = contrib_df.mean().sort_values(ascending=False)
    plt.figure(figsize=(8, 5))
    avg_contrib.plot(kind="bar", color="steelblue")
    plt.axhline(0, color="black")
    plt.title(f"Average Monthly Contribution by Factor — {fund_id}")
    plt.ylabel("Monthly Contribution (decimal)")
    plt.tight_layout(); plt.show()

# --- (3) Explained vs Residual Stacked Bar ---
expl = grouped.drop(columns="Residual", errors="ignore").sum(axis=1)
plt.figure(figsize=(10, 5))
plt.bar(plot_index, expl*100, label="Explained", color="steelblue")
plt.bar(plot_index, grouped["Residual"]*100,
        bottom=expl*100, label="Residual", color="orange")
plt.title(f"Explained vs Residual Monthly Returns — {fund_id}")
plt.ylabel("Monthly Return (%)"); plt.legend(); plt.tight_layout(); plt.show()

# --- (4) Rolling R² (36-month window) ---
rolling_r2 = aligned_mgr.rolling(36).apply(
    lambda x: r2_score_academic(x, explained.loc[x.index]), raw=False
)
plt.figure(figsize=(10, 5))
plt.plot(rolling_r2.index, rolling_r2*100, color="darkgreen", label="Rolling 3Y R²")
plt.axhline(r2_full*100, linestyle="--", color="red", label="Full-sample R²")
plt.title(f"Rolling 3-Year R² — {fund_id}")
plt.ylabel("R² (%)"); plt.legend(); plt.grid(True, alpha=0.3)
plt.tight_layout(); plt.show()

# --- (5) Factor Exposure Heatmap ---
if not exposure_df.empty:
    plt.figure(figsize=(12, 6))
    sns.heatmap(
        exposure_df.drop(columns=["intercept", "y_true"], errors="ignore").T,
        cmap="RdBu_r", center=0, cbar_kws={"label": "Beta"}
    )
    plt.title(f"Rolling Factor Exposures — {fund_id}")
    plt.xlabel("Date"); plt.ylabel("Factor")
    plt.tight_layout(); plt.show()

# --- (6) Average Group Contribution Bar Chart ---
avg_group = grouped.drop(columns="Residual", errors="ignore").mean()
plt.figure(figsize=(8, 5))
avg_group.sort_values().plot(kind="barh", color="steelblue")
plt.axvline(0, color="black")
plt.title(f"Average Contribution by Group — {fund_id}")
plt.xlabel("Average Monthly Contribution")
plt.tight_layout(); plt.show()


# ============================================================
# 8) PRINT SUMMARY TABLES
# ============================================================
print("\n===== PERFORMANCE STATISTICS =====")
print(stats_df.round(2).to_string())

print("\n===== R-SQUARED SUMMARY =====")
print(f"Full: {r2_full:.3f},  1st Half: {r2_first:.3f},  2nd Half: {r2_second:.3f}")
