In [None]:
#LIBRARIES      
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import yfinance as yf

In [None]:
# The stategy summary
""" In short the strategy is a verision of moving average crossover but instead a strict crossover of the EMA and SMA
 we use a crossover of their second derivatives, while conditioning on that the EMA - SMA will be less than some threshold, to enter a long position. Then to close we only require the second derivative of the EMA to cross back 
below the second derivative of the SMA. We will also only enter long positions if the volitility is below a certain threshold.

Derivative will be calculated as a weighted derivative over a certain window to smooth out noise.

Then vicesa versa for short positions. Including being above a certain volitility threshold.
"""


In [None]:
#Set basic params and download data
Ticker = 'QQQ'
Start_Date = '2023-02-07'
End_Date = '2026-02-06'
derivative_window = 3
MA_Window = 5
interval = "1d"

df = yf.download(Ticker, start=Start_Date, end=End_Date,interval=interval).dropna()
close_prices = df['Close'][Ticker]

In [None]:
def calculate_SMA_and_EMA(data, MA_Window):
    SMA = data['Close'][Ticker].rolling(window=MA_Window).mean()
    EMA = data['Close'][Ticker].ewm(span=MA_Window, adjust=False).mean()
    return SMA, EMA

def calculate_scaled_second_derivative(data, derivative_window):
    first_derivative_EMA = data["EMA"].diff().ewm(span=derivative_window).mean()
    data.fillna(method='bfill', inplace=True)
    second_derivative_EMA = first_derivative_EMA.diff().ewm(span=derivative_window).mean()
    Scaled_first_derivative_EMA = first_derivative_EMA / data["Close"][Ticker]
   
    first_derivative_SMA = data["SMA"].diff().rolling(window=derivative_window).mean()
    data.fillna(method='bfill', inplace=True)
    second_derivative_SMA = first_derivative_SMA.diff().rolling(window=derivative_window).mean()
    
    max_length = min(len(second_derivative_SMA), len(data["Close"]))
    scaled_second_derivative_SMA = second_derivative_SMA.iloc[:max_length] / data["Close"][Ticker].iloc[:max_length]
    scaled_second_derivative_EMA = second_derivative_EMA.iloc[:max_length] / data["Close"][Ticker].iloc[:max_length]
    
    
    return scaled_second_derivative_SMA, scaled_second_derivative_EMA, Scaled_first_derivative_EMA

def calculate_indicators(df, MA_Window, derivative_window, Ticker):
    """Calculate all indicators for the strategy"""
    # Calculate moving averages
    df["SMA"], df["EMA"] = calculate_SMA_and_EMA(df, MA_Window)
    
    # Calculate derivatives
    second_div_scale_sma, second_div_scale_ema, first_div_scale_ema = calculate_scaled_second_derivative(df, derivative_window)
    df["Second_Derivative_SMA"] = second_div_scale_sma
    df["Second_Derivative_EMA"] = second_div_scale_ema
    df["First_Derivative_EMA"] = first_div_scale_ema
    
    # Calculate differences and thresholds
    df["Second_derivative_diff"] = df["Second_Derivative_EMA"] - df["Second_Derivative_SMA"]
    df["EMA_SMA_diff"] = (df["EMA"] - df["SMA"]) / df["Close"][Ticker]
    df["Volitility"] = df['Close'][Ticker].rolling(window=14).std()
    df["Short-Vol-Threshold"] = df["Volitility"].rolling(window=90).mean() + df["Volitility"].rolling(window=90).std() * .2
    df["Long-Vol-Threshold"] = df["Volitility"].rolling(window=90).mean() - df["Volitility"].rolling(window=90).std() * .2
    df["ema_sma_diff_threshold"] = df["EMA_SMA_diff"].rolling(window=90).mean() + df["EMA_SMA_diff"].rolling(window=90).std() * .4
    
    # Drop NaN rows
    df.dropna(inplace=True)
    return df

df = calculate_indicators(df,MA_Window=MA_Window, derivative_window=derivative_window, Ticker=Ticker)
df


In [None]:
def plot_indicators_timeseries(data):
    plt.figure(figsize=(14, 10))
    plt.plot(data['Second_derivative_diff'], label='Second Derivative Difference', color='blue')
    plt.plot(data['EMA_SMA_diff'], label='EMA - SMA Difference', color='orange')
    plt.axhline(0, color='red', linestyle='--')
    plt.title('Second Derivative Difference and EMA-SMA Difference Over Time')
    plt.xlabel('Date')
    plt.ylabel('Value')
    plt.legend()
    plt.show()

def plot_indicators_histograms(data):
    plt.figure(figsize=(14, 10))
    plt.hist(data['Second_derivative_diff'].dropna(), bins=50, alpha=0.7, label='Second Derivative Difference')
    plt.hist(data['EMA_SMA_diff'].dropna(), bins=50, alpha=0.7, label='EMA - SMA Difference')
    plt.title('Histogram of Second Derivative Difference and EMA-SMA Difference')
    plt.xlabel('Value')
    plt.ylabel('Frequency')
    plt.legend()
    plt.show()

plot_indicators_timeseries(df)
plot_indicators_histograms(df)
    

In [None]:
def strategy_logic(data, Ticker, close_prices):
    
    data["position"] = 0
    long_condition = ((data['Second_derivative_diff'] > 0)) & (data['Volitility'] < data["Long-Vol-Threshold"])
    
    short_condition = ((data['Second_derivative_diff'] < 0)) & (data['Volitility'] > data["Short-Vol-Threshold"])
    
    data.loc[long_condition, "position"] = 1
    data.loc[short_condition, "position"] = -1
    
    data["returns"] = close_prices.pct_change()
    data["strategy_returns"] = data["position"].shift(1) * data["returns"]
    data["cumulative_strategy_returns"] = (1 + data["strategy_returns"]).cumprod() - 1
    data["cumulative_market_returns"] = (1 + data["returns"]).cumprod() - 1
    
    return data

df = strategy_logic(df, Ticker, close_prices)
df.head()

In [None]:
def plot_strategy_performance(data):
    plt.figure(figsize=(14, 10))
    plt.plot(data[('cumulative_strategy_returns', '')], label='Cumulative Strategy Returns', color='green')
    plt.plot(data[('cumulative_market_returns', '')], label='Cumulative Market Returns', color='red')
    plt.title(f'Strategy vs {Ticker} Performance')
    plt.xlabel('Date')
    plt.ylabel('Cumulative Returns')
    plt.legend()
    plt.show()



plot_strategy_performance(df)

In [None]:
daily_risk_free_rate = .0364/252

market_return = (1+df["returns"]).cumprod() - 1  
strategy_return = (1+df["strategy_returns"]).cumprod() - 1



strat_sharpe_ratio = ((df["strategy_returns"].mean() - daily_risk_free_rate) / df["strategy_returns"].std()) * np.sqrt(252)
market_sharpe_ratio = ((df["returns"].mean() - daily_risk_free_rate) / df["returns"].std()) * np.sqrt(252)

df["negative_strategy_returns"] = df[df["strategy_returns"] < 0]["strategy_returns"]
negative_df = df[df["strategy_returns"] < 0].copy()
negative_df["negative_strategy_returns"].dropna(inplace=True)

strat_sortino_ratio = ((df["strategy_returns"].mean() - daily_risk_free_rate)/ negative_df["negative_strategy_returns"].std()) * np.sqrt(252)
market_sortino_ratio = ((df["returns"].mean() - daily_risk_free_rate)/ df[df["returns"] < 0]["returns"].std()) * np.sqrt(252)

Annualized_Market_Return = df["returns"].mean() * 252

print("Strategy Sharpe Ratio:", strat_sharpe_ratio)
print(f"{Ticker} Sharpe Ratio:", market_sharpe_ratio)
print("Strategy Sortino Ratio:", strat_sortino_ratio)
print(f"{Ticker} Sortino Ratio:", market_sortino_ratio)
print(f'Strategy Return: {strategy_return.iloc[-1]}')
print(f'Market Return: {market_return.iloc[-1]}')
print(f'Annualized Market Return: {Annualized_Market_Return}')

In [None]:
starting_capital = 100000

def calculate_portfolio_value(data, starting_capital):
    data["portfolio_value"] = starting_capital * (1 + data["cumulative_strategy_returns"])
    return data

def calculate_drawdown(data):
    data["rolling_max"] = data["portfolio_value"].cummax()
    data["drawdown"] = data["portfolio_value"] - data["rolling_max"]
    data["drawdown_pct"] = data["drawdown"] / data["rolling_max"]
    max_drawdown = data["drawdown_pct"].min()
    return max_drawdown

def plot_portfilio_value(data):
    plt.figure(figsize=(14, 10))
    plt.plot(data["portfolio_value"], label='Portfolio Value', color='blue')
    plt.title('Portfolio Value Over Time')
    plt.xlabel('Date')
    plt.ylabel('Portfolio Value')
    plt.legend()
    plt.show()

def plot_drawdown(data):
    plt.figure(figsize=(14, 10))
    plt.plot(data["drawdown_pct"], label='Drawdown Percentage', color='red')
    plt.title('Drawdown Percentage Over Time')
    plt.xlabel('Date')
    plt.ylabel('Drawdown Percentage')
    plt.legend()
    plt.show()

df = calculate_portfolio_value(df, starting_capital)
max_drawdown = calculate_drawdown(df)
print(f'Max Drawdown: {max_drawdown}')
print(f'Final Portfolio Value: {df["portfolio_value"].iloc[-1]}')
plot_portfilio_value(df)
plot_drawdown(df)

In [None]:
#Find optimal Params
param_return_list = []

for i in range(1, 10):
    for j in range(1, 5):
        # Download fresh data
        df_temp = yf.download(Ticker, start=Start_Date, end=End_Date, interval=interval).dropna()
        close_prices = df_temp["Close"]
        
        # Calculate all indicators
        df_temp = calculate_indicators(df_temp, i, j, Ticker)
        
        # Apply strategy
        df_temp = strategy_logic(df_temp, Ticker, close_prices=close_prices)
        
        # Store results
        final_return = df_temp["cumulative_strategy_returns"].iloc[-1] if len(df_temp) > 0 else np.nan
        param_return_list.append({
            "MA_Window": i, 
            "Derivative_Window": j, 
            "Return": final_return
        })
    
    print(f"Completed MA_Window = {i}")  # Progress indicator

# Plotting function with NaN handling
def plot_optimization_results(param_return_list):
    results_df = pd.DataFrame(param_return_list)
    
    # Remove NaN values
    results_df = results_df.dropna(subset=['Return'])
    
    if len(results_df) == 0:
        print("ERROR: All returns are NaN!")
        return None
    
    # Pivot for heatmap
    heatmap_data = results_df.pivot(
        index='Derivative_Window', 
        columns='MA_Window', 
        values='Return'
    )
    
    # Create heatmap
    plt.figure(figsize=(12, 10))
    sns.heatmap(
        heatmap_data, 
        annot=True, 
        fmt='.3f', 
        cmap='RdYlGn', 
        center=0,
        cbar_kws={'label': 'Cumulative Return'}
    )
    plt.title('Strategy Returns: MA Window vs Derivative Window', fontsize=14, pad=20)
    plt.xlabel('MA Window', fontsize=12)
    plt.ylabel('Derivative Window', fontsize=12)
    plt.tight_layout()
    plt.show()
    
    # Find and print optimal parameters
    max_idx = results_df['Return'].idxmax()
    optimal_params = results_df.loc[max_idx]
    
    print("\n" + "="*50)
    print("OPTIMAL PARAMETERS")
    print("="*50)
    print(f"MA Window: {int(optimal_params['MA_Window'])}")
    print(f"Derivative Window: {int(optimal_params['Derivative_Window'])}")
    print(f"Maximum Return: {optimal_params['Return']:.4f}")
    print("="*50)
    
    return optimal_params

# Usage
optimal = plot_optimization_results(param_return_list)
print(optimal)

