In [None]:
import warnings
warnings.filterwarnings("ignore", message="No supported index is available.", module="statsmodels")
warnings.filterwarnings("ignore", message="A date index has been provided", module="statsmodels")
warnings.filterwarnings("ignore", message="Only PeriodIndexes, DatetimeIndexes with a frequency set", module="statsmodels")
!pip -q install yfinance pandas numpy matplotlib seaborn statsmodels arch

import warnings, logging
from statsmodels.tools.sm_exceptions import ValueWarning as SMValueWarning

# Silence statsmodels AR indexing warnings
warnings.filterwarnings("ignore", category=SMValueWarning)  # statsmodels' own ValueWarning
warnings.filterwarnings("ignore", category=UserWarning,   module="statsmodels")
warnings.filterwarnings("ignore", category=FutureWarning, module="statsmodels")
warnings.filterwarnings("ignore", message="No supported index is available.", module="statsmodels")
warnings.filterwarnings("ignore", message="A date index has been provided",   module="statsmodels")
warnings.filterwarnings("ignore", message="Only PeriodIndexes, DatetimeIndexes with a frequency set", module="statsmodels")


warnings.filterwarnings("ignore", message="date_parser", module="pandas")

import warnings
from statsmodels.tools.sm_exceptions import ValueWarning as SMValueWarning
warnings.filterwarnings("ignore", category=SMValueWarning, module="statsmodels")
warnings.filterwarnings("ignore", category=UserWarning,   module="statsmodels")
warnings.filterwarnings("ignore", message="No supported index is available.", module="statsmodels")
warnings.filterwarnings("ignore", message="A date index has been provided",   module="statsmodels")
warnings.filterwarnings("ignore", message="Only PeriodIndexes, DatetimeIndexes with a frequency set", module="statsmodels")



import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
from io import BytesIO
from zipfile import ZipFile
import urllib.request

plt.style.use("seaborn-v0_8")
sns.set_context("talk")
tickers = ["AAPL","MSFT","AMZN","META","NVDA","XOM","JPM","KO","JNJ","TSLA"]
start = "2019-01-01"   # covers multiple regimes
end   = None           # up to latest available

data = yf.download(tickers, start=start, end=end)
print(data.columns)
prices = data["Close"]
display(prices.head())

prices.head()                 # first 5 rows
prices.tail()                 # last 5 rows
prices.shape                  # (rows, columns)
prices.isna().sum()
prices.index.min(), prices.index.max()
prices.dtypes.head()

# simple daily returns: (P_t / P_{t-1}) - 1
rets = prices.pct_change().dropna()

# quick peek & sanity stats (returns are small numbers like -0.02 .. +0.02)
display(rets.head())
rets.describe().T.iloc[:, :4]

!pip -q install pandas-datareader

import pandas as pd
from pandas_datareader import data as web

# This fetches the official daily FF3 dataset directly
ff = web.DataReader('F-F_Research_Data_Factors_Daily', 'famafrench')

# ff[0] is the daily table with columns: 'Mkt-RF','SMB','HML','RF' in PERCENT units
ff3 = ff[0].copy()

# Making sure the index is datetime (it already is; this keeps it explicit)
ff3.index = pd.to_datetime(ff3.index)

# Converting percent → decimals to match stock returns
ff3 = ff3 / 100.0

#clean column names just in case
ff3.columns = [c.strip() for c in ff3.columns]

ff3.head()


import pandas as pd
pd.options.display.float_format = '{:.6f}'.format

ff3[['RF']].loc['2019':].head(10)      # inspect early rows
ff3['RF'].describe()                   # min / max
(float((ff3['RF']==0).mean()), 'fraction of zero RF days')  # share of zeros
ff3['RF'].value_counts().head(10)      # common values


data = rets.join(ff3[['Mkt-RF','SMB','HML','RF']], how='inner').dropna()
data.shape, data.head()

import statsmodels.api as sm

t = "AAPL"
y = data[t] - data["RF"]
X = sm.add_constant(data[["Mkt-RF","SMB","HML"]])

m_ols = sm.OLS(y, X, missing="drop").fit()
m_hac = sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags":5})  # ~1 trading week

print(m_hac.summary())


def ff3_stats_hac(df, ticker, maxlags=5):
    y = df[ticker] - df["RF"]
    X = sm.add_constant(df[["Mkt-RF","SMB","HML"]])
    m = sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags":maxlags})
    return {
        "alpha_daily": m.params["const"],
        "alpha_annual": m.params["const"] * 252,
        "beta_mkt": m.params["Mkt-RF"],
        "beta_smb": m.params["SMB"],
        "beta_hml": m.params["HML"],
        "t_alpha": m.tvalues["const"],
        "t_mkt": m.tvalues["Mkt-RF"],
        "t_smb": m.tvalues["SMB"],
        "t_hml": m.tvalues["HML"],
        "p_alpha": m.pvalues["const"],
        "r2": m.rsquared,
        "n": int(m.nobs),
    }, m

rows, models = [], {}
for tkr in rets.columns:
    r, mdl = ff3_stats_hac(data, tkr)
    r["ticker"] = tkr
    rows.append(r); models[tkr] = mdl

results = pd.DataFrame(rows).set_index("ticker").sort_index()
results.round(4)

import seaborn as sns, matplotlib.pyplot as plt
plt.figure(figsize=(10,6))
sns.heatmap(results[["beta_mkt","beta_smb","beta_hml"]], annot=True, cmap="coolwarm", center=0)
plt.title("Fama–French 3-Factor Loadings"); plt.xlabel("Factor"); plt.ylabel("Ticker")
plt.tight_layout(); plt.show()

results["alpha_annual"].sort_values().plot(kind="barh", figsize=(10,5), color="steelblue")
plt.axvline(0, color="k", lw=1); plt.title("Annualized Alpha (HAC SEs)")
plt.xlabel("Alpha per year"); plt.tight_layout(); plt.show()

sig_alpha = results.index[results["p_alpha"] < 0.05].tolist()
print("Significant alpha @5% (HAC):", sig_alpha if sig_alpha else "None")

t = "NVDA"
y = data[t] - data["RF"]
X = sm.add_constant(data[["Mkt-RF","SMB","HML"]])
pred = models[t].predict(X)

plt.figure(figsize=(10,5))
plt.plot(y.index, y,   label=f"{t} Excess Return", alpha=0.6)
plt.plot(y.index, pred,label="FF3 Predicted", alpha=0.8)
plt.title(f"{t}: Actual vs FF3 Predicted (daily excess returns)")
plt.legend(); plt.tight_layout(); plt.show()

def describe(t, res):
    bM, bS, bH = res.loc[t,["beta_mkt","beta_smb","beta_hml"]]
    aA, pA = res.loc[t,["alpha_annual","p_alpha"]]
    r2, n  = res.loc[t,["r2","n"]]
    tiltM = "high beta" if bM>1.2 else "low beta" if bM<0.8 else "market-like beta"
    tiltS = "small-cap tilt" if bS>0 else "large-cap tilt"
    tiltH = "value tilt" if bH>0 else "growth tilt"
    sigA  = "significant" if pA<0.05 else "not significant"
    return f"{t}: {tiltM}, {tiltS}, {tiltH}; alpha {aA:.2%} ({sigA}); R²={r2:.2f}, N={int(n)}."

for t in results.index: print(describe(t, results))

results.round(6).to_csv("ff3_results_hac.csv")

import numpy as np, pandas as pd, matplotlib.pyplot as plt
import statsmodels.api as sm

tickers_to_show = ["NVDA", "TSLA", "JPM"]


def get_model_for(t):
    y = data[t] - data["RF"]
    X = sm.add_constant(data[["Mkt-RF","SMB","HML"]])
    try:
        m = models[t]
    except NameError:
        m = sm.OLS(y, X, missing="drop").fit()
    except KeyError:
        m = sm.OLS(y, X, missing="drop").fit()
    return m, y, X

fig, axes = plt.subplots(nrows=3, ncols=1, figsize=(12,10), sharex=True)
for ax, t in zip(axes, tickers_to_show):
    m, y, X = get_model_for(t)
    pred = m.predict(X)

    # main plot: daily excess vs fitted
    ax.plot(y.index, y,   label=f"{t} excess return", color="#555", alpha=0.55)
    ax.plot(y.index, pred, label="FF3 fitted", color="#1976d2", alpha=0.85)

    # smoothing overlay
    y_r   = y.rolling(20).mean()
    pr_r  = pd.Series(pred, index=y.index).rolling(20).mean()
    ax.plot(y.index, y_r,  color="#000", lw=1.2, alpha=0.7)
    ax.plot(y.index, pr_r, color="#0d47a1", lw=1.2, alpha=0.9)

    # quick metrics
    rmse = np.sqrt(np.mean((y - pred)**2))
    corr = np.corrcoef(y.fillna(0), pd.Series(pred, index=y.index).fillna(0))[0,1]
    ax.set_title(f"{t}: Actual vs FF3 Fitted • R²={m.rsquared:.2f} • RMSE={rmse:.4f} • Corr={corr:.2f}")
    ax.legend(loc="upper right")
    ax.set_ylabel("Daily excess return")

axes[-1].set_xlabel("Date")
plt.tight_layout()
plt.show()

# (actual vs fitted)
import seaborn as sns
fig, axes = plt.subplots(1, 3, figsize=(14,4))
for ax, t in zip(axes, tickers_to_show):
    m, y, X = get_model_for(t)
    pred = m.predict(X)
    dfp = pd.DataFrame({"actual": y, "fitted": pred}).dropna()
    sns.regplot(data=dfp, x="fitted", y="actual", ax=ax,
                scatter_kws={"alpha":0.3, "s":8}, line_kws={"color":"crimson"})
    ax.set_title(f"{t}: scatter (fitted vs actual)\nCorr={dfp.corr().iloc[0,1]:.2f}")
    ax.set_xlabel("FF3 fitted (daily excess)")
    ax.set_ylabel("Actual (daily excess)")
plt.tight_layout()
plt.show()

import pandas as pd
import numpy as np

def scenario_return(ticker, mkt_rf, smb, hml):
    b = models[ticker].params  # const, Mkt-RF, SMB, HML
    return float(b['const'] + b['Mkt-RF']*mkt_rf + b['SMB']*smb + b['HML']*hml)

scenarios = pd.DataFrame([
    {"Scenario":"Mkt +2%, SMB 0, HML 0",      "Mkt-RF":0.02, "SMB":0.00, "HML":0.00},
    {"Scenario":"Mkt +1%, SMB +0.3%, HML -0.3%","Mkt-RF":0.01, "SMB":0.003,"HML":-0.003},
    {"Scenario":"Mkt -1%, SMB -0.3%, HML +0.3%","Mkt-RF":-0.01,"SMB":-0.003,"HML":0.003},
    {"Scenario":"Flat factors","Mkt-RF":0.0,"SMB":0.0,"HML":0.0},
])

for t in ["NVDA","TSLA","JPM"]:
    scenarios[t] = scenarios.apply(lambda r: scenario_return(t, r["Mkt-RF"], r["SMB"], r["HML"]), axis=1)

scenarios

# factor premia (daily means) from the sample window
mu_factors = data[["Mkt-RF","SMB","HML"]].mean()
rf_daily   = data["RF"].mean()

rows = []
for t in ["NVDA","TSLA","JPM"]:
    b = models[t].params
    exp_excess_daily = float(b['const'] + b['Mkt-RF']*mu_factors["Mkt-RF"] +
                             b['SMB']*mu_factors["SMB"] + b['HML']*mu_factors["HML"])
    exp_excess_annual = exp_excess_daily * 252
    exp_total_annual  = exp_excess_annual + rf_daily*252
    rows.append([t, exp_excess_daily, exp_excess_annual, exp_total_annual])

exp_table = pd.DataFrame(rows, columns=["Ticker","E[Excess]_daily","E[Excess]_annual","E[Total]_annual"])
exp_table

import numpy as np, pandas as pd, statsmodels.api as sm
from statsmodels.tsa.ar_model import AutoReg

# y_excess and factors aligned
def rolling_ff3_forecast(ticker, window=252, use_ar1=True, min_len_ar=30):
    """
    Walk-forward forecast of next-day EXCESS return for `ticker`.
    - window: trailing window (trading days) for beta estimation
    - use_ar1: True -> AR(1) forecast for factors; False -> naive (use last observed)
    - min_len_ar: minimum observations to fit AR(1); otherwise fall back to naive
    """
    y_excess = (data[ticker] - data["RF"]).dropna()
    fac = data[["Mkt-RF","SMB","HML"]].loc[y_excess.index]  # same dates as y

    preds = pd.Series(index=y_excess.index, dtype=float)

    for i in range(window, len(y_excess)-1):
        # 1) Estimating betas on trailing window [i-window, i)
        Yw = y_excess.iloc[i-window:i]
        Xw = sm.add_constant(fac.iloc[i-window:i])
        b  = sm.OLS(Yw, Xw).fit().params  # const, Mkt-RF, SMB, HML

        # 2) Forecasting factors for day i+1
        if use_ar1:
            fhat = {}
            for col in fac.columns:
                series = fac[col].iloc[:i].dropna()
                if len(series) >= min_len_ar:
                    try:
                        ar = AutoReg(series, lags=1, old_names=False).fit()
                        fhat[col] = float(ar.forecast(steps=1)[0])  # <-- key change
                    except Exception:
                        fhat[col] = float(series.iloc[-1])          # fallback
                else:
                    fhat[col] = float(series.iloc[-1])              # fallback
        else:
            # naive persistence: next factor = last observed factor
            fhat = fac.iloc[i].to_dict()

        # 3) next-day prediction
        next_idx = y_excess.index[i+1]
        preds.loc[next_idx] = (
            b['const'] + b['Mkt-RF']*fhat['Mkt-RF'] +
            b['SMB']*fhat['SMB'] + b['HML']*fhat['HML']
        )

    out = pd.DataFrame({"pred": preds, "actual": y_excess}).dropna()
    rmse = float(np.sqrt(np.mean((out['pred']-out['actual'])**2)))
    corr = float(out['pred'].corr(out['actual']))
    hit  = float((np.sign(out['pred']) == np.sign(out['actual'])).mean())
    return out, {"RMSE": rmse, "Corr": corr, "HitRate": hit}
targets = ["NVDA","TSLA","JPM"]
metrics = {}
series  = {}

for t in targets:
    df, m = rolling_ff3_forecast(ticker=t, window=252, use_ar1=True)
    metrics[t] = m
    series[t]  = df
    print(t, m)

# actual vs predicted (cumulative to see drift)
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 3, figsize=(14,4), sharey=True)
for ax, t in zip(axes, targets):
    df = series[t]
    cum_pred   = (1 + df['pred']).cumprod() - 1
    cum_actual = (1 + df['actual']).cumprod() - 1
    ax.plot(cum_actual.index, cum_actual, label="Actual (cum excess)", color="#555")
    ax.plot(cum_pred.index,   cum_pred,   label="Predicted (cum excess)", color="#1976d2")
    ax.set_title(f"{t}  •  Corr={metrics[t]['Corr']:.2f} • Hit={metrics[t]['HitRate']:.2f}")
    ax.legend(); ax.grid(alpha=0.2)
plt.tight_layout(); plt.show()


!pip -q install pandas-datareader
from pandas_datareader import data as web
import pandas as pd, statsmodels.api as sm

# 5 factors (daily, % units) and Momentum
ff5 = web.DataReader('F-F_Research_Data_5_Factors_2x3_Daily','famafrench')[0] / 100.0
mom = web.DataReader('F-F_Momentum_Factor_Daily','famafrench')[0] / 100.0
ff5.index = pd.to_datetime(ff5.index); mom.index = pd.to_datetime(mom.index)

# joining toreturns index
ffX = ff5.join(mom, how="inner")
ffX["RF"] = ff3["RF"]
# Align with returns
datX = rets.join(ffX, how="inner")

def run_model(df, ticker, cols):
    X = sm.add_constant(df[cols])
    m = sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags":5})
    return m

cols_ff3 = ["Mkt-RF","SMB","HML"]
cols_ff5 = ["Mkt-RF","SMB","HML","RMW","CMA"]
cols_car4= ["Mkt-RF","SMB","HML","Mom"]

for t in ["NVDA","TSLA","JPM"]:
    m3  = run_model(datX, t, cols_ff3)
    m5  = run_model(datX, t, cols_ff5)
    m4  = run_model(datX, t, cols_car4)
    print(f"\n{t}: R2 FF3={m3.rsquared:.3f}  FF5={m5.rsquared:.3f}  Carhart4={m4.rsquared:.3f}")
    print(f" alpha_annual FF3={m3.params['const']*252:.2%}  FF5={m5.params['const']*252:.2%}  Car4={m4.params['const']*252:.2%}")


import numpy as np, pandas as pd, statsmodels.api as sm
from statsmodels.tsa.ar_model import AutoReg

def rolling_ff3_forecast_clean(ticker, window=252, use_ar1=True, min_len_ar=30):
    """
    True 1-day-ahead forecast:
    - betas estimated on trailing `window` days
    - next-day factors via AR(1) (fallback = last value)
    - prediction written at the next trading date
    """
    y_excess = (data[ticker] - data["RF"]).dropna()
    fac = data[["Mkt-RF","SMB","HML"]].loc[y_excess.index]

    preds = pd.Series(index=y_excess.index, dtype=float)

    for i in range(window, len(y_excess)-1):
        # 1) betas from trailing window
        Yw = y_excess.iloc[i-window:i]
        Xw = sm.add_constant(fac.iloc[i-window:i])
        b  = sm.OLS(Yw, Xw).fit().params

        # 2) AR(1) forecast for each factor (robust to datetime index)
        if use_ar1:
            fhat = {}
            for col in fac.columns:
                series = fac[col].iloc[:i].dropna()
                if len(series) >= min_len_ar:
                    sr = pd.Series(series.to_numpy())           # RangeIndex
                    ar = AutoReg(sr, lags=1, old_names=False).fit()
                    fhat[col] = float(ar.forecast(steps=1).iloc[0])
                else:
                    fhat[col] = float(series.iloc[-1])
        else:
            fhat = fac.iloc[i].to_dict()                        # persistence

        # 3) write prediction at next trading timestamp
        next_idx = y_excess.index[i+1]
        preds.loc[next_idx] = (
            b['const'] + b['Mkt-RF']*fhat['Mkt-RF'] +
            b['SMB']*fhat['SMB'] + b['HML']*fhat['HML']
        )

    out = pd.DataFrame({"pred": preds, "actual": y_excess}).dropna()
    rmse = float(np.sqrt(np.mean((out['pred']-out['actual'])**2)))
    corr = float(out['pred'].corr(out['actual']))
    hit  = float((np.sign(out['pred']) == np.sign(out['actual'])).mean())
    return out, {"RMSE": rmse, "Corr": corr, "HitRate": hit}
df_nvda, m_nvda = rolling_ff3_forecast_clean("NVDA")
df_tsla, m_tsla = rolling_ff3_forecast_clean("TSLA")
df_jpm,  m_jpm  = rolling_ff3_forecast_clean("JPM")

def check_oos(df, window=252):
    first_actual = df['actual'].index.min()
    first_pred   = df['pred'].first_valid_index()
    print("First actual:", first_actual)
    print("First pred:  ", first_pred)
    print("Pred starts after training window? ->", first_pred > first_actual)
    print("Rows actual/pred:", df['actual'].shape[0], df['pred'].dropna().shape[0])

check_oos(df_nvda)
check_oos(df_tsla)
check_oos(df_jpm)

def verify_oos(ticker, df, window=252):
    y = (data[ticker] - data['RF']).dropna()
    full_start     = y.index.min()
    expected_start = y.index[window+1]      # first 1-day-ahead pred date
    df_start       = df.index.min()
    print(f"\n{ticker}")
    print("Full series start:        ", full_start)
    print("Expected first forecast:  ", expected_start)
    print("DF (actual & pred) start: ", df_start)
    print("Matches expected?         ", df_start == expected_start)
    print("Orig obs:", len(y), "  Forecast rows:", len(df))

verify_oos("NVDA", df_nvda)
verify_oos("TSLA", df_tsla)
verify_oos("JPM",  df_jpm)

import pandas as pd, numpy as np

def metrics_row(name, df):
    rmse = float(np.sqrt(np.mean((df['pred']-df['actual'])**2)))
    corr = float(df['pred'].corr(df['actual']))
    hit  = float((np.sign(df['pred']) == np.sign(df['actual'])).mean())
    return pd.Series({'RMSE': rmse, 'Corr': corr, 'HitRate': hit})

metrics = pd.DataFrame({
    'NVDA': metrics_row('NVDA', df_nvda),
    'TSLA': metrics_row('TSLA', df_tsla),
    'JPM' : metrics_row('JPM',  df_jpm),
}).T.round(4)

display(metrics)
metrics.to_csv("table_forecast_metrics.csv")

import matplotlib.pyplot as plt, matplotlib.dates as mdates

def plot_three(df_dict, title="", cumulative=True, save=None):
    fig, axes = plt.subplots(1, 3, figsize=(14,4), sharey=True, constrained_layout=True)
    for ax, (name, df) in zip(axes, df_dict.items()):
        d = df.copy()
        if cumulative:
            d['actual'] = (1 + d['actual']).cumprod() - 1
            d['pred']   = (1 + d['pred']).cumprod() - 1
        ax.plot(d.index, d['actual'], label="Actual (cum excess)" if cumulative else "Actual", color="#444", lw=1.5)
        ax.plot(d.index, d['pred'],   label="Predicted (cum excess)" if cumulative else "Predicted", color="#1f77b4", lw=1.5)
        ax.set_title(name, fontsize=11); ax.grid(alpha=.25)
        ax.spines[['top','right']].set_visible(False)
        # readable dates
        ax.xaxis.set_major_locator(mdates.YearLocator())
        ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
        ax.xaxis.set_minor_locator(mdates.MonthLocator(bymonth=(3,9)))
        ax.tick_params(axis='x', labelsize=9)
    axes[0].legend(frameon=False, loc="upper left")
    fig.suptitle(title, y=1.02, fontsize=13)
    if save: plt.savefig(save, dpi=240, bbox_inches='tight')
    plt.show()

plot_three({'NVDA': df_nvda, 'TSLA': df_tsla, 'JPM': df_jpm},
           title="FF3 1-day-ahead forecasts (cum excess)", cumulative=True,
           save="fig_forecasts.png")
def add_naive(df):
    # naive forecast = today's excess return as prediction for tomorrow
    # shift actual forward by 1 day to align as a prediction
    naive = df['actual'].shift(1)
    out = df.copy()
    out['naive'] = naive
    return out.dropna()

for name, d in {'NVDA': df_nvda, 'TSLA': df_tsla, 'JPM': df_jpm}.items():
    dd = add_naive(d)
    rmse_model = np.sqrt(np.mean((dd['pred']-dd['actual'])**2))
    rmse_naive = np.sqrt(np.mean((dd['naive']-dd['actual'])**2))
    print(f"{name}: RMSE model={rmse_model:.5f} vs naive={rmse_naive:.5f}")
!pip -q install pandas-datareader
import warnings, pandas as pd
from pandas_datareader import data as web
warnings.filterwarnings("ignore", message="date_parser")  # quiet deprecation msg

# FF5 (daily, % units) → decimals
ff5 = web.DataReader('F-F_Research_Data_5_Factors_2x3_Daily','famafrench')[0]
ff5.index = pd.to_datetime(ff5.index)
ff5 = ff5[['Mkt-RF','SMB','HML','RMW','CMA']] / 100.0

# Momentum (daily, % units) → decimals
mom = web.DataReader('F-F_Momentum_Factor_Daily','famafrench')[0]
mom.index = pd.to_datetime(mom.index)
mom = mom / 100.0
# normalize column name to 'Mom'
mom.columns = ['Mom'] if len(mom.columns)==1 else ['Mom']

# Risk-free: reuse from your ff3 if present, else fetch
try:
    rf = ff3[['RF']].copy()
except NameError:
    ff3_daily = web.DataReader('F-F_Research_Data_Factors_Daily','famafrench')[0]
    ff3_daily.index = pd.to_datetime(ff3_daily.index)
    rf = (ff3_daily[['RF']] / 100.0)

# Master factor table
ffX = ff5.join(mom, how='inner').join(rf, how='inner')

# Align to your returns index
ffX = ffX.loc[rets.index.min():rets.index.max()]

import statsmodels.api as sm
import numpy as np

# factor sets
cols_ff3   = ["Mkt-RF","SMB","HML"]
cols_ff5   = ["Mkt-RF","SMB","HML","RMW","CMA"]
cols_car4  = ["Mkt-RF","SMB","HML","Mom"]
cols_ff5m  = ["Mkt-RF","SMB","HML","RMW","CMA","Mom"]   # optional: FF5 + Momentum

def run_model(dat, ticker, cols):
    y = dat[ticker] - dat["RF"]
    X = sm.add_constant(dat[cols])
    m = sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags":5})
    return m

# join factors + RF to returns for each spec (inner join on dates)
dat_ff3  = rets.join(ffX[cols_ff3 + ["RF"]],  how="inner")
dat_ff5  = rets.join(ffX[cols_ff5 + ["RF"]],  how="inner")
dat_car4 = rets.join(ffX[cols_car4 + ["RF"]], how="inner")
dat_ff5m = rets.join(ffX[cols_ff5m + ["RF"]], how="inner")

tickers = list(rets.columns)  # or your own list

rows = []
for t in tickers:
    m3  = run_model(dat_ff3,  t, cols_ff3)
    m5  = run_model(dat_ff5,  t, cols_ff5)
    m4  = run_model(dat_car4, t, cols_car4)
    m6  = run_model(dat_ff5m, t, cols_ff5m)

    row = {
        "ticker": t,
        "R2_FF3":  m3.rsquared,
        "R2_FF5":  m5.rsquared,
        "R2_Car4": m4.rsquared,
        "R2_FF5M": m6.rsquared,
        "alpha_ann_FF3":  m3.params["const"] * 252,
        "alpha_ann_FF5":  m5.params["const"] * 252,
        "alpha_ann_Car4": m4.params["const"] * 252,
        "alpha_ann_FF5M": m6.params["const"] * 252,
        "p_alpha_FF3":  m3.pvalues["const"],
        "p_alpha_FF5":  m5.pvalues["const"],
        "p_alpha_Car4": m4.pvalues["const"],
        "p_alpha_FF5M": m6.pvalues["const"],
    }
    rows.append(row)

cmp = pd.DataFrame(rows).set_index("ticker")

# add best R2 and delta vs FF3
cmp["R2_best"]  = cmp[["R2_FF3","R2_FF5","R2_Car4","R2_FF5M"]].max(axis=1)
cmp["dR2_best_minus_FF3"] = cmp["R2_best"] - cmp["R2_FF3"]

# clean view for the report
view = cmp[[
    "R2_FF3","R2_FF5","R2_Car4","R2_FF5M","R2_best","dR2_best_minus_FF3",
    "alpha_ann_FF3","alpha_ann_FF5","alpha_ann_Car4","alpha_ann_FF5M",
    "p_alpha_FF3","p_alpha_FF5","p_alpha_Car4","p_alpha_FF5M"
]].sort_values("dR2_best_minus_FF3", ascending=False)

# pretty print
display(view.round({
    "R2_FF3":3,"R2_FF5":3,"R2_Car4":3,"R2_FF5M":3,"R2_best":3,"dR2_best_minus_FF3":3,
    "alpha_ann_FF3":3,"alpha_ann_FF5":3,"alpha_ann_Car4":3,"alpha_ann_FF5M":3,
    "p_alpha_FF3":3,"p_alpha_FF5":3,"p_alpha_Car4":3,"p_alpha_FF5M":3
}))

# save for LaTeX/Overleaf table
view.to_csv("table_ff3_vs_ff5.csv")

for t in ["NVDA","TSLA","JPM"]:
    r = view.loc[t]
    best = max(r["R2_FF3"], r["R2_FF5"], r["R2_Car4"], r["R2_FF5M"])
    print(
        f"{t}: R² FF3={r['R2_FF3']:.3f}  FF5={r['R2_FF5']:.3f}  Car4={r['R2_Car4']:.3f}  FF5+Mom={r['R2_FF5M']:.3f}  "
        f"ΔR²(best-FF3)={r['dR2_best_minus_FF3']:.3f}\n"
        f"    α_annual FF3={r['alpha_ann_FF3']:.2%} (p={r['p_alpha_FF3']:.3f}), "
        f"FF5={r['alpha_ann_FF5']:.2%} (p={r['p_alpha_FF5']:.3f}), "
        f"Car4={r['alpha_ann_Car4']:.2%} (p={r['p_alpha_Car4']:.3f}), "
        f"FF5+Mom={r['alpha_ann_FF5M']:.2%} (p={r['p_alpha_FF5M']:.3f})"
    )

import numpy as np, pandas as pd, statsmodels.api as sm

def rolling_ff3_betas(ticker, window=252, step=1, cov_type='HAC', hac_lags=5):
    """
    Rolling FF3 regressions for `ticker`.
      - window: trailing window length in trading days (252 ~ 1y)
      - step: move the window by this many days each iteration (e.g., 5 ≈ weekly endpoints)
      - cov_type: None (classic OLS SEs) or 'HAC' (Newey–West)
      - hac_lags: maxlags for HAC SEs (5 ≈ one trading week)
    Returns: DataFrame indexed by window end date with betas, alpha, R², and SEs.
    """
    y = (data[ticker] - data['RF']).dropna()
    Xf = data[['Mkt-RF','SMB','HML']].loc[y.index]

    rows = []
    for end in range(window, len(y)+1, step):
        Yw = y.iloc[end-window:end]
        Xw = sm.add_constant(Xf.iloc[end-window:end])
        if cov_type == 'HAC':
            m = sm.OLS(Yw, Xw).fit(cov_type='HAC', cov_kwds={'maxlags': hac_lags})
        else:
            m = sm.OLS(Yw, Xw).fit()

        date = y.index[end-1]
        rows.append({
            'Date': date,
            'beta_mkt': m.params['Mkt-RF'],
            'beta_smb': m.params['SMB'],
            'beta_hml': m.params['HML'],
            'se_beta_mkt': m.bse['Mkt-RF'],
            'se_beta_smb': m.bse['SMB'],
            'se_beta_hml': m.bse['HML'],
            'alpha_daily': m.params['const'],
            'alpha_annual': m.params['const'] * 252,
            'r2': m.rsquared,
            'n': int(m.nobs),
        })

    rb = pd.DataFrame(rows).set_index('Date')
    return rb


import matplotlib.pyplot as plt, matplotlib.dates as mdates

# run (change step=5 for faster, weekly endpoints)
rb_nvda = rolling_ff3_betas('NVDA', window=252, step=1, cov_type='HAC', hac_lags=5)

# 95% CI bands
ci = 1.96
u_mkt = rb_nvda['beta_mkt'] + ci*rb_nvda['se_beta_mkt']
l_mkt = rb_nvda['beta_mkt'] - ci*rb_nvda['se_beta_mkt']
u_smb = rb_nvda['beta_smb'] + ci*rb_nvda['se_beta_smb']
l_smb = rb_nvda['beta_smb'] - ci*rb_nvda['se_beta_smb']
u_hml = rb_nvda['beta_hml'] + ci*rb_nvda['se_beta_hml']
l_hml = rb_nvda['beta_hml'] - ci*rb_nvda['se_beta_hml']

# --- Betas with CIs ---
fig, axes = plt.subplots(3,1, figsize=(12,8), sharex=True, constrained_layout=True)

axes[0].plot(rb_nvda.index, rb_nvda['beta_mkt'], color='C0', label=r'$\beta_{\mathrm{Mkt}}$')
axes[0].fill_between(rb_nvda.index, l_mkt, u_mkt, color='C0', alpha=0.15, linewidth=0)
axes[0].axhline(0, color='k', lw=1, alpha=.6); axes[0].set_ylabel('Mkt beta')

axes[1].plot(rb_nvda.index, rb_nvda['beta_smb'], color='C1', label=r'$\beta_{\mathrm{SMB}}$')
axes[1].fill_between(rb_nvda.index, l_smb, u_smb, color='C1', alpha=0.15, linewidth=0)
axes[1].axhline(0, color='k', lw=1, alpha=.6); axes[1].set_ylabel('SMB beta')

axes[2].plot(rb_nvda.index, rb_nvda['beta_hml'], color='C2', label=r'$\beta_{\mathrm{HML}}$')
axes[2].fill_between(rb_nvda.index, l_hml, u_hml, color='C2', alpha=0.15, linewidth=0)
axes[2].axhline(0, color='k', lw=1, alpha=.6); axes[2].set_ylabel('HML beta')

for ax in axes:
    ax.grid(alpha=.25); ax.spines[['top','right']].set_visible(False)

axes[-1].xaxis.set_major_locator(mdates.YearLocator())
axes[-1].xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
axes[-1].xaxis.set_minor_locator(mdates.MonthLocator(bymonth=(3,9)))
axes[-1].set_xlabel('Date')

fig.suptitle('NVDA — Rolling 1Y Fama–French Betas (HAC 95% CI)', y=1.02, fontsize=13)
plt.savefig('fig_rolling_beta_NVDA.png', dpi=240, bbox_inches='tight')
plt.show()

# --- Alpha & R² (handy for appendix) ---
fig, ax2 = plt.subplots(2,1, figsize=(12,5), sharex=True, constrained_layout=True)
ax2[0].plot(rb_nvda.index, rb_nvda['alpha_annual']*100, color='#555')
ax2[0].axhline(0, color='k', lw=1, alpha=.6); ax2[0].set_ylabel('Alpha (annual, %)')
ax2[1].plot(rb_nvda.index, rb_nvda['r2'], color='#1f77b4')
ax2[1].set_ylabel(r'$R^2$'); ax2[1].set_xlabel('Date')
for a in ax2: a.grid(alpha=.25); a.spines[['top','right']].set_visible(False)
ax2[-1].xaxis.set_major_locator(mdates.YearLocator())
ax2[-1].xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
plt.savefig('fig_nvda_alpha_r2.png', dpi=240, bbox_inches='tight')
plt.show()

# quick summary for your write-up
print(rb_nvda[['beta_mkt','beta_smb','beta_hml','alpha_annual','r2']].tail(3).round(3))
print("Latest betas:", rb_nvda.iloc[-1][['beta_mkt','beta_smb','beta_hml']].round(3).to_dict())


print("Sample window:", data.index.min(), "→", data.index.max())
import numpy as np, pandas as pd, statsmodels.api as sm

def fit_ff3_segment(df, ticker, hac_lags=5):
    y = df[ticker] - df["RF"]
    X = sm.add_constant(df[["Mkt-RF","SMB","HML"]])
    m = sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags":hac_lags})
    return {
        "alpha_daily":  m.params["const"],
        "alpha_annual": m.params["const"]*252,
        "beta_mkt":     m.params["Mkt-RF"],
        "beta_smb":     m.params["SMB"],
        "beta_hml":     m.params["HML"],
        "p_alpha":      m.pvalues["const"],
        "r2":           m.rsquared,
        "n":            int(m.nobs),
    }, m

def choose_valid_split(ticker, preferred_split, min_obs=252):
    df = data[[ticker,"Mkt-RF","SMB","HML","RF"]].dropna()
    idx = df.index; n = len(idx)
    if n < 2*min_obs + 1:
        return None, f"Not enough total obs (n={n}) for min_obs={min_obs} on each side."
    pos_pref = np.searchsorted(idx.values, np.datetime64(preferred_split))
    lo = min_obs
    hi = n - min_obs - 1
    pos = min(max(pos_pref, lo), hi)
    split = idx[pos]
    note = "" if (lo <= pos_pref <= hi) else f"Preferred split {preferred_split.date()} adjusted to {split.date()}."
    return split, note

ALL_COLS = [
    "pre_beta_mkt","post_beta_mkt","Δbeta_mkt","p(Δbeta_mkt)",
    "pre_beta_smb","post_beta_smb","Δbeta_smb","p(Δbeta_smb)",
    "pre_beta_hml","post_beta_hml","Δbeta_hml","p(Δbeta_hml)",
    "pre_alpha_ann","post_alpha_ann","Δalpha_ann","p(Δalpha)",
    "pre_R2","post_R2","pre_n","post_n","note"
]

def pre_post_stability_safe(ticker, preferred_split, min_obs=252, hac_lags=5):
    # initialize row with NaNs so required columns *always* exist
    res = {k: np.nan for k in ALL_COLS}
    res["ticker"] = ticker

    split, note = choose_valid_split(ticker, preferred_split, min_obs=min_obs)
    if split is None:
        res["note"] = note
        return res

    df = data[[ticker,"Mkt-RF","SMB","HML","RF"]].dropna()
    pre  = df.loc[:split]
    post = df.loc[split + pd.Timedelta(days=1):]

    pre_stats,  _ = fit_ff3_segment(pre,  ticker, hac_lags)
    post_stats, _ = fit_ff3_segment(post, ticker, hac_lags)

    # interaction test
    full = df.copy()
    full["POST"] = (full.index > split).astype(int)
    for c in ["Mkt-RF","SMB","HML"]:
        full[f"{c}_POST"] = full[c]*full["POST"]
    y = full[ticker] - full["RF"]
    X = sm.add_constant(full[["Mkt-RF","SMB","HML","POST","Mkt-RF_POST","SMB_POST","HML_POST"]])
    m_int = sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags":hac_lags})

    res.update({
        "pre_beta_mkt":  pre_stats["beta_mkt"],  "post_beta_mkt":  post_stats["beta_mkt"],
        "Δbeta_mkt":     m_int.params["Mkt-RF_POST"], "p(Δbeta_mkt)": m_int.pvalues["Mkt-RF_POST"],
        "pre_beta_smb":  pre_stats["beta_smb"],  "post_beta_smb":  post_stats["beta_smb"],
        "Δbeta_smb":     m_int.params["SMB_POST"],    "p(Δbeta_smb)": m_int.pvalues["SMB_POST"],
        "pre_beta_hml":  pre_stats["beta_hml"],  "post_beta_hml":  post_stats["beta_hml"],
        "Δbeta_hml":     m_int.params["HML_POST"],    "p(Δbeta_hml)": m_int.pvalues["HML_POST"],
        "pre_alpha_ann": pre_stats["alpha_annual"], "post_alpha_ann": post_stats["alpha_annual"],
        "Δalpha_ann":    m_int.params["POST"]*252,    "p(Δalpha)":     m_int.pvalues["POST"],
        "pre_R2": pre_stats["r2"], "post_R2": post_stats["r2"],
        "pre_n":  pre_stats["n"],  "post_n":  post_stats["n"],
        "note":   note if note else f"Split at {split.date()}",
    })
    return res
preferred_split = pd.Timestamp("2022-01-01")   # <- good for your window
rows = [pre_post_stability_safe(t, preferred_split, min_obs=252) for t in ["NVDA","TSLA","JPM"]]
prepost = pd.DataFrame(rows).set_index("ticker")[ALL_COLS]   # enforce all columns
display(prepost.round(4))
prepost.round(4).to_csv("table_pre_post_covid.csv")

import numpy as np, matplotlib.pyplot as plt

r = prepost.loc["NVDA"]
if np.isfinite(r["pre_beta_mkt"]) and np.isfinite(r["post_beta_mkt"]):
    labels = ["Mkt","SMB","HML"]
    pre_b  = [r["pre_beta_mkt"], r["pre_beta_smb"], r["pre_beta_hml"]]
    post_b = [r["post_beta_mkt"], r["post_beta_smb"], r["post_beta_hml"]]
    pvals  = [r["p(Δbeta_mkt)"], r["p(Δbeta_smb)"], r["p(Δbeta_hml)"]]

    x = np.arange(3); w=0.35
    plt.figure(figsize=(6.2,4.2))
    plt.bar(x-w/2, pre_b,  width=w, label="Pre")
    plt.bar(x+w/2, post_b, width=w, label="Post")
    for i,p in enumerate(pvals):
        txt = f"p={p:.3f}" if np.isfinite(p) else "p=NA"
        ymax = max(pre_b[i], post_b[i])
        plt.text(x[i], ymax + 0.05, txt, ha='center', fontsize=9)
    plt.axhline(0, color='k', lw=1)
    plt.xticks(x, labels); plt.ylabel("Beta")
    plt.title(f"NVDA betas: pre vs post  •  {prepost.loc['NVDA','note']}")
    plt.legend(frameon=False); plt.tight_layout(); plt.show()
else:
    print("NVDA: not enough obs on one side — try lowering min_obs or moving the split later (e.g., 2022-06-01).")

r = prepost.loc["NVDA"]
print({
  "pre_beta_mkt":  r["pre_beta_mkt"],
  "post_beta_mkt": r["post_beta_mkt"],
  "p_d_beta_mkt":  r["p(Δbeta_mkt)"],
  "pre_beta_smb":  r["pre_beta_smb"],
  "post_beta_smb": r["post_beta_smb"],
  "p_d_beta_smb":  r["p(Δbeta_smb)"],
  "pre_beta_hml":  r["pre_beta_hml"],
  "post_beta_hml": r["post_beta_hml"],
  "p_d_beta_hml":  r["p(Δbeta_hml)"],
  "pre_alpha_ann": r["pre_alpha_ann"],
  "post_alpha_ann":r["post_alpha_ann"],
  "pre_R2":        r["pre_R2"],
  "post_R2":       r["post_R2"],
  "note":          r["note"],
})

import numpy as np, pandas as pd, statsmodels.api as sm

# ----- helpers -----
# infer tickers from `data`
FFCOLS = ["Mkt-RF","SMB","HML","RF"]
tickers = [c for c in data.columns if c not in FFCOLS]

def get_betas(ticker):
    """Return (alpha, beta_mkt, beta_smb, beta_hml) for `ticker` using full-sample OLS,
       unless a `models[ticker]` already exists."""
    try:
        b = models[ticker].params  # const, Mkt-RF, SMB, HML
        return float(b['const']), float(b['Mkt-RF']), float(b['SMB']), float(b['HML'])
    except Exception:
        y = data[ticker] - data["RF"]
        X = sm.add_constant(data[["Mkt-RF","SMB","HML"]])
        m = sm.OLS(y, X, missing="drop").fit()
        b = m.params
        return float(b['const']), float(b['Mkt-RF']), float(b['SMB']), float(b['HML'])

def scenario_excess(alpha, bm, bs, bh, mkt_rf, smb, hml):
    """Expected *excess* return under scenario (daily, decimal)."""
    return alpha + bm*mkt_rf + bs*smb + bh*hml

# ----- define scenarios (daily decimals) -----
scenarios = pd.DataFrame([
    {"Scenario":"Risk-on +2% market",          "Mkt-RF":0.0200, "SMB": 0.000, "HML": 0.000},
    {"Scenario":"Mkt +1%, small +0.3%, value -0.3%","Mkt-RF":0.0100, "SMB": 0.003, "HML":-0.003},
    {"Scenario":"Risk-off -1% market, small -0.3%, value +0.3%","Mkt-RF":-0.0100, "SMB":-0.003, "HML": 0.003},
    {"Scenario":"Flat factors (alpha only)",   "Mkt-RF":0.0000, "SMB": 0.000, "HML": 0.000},
])

# choose an RF for "total" scenario returns; use recent mean RF
rf_daily = float(data["RF"].mean())

# ----- scenario table -----
out = scenarios.copy()
for t in tickers:
    a,bm,bs,bh = get_betas(t)
    out[f"{t}_excess"] = out.apply(lambda r: scenario_excess(a,bm,bs,bh,
                                   r["Mkt-RF"], r["SMB"], r["HML"]), axis=1)
    out[f"{t}_total"]  = out[f"{t}_excess"] + rf_daily

# tidy view (two tables): excess and total
excess_cols = ["Scenario"] + [f"{t}_excess" for t in tickers]
total_cols  = ["Scenario"] + [f"{t}_total"  for t in tickers]

scen_excess = out[excess_cols].set_index("Scenario").sort_index(axis=1)
scen_total  = out[total_cols ].set_index("Scenario").sort_index(axis=1)

print("Scenario (daily *excess* return, decimals):")
display(scen_excess.round(4))
print("Scenario (daily *total* return, decimals):")
display(scen_total.round(4))

# export for report
scen_excess.to_csv("table_scenarios_excess.csv")
scen_total.to_csv("table_scenarios_total.csv")

import pandas as pd, numpy as np, statsmodels.api as sm

# factor premia (daily means) over sample
mu = data[["Mkt-RF","SMB","HML"]].mean()
rf_daily = float(data["RF"].mean())

rows = []
for t in tickers:
    # betas (reused from A)
    alpha, bm, bs, bh = get_betas(t)
    exp_excess_daily = float(alpha + bm*mu["Mkt-RF"] + bs*mu["SMB"] + bh*mu["HML"])
    exp_excess_annual = exp_excess_daily * 252
    exp_total_annual  = exp_excess_annual + rf_daily * 252
    rows.append([t, exp_excess_daily, exp_excess_annual, exp_total_annual])

exp_table = pd.DataFrame(rows, columns=["Ticker","E_excess_daily","E_excess_annual","E_total_annual"])
display(exp_table.set_index("Ticker").round(4))

# export for Overleaf
exp_table.to_csv("table_expected_returns.csv", index=False)

import seaborn as sns, matplotlib.pyplot as plt
sns.heatmap(scen_excess*100, annot=True, fmt=".2f", cmap="RdYlBu_r")
plt.title("Scenario: expected *excess* returns (daily, %)")
plt.ylabel(""); plt.show()

import numpy as np, pandas as pd, statsmodels.api as sm
from statsmodels.tsa.ar_model import AutoReg

def rolling_ff3_forecast_oos(data, ticker, window=252, use_ar1=True, min_len_ar=30):
    """
    True 1-day-ahead OOS forecast of EXCESS return for `ticker`.
      - Betas: OLS on trailing `window` days.
      - Factor forecasts: AR(1) on each factor up to t (fallback = last value).
      - Returns: DataFrame with ['actual','pred'] and metrics dict.
    """
    y_excess = (data[ticker] - data["RF"]).dropna()
    fac = data[["Mkt-RF","SMB","HML"]].loc[y_excess.index]

    preds = pd.Series(index=y_excess.index, dtype=float)

    for i in range(window, len(y_excess)-1):
        # 1) betas on [i-window, i)
        Yw = y_excess.iloc[i-window:i]
        Xw = sm.add_constant(fac.iloc[i-window:i])
        b  = sm.OLS(Yw, Xw).fit().params

        # 2) forecasting factors for t+1
        if use_ar1:
            fhat = {}
            for col in fac.columns:
                series = fac[col].iloc[:i].dropna()
                if len(series) >= min_len_ar:
                    sr = pd.Series(series.to_numpy())               # RangeIndex -> no AR warnings
                    ar = AutoReg(sr, lags=1, old_names=False).fit()
                    fhat[col] = float(ar.forecast(steps=1).iloc[0])
                else:
                    fhat[col] = float(series.iloc[-1])              # persistence fallback
        else:
            fhat = fac.iloc[i].to_dict()                            # pure persistence

        next_idx = y_excess.index[i+1]
        preds.loc[next_idx] = (
            b['const'] + b['Mkt-RF']*fhat['Mkt-RF'] +
            b['SMB']*fhat['SMB'] + b['HML']*fhat['HML']
        )

    df = pd.DataFrame({"pred": preds, "actual": y_excess}).dropna()
    rmse = float(np.sqrt(np.mean((df['pred']-df['actual'])**2)))
    corr = float(df['pred'].corr(df['actual']))
    hit  = float((np.sign(df['pred']) == np.sign(df['actual'])).mean())
    return df, {"RMSE": rmse, "Corr": corr, "HitRate": hit}

tickers = ["NVDA","TSLA","JPM"]

series = {}
metrics = {}
for t in tickers:
    df, m = rolling_ff3_forecast_oos(data, t, window=252, use_ar1=True)
    series[t]  = df
    metrics[t] = m
    print(t, m)

# metrics table
met_tbl = pd.DataFrame(metrics).T
display(met_tbl.round(4))
met_tbl.round(6).to_csv("table_forecast_metrics.csv")

import numpy as np, pandas as pd

def add_naive_and_compare(df):
    # naive(shift by +1)
    d = df.copy()
    d['naive'] = d['actual'].shift(1)
    d = d.dropna()
    rmse_model = float(np.sqrt(np.mean((d['pred'] - d['actual'])**2)))
    rmse_naive = float(np.sqrt(np.mean((d['naive']- d['actual'])**2)))
    impr = (rmse_naive - rmse_model) / rmse_naive  # + = model better
    return rmse_model, rmse_naive, impr

rows = []
for t, df in series.items():
    rm_m, rm_n, impr = add_naive_and_compare(df)
    rows.append([t, rm_m, rm_n, impr])
cmp_tbl = pd.DataFrame(rows, columns=["Ticker","RMSE_model","RMSE_naive","Improvement_vs_naive"])
display(cmp_tbl.round(4))
cmp_tbl.round(6).to_csv("table_forecast_vs_naive.csv", index=False)

def verify_oos(ticker, df, window=252):
    y = (data[ticker]-data['RF']).dropna()
    print(ticker, "| full start:", y.index.min(), "| expected first forecast:", y.index[window+1], "| df start:", df.index.min())
for t in tickers:
    verify_oos(t, series[t])
import matplotlib.pyplot as plt, matplotlib.dates as mdates

# bias-correct (for cumulative visual only)
def bias_correct_df(df):
    d = df.copy()
    bias = (d['pred'] - d['actual']).mean()
    d['pred_bc'] = d['pred'] - bias
    return d

def plot_oos_daily(series, fname=None):
    fig, axes = plt.subplots(1, len(series), figsize=(14,4), sharey=True, constrained_layout=True)
    if len(series)==1: axes=[axes]
    for ax, (name, df) in zip(axes, series.items()):
        ax.plot(df.index, df['actual'], label="Actual", color="#444", lw=1.5)
        ax.plot(df.index, df['pred'],   label="Predicted", color="#1f77b4", lw=1.3)
        ax.set_title(name); ax.grid(alpha=.25); ax.spines[['top','right']].set_visible(False)
        ax.xaxis.set_major_locator(mdates.YearLocator()); ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
    axes[0].legend(frameon=False, loc="upper left")
    fig.suptitle("FF3 1-day-ahead forecasts (daily excess)", y=1.02)
    if fname: plt.savefig(fname, dpi=240, bbox_inches='tight')
    plt.show()

def plot_oos_cum_bc(series, fname=None):
    fig, axes = plt.subplots(1, len(series), figsize=(14,4), sharey=True, constrained_layout=True)
    if len(series)==1: axes=[axes]
    for ax, (name, df) in zip(axes, series.items()):
        d = bias_correct_df(df)
        ax.plot(d.index, (1+d['actual']).cumprod()-1, label="Actual (cum excess)", color="#444", lw=1.5)
        ax.plot(d.index, (1+d['pred_bc']).cumprod()-1, label="Predicted (cum, bias-corr.)", color="#1f77b4", lw=1.3)
        ax.set_title(name); ax.grid(alpha=.25); ax.spines[['top','right']].set_visible(False)
        ax.xaxis.set_major_locator(mdates.YearLocator()); ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
    axes[0].legend(frameon=False, loc="upper left")
    fig.suptitle("FF3 1-day-ahead forecasts (cumulative, bias-corrected)", y=1.02)
    if fname: plt.savefig(fname, dpi=240, bbox_inches='tight')
    plt.show()

plot_oos_daily(series, fname="fig_oos_daily.png")
plot_oos_cum_bc(series, fname="fig_oos_cum_bc.png")

import matplotlib.pyplot as plt, matplotlib.dates as mdates

def plot_oos_daily_zoom(series, ylim=0.05, title="FF3 1-day-ahead forecasts (daily excess, zoomed)"):
    fig, axes = plt.subplots(1, len(series), figsize=(14,4), sharey=True, constrained_layout=True)
    if len(series)==1: axes=[axes]
    for ax, (name, df) in zip(axes, series.items()):
        ax.plot(df.index, df['actual'], label="Actual", color="#444", lw=1)
        ax.plot(df.index, df['pred'],   label="Predicted", color="#1f77b4", lw=1.2)
        ax.set_ylim(-ylim, ylim)
        ax.grid(alpha=.25); ax.spines[['top','right']].set_visible(False)
        ax.set_title(name); ax.xaxis.set_major_locator(mdates.YearLocator()); ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))
    axes[0].legend(frameon=False, loc="upper left")
    fig.suptitle(title, y=1.02); plt.show()

plot_oos_daily_zoom(series, ylim=0.04)

import seaborn as sns

def calib_and_residuals(df, name):
    fig, ax = plt.subplots(1,2,figsize=(10,4), constrained_layout=True)
    sns.regplot(x='pred', y='actual', data=df, scatter_kws={'s':8,'alpha':0.25}, line_kws={'color':'crimson'}, ax=ax[0])
    ax[0].axhline(0,color='k',lw=1,alpha=.5); ax[0].axvline(0,color='k',lw=1,alpha=.5)
    ax[0].set_title(f"{name}: calibration (daily)")
    ax[0].set_xlabel("Predicted"); ax[0].set_ylabel("Actual")

    res = (df['actual']-df['pred']).dropna()
    sns.histplot(res, bins=50, kde=True, color="#1f77b4", ax=ax[1])
    ax[1].axvline(0,color='k',lw=1); ax[1].set_title(f"{name}: residuals")
    plt.show()

for name, df in series.items():
    calib_and_residuals(df, name)
for name, df in series.items():
    rc = df['actual'].rolling(63).corr(df['pred'])
    rc.plot(label=name, figsize=(10,3))
plt.axhline(0,color='k',lw=1); plt.title("Rolling 3-month corr(actual, pred)"); plt.legend(); plt.show()

import numpy as np, pandas as pd, statsmodels.api as sm

#VaR ES
tickers = ["NVDA","TSLA","JPM"]

def fit_ff3_all(data, tickers):
    models, fits = {}, {}
    for t in tickers:
        y = (data[t] - data["RF"]).dropna()
        X = sm.add_constant(data[["Mkt-RF","SMB","HML"]].loc[y.index])
        m = sm.OLS(y, X, missing="drop").fit()
        resid = m.resid.dropna()
        fits[t] = {
            "alpha": float(m.params["const"]),
            "betas": m.params[["Mkt-RF","SMB","HML"]].to_numpy(),   # shape (3,)
            "sigma_eps": float(resid.std(ddof=1)),
            "n": int(m.nobs)
        }
        models[t] = m
    return models, fits

models, fits = fit_ff3_all(data, tickers)

# factor mean & covariance (shrinkage for stability)
F = data[["Mkt-RF","SMB","HML"]].dropna()
mu_f = F.mean().to_numpy()
try:
    from sklearn.covariance import LedoitWolf
    Sigma_f = LedoitWolf().fit(F.values).covariance_
except Exception:
    Sigma_f = np.cov(F.values.T)   # fallback
rf_daily = float(data["RF"].mean())
print("Factor means:", mu_f, "\nRF (daily):", rf_daily)

from numpy.random import default_rng

def simulate_excess_matrix(tickers, fits, mu_f, Sigma_f, rf_daily, n_sims=100_000, seed=42):
    """
    Returns:
      - r_excess: (n_sims x T) simulated EXCESS returns for all tickers
      - r_total : (n_sims x T) simulated TOTAL returns for all tickers
    Model: r_i = alpha_i + beta_i' F + eps_i,  F ~ N(mu_f, Sigma_f), eps_i ~ N(0, sigma_eps_i^2)
    """
    T = len(tickers)
    alphas = np.array([fits[t]["alpha"]      for t in tickers])          # (T,)
    betas  = np.vstack([fits[t]["betas"]     for t in tickers])          # (T,3)
    sig    = np.array([fits[t]["sigma_eps"]  for t in tickers])          # (T,)

    rng = default_rng(seed)
    Fsim = rng.multivariate_normal(mean=mu_f, cov=Sigma_f, size=n_sims)  # (n,3)
    eps  = rng.normal(loc=0.0, scale=sig, size=(n_sims, T))              # (n,T)

    r_excess = alphas[None, :] + Fsim @ betas.T + eps                    # (n,T)
    r_total  = r_excess + rf_daily                                       # (n,T)
    return r_excess, r_total

def var_es_cols(r, q=0.95):
    """VaR/ES for each column of r (returns, not losses). Positive VaR/ES = % loss."""
    p = 1 - q
    VaR, ES = np.zeros(r.shape[1]), np.zeros(r.shape[1])
    for j in range(r.shape[1]):
        col = r[:, j]
        cut = np.quantile(col, p)
        VaR[j] = -cut
        ES[j]  = -col[col <= cut].mean()
    return VaR, ES

def var_es_series(x, q=0.95):
    p = 1 - q
    cut = np.quantile(x, p)
    VaR = -cut
    ES  = -x[x <= cut].mean()
    return VaR, ES
# simulate
r_excess, r_total = simulate_excess_matrix(tickers, fits, mu_f, Sigma_f, rf_daily, n_sims=200_000, seed=7)

# per-ticker VaR/ES
VaR_ex, ES_ex = var_es_cols(r_excess, q=0.95)
VaR_to, ES_to = var_es_cols(r_total,  q=0.95)

tbl = pd.DataFrame({
    "VaR95_excess": VaR_ex,
    "ES95_excess":  ES_ex,
    "VaR95_total":  VaR_to,
    "ES95_total":   ES_to,
    "alpha_annual": [fits[t]["alpha"]*252 for t in tickers],
    "resid_sigma":  [fits[t]["sigma_eps"] for t in tickers],
    "n_obs":        [fits[t]["n"]         for t in tickers],
}, index=tickers).sort_index()

display(tbl.round(4))

# equal-weight portfolio
w = np.repeat(1/len(tickers), len(tickers))
r_ex_port = r_excess @ w
r_to_port  = r_total  @ w
VaR_ex_p, ES_ex_p = var_es_series(r_ex_port, q=0.95)
VaR_to_p, ES_to_p = var_es_series(r_to_port,  q=0.95)

port_row = pd.DataFrame({
    "VaR95_excess": [VaR_ex_p],
    "ES95_excess":  [ES_ex_p],
    "VaR95_total":  [VaR_to_p],
    "ES95_total":   [ES_to_p],
}, index=["EQW_PORT"])

tbl_all = pd.concat([tbl, port_row])
tbl_all.round(6).to_csv("table_var_es.csv")
print("\nSaved: table_var_es.csv")
display(tbl_all.round(4))
import matplotlib.pyplot as plt, seaborn as sns

name = "NVDA"
j = tickers.index(name)
sns.kdeplot(r_excess[:, j], fill=True, color="#1f77b4", label="Excess (MC)")
plt.axvline(-tbl.loc[name,"VaR95_excess"], color="crimson", ls="--", label="VaR95")
plt.title(f"{name} — Monte Carlo excess-return distribution (1-day)")
plt.xlabel("Return (daily, decimal)"); plt.legend(); plt.show()

# data = rets.join(ff3[['Mkt-RF','SMB','HML','RF']], how='inner').dropna()

start = data.index.min().date()
end   = data.index.max().date()
n     = data.shape[0]

print(f"Regression sample window: {start} → {end}  |  Trading days: {n}")

# ==== 4-column metrics table ====
import numpy as np, pandas as pd, statsmodels.api as sm
from pandas_datareader import data as web

FFCOLS = ["Mkt-RF","SMB","HML","RF"]

# ---- A. RMSE / Corr / n ----
def rmse_corr_from_series(series_dict):
    rows = []
    for t, df in series_dict.items():
        d = df[['actual','pred']].dropna()
        rmse = float(np.sqrt(np.mean((d['pred'] - d['actual'])**2)))
        corr = float(d['pred'].corr(d['actual']))
        rows.append((t, rmse, corr, int(len(d))))
    return pd.DataFrame(rows, columns=['ticker','RMSE','Corr','n'])

def rmse_corr_in_sample(data, tickers):
    rows = []
    for t in tickers:
        y = (data[t] - data['RF']).dropna()
        X = sm.add_constant(data[["Mkt-RF","SMB","HML"]].loc[y.index])
        m = sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags":5})
        fitted = m.fittedvalues.reindex(y.index)
        rmse = float(np.sqrt(np.mean((y - fitted)**2)))
        corr = float(y.corr(fitted))
        rows.append((t, rmse, corr, int(m.nobs)))
    return pd.DataFrame(rows, columns=['ticker','RMSE','Corr','n'])

# OOS if present, else in-sample
try:
    metrics_rcn = rmse_corr_from_series(series)
except NameError:
    tickers = [c for c in data.columns if c not in FFCOLS]
    metrics_rcn = rmse_corr_in_sample(data, tickers)

# ---- B. ΔR² (best minus FF3) ----
def dr2_from_compare_csv_or_compute():

    try:
        cmp = pd.read_csv("ff3_ff5_compare.csv")
        return cmp[["ticker","dR2_best_minus_FF3"]]
    except Exception:
        pass

    ff5 = web.DataReader('F-F_Research_Data_5_Factors_2x3_Daily','famafrench')[0]
    ff5.index = pd.to_datetime(ff5.index); ff5 = ff5[['Mkt-RF','SMB','HML','RMW','CMA']]/100.0
    mom = web.DataReader('F-F_Momentum_Factor_Daily','famafrench')[0]
    mom.index = pd.to_datetime(mom.index); mom = mom/100.0; mom.columns = ['Mom']
    rf  = web.DataReader('F-F_Research_Data_Factors_Daily','famafrench')[0]
    rf.index = pd.to_datetime(rf.index); rf = rf[['RF']]/100.0

    ffX = ff5.join(mom, how='inner').join(rf, how='inner')
    cols_ff3  = ["Mkt-RF","SMB","HML"]
    cols_ff5  = ["Mkt-RF","SMB","HML","RMW","CMA"]
    cols_car4 = ["Mkt-RF","SMB","HML","Mom"]
    cols_ff5m = ["Mkt-RF","SMB","HML","RMW","CMA","Mom"]

    dat_ff3  = rets.join(ffX[cols_ff3 + ["RF"]],  how="inner")
    dat_ff5  = rets.join(ffX[cols_ff5 + ["RF"]],  how="inner")
    dat_car4 = rets.join(ffX[cols_car4 + ["RF"]], how="inner")
    dat_ff5m = rets.join(ffX[cols_ff5m + ["RF"]], how="inner")

    def r2(dat, t, cols):
        y = dat[t] - dat["RF"]; X = sm.add_constant(dat[cols])
        return sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags":5}).rsquared

    rows = []
    for t in rets.columns:
        r3  = r2(dat_ff3,  t, cols_ff3)
        r5  = r2(dat_ff5,  t, cols_ff5)
        r4  = r2(dat_car4, t, cols_car4)
        r5m = r2(dat_ff5m, t, cols_ff5m)
        rows.append((t, max(r5, r4, r5m) - r3))
    return pd.DataFrame(rows, columns=["ticker","dR2_best_minus_FF3"])

metrics_dr2 = dr2_from_compare_csv_or_compute()

# ---- C. Merge & save ----
out4 = (metrics_rcn.merge(metrics_dr2, on='ticker', how='left')
                 [['ticker','RMSE','Corr','dR2_best_minus_FF3','n']])
out4.to_csv("metrics_summary_4col.csv", index=False)
display(out4.round(4))
print("Saved -> metrics_summary_4col.csv")

import numpy as np, pandas as pd, statsmodels.api as sm

FFCOLS = ["Mkt-RF","SMB","HML","RF"]

def get_betas(ticker):
    try:
        b = models[ticker].params
        return float(b['const']), float(b['Mkt-RF']), float(b['SMB']), float(b['HML'])
    except Exception:
        y = (data[ticker] - data["RF"]).dropna()
        X = sm.add_constant(data[["Mkt-RF","SMB","HML"]].loc[y.index])
        m = sm.OLS(y, X, missing="drop").fit(cov_type="HAC", cov_kwds={"maxlags":5})
        b = m.params
        return float(b['const']), float(b['Mkt-RF']), float(b['SMB']), float(b['HML'])

def ff3_expected_excess(alpha, bm, bs, bh, mkt_rf, smb, hml):
    return alpha + bm*mkt_rf + bs*smb + bh*hml   # daily, decimal

#  the three tickers
names = ["NVDA","TSLA","JPM"]

# --- scenarios (factor shocks are DAILY decimals)
scenarios = [
    #  Scenario label                        Mkt-RF    SMB     HML      Note
    ("Risk-on +2% market",                   0.0200,   0.000,  0.000,  "Broad rally: high-β names lead; mostly market-driven."),
    ("Growth-led rally",                     0.0100,   0.000, -0.003,  "Growth > Value (HML<0); benefits growth-tilted stocks."),
    ("Value rotation",                       0.0050,   0.000,  0.003,  "Value > Growth (HML>0); helps value/banks/energy."),
    ("Small-cap squeeze",                    0.0050,   0.005,  0.000,  "SMB>0 favors small-cap exposure; hurts large-cap tilts."),
    ("Risk-off −1.5% market",               -0.0150,  -0.003,  0.003,  "Flight to safety; high-β/growth lag, value defensive."),
]

# ---  table
rows = []
for label, mkt, smb, hml, note in scenarios:
    row = {"Scenario": label, "Notes": note}
    for t in names:
        a,bm,bs,bh = get_betas(t)
        r = ff3_expected_excess(a,bm,bs,bh, mkt, smb, hml)
        row[t] = f"{r*100:+.2f}%"   # pretty percentage string
    rows.append(row)

scen_table = pd.DataFrame(rows, columns=["Scenario","NVDA","TSLA","JPM","Notes"])
display(scen_table)
scen_table.to_csv("scenario_table_readable.csv", index=False)
print("Saved -> scenario_table_readable.csv")

import pandas as pd
import re


df = pd.read_csv("scenario_table_readable.csv")

df4 = df[["Scenario","NVDA","TSLA","JPM"]].copy()


def pct_to_number(x):
    s = str(x).strip()
    s = re.sub(r'[%+]', '', s)  # drop % and +
    return float(s)

for c in ["NVDA","TSLA","JPM"]:
    df4[c] = df4[c].apply(pct_to_number)


df4.to_csv("scenario_table_4col.csv", index=False)
print(df4)

import pandas as pd

# Assumes `data` has daily total returns for tickers (as decimals) and RF (decimal):
# columns like: ['NVDA','TSLA','JPM', ..., 'RF'] indexed by Date
tickers = ['NVDA', 'TSLA', 'JPM']

rf_ann = data['RF'].mean() * 252  # annualized risk-free (decimal)
rows = []
for t in tickers:
    excess = (data[t] - data['RF']).dropna()
    e_excess_ann = excess.mean() * 252          # decimal
    e_total_ann  = e_excess_ann + rf_ann        # decimal
    rows.append({'Ticker': t,
                 'E_excess_ann': e_excess_ann * 100,  # percent
                 'E_total_ann':  e_total_ann  * 100}) # percent

tbl = pd.DataFrame(rows)
tbl.to_csv('table_expected_returns.csv', index=False)
tbl.round(2)






































In [None]:
from matplotlib import pyplot as plt
_df_26['beta_mkt'].plot(kind='line', figsize=(8, 4), title='beta_mkt')
plt.gca().spines[['top', 'right']].set_visible(False)

In [None]:
# @title HML vs RF

from matplotlib import pyplot as plt
ff3.plot(kind='scatter', x='HML', y='RF', s=32, alpha=.8)
plt.gca().spines[['top', 'right',]].set_visible(False)

In [None]:
# @title SMB vs HML

from matplotlib import pyplot as plt
ff3.plot(kind='scatter', x='SMB', y='HML', s=32, alpha=.8)
plt.gca().spines[['top', 'right',]].set_visible(False)