In [1]:
import pandas as pd
import numpy as np
import yfinance as yf
import pandas_datareader.data as pdr
from datetime import datetime
import pandas_ta as ta
import itertools
import matplotlib.pyplot as plt
import matplotlib as mpl

import warnings
warnings.filterwarnings("ignore")

In [2]:
def optimize_strategy(data, strategy_func, param_grid, initial_capital=100000):
    """
    Optimizes a given strategy by testing various parameter combinations.

    Parameters:
    - data (pd.DataFrame): DataFrame containing price data.
    - strategy_func (callable): The strategy function to optimize. Must return (metrics, data).
    - param_grid (dict): Dictionary where keys are parameter names and values are lists of parameter values to test.
    - initial_capital (float): Initial investment capital.

    Returns:
    - results_df (pd.DataFrame): DataFrame containing performance metrics for each parameter combination.
    """
    import itertools

    # Generate all parameter combinations
    param_names = list(param_grid.keys())
    param_combinations = list(itertools.product(*param_grid.values()))

    results = []

    for params in param_combinations:
        # Create a dictionary of parameters to pass to the strategy function
        param_dict = dict(zip(param_names, params))

        # Run the strategy with the current parameters
        metrics, _ = strategy_func(data.copy(), **param_dict, initial_capital=initial_capital)

        # Append results
        results.append({**param_dict, **metrics})

    # Create DataFrame from results
    results_df = pd.DataFrame(results)
    return results_df


In [3]:
# Set date range
start_date = "2005-11-01"
end_date = "2024-11-01"

In [4]:
spy_data = yf.download("SPY", start=start_date, end=end_date)

validation_size = 0.20

train_size =\
    int(len(spy_data) 
        * 
        (1 - validation_size)
       )

spy_train, spy_test =\
    (spy_data[0         :train_size], 
     spy_data[train_size:len(spy_data)    ]
    )

[*********************100%%**********************]  1 of 1 completed


In [5]:
def calculate_benchmark(data, initial_capital=100000):

    if "Adj Close" not in data.columns:
        raise ValueError("Input data must contain 'Adj Close' column for benchmark calculations.")

    # Step 1: Calculate log returns for SPY
    data["log_return"] = np.log(data["Adj Close"] / data["Adj Close"].shift(1))

    # Step 2: Calculate cumulative log returns and exponential for cumulative returns
    data["cumulative_return"] = np.exp(data["log_return"].cumsum())

    # Step 3: Calculate final capital
    final_capital = data["cumulative_return"].iloc[-1] * initial_capital

    # Step 4: Prepare benchmark metrics
    benchmark = {
        "Cumulative Return": data["cumulative_return"].iloc[-1],
        "Final Capital": final_capital,
    }

    return benchmark

# Calculate benchmark for train dataset
train_benchmark = calculate_benchmark(spy_train)

# Calculate benchmark for test dataset
test_benchmark = calculate_benchmark(spy_test)

# Display results
train_benchmark, test_benchmark

({'Cumulative Return': 4.273412537215389, 'Final Capital': 427341.25372153893},
 {'Cumulative Return': 1.5800143389477963,
  'Final Capital': 158001.43389477962})

### RSI

In [None]:
def visualize_strategy_vs_benchmark(strategy_data, benchmark_data, title="Strategy vs. Benchmark Performance"):

    # Ensure data alignment
    strategy_cumulative = strategy_data['cumulative_strategy_return']
    benchmark_cumulative = benchmark_data['cumulative_benchmark_return']

    # Plot
    plt.figure(figsize=(12, 6))
    plt.plot(strategy_cumulative, label="Strategy Return", linewidth=2)
    plt.plot(benchmark_cumulative, label="Benchmark Return", linestyle='--', linewidth=2)

    # Add titles and labels
    plt.title(title, fontsize=16)
    plt.xlabel("Time", fontsize=12)
    plt.ylabel("Cumulative Return", fontsize=12)

    # Add legend
    plt.legend(loc="upper left", fontsize=12)

    # Grid and show plot
    plt.grid(True)
    plt.tight_layout()
    plt.show()


In [6]:
def rsi_strategy(data, rsi_window=14, buy_threshold=30, sell_threshold=70, unwind_threshold=50, initial_capital=100000):
    
    # Ensure "Adj Close" column exists
    if "Adj Close" not in data.columns:
        raise ValueError("Input data must contain 'Adj Close' column.")

    # Step 1: Calculate price changes
    price_change = data["Adj Close"].diff().iloc[1:]
    gain = price_change.clip(lower=0)  # Keep only gains
    loss = -price_change.clip(upper=0)  # Keep only losses

    # Step 2: Calculate initial average gain and loss
    gain_loss = pd.concat([gain, loss], axis=1)
    gain_loss.columns = ["gain", "loss"]
    avg_gain = gain_loss["gain"].iloc[:rsi_window].mean()
    avg_loss = gain_loss["loss"].iloc[:rsi_window].mean()

    # Step 3: Calculate RSI using rolling averages
    avg_gains, avg_losses = [avg_gain], [avg_loss]
    for i in range(rsi_window, len(gain_loss) - 1):
        current_gain = gain_loss["gain"].iloc[i]
        current_loss = gain_loss["loss"].iloc[i]
        avg_gain = (avg_gain * (rsi_window - 1) + current_gain) / rsi_window
        avg_loss = (avg_loss * (rsi_window - 1) + current_loss) / rsi_window
        avg_gains.append(avg_gain)
        avg_losses.append(avg_loss)

    avg_gains = pd.Series(avg_gains, index=gain_loss.index[rsi_window:])
    avg_losses = pd.Series(avg_losses, index=gain_loss.index[rsi_window:])
    rs = avg_gains / avg_losses
    rsi = 100 - 100 / (1 + rs)

    # Step 4: Generate RSI-based signals
    rsi_up = rsi.diff() > 0
    rsi_down = rsi.diff() < 0

    crossed_buy = (np.sign((rsi - buy_threshold) * (rsi - buy_threshold).shift(1)).fillna(1) == -1)
    crossed_unwind = (np.sign((rsi - unwind_threshold) * (rsi - unwind_threshold).shift(1)).fillna(1) == -1)
    crossed_sell = (np.sign((rsi - sell_threshold) * (rsi - sell_threshold).shift(1)).fillna(1) == -1)

    buy = rsi_up & crossed_buy  # Buy when RSI crosses the buy threshold upwards
    unwind = crossed_unwind     # Exit when RSI crosses the unwind threshold
    sell = rsi_down & crossed_sell  # Sell when RSI crosses the sell threshold downwards

    # Step 5: Generate position series
    position_series = pd.Series(np.nan, index=rsi.index)
    position_series[buy] = 1
    position_series[unwind] = 0
    position_series[sell] = -1
    position_series.iloc[0] = 0  # Initialize position as flat
    position_series = position_series.ffill()  # Forward fill positions

    # Step 6: Merge signals and price data
    buy_unwind = position_series.clip(lower=0)
    buy_once = (buy_unwind.diff() == 1)
    unwind_buy = (buy_unwind.diff() == -1)
    sell_unwind = position_series.clip(upper=0)
    sell_once = (sell_unwind.diff() == -1)
    unwind_sell = (sell_unwind.diff() == 1)
    unwind_once = unwind_buy | unwind_sell

    signals = pd.concat(
        [buy_once, unwind_once, sell_once, data["Adj Close"]],
        axis=1
    ).dropna()
    signals.columns = ["Buy Signal", "Unwind Signal", "Sell Signal", "Adj Close"]

    # Step 7: Calculate strategy returns
    data = data.loc[position_series.index]
    data["positions"] = position_series
    # Step 3: Calculate daily returns
    data["log_return"] = np.log(data["Adj Close"] / data["Adj Close"].shift(1))

    # Step 4: Calculate strategy returns
    data["strategy_return"] = data["positions"].shift(1) * data["log_return"]

    # Step 5: Calculate cumulative returns
    data["cumulative_return"] = np.exp(data["strategy_return"].cumsum())

    # Step 6: Performance metrics
    days = (data.index[-1] - data.index[0]).days

    sharpe_ratio = \
    (
        np.sqrt(252) * (data["strategy_return"].dropna().apply(np.exp).mean() - 1) /
        data["strategy_return"].dropna().apply(np.exp).std()
    )
    cagr = (data["strategy_return"].sum()) ** (365.0 / days) - 1
    final_capital = data["cumulative_return"].iloc[-1] * initial_capital
    max_drawdown = (data["cumulative_return"].cummax() - data["cumulative_return"]).max()

    metrics = {
        "Sharpe Ratio": sharpe_ratio,
        "CAGR": cagr,
        "Final Capital": final_capital,
        "Max Drawdown": max_drawdown,
        "Cumulative Return": data["cumulative_return"].iloc[-1],
    }

    return metrics, data

In [7]:
RSI_param_grid = {
    "rsi_window": range(5, 31),  # RSI windows from 5 to 30
    "buy_threshold": [25, 30, 35],
    "sell_threshold": [65, 70, 75],
    "unwind_threshold": [50],  # Keep unwind threshold fixed for simplicity
}

In [9]:
results_df_RSI = optimize_strategy(spy_train, rsi_strategy, RSI_param_grid, initial_capital=100000)

In [35]:
results_df_RSI.sort_values("Sharpe Ratio", ascending=False).head()

Unnamed: 0,rsi_window,buy_threshold,sell_threshold,unwind_threshold,Sharpe Ratio,CAGR,Final Capital,Max Drawdown,Cumulative Return
6,5,35,65,50,0.330578,-0.039539,171930.339464,0.464479,1.719303
8,5,35,75,50,0.302848,-0.051145,156918.661759,0.348469,1.569187
62,11,35,75,50,0.271211,-0.058019,149777.791835,0.460565,1.497778
7,5,35,70,50,0.258343,-0.061401,146522.902666,0.397934,1.465229
203,27,30,75,50,0.23602,-0.080314,132651.569591,0.260777,1.326516


In [36]:
rsi_strategy(spy_train, rsi_window=5, buy_threshold=35, sell_threshold=65, unwind_threshold=50, initial_capital=100000)

({'Sharpe Ratio': 0.33057791669753034,
  'CAGR': -0.039538625741325495,
  'Final Capital': 171930.33946429187,
  'Max Drawdown': 0.46447854154862744,
  'Cumulative Return': 1.7193033946429188},
                   Open        High         Low       Close   Adj Close  \
 Date                                                                     
 2005-11-09  122.080002  122.949997  121.860001  122.389999   85.315605   
 2005-11-10  122.339996  123.519997  121.750000  123.339996   85.977821   
 2005-11-11  123.349998  123.839996  122.430000  123.760002   86.270622   
 2005-11-14  123.790001  124.019997  123.379997  123.690002   86.221794   
 2005-11-15  123.550003  124.089996  122.860001  123.239998   85.908104   
 ...                ...         ...         ...         ...         ...   
 2021-01-06  369.709991  376.980011  369.119995  373.549988  353.982300   
 2021-01-07  376.100006  379.899994  375.910004  379.100006  359.241547   
 2021-01-08  380.589996  381.489990  377.100006  381.260

### Mean-Reversion

In [11]:
def mean_reversion_strategy(
    data, window=42, num_std=2, initial_capital=100000
):
    """
    Implements a mean-reversion trading strategy based on Bollinger Bands.

    Parameters:
    - data (pd.DataFrame): DataFrame with "Adj Close" prices.
    - window (int): Lookback window for SMA and standard deviation.
    - num_std (float): Number of standard deviations for the Bollinger Bands.
    - initial_capital (float): Initial investment capital.

    Returns:
    - metrics (dict): Performance metrics including Sharpe Ratio, CAGR, etc.
    """
    # Ensure "Adj Close" column exists
    if "Adj Close" not in data.columns:
        raise ValueError("Input data must contain 'Adj Close' column.")

    # Calculate Bollinger Bands
    data["SMA"] = data["Adj Close"].rolling(window=window).mean()
    data["STD"] = data["Adj Close"].rolling(window=window).std()
    data["Upper Band"] = data["SMA"] + num_std * data["STD"]
    data["Lower Band"] = data["SMA"] - num_std * data["STD"]

    # Generate signals
    buy_signal = data["Adj Close"] < data["Lower Band"]
    sell_signal = data["Adj Close"] > data["Upper Band"]
    data["positions"] = 0
    data.loc[buy_signal, "positions"] = 1  # Buy when below lower band
    data.loc[sell_signal, "positions"] = -1  # Sell when above upper band
    data["positions"] = data["positions"].ffill()  # Carry forward positions

    # Step 3: Calculate daily returns
    data["log_return"] = np.log(data["Adj Close"] / data["Adj Close"].shift(1))

    # Step 4: Calculate strategy returns
    data["strategy_return"] = data["positions"].shift(1) * data["log_return"]

    # Step 5: Calculate cumulative returns
    data["cumulative_return"] = np.exp(data["strategy_return"].cumsum())

    # Step 6: Performance metrics
    days = (data.index[-1] - data.index[0]).days

    sharpe_ratio = \
    (
        np.sqrt(252) * (data["strategy_return"].dropna().apply(np.exp).mean() - 1) /
        data["strategy_return"].dropna().apply(np.exp).std()
    )
    cagr = (data["strategy_return"].sum()) ** (365.0 / days) - 1
    final_capital = data["cumulative_return"].iloc[-1] * initial_capital
    max_drawdown = (data["cumulative_return"].cummax() - data["cumulative_return"]).max()

    metrics = {
        "Sharpe Ratio": sharpe_ratio,
        "CAGR": cagr,
        "Final Capital": final_capital,
        "Max Drawdown": max_drawdown,
        "Cumulative Return": data["cumulative_return"].iloc[-1],
    }

    return metrics, data


In [12]:
param_grid_MR =\
{
    "window": [20, 42, 60],  # SMA windows to test
    "num_std": [1, 2, 3],   # Standard deviation multipliers
}


In [13]:
results_df_MR = optimize_strategy(spy_train, mean_reversion_strategy, param_grid_MR, initial_capital=100000)

In [14]:
results_df_MR.sort_values("Final Capital", ascending=False).head()

Unnamed: 0,window,num_std,Sharpe Ratio,CAGR,Final Capital,Max Drawdown,Cumulative Return
0,20,1,0.4166,-0.015183,220870.310688,1.559405,2.208703
1,20,2,0.569367,-0.026565,194255.699359,0.18161,1.942557
5,42,3,0.47354,-0.055335,152309.949959,0.172717,1.523099
8,60,3,0.276264,-0.087051,128441.272944,0.171511,1.284413
3,42,1,0.172182,-0.093556,125170.837526,0.629404,1.251708


### MACD

In [15]:
def macd_strategy(data, short_window=12, long_window=26, signal_window=9, initial_capital=100000):
    """
    Implements a MACD-based trading strategy.

    Parameters:
    - data (pd.DataFrame): DataFrame with "Adj Close" prices.
    - short_window (int): Window size for the short-term EMA (default: 12).
    - long_window (int): Window size for the long-term EMA (default: 26).
    - signal_window (int): Window size for the signal line EMA (default: 9).
    - initial_capital (float): Initial investment capital.

    Returns:
    - metrics (dict): Performance metrics including Sharpe Ratio, CAGR, etc.
    - data (pd.DataFrame): DataFrame with MACD, signal line, positions, and returns.
    """
    # Ensure "Adj Close" column exists
    if "Adj Close" not in data.columns:
        raise ValueError("Input data must contain 'Adj Close' column.")

    # Step 1: Calculate MACD and Signal Line
    data["EMA_Short"] = data["Adj Close"].ewm(span=short_window, adjust=False).mean()
    data["EMA_Long"] = data["Adj Close"].ewm(span=long_window, adjust=False).mean()
    data["MACD"] = data["EMA_Short"] - data["EMA_Long"]
    data["Signal_Line"] = data["MACD"].ewm(span=signal_window, adjust=False).mean()

    # Step 2: Generate trading signals
    data["positions"] = 0
    data ["positions"] =\
(
    np
    .where(data["MACD"] > data["Signal_Line"],
           1, -1)
)
    data["positions"] = data["positions"].ffill()  # Carry forward positions

    # Step 3: Calculate daily returns
    data["log_return"] = np.log(data["Adj Close"] / data["Adj Close"].shift(1))

    # Step 4: Calculate strategy returns
    data["strategy_return"] = data["positions"].shift(1) * data["log_return"]

    # Step 5: Calculate cumulative returns
    data["cumulative_return"] = np.exp(data["strategy_return"].cumsum())

    # Step 6: Performance metrics
    days = (data.index[-1] - data.index[0]).days

    sharpe_ratio = \
    (
        np.sqrt(252) * (data["strategy_return"].dropna().apply(np.exp).mean() - 1) /
        data["strategy_return"].dropna().apply(np.exp).std()
    )
    cagr = (data["strategy_return"].sum()) ** (365.0 / days) - 1
    final_capital = data["cumulative_return"].iloc[-1] * initial_capital
    max_drawdown = (data["cumulative_return"].cummax() - data["cumulative_return"]).max()

    metrics = {
        "Sharpe Ratio": sharpe_ratio,
        "CAGR": cagr,
        "Final Capital": final_capital,
        "Max Drawdown": max_drawdown,
        "Cumulative Return": data["cumulative_return"].iloc[-1],
    }

    return metrics, data

In [22]:
param_grid_MACD = {
    "short_window": [8, 12, 16],  # Test different short-term EMAs
    "long_window": [20, 26, 32],  # Test different long-term EMAs
    "signal_window": [7, 9, 11],  # Test different signal line EMAs
}

results_df_MACD = optimize_strategy(spy_train, macd_strategy, param_grid_MACD, initial_capital=100000)

results_df_MACD.sort_values("Final Capital", ascending=False).head()

Unnamed: 0,short_window,long_window,signal_window,Sharpe Ratio,CAGR,Final Capital,Max Drawdown,Cumulative Return
8,8,32,11,0.21957,-0.064325,143878.787398,0.552596,1.438788
21,16,26,7,0.21588,-0.066253,142271.540833,0.5963,1.422715
24,16,32,7,0.208634,-0.070243,139143.465508,0.580904,1.391435
13,12,26,9,0.190795,-0.081113,131816.198806,0.565514,1.318162
15,12,32,7,0.171596,-0.095372,124329.445278,0.525242,1.243294


In [23]:
def macd_sma_strategy(
    data, 
    short_window=12, 
    long_window=26, 
    signal_window=9, 
    sma_window=50, 
    initial_capital=100000
):
    # Ensure "Adj Close" column exists
    if "Adj Close" not in data.columns:
        raise ValueError("Input data must contain 'Adj Close' column.")

    # Step 1: Calculate MACD and Signal Line
    data["EMA_Short"] = data["Adj Close"].ewm(span=short_window, adjust=False).mean()
    data["EMA_Long"] = data["Adj Close"].ewm(span=long_window, adjust=False).mean()
    data["MACD"] = data["EMA_Short"] - data["EMA_Long"]
    data["Signal_Line"] = data["MACD"].ewm(span=signal_window, adjust=False).mean()

    # Step 2: Calculate SMA
    data["SMA"] = data["Adj Close"].rolling(window=sma_window).mean()

    # Step 3: Generate trading signals
    data["positions"] = 0
    buy_signal = (data["MACD"] > data["Signal_Line"]) & (data["Adj Close"] > data["SMA"])
    sell_signal = (data["MACD"] < data["Signal_Line"]) & (data["Adj Close"] < data["SMA"])

    data.loc[buy_signal, "positions"] = 1  # Buy signal
    data.loc[sell_signal, "positions"] = -1  # Sell signal
    data["positions"] = data["positions"].ffill()  # Carry forward positions

    # Step 3: Calculate daily returns
    data["log_return"] = np.log(data["Adj Close"] / data["Adj Close"].shift(1))

    # Step 4: Calculate strategy returns
    data["strategy_return"] = data["positions"].shift(1) * data["log_return"]

    # Step 5: Calculate cumulative returns
    data["cumulative_return"] = np.exp(data["strategy_return"].cumsum())

    # Step 6: Performance metrics
    days = (data.index[-1] - data.index[0]).days

    sharpe_ratio = \
    (
        np.sqrt(252) * (data["strategy_return"].dropna().apply(np.exp).mean() - 1) /
        data["strategy_return"].dropna().apply(np.exp).std()
    )
    cagr = (data["strategy_return"].sum()) ** (365.0 / days) - 1
    final_capital = data["cumulative_return"].iloc[-1] * initial_capital
    max_drawdown = (data["cumulative_return"].cummax() - data["cumulative_return"]).max()

    metrics = {
        "Sharpe Ratio": sharpe_ratio,
        "CAGR": cagr,
        "Final Capital": final_capital,
        "Max Drawdown": max_drawdown,
        "Cumulative Return": data["cumulative_return"].iloc[-1],
    }

    return metrics, data

In [24]:
param_grid_MACD_SMA = {
    "short_window": [8, 12, 16],  # Short-term EMA
    "long_window": [20, 26, 32],  # Long-term EMA
    "signal_window": [7, 9, 11],  # Signal line EMA
    "sma_window": [30, 50, 100],  # SMA filter
}

results_df_MACD_SMA = optimize_strategy(spy_train, macd_sma_strategy, param_grid_MACD_SMA, initial_capital=100000)

results_df_MACD_SMA.sort_values("Final Capital", ascending=False).head()


Unnamed: 0,short_window,long_window,signal_window,sma_window,Sharpe Ratio,CAGR,Final Capital,Max Drawdown,Cumulative Return
26,8,32,11,100,0.312849,-0.03765,174692.459216,0.387612,1.746925
65,16,26,7,100,0.310179,-0.038291,173713.988413,0.440358,1.73714
74,16,32,7,100,0.308718,-0.03871,173080.064832,0.420531,1.730801
41,12,26,9,100,0.294146,-0.042808,167209.198826,0.352475,1.672092
47,12,32,7,100,0.284614,-0.045994,163013.814297,0.390489,1.630138


### KDJ strategy with MA

In [25]:
def kdj_ma_strategy(data, k_window=14, d_window=3, ma_window=50, initial_capital=100000):
    """
    Implements a KDJ-based trading strategy with MA trend confirmation.
    
    Parameters:
    - data (pd.DataFrame): DataFrame with "Adj Close", "High", and "Low" prices.
    - k_window (int): Lookback period for calculating RSV and %K.
    - d_window (int): Smoothing period for %K and %D.
    - ma_window (int): Moving average window for trend confirmation.
    - initial_capital (float): Starting capital for the strategy.

    Returns:
    - metrics (dict): Performance metrics (Sharpe Ratio, CAGR, etc.).
    - data (pd.DataFrame): Updated DataFrame with KDJ values, MA, positions, and returns.
    """
    # Ensure required columns exist
    for col in ["Adj Close", "High", "Low"]:
        if col not in data.columns:
            raise ValueError(f"Input data must contain '{col}' column.")

    # Step 1: Calculate RSV (Raw Stochastic Value)
    data["Lowest_Low"] = data["Low"].rolling(window=k_window).min()
    data["Highest_High"] = data["High"].rolling(window=k_window).max()
    data["RSV"] = 100 * ((data["Adj Close"] - data["Lowest_Low"]) / (data["Highest_High"] - data["Lowest_Low"]))

    # Step 2: Calculate %K (N-day moving average of RSV)
    data["%K"] = data["RSV"].rolling(window=k_window).mean()

    # Step 3: Calculate %D (M-day moving average of %K)
    data["%D"] = data["%K"].rolling(window=d_window).mean()

    # Step 4: Calculate %J (divergence of %K and %D using fixed J factor = 3)
    data["%J"] = 3 * data["%K"] - 2 * data["%D"]

    # Step 5: Calculate Moving Average (MA) for trend confirmation
    data["MA"] = data["Adj Close"].rolling(window=ma_window).mean()

    # Step 6: Generate trading signals
    data["positions"] = 0
    long_signal = (data["%J"] < 20) & (data["Adj Close"] > data["MA"]) & (data["Adj Close"].shift(1) <= data["MA"].shift(1))
    short_signal = (data["%J"] > 80) & (data["Adj Close"] < data["MA"]) & (data["Adj Close"].shift(1) >= data["MA"].shift(1))

    data.loc[long_signal, "positions"] = 1  # Long signal
    data.loc[short_signal, "positions"] = -1  # Short signal
    data["positions"] = data["positions"].ffill()  # Forward-fill positions

    # Step 3: Calculate daily returns
    data["log_return"] = np.log(data["Adj Close"] / data["Adj Close"].shift(1))

    # Step 4: Calculate strategy returns
    data["strategy_return"] = data["positions"].shift(1) * data["log_return"]

    # Step 5: Calculate cumulative returns
    data["cumulative_return"] = np.exp(data["strategy_return"].cumsum())

    # Step 6: Performance metrics
    days = (data.index[-1] - data.index[0]).days

    sharpe_ratio = \
    (
        np.sqrt(252) * (data["strategy_return"].dropna().apply(np.exp).mean() - 1) /
        data["strategy_return"].dropna().apply(np.exp).std()
    )
    cagr = (data["strategy_return"].sum()) ** (365.0 / days) - 1
    final_capital = data["cumulative_return"].iloc[-1] * initial_capital
    max_drawdown = (data["cumulative_return"].cummax() - data["cumulative_return"]).max()

    metrics = {
        "Sharpe Ratio": sharpe_ratio,
        "CAGR": cagr,
        "Final Capital": final_capital,
        "Max Drawdown": max_drawdown,
        "Cumulative Return": data["cumulative_return"].iloc[-1],
    }

    return metrics, data

In [26]:
param_grid_KDJ_MA = {
    "k_window": [10, 14, 20],  # Range of K windows
    "d_window": [3, 5, 7],     # Range of D windows
    "ma_window": [20, 50, 100] # Range of MA windows
}

In [27]:
results_df_KDJ = optimize_strategy(spy_train, kdj_ma_strategy, param_grid_KDJ_MA, initial_capital=100000)

results_df_KDJ.sort_values("Sharpe Ratio", ascending=False).head()

Unnamed: 0,k_window,d_window,ma_window,Sharpe Ratio,CAGR,Final Capital,Max Drawdown,Cumulative Return
8,10,7,100,0.646731,-0.088971,127432.174474,0.040524,1.274322
26,20,7,100,0.546391,-0.096707,123729.408396,0.077524,1.237294
11,14,3,100,0.546391,-0.096707,123729.408396,0.077524,1.237294
2,10,3,100,0.546391,-0.096707,123729.408396,0.077524,1.237294
23,20,5,100,0.546391,-0.096707,123729.408396,0.077524,1.237294


### KDJ

In [28]:
def kdj_strategy_test(data, n=14, smoothing=3, initial_capital=100000):

    # Calculate the lowest low and highest high over the lookback period
    data['Low_n'] = data['Low'].rolling(window=n).min()
    data['High_n'] = data['High'].rolling(window=n).max()
    
    # Raw %K calculation
    data['K'] = (data['Close'] - data['Low_n']) / (data['High_n'] - data['Low_n']) * 100

    data['D'] = data['K'].rolling(window=smoothing).mean()

    # Calculate J line
    data['J'] = 3 * data['K'] - 2 * data['D']
    
    # Initialize signals column
    data['Signal'] = 0

    # Overbought (D > 70) and Oversold (D < 30) signals
    data.loc[data['D'] > 70, 'Signal'] = -1  # Short
    data.loc[data['D'] < 30, 'Signal'] = 1   # Long

    # Short-Term Signals
    # Short-term Buy (Golden Cross below 20)
    short_term_buy = (data['K'] > data['D']) & (data['K'].shift(1) <= data['D'].shift(1)) & (data['D'] < 20)
    data.loc[short_term_buy, 'Signal'] = 1

    # Short-term Sell (Death Cross above 80)
    short_term_sell = (data['K'] < data['D']) & (data['K'].shift(1) >= data['D'].shift(1)) & (data['D'] > 80)
    data.loc[short_term_sell, 'Signal'] = -1

    # W Pattern (Higher-High Golden Cross)
    data['K_prev1'] = data['K'].shift(1)
    data['D_prev1'] = data['D'].shift(1)
    data['K_prev2'] = data['K'].shift(2)
    data['D_prev2'] = data['D'].shift(2)

    w_pattern = (
        (data['K'] > data['D']) &  # Current Golden Cross
        (data['K_prev1'] < data['D_prev1']) &  # Previous Death Cross
        (data['K_prev2'] > data['D_prev2']) &  # Earlier Golden Cross
        (data['K'] > data['K_prev1']) &  # Higher High
        (data['K'] < 50)  # Below 50
    )
    data.loc[w_pattern, 'Signal'] = 2  # Strong Buy

    # M Pattern (Lower-Low Death Cross)
    m_pattern = (
        (data['K'] < data['D']) &  # Current Death Cross
        (data['K_prev1'] > data['D_prev1']) &  # Previous Golden Cross
        (data['K_prev2'] < data['D_prev2']) &  # Earlier Death Cross
        (data['K'] < data['K_prev1']) &  # Lower Low
        (data['K'] > 50)  # Above 50
    )
    data.loc[m_pattern, 'Signal'] = -2  # Strong Sell

    # Bottom and Peak Detection
    data['Bottom'] = (data['J'] < 10).rolling(window=3).sum() >= 3
    data['Peak'] = (data['J'] > 90).rolling(window=3).sum() >= 3
    data.loc[data['Bottom'], 'Signal'] = 1   # Long at bottom
    data.loc[data['Peak'], 'Signal'] = -1    # Short at peak
    
    # Convert signals to positions
    data['positions'] = data['Signal'].replace({2: 1, -2: -1})  # Strong signals treated as regular buy/sell
    data['positions'] = data['positions'].fillna(0).ffill()  # Carry forward positions

    # Step 3: Calculate daily returns
    data["log_return"] = np.log(data["Adj Close"] / data["Adj Close"].shift(1))

    # Step 4: Calculate strategy returns
    data["strategy_return"] = data["positions"].shift(1) * data["log_return"]

    # Step 5: Calculate cumulative returns
    data["cumulative_return"] = np.exp(data["strategy_return"].cumsum())

    # Step 6: Performance metrics
    days = (data.index[-1] - data.index[0]).days

    sharpe_ratio = \
    (
        np.sqrt(252) * (data["strategy_return"].dropna().apply(np.exp).mean() - 1) /
        data["strategy_return"].dropna().apply(np.exp).std()
    )
    cagr = (data["strategy_return"].sum()) ** (365.0 / days) - 1
    final_capital = data["cumulative_return"].iloc[-1] * initial_capital
    max_drawdown = (data["cumulative_return"].cummax() - data["cumulative_return"]).max()

    metrics = {
        "Sharpe Ratio": sharpe_ratio,
        "CAGR": cagr,
        "Final Capital": final_capital,
        "Max Drawdown": max_drawdown,
        "Cumulative Return": data["cumulative_return"].iloc[-1],
    }

    return metrics, data

In [29]:
param_grid_KDJ_test =\
{
    "n": [5,10,14,20,25,30],  
    "smoothing": [1,2,3,5,7,10],   
}

In [30]:
results_df_KDJ_test = optimize_strategy(spy_train, kdj_strategy_test, param_grid_KDJ_test, initial_capital=100000)


In [31]:
results_df_KDJ_test.sort_values("Sharpe Ratio", ascending=False).head()

Unnamed: 0,n,smoothing,Sharpe Ratio,CAGR,Final Capital,Max Drawdown,Cumulative Return
0,5,1,0.515502,0.005532,296689.95184,1.082913,2.9669
5,5,10,0.510465,-0.002736,260959.297595,0.404434,2.609593
6,10,1,0.504007,0.002507,282584.424325,1.190486,2.825844
9,10,5,0.43837,-0.006096,248731.28796,1.116236,2.487313
2,5,3,0.406079,-0.014711,222155.968527,1.298127,2.22156


In [32]:
kdj_strategy_test(spy_test, n=5, smoothing=1, initial_capital=100000)

({'Sharpe Ratio': 0.47267092716936987,
  'CAGR': -0.33904855912018517,
  'Final Capital': 123037.80522276083,
  'Max Drawdown': 0.2047585053706693,
  'Cumulative Return': 1.2303780522276082},
                   Open        High         Low       Close   Adj Close  \
 Date                                                                     
 2021-01-13  378.690002  380.859985  377.850006  379.790009  359.895477   
 2021-01-14  380.589996  381.130005  378.100006  378.459991  358.635071   
 2021-01-15  376.720001  377.579987  373.700012  375.700012  356.019714   
 2021-01-19  378.339996  379.230011  376.750000  378.649994  358.815186   
 2021-01-20  381.109985  384.790009  380.690002  383.890015  363.780731   
 ...                ...         ...         ...         ...         ...   
 2024-10-25  581.510010  584.460022  578.080017  579.039978  579.039978   
 2024-10-28  582.580017  582.710022  580.520020  580.830017  580.830017   
 2024-10-29  579.849976  582.909973  578.429993  581.77002

In [33]:
def kdj_strategy(data, n=14, m=3):    
    """
    Parameters:
        data (pd.DataFrame): A DataFrame containing 'High', 'Low', and 'Close' prices.
        n (int): Lookback period for the Stochastic Oscillator (default is 14).
        m (int): Smoothing period for %D (default is 3).
    # Ensure required columns exist
    for col in ["Adj Close", "High", "Low"]:
        if col not in data.columns:
            raise ValueError(f"Input data must contain '{col}' column.")
    """
    # Step 1: Calculate %K, %D, and %J
    data["Lowest_Low"] = data["Low"].rolling(window=n).min()
    data["Highest_High"] = data["High"].rolling(window=n).max()
    data["%K"] = 100 * ((data["Adj Close"] - data["Lowest_Low"]) / (data["Highest_High"] - data["Lowest_Low"]))
    data["%D"] = data["%K"].rolling(window=m).mean()
    data["%J"] = 3 * data["%K"] - 2 * data["%D"]

    # Step 2: Generate trading signals
    data["positions"] = 0
    buy_signal = (data["%K"] < 20) & (data["%J"] > data["%D"])
    sell_signal = (data["%K"] > 80) & (data["%J"] < data["%D"])

    data.loc[buy_signal, "positions"] = 1  # Buy signal
    data.loc[sell_signal, "positions"] = -1  # Sell signal
    data["positions"] = data["positions"].ffill()  # Carry forward positions
    
    # Step 3: Calculate daily returns
    data["log_return"] = np.log(data["Adj Close"] / data["Adj Close"].shift(1))

    # Step 4: Calculate strategy returns
    data["strategy_return"] = data["positions"].shift(1) * data["log_return"]

    # Step 5: Calculate cumulative returns
    data["cumulative_return"] = np.exp(data["strategy_return"].cumsum())

    # Step 6: Performance metrics
    days = (data.index[-1] - data.index[0]).days

    sharpe_ratio = \
    (
        np.sqrt(252) * (data["strategy_return"].dropna().apply(np.exp).mean() - 1) /
        data["strategy_return"].dropna().apply(np.exp).std()
    )
    cagr = (data["strategy_return"].sum()) ** (365.0 / days) - 1
    final_capital = data["cumulative_return"].iloc[-1] * initial_capital
    max_drawdown = (data["cumulative_return"].cummax() - data["cumulative_return"]).max()

    metrics = {
        "Sharpe Ratio": sharpe_ratio,
        "CAGR": cagr,
        "Final Capital": final_capital,
        "Max Drawdown": max_drawdown,
        "Cumulative Return": data["cumulative_return"].iloc[-1],
    }

    return metrics, data

In [34]:
# Call the KDJ strategy function
metrics, kdj_results = kdj_strategy(spy_data_train, k_window=10, d_window=5, j_factor=2, k_threshold=30, d_threshold=90, initial_capital=100000)

# Extract buy and sell signals
buy_signals = kdj_results[kdj_results["positions"].diff() == 1]
sell_signals = kdj_results[kdj_results["positions"].diff() == -1]

# Plot the price data and signals
plt.figure(figsize=(14, 7))
plt.plot(kdj_results.index, kdj_results["Adj Close"], label="Adjusted Close", color="blue", alpha=0.7)
plt.scatter(buy_signals.index, buy_signals["Adj Close"], marker="^", color="green", label="Buy Signal", alpha=1)
plt.scatter(sell_signals.index, sell_signals["Adj Close"], marker="v", color="red", label="Sell Signal", alpha=1)
plt.title("KDJ Strategy: Buy and Sell Signals")
plt.xlabel("Date")
plt.ylabel("Price")
plt.legend()
plt.grid()

# Plot %K, %D, and %J for visualization
plt.figure(figsize=(14, 7))
plt.plot(kdj_results.index, kdj_results["%K"], label="%K", color="blue")
plt.plot(kdj_results.index, kdj_results["%D"], label="%D", color="red")
plt.plot(kdj_results.index, kdj_results["%J"], label="%J", color="green")
plt.axhline(20, linestyle="--", color="grey", label="K Threshold (20)")
plt.axhline(80, linestyle="--", color="grey", label="D Threshold (80)")
plt.title("KDJ Indicators")
plt.xlabel("Date")
plt.ylabel("Value")
plt.legend()
plt.grid()

plt.show()

NameError: name 'spy_data_train' is not defined