<a href="https://colab.research.google.com/github/alemartini5/mean-reversion-project/blob/main/mean_rev.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import pandas as pd
import numpy as np
import yfinance as yf
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import zscore
import warnings

# Configurazioni iniziali
sns.set(style='whitegrid')
warnings.filterwarnings('ignore')

In [None]:
# ============================================================
# Step 1: Download e preparazione dati
# ============================================================

tickers = ["SPY", "QQQ", "IWM"]

# Download dati daily (ultimi 2 anni)
data = yf.download(tickers, period="2y", interval="1d", group_by='ticker')

# Creiamo DataFrame con Adjusted Close
prices = pd.DataFrame()
for t in tickers:
    if 'Adj Close' in data[t].columns:
        prices[t] = data[t]['Adj Close']
    else:
        prices[t] = data[t]['Close']

# Pulizia missing values
prices = prices.fillna(method='ffill')
prices.interpolate(method='linear', inplace=True)


# Grafico prezzi
plt.figure(figsize=(12, 5))
normalized_prices = prices / prices.iloc[0] * 100
for col in normalized_prices.columns:
    plt.plot(normalized_prices.index, normalized_prices[col], label=col, linewidth=1.8)

plt.title('Normalized Price Performance (Base = 100)')
plt.ylabel('Index Level')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()


In [4]:
# ============================================================
# Step 2: Strategia Mean Reversion
# ============================================================

# Parametri
lookback = 20        # rolling window
z_thresh = 1.5       # soglia Z-score
slippage = 0.0005    # 0.05% per trade
commission = 0.0005  # 0.05% per trade

# Calcolo Z-score
mean = prices.rolling(lookback).mean()
std = prices.rolling(lookback).std()
zscore_signal = (prices - mean) / std

# Segnali iniziali
signals = pd.DataFrame(0, index=prices.index, columns=prices.columns)
signals[zscore_signal < -z_thresh] = 1    # long
signals[zscore_signal > z_thresh] = -1    # short

# Filtro trend: long solo se prezzo sopra EMA50
ema50 = prices.rolling(50).mean()
for col in prices.columns:
    signals.loc[prices[col] < ema50[col], col] = 0

# Pulizia segnali
signals = signals.dropna()
signals = signals[(signals != 0).any(axis=1)]

# Calcolo returns
returns = prices.pct_change().loc[signals.index]

# Strategy returns con costi
strategy_returns = signals.shift(1) * returns
strategy_returns -= np.abs(signals.shift(1)) * (slippage + commission)


In [None]:
# ============================================================
# Step 3: Backtest e cumulative returns
# ============================================================

strategy_returns_clean = strategy_returns.dropna()
returns_clean = returns.loc[strategy_returns_clean.index]

cumulative_strategy = (1 + strategy_returns_clean).cumprod()
cumulative_buy_hold = (1 + returns_clean).cumprod()

# Drawdown
drawdown = cumulative_strategy / cumulative_strategy.cummax() - 1

plt.figure(figsize=(12, 5))
for col in drawdown.columns:
    plt.plot(drawdown.index, drawdown[col], label=f'{col} Drawdown', linewidth=1.5)
    plt.fill_between(drawdown.index, drawdown[col], 0, alpha=0.2)

# Aggiungere soglia -5%
plt.axhline(-0.05, color='red', linestyle='--', linewidth=1.5, label='-5% Threshold')

# Evidenziare il massimo drawdown globale
max_dd = drawdown.min().min()
plt.axhline(max_dd, color='black', linestyle=':', linewidth=1.2, label=f'Max DD: {max_dd:.1%}')

plt.title('Strategy Drawdowns')
plt.ylabel('Drawdown (%)')
plt.gca().yaxis.set_major_formatter(plt.matplotlib.ticker.PercentFormatter(1.0))
plt.legend()
plt.show()



# Cumulative returns vs Buy&Hold
from matplotlib.ticker import PercentFormatter

plt.figure(figsize=(12, 6))
for col in cumulative_strategy.columns:
    # Strategy line
    plt.plot(cumulative_strategy.index, cumulative_strategy[col], label=f'{col} Strategy', linewidth=2)

    # Buy&Hold line (same color, dashed)
    plt.plot(cumulative_buy_hold.index, cumulative_buy_hold[col], linestyle='--', linewidth=1.8, label=f'{col} Buy&Hold')

# Titolo e labels
plt.title('Cumulative Returns: Mean Reversion Strategy vs Buy&Hold')
plt.ylabel('Cumulative Return (%)')
plt.gca().yaxis.set_major_formatter(PercentFormatter(1.0))

plt.legend()
plt.grid(True, alpha=0.3)
plt.show()


In [None]:
# ============================================================
# Step 4: Performance Metrics e Rolling Sharpe
# ============================================================

def performance_metrics(returns, freq=252):
    ann_return = (1 + returns).prod()**(freq/len(returns)) - 1
    vol = returns.std() * np.sqrt(freq)
    sharpe = ann_return / vol if vol != 0 else np.nan
    downside = returns[returns < 0].std() * np.sqrt(freq)
    sortino = ann_return / downside if downside != 0 else np.nan
    cagr = (1 + returns).prod()**(1/(len(returns)/freq)) - 1
    cum_ret = (1 + returns).cumprod()
    max_dd = (cum_ret / cum_ret.cummax() - 1).min()
    win_rate = (returns > 0).mean()
    avg_win = returns[returns > 0].mean()
    avg_loss = returns[returns < 0].mean()
    payoff = abs(avg_win / avg_loss) if avg_loss != 0 else np.nan

    return pd.Series({
        "Annualized Return": ann_return,
        "Volatility": vol,
        "Sharpe Ratio": sharpe,
        "Sortino Ratio": sortino,
        "CAGR": cagr,
        "Max Drawdown": abs(max_dd),
        "Win Rate": win_rate,
        "Payoff Ratio": payoff
    })

# Metriche per asset
metrics = pd.DataFrame({
    col: performance_metrics(strategy_returns[col].dropna())
    for col in strategy_returns.columns
}).T
print(metrics)

# Rolling Sharpe Ratio
rolling_window = min(252, max(20, len(strategy_returns)//2))
rolling_sharpe = (
    strategy_returns.rolling(rolling_window).mean() /
    strategy_returns.rolling(rolling_window).std()
) * np.sqrt(252)
rolling_sharpe = rolling_sharpe.dropna(how='all')



if not rolling_sharpe.empty:
    plt.figure(figsize=(12, 5))
    ax = sns.boxplot(data=rolling_sharpe.astype(float), palette="Set2")

    # Linea Sharpe=0
    plt.axhline(0, color="black", linestyle="--", linewidth=1)

    # Aggiunta valori mediani come testo
    medians = rolling_sharpe.median()
    for i, median in enumerate(medians):
        ax.text(i, median, f"{median:.2f}",
                ha='center', va='bottom', fontsize=9, color="black")

    # Titolo e labels
    plt.title(f"Rolling Sharpe Ratio Distribution (Window = {rolling_window} days)", fontsize=14)
    plt.ylabel("Sharpe Ratio", fontsize=12)
    plt.xlabel("Asset", fontsize=12)
    plt.grid(alpha=0.3, linestyle="--")
    plt.tight_layout()
    plt.show()


In [None]:
# ============================================================
# Step 5: Walk-Forward Validation
# ============================================================

lookbacks = [10, 15, 20, 25, 30]
z_thresholds = np.arange(0.5, 3.0, 0.5)
train_window = 252   # ~1 anno
test_window = 63     # ~3 mesi

walkforward_results = []

start_idx = 0
while start_idx + train_window + test_window <= len(prices):
    train_idx = slice(start_idx, start_idx + train_window)
    test_idx = slice(start_idx + train_window, start_idx + train_window + test_window)

    train_returns = prices.iloc[train_idx].pct_change().dropna()
    test_returns = prices.iloc[test_idx].pct_change().dropna()

    # Ottimizzazione parametri
    results_train = pd.DataFrame(index=lookbacks, columns=z_thresholds)
    for lb in lookbacks:
        for zt in z_thresholds:
            r = pd.DataFrame(0, index=train_returns.index, columns=train_returns.columns)
            mean = train_returns.rolling(lb).mean()
            std = train_returns.rolling(lb).std()
            z = (train_returns - mean) / std
            r[z < -zt] = 1
            r[z > zt] = -1
            strat = r.shift(1) * train_returns
            if strat.std().mean() != 0:
                results_train.loc[lb, zt] = strat.mean().mean() * 252 / (strat.std().mean() * np.sqrt(252))

    # Migliori parametri
    best_lb, best_zt = results_train.stack().idxmax()

    # Test set
    r_test = pd.DataFrame(0, index=test_returns.index, columns=test_returns.columns)
    mean_test = test_returns.rolling(best_lb).mean()
    std_test = test_returns.rolling(best_lb).std()
    z_test = (test_returns - mean_test) / std_test
    r_test[z_test < -best_zt] = 1
    r_test[z_test > best_zt] = -1
    strat_test = r_test.shift(1) * test_returns

    walkforward_results.append({
        "train_start": prices.index[train_idx.start],
        "train_end": prices.index[train_idx.stop-1],
        "test_start": prices.index[test_idx.start],
        "test_end": prices.index[test_idx.stop-1],
        "lookback": best_lb,
        "z_threshold": best_zt,
        "test_metrics": performance_metrics(strat_test.mean(axis=1))
    })

    start_idx += test_window

walkforward_df = pd.DataFrame(walkforward_results)
print("Walk-Forward Validation Results:")
print(walkforward_df[['train_start','train_end','test_start','test_end','lookback','z_threshold','test_metrics']])






results = []  # per salvare metriche di ogni test

plt.figure(figsize=(14, 8))
palette = sns.color_palette("tab10", n_colors=len(walkforward_df))

for i, row in walkforward_df.iterrows():
    # Dati di test
    test_prices = prices.loc[row['test_start']:row['test_end']]
    test_returns = test_prices.pct_change().dropna()

    # Segnali strategia
    r_test = pd.DataFrame(0, index=test_returns.index, columns=test_returns.columns)
    mean_test = test_returns.rolling(row['lookback']).mean()
    std_test = test_returns.rolling(row['lookback']).std()
    z_test = (test_returns - mean_test) / std_test
    r_test[z_test < -row['z_threshold']] = 1
    r_test[z_test > row['z_threshold']] = -1

    strat_test = r_test.shift(1) * test_returns

    # Cumulative returns
    cumulative_test = (1 + strat_test.mean(axis=1)).cumprod() - 1
    cumulative_bh = (1 + test_returns.mean(axis=1)).cumprod() - 1

    # Metriche strategia
    strat_daily = strat_test.mean(axis=1)
    sharpe = (strat_daily.mean() / strat_daily.std()) * np.sqrt(252) if strat_daily.std() > 0 else 0
    cagr = (1 + strat_daily).prod() ** (252/len(strat_daily)) - 1
    roll_max = (1 + strat_daily).cumprod().cummax()
    dd = ((1 + strat_daily).cumprod() / roll_max - 1).min()

    # Salvo risultati
    results.append({
        "Test": i+1,
        "Sharpe": round(sharpe, 2),
        "CAGR %": round(cagr*100, 2),
        "MaxDD %": round(dd*100, 2)
    })

    color = palette[i]

    # Strategia evidenziata
    plt.plot(cumulative_test.index, cumulative_test * 100,
             label=f"Strategy Test {i+1}", color=color, linewidth=2.0)

    # Benchmark attenuato
    plt.plot(cumulative_bh.index, cumulative_bh * 100,
             linestyle="--", alpha=0.5, color=color)

plt.title("Walk-Forward Cumulative Returns (%)", fontsize=15, fontweight="bold")
plt.axhline(0, color="black", linestyle="--", linewidth=1)
plt.xlabel("Date", fontsize=12)
plt.ylabel("Cumulative Return (%)", fontsize=12)
plt.grid(alpha=0.3)
plt.legend()
plt.tight_layout()
plt.show()

# Tabella con metriche
results_df = pd.DataFrame(results)
print(results_df)



In [None]:
# ============================================================
# Step 6: Correlazioni e Ottimizzazione Parametri
# ============================================================

# Heatmap parametri
results_opt = pd.DataFrame(index=lookbacks, columns=z_thresholds)
for lb in lookbacks:
    for zt in z_thresholds:
        r = signals.copy()
        mean = returns.rolling(lb).mean()
        std = returns.rolling(lb).std()
        z = (returns - mean) / std
        r[z < -zt] = 1
        r[z > zt] = -1
        strat = r.shift(1) * returns
        if strat.std().mean() != 0:
            results_opt.loc[lb, zt] = strat.mean().mean() * 252 / (strat.std().mean() * np.sqrt(252))

sns.heatmap(results_opt.astype(float), annot=True, cmap='coolwarm')
plt.title('Sharpe Ratio Heatmap (Lookback vs Z-Score)')
plt.xlabel('Z-Score Threshold')
plt.ylabel('Lookback')
plt.show()

# Top 3 combinazioni
top_combinations = results_opt.stack().sort_values(ascending=False).head(3)
print("Top 3 parameter combinations (Lookback, Z-Score):\n", top_combinations)

# Correlazione tra strategie multi-asset
strategy_corr = strategy_returns.corr()
plt.figure(figsize=(6, 5))
sns.heatmap(strategy_corr, annot=True, cmap='coolwarm')
plt.title('Strategy Returns Correlation Across Assets')
plt.show()
