In [15]:
import yfinance as yf
import pandas as pd
import numpy as np

from datetime import datetime, timedelta

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

import itertools

from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import backtrader as bt

import CustomCSVData
from Strategies import Momentum3, RollingVote
from Report import generate_reports

import random

In [16]:
SEED = 42
np.random.seed(SEED)
random.seed(SEED)

## Download Data

In [3]:
def download_data(ticker, start_date, end_date, interval, filename):
    """
    Download data from Yahoo Finance

    Parameters
    ----------
        ticker (str): Ticker symbol
        start_date (str): Start date in "YYYY-MM-DD" format
        end_date (str): End date in "YYYY-MM-DD" format
        interval (str): Data interval (e.g., '1d', '1h', '1m')
        filename (str): Filename to save the data as CSV

    Returns
    -------
        pd.DataFrame: DataFrame containing the downloaded data
    """
    start_dt = datetime.strptime(start_date, "%Y-%m-%d")
    end_dt   = datetime.strptime(end_date, "%Y-%m-%d")

    intraday_intervals = ['1m','2m','5m','15m','30m','60m','90m','1h']

    if interval in intraday_intervals:
        max_span = timedelta(days=60)
        # enforce 60-day limit
        if end_dt - start_dt > max_span:
            print("Warning: Intraday data cannot exceed 60 days. Adjusting start_date.")
            start_dt = end_dt - max_span

    df = yf.download(ticker, start=start_dt.strftime("%Y-%m-%d"), end=end_dt.strftime("%Y-%m-%d"), interval=interval, auto_adjust=True)  
    if df is None or df.empty:
        print(f"No data found for {ticker} between {start_date} and {end_date}.")
        return False
    
    # Handle missing values by forward filling
    df.ffill(inplace=True)

    df.columns = df.columns.droplevel('Ticker')
    df.index = pd.to_datetime(df.index)
    df.sort_index(inplace=True)
    df.reset_index(inplace=True)
    df.columns.name = None

    # df.to_csv(filename)
    # print(f"Data for {ticker} saved to {filename}")

    return df

## Data Splitting and Feature Engineering

In [4]:
def feature_engineering(df, ema_length, rsi_length, macd_short, macd_long, bb_window):
    """
    Create the following features:
    - Exponential Moving Average (EMA)
    - Relative Strength Index (RSI)
    - Moving Average Convergence Divergence (MACD)
    - Bollinger Bands (BB)

    Parameters
    ----------
        df (pd.DataFrame): DataFrame containing stock data with 'Close' prices
        ema_length (int): Length for EMA calculation
        rsi_length (int): Length for RSI calculation
        macd_short (int): Short length for MACD calculation
        macd_long (int): Long length for MACD calculation
        bb_window (int): Window length for Bollinger Bands calculation
    
    Returns
    -------
        pd.DataFrame: DataFrame with added technical indicators and target variable
    """
    df = df.copy()
    
    df[f'EMA_{ema_length}'] = df['Close'].ewm(span=ema_length, adjust=False).mean()

    delta = df['Close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=rsi_length).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=rsi_length).mean()
    rs = gain / loss
    df[f'RSI_{rsi_length}'] = 100 - (100 / (1 + rs))

    ema_short = df['Close'].ewm(span=macd_short, adjust=False).mean()
    ema_long = df['Close'].ewm(span=macd_long, adjust=False).mean()
    df['MACD'] = ema_short - ema_long
    df['MACD_Signal'] = df['MACD'].ewm(span=9, adjust=False).mean()

    sma = df['Close'].rolling(window=bb_window).mean()
    std = df['Close'].rolling(window=bb_window).std()
    df['BB_Upper'] = sma + (std * 2)
    df['BB_Lower'] = sma - (std * 2)

    df['price_rise'] = np.where(df['Close'] > df['Close'].shift(1), 1, 0)
    df['Target'] = df['price_rise'].shift(-1)


    df.drop(columns=['price_rise'], inplace=True)
    df.dropna(inplace=True)
    
    return df

In [5]:
def split_transform(df):
    """
    Split the DataFrame into training and testing sets (**80%** train, **20%** test). Standardize the raw features.

    Parameters
    ----------
        df (pd.DataFrame): DataFrame containing the features and target variable
    
    Returns
    -------
        pd.DataFrame: Training set
        pd.DataFrame: Testing set
        StandardScaler: Fitted scaler object
    """
    train_df, test_df = train_test_split(df, test_size=0.2, shuffle=False)

    scaler = StandardScaler()
    cols = [col for col in train_df.columns if col not in ['Date']]
    train_df[cols] = scaler.fit_transform(train_df[cols])
    test_df[cols] = scaler.transform(test_df[cols])

    return train_df, test_df, scaler

## Tune Model

In [6]:
def tune_model(raw_df):
    # FE Grid Search Setup
    ema_lengths = [20]
    rsi_lengths = [14]
    macd_shorts = [12]
    macd_longs = [26]
    bb_windows = [20]
    
    # Model Hyperparameters
    model_params = {
        'SVM': {'C': [2,5,7,10,20]},
        'RandomForest': {'n_estimators': [50, 100, 150, 200], 'max_depth': [5, 10, 20, 30]},
        'LASSO': {'C': [0.01, 0.1, 1.0, 10.0]},
        'KNN': {'n_neighbors': [3, 5, 7, 9, 11, 13]},
    }

    results = []

    fe_combinations = list(itertools.product(ema_lengths, rsi_lengths, macd_shorts, macd_longs, bb_windows))
    
    total_fe_sets = len(fe_combinations)
    total_runs = total_fe_sets * 32
    
    
    global_run_count = 0

    for count, fe_set in enumerate(fe_combinations, 1):
        ema_len, rsi_len, macd_short, macd_long, bb_window = fe_set
        
        print(f"\nEvaluating Feature Set {count:3d} of {total_fe_sets}. Params: EMA={ema_len}, RSI={rsi_len}, MACD=({macd_short}, {macd_long}), BB={bb_window}")
        
        processed_df = feature_engineering(
            raw_df, 
            ema_length=ema_len, 
            rsi_length=rsi_len, 
            macd_short=macd_short, 
            macd_long=macd_long, 
            bb_window=bb_window, 
        )
        
        train_df, val_df = train_test_split(processed_df, test_size=0.25, shuffle=False)
        
        X_train = train_df.drop(columns=['Target','Date'])
        y_train = train_df['Target']
        X_val = val_df.drop(columns=['Target','Date'])
        y_val = val_df['Target']
        
        fe_params = {
            'ema_length': ema_len, 'rsi_length': rsi_len, 
            'macd_short': macd_short, 'macd_long': macd_long, 'bb_window': bb_window,
        }


        for model_name, param_grid in model_params.items():
            # SVM
            if model_name == 'SVM':
                for C_val in param_grid['C']:
                    global_run_count += 1
                    model = SVC(C=C_val, kernel='linear', probability=False, max_iter=-1, random_state=SEED)
                    model.fit(X_train, y_train)
                    predictions = model.predict(X_val)
                    score = accuracy_score(y_val, predictions)
                    
                    print(f"| Run {global_run_count:4d}/{total_runs} | {model_name} (C={C_val:.2f}) -> Accuracy: {score:.4f}")

                    current_run_results = {
                        'model': model_name,
                        'model_params': {'C': C_val},
                        'fe_params': fe_params,
                        'accuracy': score,
                        'precision': precision_score(y_val, predictions, zero_division=0),
                        'recall': recall_score(y_val, predictions, zero_division=0),
                        'f1': f1_score(y_val, predictions, zero_division=0),
                        'num_features': X_train.shape[1]
                    }
                    results.append(current_run_results)
            
            # LASSO
            elif model_name == 'LASSO':
                for C_val in param_grid['C']:
                    global_run_count += 1
                    model = LogisticRegression(penalty='l1', C=C_val, solver='liblinear', random_state=SEED)
                    model.fit(X_train, y_train)
                    predictions = model.predict(X_val)
                    score = accuracy_score(y_val, predictions)
                    
                    print(f"| Run {global_run_count:4d}/{total_runs} | {model_name} (C={C_val:.4f}) -> Accuracy: {score:.4f}")

                    current_run_results = {
                        'model': model_name,
                        'model_params': {'C': C_val},
                        'fe_params': fe_params,
                        'accuracy': score,
                        'precision': precision_score(y_val, predictions, zero_division=0),
                        'recall': recall_score(y_val, predictions, zero_division=0),
                        'f1': f1_score(y_val, predictions, zero_division=0),
                        'num_features': X_train.shape[1]
                    }
                    results.append(current_run_results)

            # KNN
            elif model_name == 'KNN':
                for n_neighbors in param_grid['n_neighbors']:
                    global_run_count += 1
                    model = KNeighborsClassifier(n_neighbors=n_neighbors, n_jobs=-1)
                    model.fit(X_train, y_train)
                    predictions = model.predict(X_val)
                    score = accuracy_score(y_val, predictions)
                    
                    print(f"| Run {global_run_count:4d}/{total_runs} | {model_name} (Neighbors={n_neighbors}) -> Accuracy: {score:.4f}")

                    current_run_results = {
                        'model': model_name,
                        'model_params': {'n_neighbors': n_neighbors},
                        'fe_params': fe_params,
                        'accuracy': score,
                        'precision': precision_score(y_val, predictions, zero_division=0),
                        'recall': recall_score(y_val, predictions, zero_division=0),
                        'f1': f1_score(y_val, predictions, zero_division=0),
                        'num_features': X_train.shape[1]
                    }
                    results.append(current_run_results)
            
            # Random Forest
            elif model_name == 'RandomForest':
                for n_est in param_grid['n_estimators']:
                    for max_d in param_grid['max_depth']:
                        global_run_count += 1
                        model = RandomForestClassifier(n_estimators=n_est, max_depth=max_d, random_state=SEED, n_jobs=-1)
                        model.fit(X_train, y_train)
                        predictions = model.predict(X_val)
                        score = accuracy_score(y_val, predictions)
                        
                        print(f"| Run {global_run_count:4d}/{total_runs} | {model_name} (Est={n_est}, Depth={max_d}) -> Accuracy: {score:.4f}")

                        current_run_results = {
                            'model': model_name,
                            'model_params': {'n_estimators': n_est, 'max_depth': max_d},
                            'fe_params': fe_params,
                            'accuracy': score,
                            'precision': precision_score(y_val, predictions, zero_division=0),
                            'recall': recall_score(y_val, predictions, zero_division=0),
                            'f1': f1_score(y_val, predictions, zero_division=0),
                            'num_features': X_train.shape[1]
                        }
                        results.append(current_run_results)

    return results

In [7]:
def find_best_params_per_model(results, evaluation_metric='f1'):
    best_by_model = {}
    
    model_best_scores = {
        'SVM': -1.0,
        'RandomForest': -1.0,
        'LASSO': -1.0,
        'KNN': -1.0,
    }
    
    for run in results:
        model_name = run['model']
        accuracy = run[evaluation_metric]
        
        if accuracy > model_best_scores[model_name]:
            model_best_scores[model_name] = accuracy
            best_by_model[model_name] = run
    
    sorted_models = sorted(model_best_scores.items(), key=lambda x: x[1])

    worst_two = [sorted_models[0][0], sorted_models[1][0]]

    for m in worst_two:
        if m in best_by_model:
            del best_by_model[m]

    return best_by_model

## Generate Test Predictions

In [8]:
def test_predictions(ticker, best_results, train_df, test_df, scaler):
    
    for model_name, best_run in best_results.items():
        fe_params = best_run['fe_params']
        
        fe_train_df = feature_engineering(
            train_df, 
            ema_length=fe_params['ema_length'], 
            rsi_length=fe_params['rsi_length'], 
            macd_short=fe_params['macd_short'], 
            macd_long=fe_params['macd_long'], 
            bb_window=fe_params['bb_window'],
        )

        fe_test_df = feature_engineering(
            test_df, 
            ema_length=fe_params['ema_length'], 
            rsi_length=fe_params['rsi_length'], 
            macd_short=fe_params['macd_short'], 
            macd_long=fe_params['macd_long'], 
            bb_window=fe_params['bb_window'],
        )
        
        X_train = fe_train_df.drop(columns=['Target','Date'])
        y_train = fe_train_df['Target']
        X_test = fe_test_df.drop(columns=['Target','Date'])
        y_test = fe_test_df['Target']
        
        model_params = best_run['model_params']
        
        if model_name == 'SVM':
            model = SVC(C=model_params['C'], kernel='linear', probability=False, random_state=SEED)
        elif model_name == 'RandomForest':
            model = RandomForestClassifier(n_estimators=model_params['n_estimators'], max_depth=model_params['max_depth'], random_state=SEED, n_jobs=-1)
        elif model_name == 'LASSO':
            model = LogisticRegression(penalty='l1', C=model_params['C'], solver='liblinear', random_state=SEED)
        elif model_name == 'KNN':
            model = KNeighborsClassifier(n_neighbors=model_params['n_neighbors'], n_jobs=-1)
        
        model.fit(X_train, y_train)
        predictions = model.predict(X_test)

        accuracy = accuracy_score(y_test, predictions)
        precision = precision_score(y_test, predictions, zero_division=0)
        print(f"Test Accuracy for Best {model_name}: {accuracy:.4f}")
        print(f"Test Precision for Best {model_name}: {precision:.4f}\n")

        result_df = pd.DataFrame({
            'Date': fe_test_df['Date'],
            'Open': fe_test_df['Open'],
            'High': fe_test_df['High'],
            'Low': fe_test_df['Low'],
            'Close': fe_test_df['Close'],
            'Volume': fe_test_df['Volume'],
            'Actual': y_test,
            'Predicted': predictions
        })

        result_df[['Open', 'High', 'Low', 'Close', 'Volume']] = scaler.inverse_transform(result_df[['Open', 'High', 'Low', 'Close', 'Volume']])

        result_df.to_csv(f'./data/predictions/{ticker}_{model_name}.csv', index=False)

## Backtest

In [9]:
def backtest(ticker, data_filename, strategy_class):
    cerebro = bt.Cerebro()
    cerebro.broker.set_cash(100000.0)
    cerebro.broker.setcommission(commission=0.001)
    cerebro.addsizer(bt.sizers.PercentSizer, percents=50)

    data = CustomCSVData.CustomCSVData(
        dataname=data_filename,
        preset='predicted',
    )

    cerebro.adddata(data)

    cerebro.addstrategy(strategy_class)
    cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')

    print(f"Starting Portfolio Value: {cerebro.broker.getvalue():.2f}")
    results = cerebro.run()
    print(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}")

    return results

## Single Stock Workflow

In [10]:
def single_stock_backtest(ticker):
    df = download_data(ticker, '2000-01-01', '2021-11-12', '1d', f'{ticker}_data.csv')
    train_df, test_df, scaler = split_transform(df)

    tunned_results = tune_model(train_df)

    best_results = find_best_params_per_model(tunned_results, evaluation_metric='f1')

    test_predictions(ticker, best_results, train_df, test_df,scaler)

    for model in best_results.keys():
        generate_reports(backtest(ticker, f'./data/predictions/{ticker}_{model}.csv', Momentum3), ticker_name=ticker, model_name=model, strategy_name="Momentum3")
        generate_reports(backtest(ticker, f'./data/predictions/{ticker}_{model}.csv', RollingVote), ticker_name=ticker, model_name=model, strategy_name="RollingVote")

In [None]:
single_stock_backtest("TSCO")

## 10 Stock Workflow

In [11]:
nyse_df = pd.read_csv('./data/tickers_nyse.csv')
nasd_df = pd.read_csv('./data/tickers_nasd.csv')

tickers_df = pd.concat([nyse_df, nasd_df], ignore_index=True)

In [None]:
tickers_df['IPOyear'] = pd.to_numeric(tickers_df['IPOyear'], errors='coerce')
tickers_df = tickers_df[(tickers_df['IPOyear'].notna()) & (tickers_df['IPOyear'] <= 2000)]
filtered_tickers = tickers_df['Symbol'].tolist()

In [None]:
random.shuffle(filtered_tickers)

valid = []
tried = set()

for ticker in filtered_tickers:
    if len(valid) == 10:
        break

    if ticker in tried:
        continue
    tried.add(ticker)

    try:
        single_stock_backtest(ticker)
        valid.append(ticker)
        print(f"Valid: {ticker}")
    except Exception as e:
        print(f"Invalid: {ticker} ({e})")
        continue

Sampled Tickers for Backtesting: ['ULTI', 'AUDC', 'CTIB', 'LMNX', 'RGEN', 'CIEN', 'TSCO', 'WTS', 'VRSN', 'KOPN']
