In [1]:
from pathlib import Path
from typing import Tuple, Dict, Callable

import numpy as np
import pandas as pd
import plotly.graph_objects as go

import plotly.express as pexp
from main import load_prices, log_returns, optimise_cvar, optimize_mean_variance, simulate_top_n

In [2]:
DATA_PATH = Path("stocks_clean_aligned_v2.csv")
# BETA = 0.8
TARGET_RETURN = 1e-5
SHORT_CAP = 0
FREQ = "M"  # monthly
INITIAL_CASH = 1000.0
SHOW_ABS_GT = 1e-10
BASELINES = ["MSFT", "MRK", "GOOGL", "CVX"]
window = 2

In [3]:

def walk_forward(
    prices: pd.DataFrame,
    optimiser: Callable[..., dict],
    init_cash: float = 100.0,
    freq: str = "M",
    label: str = "strategy",
) -> Tuple[pd.Series, pd.DataFrame]:
    """Generic walk‑forward wrapper for any single‑period optimiser."""
    periods = pd.period_range(prices.index.min(), prices.index.max(), freq=freq)

    eq_curve, w_log = [], []
    equity = init_cash

    for i in range(len(periods) - 1):
        # optimise with all info ≤ period end
        R = log_returns(prices.loc[: periods[i].end_time])
        if R.empty:
            continue

        res = optimiser(R)
        w = res["weights"]

        # hold during next period
        hold_px = prices.loc[periods[i].end_time : periods[i + 1].end_time]
        if len(hold_px) < 2:
            continue
        ret = np.dot((hold_px.iloc[-1] / hold_px.iloc[0] - 1), w.fillna(0.0))
        equity *= 1 + ret

        eq_curve.append((periods[i + 1].end_time, equity))
        w_log.append((periods[i + 1].end_time, w))

    equity_ser = pd.Series(
        [v for _, v in eq_curve],
        index=pd.DatetimeIndex([d for d, _ in eq_curve], name="date"),
        name=label,
    )
    weights_df = (
        pd.concat([w.rename(d) for d, w in w_log], axis=1)
        .T.sort_index()
        .rename_axis("date")
    )
    return equity_ser, weights_df


def pct_ret(series: pd.Series) -> pd.Series:
    return series.pct_change().dropna()


def rolling_cvar(ret: pd.Series, beta: float, window: int) -> pd.Series:
    def cvar(x):
        losses = x[x < 0]
        if losses.empty:
            return 0.0
        var = np.quantile(losses, 1 - beta)
        return losses[losses <= var].mean()

    return ret.rolling(window).apply(cvar, raw=False)


def drawdown_curve(equity: pd.Series) -> pd.Series:
    cummax = equity.cummax()
    return (equity - cummax) / cummax



def line_chart(series_dict: Dict[str, pd.Series], title: str, y_title: str):
    fig = go.Figure()
    for name, s in series_dict.items():
        s.fillna(0, inplace=True)
        fig.add_trace(go.Scatter(x=s.index, y=s.values, mode="lines", name=name))
    fig.update_layout(
        title=title,
        xaxis_title="Date",
        yaxis_title=y_title,
        legend_title="Series",
        template="plotly_white",
    )
    fig.show()


def boxplot(returns: Dict[str, pd.Series], title: str):
    fig = go.Figure()
    for name, r in returns.items():
        fig.add_trace(
            go.Box(
                x=r.values,
                name=name,
                boxmean="sd",
                orientation="h",
            )
        )
    fig.update_layout(title=title, template="plotly_white", xaxis_title="Return")
    fig.show()




In [4]:
prices = load_prices(DATA_PATH)


In [5]:
equity_cvar_90, w_cvar_50 = walk_forward(
    prices,
    lambda R: optimise_cvar(R, 0.9, TARGET_RETURN, SHORT_CAP),
    INITIAL_CASH,
    FREQ,
    label="CVaR",
)



In [6]:
equity_cvar_95, w_cvar_95 = walk_forward(
    prices,
    lambda R: optimise_cvar(R, 0.95, TARGET_RETURN, SHORT_CAP),
    INITIAL_CASH,
    FREQ,
    label="CVaR",
)

In [7]:
equity_cvar_80, w_cvar_80 = walk_forward(
    prices,
    lambda R: optimise_cvar(R, 0.8, TARGET_RETURN, SHORT_CAP),
    INITIAL_CASH,
    FREQ,
    label="CVaR",
)

In [8]:


equity_mv, w_mv = walk_forward(
    prices,
    lambda R: optimize_mean_variance(R, TARGET_RETURN, SHORT_CAP),
    INITIAL_CASH,
    FREQ,
    label="MVar",
)

In [9]:
top3_eq = simulate_top_n(prices.resample(FREQ).last(), 3, INITIAL_CASH)

  top3_eq = simulate_top_n(prices.resample(FREQ).last(), 3, INITIAL_CASH)


In [10]:
baselines: Dict[str, pd.Series] = {}
for tkr in BASELINES:
    if tkr not in prices.columns:
        raise KeyError(f"Ticker '{tkr}' not found.")
    px = prices[tkr].sort_index().reindex(equity_cvar_80.index, method="ffill")
    baselines[tkr] = INITIAL_CASH * px / px.iloc[0]
    baselines[tkr].name = tkr

In [11]:
rows = ["CVaR_80", "CVaR_95", "CVaR_90", "MVar", "TOP3"] + list(baselines.keys())
finals = [
    equity_cvar_80.iloc[-1],
    equity_cvar_95.iloc[-1],
    equity_cvar_90.iloc[-1],
    equity_mv.iloc[-1],
    top3_eq.iloc[-1],
] + [s.iloc[-1] for s in baselines.values()]
cumret = [(v / INITIAL_CASH - 1) * 100 for v in finals]
summary = (
    pd.DataFrame({"final_$": finals, "cum_%": cumret}, index=rows)
    .round(2)
    .sort_values(by="cum_%")
    .reset_index()
)

In [12]:
summary

Unnamed: 0,index,final_$,cum_%
0,CVX,1129.64,12.96
1,MRK,1553.57,55.36
2,TOP3,1802.77,80.28
3,GOOGL,2380.73,138.07
4,MSFT,3596.35,259.64
5,CVaR_95,7536.64,653.66
6,CVaR_90,7571.41,657.14
7,MVar,7818.84,681.88
8,CVaR_80,9069.32,806.93


In [13]:
pexp.bar(summary.sort_values(by="cum_%"), x="index", y="cum_%")

In [14]:
# print last weights for both strategies
print("\n===== LAST CVaR WEIGHTS (|w| ≥ {:.4f}) =====".format(SHOW_ABS_GT))
print(w_cvar_80.iloc[-1][w_cvar_80.iloc[-1].abs() >= SHOW_ABS_GT].round(6).to_string())

print("\n===== LAST MVar WEIGHTS (|w| ≥ {:.4f}) =====".format(SHOW_ABS_GT))
print(w_mv.iloc[-1][w_mv.iloc[-1].abs() >= SHOW_ABS_GT].round(6).to_string())

# ── equity curves ──────────────────────────────────────────────────── #
series_equity = {
    "CVaR_95": equity_cvar_95,
    "CVaR_90": equity_cvar_90,
    "CVaR_80": equity_cvar_80,
    "MVar": equity_mv,
    "TOP3": top3_eq,
} | baselines
line_chart(series_equity, f"Equity curve — re‑balance = {FREQ}", "Portfolio value ($)")

# ── KPI charts (rolling CVaR, σ, Sharpe, drawdowns, boxplot) ───────── #
returns = {k: pct_ret(v) for k, v in series_equity.items()}


# 1. Rolling CVaR
# rolling_cvar_series = {k: rolling_cvar(r, BETA, window) for k, r in returns.items()}
# line_chart(
#     rolling_cvar_series,
#     f"Rolling {int(BETA*100)}% CVaR (window={window})",
#     "CVaR (mean loss)",
# )

# 2. Rolling volatility σ
rolling_vol = {k: r.rolling(window).std() for k, r in returns.items()}
line_chart(rolling_vol, f"Rolling volatility σ (window={window})", "Std‑dev of returns")

# 3. Rolling Sharpe
rolling_sharpe = {
    k: r.rolling(window).mean() / r.rolling(window).std() for k, r in returns.items()
}
line_chart(rolling_sharpe, f"Rolling Sharpe ratio (window={window})", "Sharpe ≈ μ/σ")

# 4. Drawdowns
drawdowns = {k: drawdown_curve(eq) for k, eq in series_equity.items()}
line_chart(drawdowns, "Drawdown curves", "Drawdown (%)")

# 5. Box‑plot of period returns
boxplot(returns, "Distribution of period returns")


# if __name__ == "__main__":
#     _run_cli()


===== LAST CVaR WEIGHTS (|w| ≥ 0.0000) =====
company
CVX      0.016653
GOOGL    0.165433
JNJ      0.201692
MRK      0.190432
PFE      0.072406
XOM      0.353385

===== LAST MVar WEIGHTS (|w| ≥ 0.0000) =====
company
COP      0.017186
GOOGL    0.198778
JNJ      0.166940
MRK      0.155129
PFE      0.143891
XOM      0.318076


In [15]:

# baselines: Dict[str, pd.Series] = {}
# for tkr in BASELINES:
#     if tkr not in prices.columns:
#         raise KeyError(f"Ticker '{tkr}' not found.")
#     px = prices[tkr].sort_index().reindex(equity_cvar_80.index, method="ffill")
#     baselines[tkr] = INITIAL_CASH * px / px.iloc[0]
#     baselines[tkr].name = tkr

# # ── summary table ──────────────────────────────────────────────────── #
# rows = ["CVaR", "MVar", "TOP3"] + list(baselines.keys())
# finals = [equity_cvar_80.iloc[-1], equity_mv.iloc[-1], top3_eq.iloc[-1]] + [
#     s.iloc[-1] for s in baselines.values()
# ]
# cumret = [(v / INITIAL_CASH - 1) * 100 for v in finals]
# summary = pd.DataFrame({"final_$": finals, "cum_%": cumret}, index=rows).round(2)

# print("\n===== SUMMARY =====")
# print(summary.to_string())

# # print last weights for both strategies
# print("\n===== LAST CVaR WEIGHTS (|w| ≥ {:.4f}) =====".format(SHOW_ABS_GT))
# print(w_cvar_80.iloc[-1][w_cvar_80.iloc[-1].abs() >= SHOW_ABS_GT].round(6).to_string())

# print("\n===== LAST MVar WEIGHTS (|w| ≥ {:.4f}) =====".format(SHOW_ABS_GT))
# print(w_mv.iloc[-1][w_mv.iloc[-1].abs() >= SHOW_ABS_GT].round(6).to_string())

# # ── equity curves ──────────────────────────────────────────────────── #
# series_equity = {
#     "CVaR_80": equity_cvar_80,
#     "CVaR_95": equity_cvar_95,
#     "MVar": equity_mv,
#     "TOP3": top3_eq,
# } | baselines
# line_chart(series_equity, f"Equity curve — re‑balance = {FREQ}", "Portfolio value ($)")

# # ── KPI charts (rolling CVaR, σ, Sharpe, drawdowns, boxplot) ───────── #
# returns = {k: pct_ret(v) for k, v in series_equity.items()}
# window = 12  # approx. 30 periods ≈ one trading month

# # 1. Rolling CVaR
# # rolling_cvar_series = {k: rolling_cvar(r, BETA, window) for k, r in returns.items()}
# # line_chart(
# #     rolling_cvar_series,
# #     f"Rolling {int(BETA*100)}% CVaR (window={window})",
# #     "CVaR (mean loss)",
# # )

# # 2. Rolling volatility σ
# rolling_vol = {k: r.rolling(window).std() for k, r in returns.items()}
# line_chart(rolling_vol, f"Rolling volatility σ (window={window})", "Std‑dev of returns")

# # 3. Rolling Sharpe
# rolling_sharpe = {
#     k: r.rolling(window).mean() / r.rolling(window).std() for k, r in returns.items()
# }
# line_chart(rolling_sharpe, f"Rolling Sharpe ratio (window={window})", "Sharpe ≈ μ/σ")

# # 4. Drawdowns
# drawdowns = {k: drawdown_curve(eq) for k, eq in series_equity.items()}
# line_chart(drawdowns, "Drawdown curves", "Drawdown (%)")

# # 5. Box‑plot of period returns
# boxplot(returns, "Distribution of period returns")


# # if __name__ == "__main__":
# #     _run_cli()