In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import random
from deap import base, creator, tools, algorithms
import matplotlib.pyplot as plt

# Define the fitness function for NSGA-II
if "FitnessMulti" not in creator.__dict__:
    creator.create("FitnessMulti", base.Fitness, weights=(1.0, -1.0))
if "Individual" not in creator.__dict__:
    creator.create("Individual", list, fitness=creator.FitnessMulti)

# Toolbox setup
toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=52)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selNSGA2)

def download_data(ticker, start_date, end_date):
    print(f"Downloading data for {ticker} from {start_date} to {end_date}...")
    data = yf.download(ticker, start=start_date, end=end_date)
    if isinstance(data.columns, pd.MultiIndex):
        # Flatten the columns
        data.columns = [col[0] for col in data.columns]
    print(f"Downloaded {len(data)} rows of data.")
    print("Columns:", data.columns.tolist())
    return data

def decode_chromosome(individual):
    """
    Decode a 52-bit chromosome into the Buy and Sell rule structures.

    Layout (indices in the chromosome):
       Buy_Active        = bits 0..8   (9 bits)
       Buy_TrueFalse     = bits 9..17  (9 bits)
       Buy_Operators     = bits 18..25 (8 bits)
       Sell_Active       = bits 26..34 (9 bits)
       Sell_TrueFalse    = bits 35..43 (9 bits)
       Sell_Operators    = bits 44..51 (8 bits)
    """
    # Buy side
    buy_active_bits     = individual[0:9]
    buy_truefalse_bits  = individual[9:18]
    buy_operator_bits   = individual[18:26]

    # Sell side
    sell_active_bits    = individual[26:35]
    sell_truefalse_bits = individual[35:44]
    sell_operator_bits  = individual[44:52]

    # Create dictionary structure
    return {
        "buy_active":     buy_active_bits,
        "buy_truefalse":  buy_truefalse_bits,
        "buy_ops":        buy_operator_bits,

        "sell_active":    sell_active_bits,
        "sell_truefalse": sell_truefalse_bits,
        "sell_ops":       sell_operator_bits,
    }

def compute_indicator_signals(df):
    """
    Adds indicators as signals in the DataFrame.
    """
    df['SMA_cross'] = (df['SMA9'] > df['SMA40']).astype(int)
    df['MACD_signal'] = (df['MACD'] > df['Signal_Line']).astype(int)
    df['Momentum_signal'] = (df['Close'] > df['Close'].shift(1)).astype(int)
    df['RSI_signal'] = (df['RSI'] < 30).astype(int)  # Oversold condition
    
    # Add signals to ensure 4 indicator columns align with the chromosome
    return df

def apply_paper_rule(df, indicator_cols, active_bits, truefalse_bits):
    """
    Replicate the paper’s grouping logic:
      - The first 4 bits (indices 0..3) are momentum indicators (AND-ed).
      - The last 5 bits (indices 4..8) are reversal indicators (AND-ed).
      - Then momentum_cond OR reversal_cond.
    
    The operator_bits are ignored, because the paper fixes AND inside each group and
    a single OR between the two groups.
    """
    # 1) Momentum: first 4 indicators (indices 0..3)
    momentum_cond = pd.Series([True]*len(df), index=df.index)
    for i in range(4):
        if active_bits[i] == 1:
            # Check if the current indicator matches the desired true/false bit
            cond = (df[indicator_cols[i]] == truefalse_bits[i])
            momentum_cond = momentum_cond & cond
        else:
            # If not active, do nothing special; effectively that indicator doesn't impose any requirement
            pass

    # 2) Reversal: last 5 indicators (indices 4..8)
    reversal_cond = pd.Series([True]*len(df), index=df.index)
    for i in range(4, 9):
        if active_bits[i] == 1:
            cond = (df[indicator_cols[i]] == truefalse_bits[i])
            reversal_cond = reversal_cond & cond
        else:
            pass

    # 3) Combine with OR
    final_cond = momentum_cond | reversal_cond

    # Return int 0/1
    return final_cond.astype(int)

def evaluate_strategy(individual, df):
    # 1) Decode the chromosome
    decoded = decode_chromosome(individual)
    indicator_cols = [
        "SMA_signal",
        "MACD_signal",
        "MO_signal",
        "PO_signal",
        "SO_signal",
        "RSI_signal",
        "CCI_signal",
        "LW_signal",
        "BB_signal"
    ]

    # 2) Create a copy if df might be a slice
    df = df.copy()

    # 3) Build buy_signal and sell_signal
    buy_signal = apply_paper_rule(
        df,
        indicator_cols,
        decoded["buy_active"],
        decoded["buy_truefalse"]
        # ignore decoded["buy_ops"]
    )

    sell_signal = apply_paper_rule(
        df,
        indicator_cols,
        decoded["sell_active"],
        decoded["sell_truefalse"]
        # ignore decoded["sell_ops"]
    )

    # 4) Construct positions in a separate list or Series
    positions = []
    current_pos = 0
    for i in range(len(df)):
        if buy_signal.iloc[i] == 1 and sell_signal.iloc[i] == 0:
            current_pos = 1
        elif buy_signal.iloc[i] == 0 and sell_signal.iloc[i] == 1:
            current_pos = -1
        positions.append(current_pos)

    # 5) Assign columns with .loc
    df.loc[:, "Position"] = positions
    df.loc[:, "Daily_Return"] = df["Close"].pct_change() * df["Position"].shift(1)

    # 6) Drop any NaNs by reassigning
    df = df.dropna()

    # 7) Compute Sharpe, MDD, etc.
    returns = df["Daily_Return"]
    if returns.std() == 0:
        sharpe_ratio = -np.inf
    else:
        sharpe_ratio = np.sqrt(252) * returns.mean() / returns.std()

    cum_returns = (1 + returns).cumprod()
    running_max = cum_returns.cummax()
    drawdowns = (cum_returns - running_max) / running_max
    max_drawdown = drawdowns.min()

    return sharpe_ratio, max_drawdown, positions

def calculate_indicators(df):
    print("Calculating indicators...")
    # Make a clean copy and ensure proper index
    df = df.copy()
    
    # Handle potential Ticker index from yfinance
    if isinstance(df.index, pd.MultiIndex):
        df = df.droplevel(level=0)
    
    print("Data shape after index handling:", df.shape)
    print("Columns:", df.columns.tolist())
    
    # Calculate Simple Moving Averages (SMA)
    df['SMA9'] = df['Close'].rolling(window=9).mean()
    df['SMA40'] = df['Close'].rolling(window=40).mean()
    df['SMA_signal'] = 0
    mask = (df['SMA9'] > df['SMA40']) & (df['SMA9'].shift(1) <= df['SMA40'].shift(1))
    df.loc[mask, 'SMA_signal'] = 1

    # Calculate MACD
    df['EMA12'] = df['Close'].ewm(span=12, adjust=False).mean()
    df['EMA26'] = df['Close'].ewm(span=26, adjust=False).mean()
    df['MACD'] = df['EMA12'] - df['EMA26']
    df['Signal_Line'] = df['MACD'].ewm(span=9, adjust=False).mean()
    df['MACD_signal'] = 0
    mask = (df['MACD'] > df['Signal_Line']) & (df['MACD'].shift(1) <= df['Signal_Line'].shift(1))
    df.loc[mask, 'MACD_signal'] = 1

    # Momentum Oscillator
    df['MO_signal'] = 0
    df.loc[(df['Close'].diff(10) > 0) & (df['Close'].diff(10).shift(1) <= 0), 'MO_signal'] = 1

    # Price Oscillator
    df['PO'] = (df['EMA12'] - df['EMA26']) / df['EMA26']
    df['PO_signal'] = 0
    df.loc[(df['PO'] > 0) & (df['PO'].shift(1) <= 0), 'PO_signal'] = 1

    # Stochastic Oscillator
    low_min = df['Low'].rolling(window=14).min()
    high_max = df['High'].rolling(window=14).max()
    df['K'] = 100 * ((df['Close'] - low_min) / (high_max - low_min))
    df['D'] = df['K'].rolling(window=3).mean()
    df['D_slow'] = df['D'].rolling(window=3).mean()
    df['SO_signal'] = 0
    df.loc[(df['D'] > df['D_slow']) & (df['D'].shift(1) <= df['D_slow'].shift(1)) & 
           (df['D'] < 20) & (df['D_slow'] < 20), 'SO_signal'] = 1

    # RSI
    delta = df['Close'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    avg_gain = gain.rolling(window=14).mean()
    avg_loss = loss.rolling(window=14).mean()
    rs = avg_gain / avg_loss
    df['RSI'] = 100 - (100 / (1 + rs))
    df['RSI_signal'] = 0
    df.loc[(df['RSI'] > 30) & (df['RSI'].shift(1) <= 30), 'RSI_signal'] = 1

    # CCI
    typical_price = (df['High'] + df['Low'] + df['Close']) / 3
    tp_ma = typical_price.rolling(window=20).mean()
    tp_std = typical_price.rolling(window=20).std()
    df['CCI'] = (typical_price - tp_ma) / (0.015 * tp_std)
    df['CCI_signal'] = 0
    df.loc[(df['CCI'] > -100) & (df['CCI'].shift(1) <= -100), 'CCI_signal'] = 1

    # Larry Williams %R
    df['LW'] = -100 * (high_max - df['Close']) / (high_max - low_min)
    df['LW_signal'] = 0
    df.loc[(df['LW'] > -80) & (df['LW'].shift(1) <= -80), 'LW_signal'] = 1

    # Bollinger Bands
    df['SMA20'] = df['Close'].rolling(window=20).mean()
    df['BBstd'] = df['Close'].rolling(window=20).std()
    df['BB_upper'] = df['SMA20'] + (2 * df['BBstd'])
    df['BB_lower'] = df['SMA20'] - (2 * df['BBstd'])
    df['BB_signal'] = 0
    df.loc[(df['Close'] > df['BB_lower']) & (df['Close'].shift(1) <= df['BB_lower'].shift(1)), 'BB_signal'] = 1

    # Forward fill any remaining NaN values
    df = df.fillna(method='ffill').fillna(0)
    
    print("Indicators calculated.")
    return df

def fitness(individual):
    global training_data
    try:
        sharpe, drawdown, _ = evaluate_strategy(individual, training_data)
        # Ensure valid results before returning
        if isinstance(sharpe, (int, float)) and isinstance(drawdown, (int, float)):
            return sharpe, drawdown
        else:
            raise ValueError("Invalid Sharpe or Drawdown values.")
    except Exception as e:
        print(f"Error in fitness calculation: {e}")
        return -np.inf, -np.inf

def run_nsga2(data, generations=50, pop_size=100):
    global training_data
    training_data = data

    print("Initializing NSGA-II...")
    toolbox.register("evaluate", fitness)
    population = toolbox.population(n=pop_size)

    print(f"Starting optimization for {generations} generations with population size {pop_size}...")
    algorithms.eaMuPlusLambda(
        population, toolbox, mu=pop_size, lambda_=pop_size, cxpb=0.9, mutpb=0.1, ngen=generations, verbose=True
    )
    print("Optimization complete.")

    pareto_front = tools.sortNondominated(population, len(population), first_front_only=True)[0]
    print(f"Pareto front contains {len(pareto_front)} solutions.")
    return pareto_front

def backtest_signals(df, initial_capital=10000, lot_size=1, commission_rate=0.0):
    """
    A simple backtester that:
      - Uses df['Position'] which can be -1, 0, +1
      - Tracks trades from changes in position
      - Returns a dictionary of stats for your final report
    
    Parameters:
    -----------
    df : pd.DataFrame
        Must contain columns: 'Close' and 'Position'.
        'Position' is the final day-end position for that day 
        (e.g., +1 means long 1 unit, -1 means short 1 unit, 0 means flat).
    initial_capital : float
        Starting capital for the backtest
    lot_size : int or float
        Number of shares/contracts per 1 unit of position
    commission_rate : float
        Commission or transaction cost rate if applicable 
        (e.g., 0.001 = 0.1% per trade).
    
    Returns:
    --------
    results : dict
        Dictionary containing stats like # trades, winrate, total PnL, 
        Sharpe ratio, MDD, trade distribution stats, etc.
    trades_df : pd.DataFrame
        DataFrame containing details of each individual trade
    equity_curve : pd.Series
        The daily equity curve (portfolio value over time)
    """
    # Make a copy to avoid modifying original
    df = df.copy()

    # Ensure Position is filled
    if 'Position' not in df.columns:
        raise ValueError("DataFrame must contain 'Position' column with -1, 0, or +1 signals.")

    # Calculate daily price change % or daily returns
    df['price_change'] = df['Close'].pct_change()  # e.g., daily % change
    df['price_change'].fillna(0, inplace=True)

    # We'll track daily PnL from holding the position
    # daily_pnl = position(t-1) * (price_change at t) * lot_size * capital_base
    # BUT for simplicity, let's do absolute returns in $.
    # If you want more exact logic, you can store entry price and do (Close_t - Close_(t-1))*position.
    
    # Track trades: whenever position changes from one day to next, that's an exit from old + entry to new
    # We'll store each trade in a list of dicts
    trades = []
    current_trade = None

    current_position = 0
    equity = initial_capital
    equity_list = []

    for i in range(len(df)):
        today_pos = df['Position'].iloc[i]

        # If position changed from previous day, we either close an old trade or open a new one
        if i > 0:
            prev_pos = df['Position'].iloc[i - 1]
        else:
            prev_pos = 0

        # 1) If we are in a trade and the position changes, close out the old trade
        if current_trade is not None and today_pos != prev_pos:
            # close out the trade
            exit_price = df['Close'].iloc[i]
            entry_price = current_trade['entry_price']
            side = current_trade['side']  # +1 or -1

            # PnL calculation
            raw_pnl = (exit_price - entry_price) * side * lot_size
            # commission
            trade_commission = commission_rate * abs(raw_pnl)
            net_pnl = raw_pnl - trade_commission

            current_trade['exit_time'] = df.index[i]
            current_trade['exit_price'] = exit_price
            current_trade['pnl'] = net_pnl
            trades.append(current_trade)
            current_trade = None

            # update equity
            equity += net_pnl

        # 2) If we just switched into a new non-zero position, open a trade
        if today_pos != prev_pos and today_pos != 0:
            # open new trade
            current_trade = {
                'entry_time': df.index[i],
                'entry_price': df['Close'].iloc[i],
                'side': today_pos,  # +1 (long) or -1 (short)
            }

        # 3) If we hold a position from previous day to today, we accumulate daily PnL
        daily_pnl = 0
        if current_trade is not None:
            # approximate daily PnL from close to close
            side = current_trade['side']
            daily_pnl = df['price_change'].iloc[i] * side * equity  # or * some fraction

        # Update equity from daily open PnL
        equity = equity + daily_pnl

        # Store equity in list for generating equity curve
        equity_list.append(equity)

    # If there's an open position at the end, close it out at the final day's price
    if current_trade is not None:
        exit_price = df['Close'].iloc[-1]
        side = current_trade['side']
        raw_pnl = (exit_price - current_trade['entry_price']) * side * lot_size
        trade_commission = commission_rate * abs(raw_pnl)
        net_pnl = raw_pnl - trade_commission
        current_trade['exit_time'] = df.index[-1]
        current_trade['exit_price'] = exit_price
        current_trade['pnl'] = net_pnl
        trades.append(current_trade)
        equity += net_pnl
        equity_list[-1] = equity  # update last equity

    # Convert trades list to a DataFrame
    trades_df = pd.DataFrame(trades)

    # Create equity curve Series
    equity_curve = pd.Series(data=equity_list, index=df.index, name='Equity')

    # ------------------
    # Compute Stats
    # ------------------
    results = {}
    results['Initial Capital'] = initial_capital

    # 1) Number of Trades
    results['Total Number of Trades'] = len(trades_df)

    # 2) Win Rate
    if len(trades_df) > 0:
        wins = sum(trades_df['pnl'] > 0)
        results['Winrate (%)'] = round(100.0 * wins / len(trades_df), 2)
    else:
        results['Winrate (%)'] = 0.0

    # 3) Total PnL
    if len(trades_df) > 0:
        results['Total Profit & Loss (PnL)'] = trades_df['pnl'].sum()
    else:
        results['Total Profit & Loss (PnL)'] = 0.0

    # 4) Monthly Sharpe & Annual Sharpe
    #    Typically, we do daily returns from equity curve: ret(t) = equity(t)/equity(t-1)-1
    daily_returns = equity_curve.pct_change().dropna()
    if len(daily_returns) > 1:
        # monthly Sharpe ~ sqrt(12)*mean(daily_return)/std(daily_return) if ~21 trade days per month
        monthly_factor = np.sqrt(12)
        annual_factor = np.sqrt(252)
        mean_ret = daily_returns.mean()
        std_ret = daily_returns.std()

        results['Monthly Sharpe Ratio'] = (monthly_factor * mean_ret / std_ret) if std_ret != 0 else 0
        results['Annual Sharpe Ratio'] = (annual_factor * mean_ret / std_ret) if std_ret != 0 else 0
    else:
        results['Monthly Sharpe Ratio'] = 0
        results['Annual Sharpe Ratio'] = 0

    # 5) Trade stats
    if len(trades_df) > 0:
        results['Mean Trade Value'] = trades_df['pnl'].mean()
        results['Maximum Trade Value'] = trades_df['pnl'].max()
        results['Minimum Trade Value'] = trades_df['pnl'].min()
        results['Standard Deviation of Trades'] = trades_df['pnl'].std()

        # Reward/Risk can be average win / average loss or mean pnl / std
        # We'll do a simple: Reward/Risk = (mean PnL) / (std PnL)
        # But you could define it differently
        if trades_df['pnl'].std() != 0:
            results['Reward/Risk Ratio'] = trades_df['pnl'].mean() / trades_df['pnl'].std()
        else:
            results['Reward/Risk Ratio'] = 0

        # Skewness & Kurtosis of trade returns
        results['Skewness of Trade Returns'] = trades_df['pnl'].skew()
        results['Kurtosis of Trade Returns'] = trades_df['pnl'].kurtosis()
    else:
        results['Mean Trade Value'] = 0
        results['Maximum Trade Value'] = 0
        results['Minimum Trade Value'] = 0
        results['Standard Deviation of Trades'] = 0
        results['Reward/Risk Ratio'] = 0
        results['Skewness of Trade Returns'] = 0
        results['Kurtosis of Trade Returns'] = 0

    # 6) Max Drawdown from equity curve
    running_max = equity_curve.cummax()
    drawdown = (equity_curve - running_max) / running_max
    results['Maximum Drawdown'] = drawdown.min()

    # 7) Average Holding Period
    if len(trades_df) > 0:
        # Approx. difference in days between entry_time and exit_time
        trades_df['holding_days'] = (trades_df['exit_time'] - trades_df['entry_time']).dt.days
        results['Average Holding Period'] = trades_df['holding_days'].mean()
    else:
        results['Average Holding Period'] = 0

    return results, trades_df, equity_curve
    
# Main execution
if __name__ == "__main__":
    ticker = "^GSPC"
    start_date = "2000-01-01"
    end_date = "2020-12-31"

    print("Starting main execution...")
    data = download_data(ticker, start_date, end_date)
    data = calculate_indicators(data)

    window_size = 3 * 252
    results = []

    rolling_results = []

    for start in range(0, len(data) - window_size, 252):
        print(f"Processing rolling window starting at index {start}...")
        train_data = data.iloc[start : start + 2*252]
        test_data  = data.iloc[start + 2*252 : start + window_size]
    
        print("Running NSGA-II for training data...")
        pareto_front = run_nsga2(train_data)
    
        print("Evaluating Pareto front on test data...")
        best_individual = max(pareto_front, key=lambda ind: ind.fitness.values[0])
        sharpe, drawdown, positions = evaluate_strategy(best_individual, test_data)
        rolling_results.append((sharpe, drawdown))  
    
        # Put positions in test_data
        test_data["Position"] = positions
    
        print(f"Test Period {start} - {start + window_size}: Sharpe = {sharpe}, Drawdown = {drawdown}")
        df_test = test_data.copy()
    
        # DO NOT overwrite rolling_results with the dictionary
        backtest_report, trades_df, equity_curve = backtest_signals(
            df_test, 
            initial_capital=10000, 
            lot_size=1, 
            commission_rate=0.001
        )
    
        print("Backtest Results:")
        # backtest_report is a dict, so we can do .items()
        for k, v in backtest_report.items():
            print(f"{k}: {v}")
    
        print("\nTrades:\n", trades_df)
        print("\nEquity Curve:\n", equity_curve)
    
    # Now at the end, rolling_results is still a list of tuples
    sharpe_ratios, drawdowns = zip(*rolling_results)
    print(f"Average Sharpe Ratio: {np.mean(sharpe_ratios)}")
    print(f"Average Max Drawdown: {np.mean(drawdowns)}")



[*********************100%***********************]  1 of 1 completed

Starting main execution...
Downloading data for ^GSPC from 2000-01-01 to 2020-12-31...
Downloaded 5283 rows of data.
Columns: ['Close', 'High', 'Low', 'Open', 'Volume']
Calculating indicators...
Data shape after index handling: (5283, 5)
Columns: ['Close', 'High', 'Low', 'Open', 'Volume']
Indicators calculated.
Processing rolling window starting at index 0...
Running NSGA-II for training data...
Initializing NSGA-II...
Starting optimization for 50 generations with population size 100...



  df = df.fillna(method='ffill').fillna(0)


gen	nevals
0  	100   
1  	100   
2  	100   
3  	100   
4  	100   
5  	100   
6  	100   
7  	100   
8  	100   
9  	100   
10 	100   
11 	100   
12 	100   
13 	100   
14 	100   
15 	100   
16 	100   
17 	100   
18 	100   
19 	100   
20 	100   
21 	100   
22 	100   
23 	100   
24 	100   
25 	100   
26 	100   
27 	100   
28 	100   
29 	100   
30 	100   
31 	100   
32 	100   
33 	100   
34 	100   
35 	100   
36 	100   
37 	100   
38 	100   
39 	100   
40 	100   
41 	100   
42 	100   
43 	100   
44 	100   
45 	100   
46 	100   
47 	100   
48 	100   
49 	100   
50 	100   
Optimization complete.
Pareto front contains 100 solutions.
Evaluating Pareto front on test data...
Test Period 0 - 756: Sharpe = -inf, Drawdown = 0.0
Backtest Results:
Initial Capital: 10000
Total Number of Trades: 0
Winrate (%): 0.0
Total Profit & Loss (PnL): 0.0
Monthly Sharpe Ratio: 0
Annual Sharpe Ratio: 0
Mean Trade Value: 0
Maximum Trade Value: 0
Minimum Trade Value: 0
Standard Deviation of Trades: 0
Reward/Risk Ratio

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_data["Position"] = positions
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['price_change'].fillna(0, inplace=True)


gen	nevals
0  	100   
1  	100   
2  	100   
3  	100   
