In [4]:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed

# -------------------------------
# Helper Functions to Compute Indicators
# -------------------------------

def compute_indicators_for_stock(df, ma_periods, rsi_period=14):
    """
    Given a DataFrame with columns 'Date' and 'Price', compute moving averages for each period in ma_periods
    and compute the RSI using the specified rsi_period.
    """
    df = df.sort_values('Date').reset_index(drop=True)
    
    # Compute moving averages
    for period in ma_periods:
        df[f"MA {period}"] = df['Price'].rolling(window=period).mean()
    
    # Compute RSI (using the standard 14-day period by default)
    delta = df['Price'].diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.rolling(window=rsi_period).mean()
    avg_loss = loss.rolling(window=rsi_period).mean()
    rs = avg_gain / avg_loss
    df['RSI'] = 100 - (100 / (1 + rs))
    
    return df

# -------------------------------
# Simulation Functions for Strategies
# -------------------------------

def simulate_ma_strategy(df, short_window, long_window):
    """
    Simulate a moving average crossover strategy:
      - Buy when the short MA crosses above the long MA.
      - Sell when the short MA crosses below the long MA.
    """
    trades = []
    in_position = False
    entry_price = 0.0

    ma_short = df[f"MA {short_window}"]
    ma_long  = df[f"MA {long_window}"]
    price    = df['Price']

    for i in range(1, len(df)):
        # Buy signal: short MA crosses above long MA
        if not in_position and (ma_short.iloc[i] > ma_long.iloc[i]) and (ma_short.iloc[i-1] <= ma_long.iloc[i-1]):
            in_position = True
            entry_price = price.iloc[i]
        # Sell signal: short MA crosses below long MA
        elif in_position and (ma_short.iloc[i] < ma_long.iloc[i]) and (ma_short.iloc[i-1] >= ma_long.iloc[i-1]):
            exit_price = price.iloc[i]
            trades.append(exit_price - entry_price)
            in_position = False

    # Close any open position at the end of the series
    if in_position:
        trades.append(price.iloc[-1] - entry_price)
        
    return trades

def simulate_rsi_strategy(df, rsi_buy_threshold, rsi_sell_threshold):
    """
    Simulate an RSI-based strategy:
      - Buy when RSI exceeds the buy threshold.
      - Sell when RSI falls below the sell threshold.
    """
    trades = []
    in_position = False
    entry_price = 0.0
    price = df['Price']
    rsi   = df['RSI']

    for i in range(len(df)):
        if not in_position and (rsi.iloc[i] > rsi_buy_threshold):
            in_position = True
            entry_price = price.iloc[i]
        elif in_position and (rsi.iloc[i] < rsi_sell_threshold):
            exit_price = price.iloc[i]
            trades.append(exit_price - entry_price)
            in_position = False

    if in_position:
        trades.append(price.iloc[-1] - entry_price)
    return trades

def simulate_combined_strategy(df, short_window, long_window, rsi_buy_threshold, rsi_sell_threshold):
    """
    Simulate a combined strategy:
      - Buy when a short MA crosses above a long MA and RSI exceeds the buy threshold.
      - Sell when a short MA crosses below a long MA and RSI falls below the sell threshold.
    """
    trades = []
    in_position = False
    entry_price = 0.0
    price = df['Price']
    ma_short = df[f"MA {short_window}"]
    ma_long  = df[f"MA {long_window}"]
    rsi      = df['RSI']

    for i in range(1, len(df)):
        if not in_position and (ma_short.iloc[i] > ma_long.iloc[i]) and (ma_short.iloc[i-1] <= ma_long.iloc[i-1]) and (rsi.iloc[i] > rsi_buy_threshold):
            in_position = True
            entry_price = price.iloc[i]
        elif in_position and (ma_short.iloc[i] < ma_long.iloc[i]) and (ma_short.iloc[i-1] >= ma_long.iloc[i-1]) and (rsi.iloc[i] < rsi_sell_threshold):
            exit_price = price.iloc[i]
            trades.append(exit_price - entry_price)
            in_position = False

    if in_position:
        trades.append(price.iloc[-1] - entry_price)
    return trades

# -------------------------------
# Wrapper Functions for Parallel Processing
# -------------------------------

def get_trades_for_ticker_ma(df, s, l):
    if f"MA {s}" in df.columns and f"MA {l}" in df.columns:
        return simulate_ma_strategy(df, s, l)
    return []

def get_trades_for_ticker_rsi(df, rsi_buy, rsi_sell):
    if 'RSI' in df.columns:
        return simulate_rsi_strategy(df, rsi_buy, rsi_sell)
    return []

def get_trades_for_ticker_combined(df, s, l, rsi_buy, rsi_sell):
    if (f"MA {s}" in df.columns) and (f"MA {l}" in df.columns) and ('RSI' in df.columns):
        return simulate_combined_strategy(df, s, l, rsi_buy, rsi_sell)
    return []

# -------------------------------
# Main Backtesting Process
# -------------------------------

# Load the CSV file.
# The CSV is assumed to have the first column as "Date" and all subsequent columns are adjusted close prices for S&P 500 stocks.
data = pd.read_csv('fully_cleaned_stock_data.csv', parse_dates=['Date'])
data.sort_values('Date', inplace=True)

# All columns except 'Date' represent different stocks.
tickers = data.columns[1:]

# Define the moving average periods of interest.
short_windows = [5, 7, 10, 15, 20]
long_windows  = [50, 70, 100, 150, 200]
ma_periods = sorted(list(set(short_windows + long_windows)))

# Build a dictionary with processed DataFrames for each ticker.
stock_dfs = {}
for ticker in tickers:
    df_stock = data[['Date', ticker]].rename(columns={ticker: 'Price'})
    df_stock = compute_indicators_for_stock(df_stock, ma_periods, rsi_period=14)
    stock_dfs[ticker] = df_stock

# -------------------------------
# 1. Evaluate the MA Crossover Strategy using Parallel Processing
# -------------------------------
ma_results = []
for s in short_windows:
    for l in long_windows:
        if s < l:
            trades_list = Parallel(n_jobs=-1)(
                delayed(get_trades_for_ticker_ma)(df, s, l) for df in stock_dfs.values()
            )
            # Flatten the list of trade results
            all_trades = [trade for sublist in trades_list for trade in sublist]
            if all_trades:
                avg_pnl = np.mean(all_trades)
                var_pnl = np.var(all_trades)
            else:
                avg_pnl = np.nan
                var_pnl = np.nan
            ma_results.append({
                'Short_MA': s,
                'Long_MA': l,
                'Average_PnL': avg_pnl,
                'Variance_PnL': var_pnl
            })

ma_results_df = pd.DataFrame(ma_results)
print("Moving Average Crossover Strategy Results:")
print(ma_results_df)

# -------------------------------
# 2. Evaluate the RSI-only Strategy using Parallel Processing
# -------------------------------
# Define a list of RSI threshold pairs to test: (RSI_buy_threshold, RSI_sell_threshold)
rsi_thresholds = [
    (70, 30),
    (75, 25),
    (65, 35),
    (80, 20),
    (60, 40)
]

rsi_results = []
for (rsi_buy, rsi_sell) in rsi_thresholds:
    trades_list = Parallel(n_jobs=-1)(
        delayed(get_trades_for_ticker_rsi)(df, rsi_buy, rsi_sell) for df in stock_dfs.values()
    )
    all_trades = [trade for sublist in trades_list for trade in sublist]
    if all_trades:
        avg_pnl = np.mean(all_trades)
        var_pnl = np.var(all_trades)
    else:
        avg_pnl = np.nan
        var_pnl = np.nan
    rsi_results.append({
        'RSI_Buy': rsi_buy,
        'RSI_Sell': rsi_sell,
        'Average_PnL': avg_pnl,
        'Variance_PnL': var_pnl
    })

rsi_results_df = pd.DataFrame(rsi_results)
print("\nRSI Strategy Results:")
print(rsi_results_df)

# -------------------------------
# 3. Evaluate the Combined MA & RSI Strategy using Parallel Processing
# -------------------------------
combined_results = []
for s in short_windows:
    for l in long_windows:
        if s < l:
            for (rsi_buy, rsi_sell) in rsi_thresholds:
                trades_list = Parallel(n_jobs=-1)(
                    delayed(get_trades_for_ticker_combined)(df, s, l, rsi_buy, rsi_sell) for df in stock_dfs.values()
                )
                all_trades = [trade for sublist in trades_list for trade in sublist]
                if all_trades:
                    avg_pnl = np.mean(all_trades)
                    var_pnl = np.var(all_trades)
                else:
                    avg_pnl = np.nan
                    var_pnl = np.nan
                combined_results.append({
                    'Short_MA': s,
                    'Long_MA': l,
                    'RSI_Buy': rsi_buy,
                    'RSI_Sell': rsi_sell,
                    'Average_PnL': avg_pnl,
                    'Variance_PnL': var_pnl
                })

combined_results_df = pd.DataFrame(combined_results)
print("\nCombined MA & RSI Strategy Results:")
print(combined_results_df)


Moving Average Crossover Strategy Results:
    Short_MA  Long_MA  Average_PnL  Variance_PnL
0          5       50     7.871429  6.958341e+05
1          5       70     7.423369  7.307737e+05
2          5      100    15.178919  1.514370e+06
3          5      150    15.532509  1.714463e+06
4          5      200    20.153850  2.455918e+06
5          7       50     7.427176  7.856267e+05
6          7       70    10.152610  8.656064e+05
7          7      100    19.006505  1.689013e+06
8          7      150    21.636478  1.819870e+06
9          7      200    21.248463  2.320816e+06
10        10       50     9.248550  8.046415e+05
11        10       70    12.754032  1.051194e+06
12        10      100    20.331770  1.913846e+06
13        10      150    26.027039  2.050271e+06
14        10      200    24.428110  2.798834e+06
15        15       50     8.970257  8.972154e+05
16        15       70    10.264637  1.046650e+06
17        15      100    25.071525  2.236426e+06
18        15      150    3

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

# Set the default plot style
plt.style.use('default')

In [2]:
# Load the fully cleaned historical market data
data = pd.read_csv('fully_cleaned_stock_data.csv')

# Preview the first few rows
data.head()

Unnamed: 0,Date,AAPL_adjclose,ABC_adjclose,ABMD_adjclose,ABT_adjclose,ADI_adjclose,ADM_adjclose,ADP_adjclose,ADSK_adjclose,AEP_adjclose,...,WEC_adjclose,WHR_adjclose,WM_adjclose,WMB_adjclose,WRB_adjclose,WST_adjclose,WY_adjclose,XEL_adjclose,XOM_adjclose,ZION_adjclose
0,1996-02-01,0.215958,2.986069,7.25,5.315176,5.811177,8.10832,9.436206,7.10058,12.827035,...,5.974281,28.367496,11.960501,4.755478,1.971051,4.115513,6.994101,7.907068,8.867104,12.244139
1,1996-02-02,0.222618,2.889744,6.75,5.330627,5.659843,8.10832,9.52495,6.983215,12.718023,...,5.974281,27.745419,12.033428,4.76786,1.971051,4.136405,6.994101,7.887832,8.826117,12.244139
2,1996-02-05,0.222618,2.841581,6.75,5.361532,5.932241,8.10832,9.406628,7.33531,12.790698,...,5.998176,27.869825,12.106361,4.743094,1.92202,4.136405,6.974671,7.92631,8.826117,12.244139
3,1996-02-06,0.225472,2.8175,6.375,5.268825,5.962509,8.161662,9.465786,7.217945,12.718023,...,5.998176,27.745419,11.814639,4.70594,1.971051,4.136405,7.091239,8.041742,8.894431,12.284556
4,1996-02-07,0.215007,2.913825,6.5,5.361532,6.537573,8.161662,9.52495,8.127523,12.790698,...,5.998176,28.056448,11.814639,4.70594,1.971051,4.07373,7.188381,8.022505,8.908089,12.324958


In [3]:
# Convert the 'Date' column to datetime (if not already) and sort the data
if not np.issubdtype(data['Date'].dtype, np.datetime64):
    data['Date'] = pd.to_datetime(data['Date'])

data.sort_values(by=['Ticker', 'Date'], inplace=True)
data.reset_index(drop=True, inplace=True)

data.head()

KeyError: 'Ticker'

In [None]:
def backtest_strategy(df, short_window, long_window, tc=0.001):
    """
    Backtests a simple moving average crossover strategy for a single stock.
    
    Parameters:
        df (pd.DataFrame): DataFrame for one stock with columns such as ['Date', 'Open', 'High', 'Low', 'Close']
        short_window (int): Lookback period for the short-term SMA
        long_window (int): Lookback period for the long-term SMA
        tc (float): Transaction cost rate per trade side (default 0.1%)
        
    Returns:
        trades (list): List of returns for each completed trade
    """
    df = df.copy().reset_index(drop=True)
    
    # Compute moving averages (using min_periods=1 to get a value from the first day)
    df['SMA_short'] = df['Close'].rolling(window=short_window, min_periods=1).mean()
    df['SMA_long'] = df['Close'].rolling(window=long_window, min_periods=1).mean()
    
    # Generate a simple signal: 1 when short SMA is above long SMA, otherwise 0
    df['signal'] = 0
    df.loc[df['SMA_short'] > df['SMA_long'], 'signal'] = 1
    
    trades = []
    position = 0
    buy_price = 0
    
    # Loop over the DataFrame (from index 1 to len-2 to allow next day open access)
    for i in range(1, len(df) - 1):
        # Buy signal: when signal changes from 0 to 1
        if position == 0 and df.loc[i-1, 'signal'] == 0 and df.loc[i, 'signal'] == 1:
            # Buy at the next day's open price
            buy_price = df.loc[i+1, 'Open']
            position = 1
        # Sell signal: when signal changes from 1 to 0
        elif position == 1 and df.loc[i-1, 'signal'] == 1 and df.loc[i, 'signal'] == 0:
            # Sell at the next day's open price
            sell_price = df.loc[i+1, 'Open']
            # Calculate trade return (percentage change minus transaction costs on both sides)
            ret = (sell_price - buy_price) / buy_price - 2 * tc
            trades.append(ret)
            position = 0
    
    # If a position is still open at the end, exit at the last available close price
    if position == 1:
        sell_price = df.iloc[-1]['Close']
        ret = (sell_price - buy_price) / buy_price - 2 * tc
        trades.append(ret)
        position = 0
    
    return trades

# Test the backtest function on a sample stock
sample_ticker = data['Ticker'].unique()[0]
sample_df = data[data['Ticker'] == sample_ticker].copy()
sample_trades = backtest_strategy(sample_df, short_window=10, long_window=50)
print(f"Ticker: {sample_ticker}, Number of trades: {len(sample_trades)}")
if sample_trades:
    print(f"Average return per trade: {np.mean(sample_trades):.2%}")
    print(f"Return variance: {np.var(sample_trades):.6f}")

In [None]:
# Define ranges for the short-term and long-term moving averages
short_windows = [5, 10, 15, 20]
long_windows = [30, 50, 100, 200]

# Dictionary to store results for each combination
results = {}

tickers = data['Ticker'].unique()

for s in short_windows:
    for l in long_windows:
        if s < l:  # Consider only valid combinations where short_window < long_window
            all_trades = []
            
            # Process each ticker
            for ticker in tqdm(tickers, desc=f"Processing SMA {s}/{l}", leave=False):
                df_stock = data[data['Ticker'] == ticker].copy()
                trades = backtest_strategy(df_stock, short_window=s, long_window=l)
                if trades:  # Only include if there was at least one trade
                    all_trades.extend(trades)
            
            if all_trades:
                avg_return = np.mean(all_trades)
                var_return = np.var(all_trades)
            else:
                avg_return = np.nan
                var_return = np.nan
            
            results[(s, l)] = {
                'avg_return': avg_return, 
                'var_return': var_return, 
                'num_trades': len(all_trades)
            }

# Convert the results dictionary to a DataFrame
results_df = pd.DataFrame.from_dict(results, orient='index')
results_df.index = pd.MultiIndex.from_tuples(results_df.index, names=['Short_SMA', 'Long_SMA'])
results_df.sort_index(inplace=True)
results_df

In [None]:
# Visualize the results using heatmaps for average return and variance
avg_return_df = results_df['avg_return'].unstack()
var_return_df = results_df['var_return'].unstack()

fig, ax = plt.subplots(figsize=(8,6))
cax = ax.matshow(avg_return_df, cmap='viridis')
fig.colorbar(cax)
ax.set_xticks(range(len(avg_return_df.columns)))
ax.set_xticklabels(avg_return_df.columns)
ax.set_yticks(range(len(avg_return_df.index)))
ax.set_yticklabels(avg_return_df.index)
ax.set_xlabel('Long SMA')
ax.set_ylabel('Short SMA')
ax.set_title('Average Return per Trade')
plt.show()

fig, ax = plt.subplots(figsize=(8,6))
cax = ax.matshow(var_return_df, cmap='magma')
fig.colorbar(cax)
ax.set_xticks(range(len(var_return_df.columns)))
ax.set_xticklabels(var_return_df.columns)
ax.set_yticks(range(len(var_return_df.index)))
ax.set_yticklabels(var_return_df.index)
ax.set_xlabel('Long SMA')
ax.set_ylabel('Short SMA')
ax.set_title('Variance of Return per Trade')
plt.show()