In [1]:
import warnings; warnings.filterwarnings('ignore')
import pandas as pd, numpy as np, yfinance as yf
from datetime import datetime

In [21]:
# ---------------- USER PARAMETERS ----------------
START_DATE = "2010-01-01"
END_DATE = datetime.today().strftime("%Y-%m-%d")
FORMATION = 12                                      # look-back window (months)
SKIP = 1                                            # skip window (months)
HOLD = 12                                            # holding period (months)
METRIC = "raw"                                      # "risk" → mean/vol | "raw" → cumulative return
TOP_DECILE = 0.10                                   # winners
BOT_DECILE = 0.10                                   # losers
INIT_CAPITAL = 1_000_000                            # rupees – used for trade P&L
FILENAME = "momentum_backtest_results.xlsx"
MEMBERSHIP_FILE = "nifty50_membership.xlsx"

In [None]:
def load_membership_data(filename):
    """Load and process Nifty 50 membership data with gap handling."""
    df = pd.read_excel(filename)
    df['Start'] = pd.to_datetime(df['Start'])
    df['End'] = pd.to_datetime(df['End'])  
    
    membership_periods = {}
    for _, row in df.iterrows():
        ticker = row['Ticker']
        start = row['Start']
        end = row['End'] if pd.notna(row['End']) else pd.Timestamp('2030-12-31')
        
        if ticker not in membership_periods:
            membership_periods[ticker] = []
        membership_periods[ticker].append((start, end))
    
    return membership_periods

def get_active_members(membership_periods, date):
    """Get list of stocks that are active members at month-end date."""
    active_stocks = []
    month_end = pd.Timestamp(date.year, date.month, 1) + pd.offsets.MonthEnd(0)
    
    for ticker, periods in membership_periods.items():
        for start, end in periods:
            if start <= month_end <= end:
                clean_ticker = ticker.replace(".NS", "")
                active_stocks.append(clean_ticker)
                break  
    
    return active_stocks

print("Loading membership data...")
membership_periods = load_membership_data(MEMBERSHIP_FILE)
all_tickers = list(membership_periods.keys())

Loading membership data...


In [None]:
def fetch_prices(tickers, start, end):
    out = {}
    for tk in (t.strip() for t in tickers):
        if not tk:
            continue
        df = yf.download(tk, start=start, end=end,
                        auto_adjust=True, progress=False,
                        multi_level_index=False)
        if not df.empty:
            out[tk.replace(".NS", "")] = df["Close"]
    return pd.DataFrame(out)

daily_prices = fetch_prices(all_tickers, START_DATE, END_DATE)
if daily_prices.empty:
    raise RuntimeError("Price download failed – check tickers / internet")

monthly_prices = daily_prices.resample("M").last()
monthly_returns = monthly_prices.pct_change()


In [None]:
equity_curve = [1.0]
portfolio_records = []
trades = []
per_stock_trades = {}
open_positions = []
all_months = monthly_returns.index
start_idx = FORMATION + SKIP

In [None]:
for idx in range(start_idx, len(all_months)):
    date = all_months[idx]
    
    active_stocks = get_active_members(membership_periods, date)
    
    if len(active_stocks) < 10:  # Need minimum stocks for decile formation
        print(f"Warning: Only {len(active_stocks)} active stocks at {date.strftime('%Y-%m')}, skipping...")
        continue
    
    available_stocks = [s for s in active_stocks if s in monthly_prices.columns]
    
    if len(available_stocks) < 10:
        print(f"Warning: Only {len(available_stocks)} stocks with price data at {date.strftime('%Y-%m')}, skipping...")
        continue

    port_ret = 0.0
    to_close = []
    for pos in open_positions:
        s, w = pos['stock'], pos['weight']
        r = monthly_returns.get(s, pd.Series()).reindex(all_months).loc[date]
        if np.isnan(r):
            r = 0.0 
        port_ret += w * r
        pos['months_left'] -= 1 
        if pos['months_left'] == 0: 
            exit_px = monthly_prices.loc[date, s]
            if not np.isnan(exit_px):
                raw_ret = (exit_px / pos['entry_price']) - 1
                trade_ret = raw_ret if w > 0 else -raw_ret
                profit_rs = INIT_CAPITAL * abs(w) * trade_ret
                trade = {
                    "Entry_Date" : pos['entry_date'].strftime("%Y-%m-%d"),
                    "Exit_Date" : date.strftime("%Y-%m-%d"),
                    "Stock" : s,
                    "Side" : "Long" if w > 0 else "Short",
                    "Entry_Price": round(pos['entry_price'], 2),
                    "Exit_Price" : round(exit_px, 2),
                    "Return_% " : round(trade_ret * 100, 2),
                    "Profit_RS" : round(profit_rs, 2),
                    "Weight" : round(w, 4),
                    "Mean_MonRet": round(pos['mean_ret'], 6),
                    "Volatility" : round(pos['vol_ret'], 6),
                    "Score" : round(pos['score'], 6)
                }
                trades.append(trade)
                per_stock_trades.setdefault(s, []).append(trade)
            to_close.append(pos)
    
    for pos in to_close: 
        open_positions.remove(pos)

    equity_curve.append(equity_curve[-1] * (1 + port_ret))

    formation_end = date - pd.DateOffset(months=SKIP)
    formation_start = formation_end - pd.DateOffset(months=FORMATION-1)
    
    slice_ = monthly_returns.loc[formation_start:formation_end, available_stocks]
    
    mean_ret = slice_.mean()
    vol_ret = slice_.std().replace(0, np.nan) # avoid /0
    
    if METRIC == "risk":
        score = mean_ret / vol_ret
    elif METRIC == "raw":
        score = (1 + slice_).prod() - 1
    else:
        raise ValueError("METRIC must be 'risk' or 'raw'")
    
    score = score.dropna()
    n_stocks = len(score)
    
    if n_stocks == 0:
        portfolio_records.append(
            {"Month": date.strftime("%Y-%m"),
             "Year" : date.year,
             "Portfolio_Return": port_ret,
             "Active_Stocks": len(available_stocks)})
        continue 
    
    n_long = max(1, round(n_stocks * TOP_DECILE))
    n_short = max(1, round(n_stocks * BOT_DECILE))
    
    winners = score.nlargest(n_long).index.tolist()
    losers = score.nsmallest(n_short).index.tolist()
    
    scale = 1 / HOLD
    new_weights = {s: +scale/len(winners) for s in winners}
    new_weights.update({s: -scale/len(losers) for s in losers})
    
    for s, w in new_weights.items():
        entry_px = monthly_prices.loc[date, s]
        if np.isnan(entry_px):
            continue
        open_positions.append({
            'stock' : s,
            'weight' : w,
            'months_left' : HOLD,
            'entry_price' : entry_px,
            'entry_date' : date,
            'mean_ret' : mean_ret.get(s, np.nan),
            'vol_ret' : vol_ret.get(s, np.nan),
            'score' : score[s]
        })
    
    current_weights = {}
    for pos in open_positions:
        current_weights[pos['stock']] = current_weights.get(pos['stock'], 0) + pos['weight']
    
    rec = {"Month": date.strftime("%Y-%m"),
           "Year" : date.year,
           "Portfolio_Return": port_ret,
           "Active_Stocks": len(available_stocks)}
    
    for tk in daily_prices.columns:
        rec[tk] = current_weights.get(tk, 0)
    
    portfolio_records.append(rec)

In [None]:
port_df = pd.DataFrame(portfolio_records)
port_ret = port_df["Portfolio_Return"]
ann_ret = port_ret.mean() * 12
ann_vol = port_ret.std(ddof=0) * np.sqrt(12)
sharpe = ann_ret / ann_vol if ann_vol else np.nan
cum = np.cumprod(1 + port_ret)
max_dd = (cum / cum.cummax() - 1).min()
win_rate = (port_ret > 0).mean()

print("-"*60)
print("SURVIVORSHIP BIAS CORRECTED MOMENTUM BACKTEST")
print(f"BACKTEST SUMMARY (Start → {port_df['Month'].iloc[-1]})")
print(f"Total Months : {len(port_ret)}")
print(f"Total Trades (roundtrip): {len(trades)}")
print(f"Avg Active Stocks/Month : {port_df['Active_Stocks'].mean():.1f}")
print(f"Annualised Return : {ann_ret*100:6.2f}%")
print(f"Annualised Volatility : {ann_vol*100:6.2f}%")
print(f"Sharpe Ratio : {sharpe:6.3f}")
print(f"Maximum Drawdown : {max_dd*100:6.2f}%")
print(f"Winning Months : {win_rate*100:6.1f}%")
print("-"*60)

------------------------------------------------------------
SURVIVORSHIP BIAS CORRECTED MOMENTUM BACKTEST
BACKTEST SUMMARY (Start → 2025-06)
Total Months : 173
Total Trades (roundtrip): 1330
Avg Active Stocks/Month : 41.8
Annualised Return :   5.99%
Annualised Volatility :  28.10%
Sharpe Ratio :  0.213
Maximum Drawdown : -64.15%
Winning Months :   56.1%
------------------------------------------------------------


In [None]:
with pd.ExcelWriter(FILENAME, engine="openpyxl") as xl:
    pd.DataFrame(trades).to_excel(xl, sheet_name="Trades", index=False)
    for stock, tlist in per_stock_trades.items():
        pd.DataFrame(tlist).to_excel(xl, sheet_name=stock[:31], index=False)
    port_df.to_excel(xl, sheet_name="Portfolio", index=False)

In [None]:
param_dict = {
    "Start_Date" : START_DATE,
    "End_Date" : END_DATE,
    "Formation_M" : FORMATION,
    "Skip_M" : SKIP,
    "Hold_M" : HOLD,
    "Metric" : METRIC,
    "Top_Decile" : TOP_DECILE,
    "Bottom_Decile": BOT_DECILE,
    "Init_Capital" : INIT_CAPITAL,
    "Universe_Size": len(all_tickers),
    "Survivorship_Bias": "CORRECTED"
}

summary_dict = {
    "Total_Months" : len(port_ret),
    "Total_Trades" : len(trades),
    "Ann_Return" : round(ann_ret, 6),
    "Ann_Vol" : round(ann_vol, 6),
    "Sharpe" : round(sharpe, 3),
    "Max_Drawdown" : round(max_dd, 6),
    "Win_Rate" : round(win_rate, 4),
    "Avg_Active_Stocks": round(port_df['Active_Stocks'].mean(), 1)
}

with pd.ExcelWriter("backtest_overview.xlsx", engine="openpyxl") as xl:
    pd.DataFrame([param_dict]).to_excel(xl, sheet_name="Parameters", index=False)
    pd.DataFrame([summary_dict]).to_excel(xl, sheet_name="Summary", index=False)

print("Overview workbook created: backtest_overview.xlsx")


Overview workbook created: backtest_overview.xlsx


In [None]:
n_ret = yf.download('^NSEI', start=START_DATE,
                    auto_adjust=True, progress=False,
                    multi_level_index=False)['Close'] \
            .resample('M').last().pct_change().dropna()
ann_ret_n, ann_vol_n = n_ret.mean()*12, n_ret.std()*np.sqrt(12)
print(f'NIFTY50  |  AnnRet {ann_ret_n:.2%}  Vol {ann_vol_n:.2%}  Sharpe {(ann_ret_n/ann_vol_n):.2f}')

print(f"\nExcel file created: {FILENAME}")

NIFTY50  |  AnnRet 12.07%  Vol 16.45%  Sharpe 0.73

Excel file created: momentum_backtest_results.xlsx
