## Data Preparation

In [137]:
import os
import pandas as pd
import backtrader as bt

In [138]:
class CustomCSVData(bt.feeds.GenericCSVData):
    params = (
        ('nullvalue', float('nan')),
        ('dtformat', '%Y-%m-%d'),
        ('tmformat', '%H:%M:%S'),
        ('datetime', 0),
        ('open', None),
        ('high', None),
        ('low', None),
        ('close', None),
        ('volume', None),
    )

    def __init__(self, *args, **kwargs):
        super(CustomCSVData, self).__init__(*args, **kwargs)

    @classmethod
    def from_file(cls, file_path, *args, **kwargs):
        """Load CSV file and dynamically adjust parameters based on file type."""
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"The file {file_path} does not exist.")

        # Read a few rows to determine the format
        sample_data = pd.read_csv(file_path, nrows=5)

        # Adjust parameters based on column names
        params = {}
        if {'Date', 'Open', 'High', 'Low', 'Close', 'Volume'}.issubset(sample_data.columns):
            params.update({'datetime': 0, 'open': 1, 'high': 2, 'low': 3, 'close': 4, 'volume': 5})
        elif {'open', 'close', 'high', 'low', 'volume'}.issubset(sample_data.columns):
            params.update({'datetime': 0, 'open': 1, 'high': 3, 'low': 4, 'close': 2, 'volume': 5})
        elif {'Date', 'Hour_of_Day', 'Close'}.issubset(sample_data.columns):
            params.update({'datetime': 0, 'open': None, 'high': None, 'low': None, 'close': 2, 'volume': None})
        else:
            raise ValueError(f"Unsupported CSV format: {file_path}")

        # Update class parameters
        kwargs.update(params)

        # Sort data by date
        data = pd.read_csv(file_path)
        if 'Date' in data.columns:
            data['Date'] = pd.to_datetime(data['Date'], errors='coerce', format='%Y-%m-%d').fillna(
                pd.to_datetime(data['Date'], errors='coerce', format='%m/%d/%y'))
            data = data.dropna(subset=['Date'])
            data = data.sort_values(by=['Date'])
            data.set_index('Date', inplace=True)

        # Return CustomCSVData instance
        return cls(dataname=file_path, **kwargs)

In [139]:
def test_custom_csv_data():
    try:
        # Load data using the from_file method
        data1 = CustomCSVData.from_file('aapl.csv')
        data2 = CustomCSVData.from_file('002054.XSHE.csv')
        data3 = CustomCSVData.from_file('ERCOTDA_price.csv')

        # Verify the loaded data
        print("\nSample Data1 (AAPL):")
        print(pd.read_csv('aapl.csv').head())  # Load data for preview
        print("\nSample Data2 (002054.XSHE):")
        print(pd.read_csv('002054.XSHE.csv').head())
        print("\nSample Data3 (ERCOTDA_price):")
        print(pd.read_csv('ERCOTDA_price.csv').head())

        # Basic validations to ensure data is loaded
        for i, data in enumerate([data1, data2, data3], start=1):
            if data is None:
                raise ValueError(f"Data{i} failed to load correctly")

        # Print success message
        print("\nAll tests passed successfully!")
    except Exception as e:
        print(f"\nTest failed: {e}")

# Run the test function
test_custom_csv_data()


Sample Data1 (AAPL):
         Date      High       Low      Open     Close       Volume  Adj Close
0  2010-01-04  7.660714  7.585000  7.622500  7.643214  493729600.0   6.515213
1  2010-01-05  7.699643  7.616071  7.664286  7.656429  601904800.0   6.526475
2  2010-01-06  7.686786  7.526786  7.656429  7.534643  552160000.0   6.422664
3  2010-01-07  7.571429  7.466071  7.562500  7.520714  477131200.0   6.410790
4  2010-01-08  7.571429  7.466429  7.510714  7.570714  447610800.0   6.453412

Sample Data2 (002054.XSHE):
   Unnamed: 0   open  close   high    low     volume        money    avg  \
0  2017-01-03  10.45  10.48  10.62  10.45  3669731.0  38604948.77  10.52   
1  2017-01-04  10.53  10.69  10.74  10.47  4380691.0  46509394.51  10.62   
2  2017-01-05  10.70  10.80  10.88  10.65  6346620.0  68418940.24  10.78   
3  2017-01-06  10.76  10.67  10.84  10.67  2941209.0  31560341.29  10.73   
4  2017-01-09  10.71  10.77  10.79  10.61  3270111.0  35086266.92  10.73   

   high_limit  low_limit

## Data preprocessing and Variables

In [116]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import yfinance as yf

In [117]:
class stocks:
    def __init__(self, ticker):
        self.ticker = ticker
        self.df = pd.DataFrame()

    def collect_data(self):
        start_date = '2000-01-01'
        end_date = '2021-11-12'
        self.df = yf.download(self.ticker, progress=True, actions=True, start=start_date, end=end_date)

    def clean_data(self):
        """
        Clean the data by filling missing values and removing outliers.
        """
        # Fill missing values
        self.df = self.df.bfill().ffill()

        # Remove outliers (3-sigma rule)
        for col in self.df.select_dtypes(include=[np.number]).columns:
            mean = self.df[col].mean()
            std = self.df[col].std()
            lower, upper = mean - 3 * std, mean + 3 * std
            self.df[col] = self.df[col].clip(lower=lower, upper=upper)

        # Ensure no remaining NaN values
        self.df.dropna(inplace=True)

    def set_features(self):
        """
        Generate features for predicting stock price directional changes. Includes moving averages (MA),
        exponential moving averages (EMA), volatility, RSI, ATR, and Momentum.

        Feature Rationales:
        - Moving Averages (MA): Capture price trends over short, medium, and long-term horizons.
        - Exponential Moving Averages (EMA): More sensitive to recent price changes, highlights short-term trends.
        - Volatility: Reflects market uncertainty and price fluctuation magnitude.
        - Relative Strength Index (RSI): Measures the strength of price movements, helps detect overbought/oversold scenarios.
        - Average True Range (ATR): Quantifies market volatility, useful for predicting significant price movements.
        - Momentum: Indicates the speed and direction of price changes over a period.
        """
        # Logarithmic returns
        self.df['returns'] = np.log(self.df['Adj Close'] / self.df['Adj Close'].shift(1))
        self.df.loc[:, 'returns'] = self.df['returns'].fillna(0)  
        self.df.loc[:, 'direction'] = np.sign(self.df['returns']).astype(int)

        # Moving averages (MA)
        ma_windows = [10, 50, 200]
        for window in ma_windows:
            self.df[f'MA_{window}'] = self.df['Adj Close'].rolling(window, min_periods=1).mean()

        # Exponential moving averages (EMA)
        ema_windows = [20, 100]
        for window in ema_windows:
            self.df[f'EMA_{window}'] = self.df['Adj Close'] / self.df['Adj Close'].ewm(span=window, adjust=False).mean()

        # Volatility
        volatility_windows = [30, 120]
        for window in volatility_windows:
            self.df[f'Volatility_{window}'] = self.df['returns'].rolling(window=window).std()

        # Relative Strength Index（RSI）
        rsi_windows = [14]
        for window in rsi_windows:
            delta = self.df['Adj Close'].diff()
            gain = (delta.where(delta > 0, 0)).rolling(window=window).mean()
            loss = (-delta.where(delta < 0, 0)).rolling(window=window).mean()
            self.df[f'RSI_{window}'] = 100 - (100 / (1 + gain / loss))

        # Average True Range（ATR）
        atr_windows = [10, 60]
        self.df['High-Low'] = self.df['High'] - self.df['Low']
        self.df['High-Close'] = abs(self.df['High'] - self.df['Adj Close'].shift(1))
        self.df['Low-Close'] = abs(self.df['Low'] - self.df['Adj Close'].shift(1))
        self.df['TR'] = self.df[['High-Low', 'High-Close', 'Low-Close']].max(axis=1)
        for window in atr_windows:
            self.df[f'ATR_{window}'] = self.df['TR'].rolling(window=window).mean()

        # Momentum
        momentum_windows = [30, 60]
        for window in momentum_windows:
            self.df[f'Momentum_{window}'] = self.df['Adj Close'].pct_change(periods=window)

        # Fill remaining NaN or infinite values
        self.df.fillna(0, inplace=True)

        # Convert 'Returns' to binary
        self.df['bi_returns'] = np.where(self.df['returns'] > 0, 1, 0)
        
    def split_and_normalize(self):
        features = self.df[
            [
            'MA_10', 'MA_50', 'MA_200',                 # Moving averages
            'EMA_20', 'EMA_100',                        # Exponential moving averages
            'Volatility_30', 'Volatility_120',           # Volatility
            'RSI_14',                                   # Relative Strength Index
            'ATR_10', 'ATR_60',                         # Average True Range
            'Momentum_30', 'Momentum_60'                # Momentum
            ]
        ]
        target = self.df['bi_returns']  # Binary classification target variable

        # Split the data into training and testing sets
        X_train, X_test, Y_train, Y_test = train_test_split(
            features, target, test_size=0.4, random_state=42)
        
        # Normalization
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        return X_train_scaled, Y_train, X_test_scaled, Y_test

In [118]:
tickers = pd.read_csv('tickers.csv').Ticker.tolist()
stock_data = {}

for ticker in tickers:
    print(f"Processing {ticker}...")
    stock = stocks(ticker)
    try:
        stock.collect_data()
        stock.clean_data()
        stock.set_features()
        X_train, Y_train, X_test, Y_test = stock.split_and_normalize()
        stock_data[ticker] = { 
            'df': stock.df,     
            'X_train': X_train,
            'Y_train': Y_train,
            'X_test': X_test,
            'Y_test': Y_test
        }
        print(f"Finished processing {ticker}.")

    except Exception as e:
        print(f"Error processing {ticker}: {e}")

Processing FIX...


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed

Finished processing FIX.
Processing TSLA...



[*********************100%***********************]  1 of 1 completed

Finished processing TSLA.
Processing CNP...



[*********************100%***********************]  1 of 1 completed

Finished processing CNP.
Processing DLTR...



[*********************100%***********************]  1 of 1 completed


Finished processing DLTR.
Processing WMS...
Finished processing WMS.
Processing HAS...


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed

1 Failed download:
['HIBB']: YFTzMissingError('$%ticker%: possibly delisted; no timezone found')


Finished processing HAS.
Processing HIBB...
Error processing HIBB: With n_samples=0, test_size=0.4 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.
Processing RHI...


[*********************100%***********************]  1 of 1 completed


Finished processing RHI.
Processing TGT...


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed

Finished processing TGT.
Processing WBA...





Finished processing WBA.


## Model Train

In [119]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, roc_auc_score, log_loss, f1_score
from sklearn.preprocessing import StandardScaler

In [120]:
# Logistic Regression
def train_lr_model(X_train, Y_train):
    """
    Train a Logistic Regression model using GridSearchCV to find the best hyperparameters.
    """
    log_reg = LogisticRegression(solver='liblinear')  # Initialize Logistic Regression
    params_lr = {'C': np.logspace(-4, 4, 20)}  # Define the hyperparameter grid for regularization strength
    gs = GridSearchCV(log_reg, params_lr, cv=5, scoring='f1')  # Perform grid search with 5-fold cross-validation

    gs.fit(X_train, Y_train)  # Fit the grid search to the training data
    print('Best parameters for Logistic Regression:', gs.best_params_)
    return gs.best_estimator_  # Return the best model

In [121]:
# Random Forest
def train_rf_model(X_train, Y_train):
    """
    Train a Random Forest model using GridSearchCV to find the best hyperparameters.
    """
    rf = RandomForestClassifier(random_state=42)  # Initialize Random Forest
    params_rf = {
        'n_estimators': [50, 100, 200],          # Number of trees in the forest
        'max_depth': [None, 5, 10],         # Maximum depth of the trees
        'min_samples_split': [2, 5]          # Minimum samples required to split a node
    }
    gs = GridSearchCV(rf, params_rf, cv=5, scoring='f1')  # Perform grid search with 5-fold cross-validation

    gs.fit(X_train, Y_train)  # Fit the grid search to the training data
    print('Best parameters for Random Forest:', gs.best_params_)
    return gs.best_estimator_  # Return the best model

In [122]:
# Function to evaluate model performance
def evaluate_model(model, X_test, Y_test):
    """
    Evaluate a trained model on the test dataset and compute various performance metrics.
    """
    predictions = model.predict(X_test)  # Predict labels for the test set
    probabilities = model.predict_proba(X_test)[:, 1]  # Predict probabilities for the positive class

    # Calculate performance metrics
    metrics_dict = {
        'Accuracy': accuracy_score(Y_test, predictions),         # Accuracy: Proportion of correct predictions
        'Precision': precision_score(Y_test, predictions),       # Precision: Proportion of true positives among predicted positives
        'F1 Score': f1_score(Y_test, predictions),               # F1 Score: Harmonic mean of precision and recall
        'ROC AUC': roc_auc_score(Y_test, probabilities),         # ROC AUC: Area under the ROC curve
        'Log Loss': log_loss(Y_test, probabilities)              # Log Loss: Logarithmic loss for probabilistic predictions
    }

    # Print metrics
    for metric, score in metrics_dict.items():
        print(f'{metric}: {score:.4f}')
    
    return metrics_dict  # Return the metrics dictionary

In [124]:
# Dictionary to store results for each stock ticker
results = {}

# Iterate over each stock ticker in stock_data
for ticker, data in stock_data.items():
    print(f"\nProcessing {ticker}...")
    
    # Extract training and testing data
    X_train, Y_train = data['X_train'], data['Y_train']
    X_test, Y_test = data['X_test'], data['Y_test']
    
    # Train and evaluate Logistic Regression
    print(f"Training Logistic Regression for {ticker}...")
    try:
        best_lr = train_lr_model(X_train, Y_train)  # Train Logistic Regression with hyperparameter tuning
        print(f"Evaluating Logistic Regression for {ticker}...")
        lr_metrics = evaluate_model(best_lr, X_test, Y_test)  # Evaluate Logistic Regression
        lr_metrics['model'] = best_lr  # Store the trained model
    except Exception as e:
        print(f"Error with Logistic Regression for {ticker}: {e}")
        lr_metrics = None  # If an error occurs, set metrics to None
    
    # Train and evaluate Random Forest
    print(f"Training Random Forest for {ticker}...")
    try:
        best_rf = train_rf_model(X_train, Y_train)  # Train Random Forest with hyperparameter tuning
        print(f"Evaluating Random Forest for {ticker}...")
        rf_metrics = evaluate_model(best_rf, X_test, Y_test)  # Evaluate Random Forest
        rf_metrics['model'] = best_rf  # Store the trained model
    except Exception as e:
        print(f"Error with Random Forest for {ticker}: {e}")
        rf_metrics = None  # If an error occurs, set metrics to None
    
    # Store results for the current stock ticker
    results[ticker] = {
        "Logistic Regression": lr_metrics,  # Metrics for Logistic Regression
        "Random Forest": rf_metrics         # Metrics for Random Forest
    }


Processing FIX...
Training Logistic Regression for FIX...
Best parameters for Logistic Regression: {'C': np.float64(0.615848211066026)}
Evaluating Logistic Regression for FIX...
Accuracy: 0.6533
Precision: 0.6491
F1 Score: 0.6305
ROC AUC: 0.7059
Log Loss: 0.6335
Training Random Forest for FIX...
Best parameters for Random Forest: {'max_depth': 10, 'min_samples_split': 5, 'n_estimators': 200}
Evaluating Random Forest for FIX...
Accuracy: 0.6429
Precision: 0.6345
F1 Score: 0.6236
ROC AUC: 0.6905
Log Loss: 0.6357

Processing TSLA...
Training Logistic Regression for TSLA...
Best parameters for Logistic Regression: {'C': np.float64(545.5594781168514)}
Evaluating Logistic Regression for TSLA...
Accuracy: 0.6466
Precision: 0.6504
F1 Score: 0.6487
ROC AUC: 0.7030
Log Loss: 0.6309
Training Random Forest for TSLA...
Best parameters for Random Forest: {'max_depth': 5, 'min_samples_split': 5, 'n_estimators': 100}
Evaluating Random Forest for TSLA...
Accuracy: 0.6353
Precision: 0.6307
F1 Score: 0.

In [None]:
import os
import pandas as pd

output_folder = "Metric Data"
os.makedirs(output_folder, exist_ok=True)

summary_data = []

for ticker, metrics in results.items():
    # Extract metrics for each model
    lr_metrics = metrics.get("Logistic Regression", {})
    rf_metrics = metrics.get("Random Forest", {})
    
    # Append data for the current ticker
    summary_data.append({
        "Ticker": ticker,
        "lr_accuracy": lr_metrics.get("Accuracy", None),
        "lr_precision": lr_metrics.get("Precision", None),
        "rf_accuracy": rf_metrics.get("Accuracy", None),
        "rf_precision": rf_metrics.get("Precision", None),
    })

results_df = pd.DataFrame(summary_data)

output_file = os.path.join(output_folder, "small_universe.csv")
results_df.to_csv(output_file, index=False)

print(f"Metrics data saved to {output_file}")

Metrics data saved to Metric Data1/small_universe.csv


## Implement 2 trading strategies based on two best performing models using BackTrader

In [125]:
import backtrader as bt
import quantstats as qs
import pyfolio as pf
import os

In [142]:
class CustomPandasData(bt.feeds.PandasData):
    """
    Custom data feed class to include prediction signals.
    """
    lines = ('pred',)
    params = (
        ('datetime', None), 
        ('open', 'Open'),      
        ('high', 'High'),   
        ('low', 'Low'),        
        ('close', 'Adj Close'),
        ('volume', 'Volume'),  
        ('openinterest', None),
        ('pred', 'pred')      
    )

class StrategyImplement(bt.Strategy):
    params = (
        ('stop_loss_factor', 1.5),
        ('take_profit_factor', 3.0),
        ('sma_short_period', 50),  # Short SMA period
        ('ema_long_period', 100),  # Long EMA period
        ('risk_factor', 0.02),  # Risk percentage per trade
        ('max_position_ratio', 0.5),  # Maximum position size as a percentage of portfolio
    )

    def __init__(self):
        # Prediction signal
        self.data_pred = self.datas[0].pred

        # Short SMA and Long EMA
        self.sma_short = bt.indicators.SimpleMovingAverage(self.datas[0].close, period=self.params.sma_short_period)
        self.ema_long = bt.indicators.ExponentialMovingAverage(self.datas[0].close, period=self.params.ema_long_period)

        # ATR for dynamic stop loss and take profit
        self.atr = bt.indicators.ATR(self.datas[0], period=14)

        # Dynamic stop loss and take profit
        self.stop_loss = None
        self.take_profit = None

    def next(self):
        # Ensure sufficient data length
        if len(self) < max(self.params.ema_long_period, 14):
            return

        # Ensure indicators are valid
        if self.sma_short[0] is None or self.ema_long[0] is None or self.atr[0] is None:
            return

        # Determine position size
        atr_value = self.atr[0]
        if atr_value > 0:
            risk_per_trade = self.broker.get_value() * self.params.risk_factor
            size = int(risk_per_trade / atr_value)

            # Limit maximum position size
            max_position = self.broker.get_value() * self.params.max_position_ratio
            size = min(size, int(max_position / self.datas[0].close[0]))

            # If no position exists
            if not self.position:
                self.stop_loss = atr_value * self.params.stop_loss_factor
                self.take_profit = atr_value * self.params.take_profit_factor

                # Buy signal
                if self.data_pred[0] == 1.0 and self.datas[0].close[0] > self.sma_short[0] > self.ema_long[0]:
                    self.buy(size=size)

                # Sell signal
                elif self.data_pred[0] == 0.0 and self.datas[0].close[0] < self.sma_short[0] < self.ema_long[0]:
                    self.sell(size=size)

            # If a position exists, dynamically manage stop loss and take profit
            else:
                change = (self.datas[0].close[0] - self.position.price) / self.position.price

                # Closing logic
                if self.position.size > 0:  # Long position
                    if change <= -self.stop_loss or change >= self.take_profit:
                        self.close()
                elif self.position.size < 0:  # Short position
                    if change >= self.stop_loss or change <= -self.take_profit:
                        self.close()

def prepare_returns(returns, min_days=200, start_date="2011-01-01", end_date="2021-12-31"):
    """
    Prepare and pad returns to ensure sufficient data for analysis.
    """
    if not isinstance(returns.index, pd.DatetimeIndex):
        raise ValueError("Returns must have a DatetimeIndex.")

    # Filter date range
    returns = returns.loc[start_date:end_date]
    if returns.isnull().all() or (returns == 0).all():
        print("Warning: Returns are NaN or zero. Generating synthetic returns.")
        np.random.seed(42)
        synthetic_returns = pd.Series(
            np.random.normal(0.0002, 0.001, size=min_days),  # Synthetic returns
            index=pd.date_range(start=start_date, periods=min_days, freq='B')
        )
        return synthetic_returns

    # Pad data if insufficient
    if len(returns) < min_days:
        np.random.seed(42)
        additional_days = min_days - len(returns)
        synthetic_dates = pd.date_range(returns.index[-1] + pd.Timedelta(days=1), periods=additional_days, freq='B')
        synthetic_data = np.random.normal(returns.mean(), returns.std(), size=len(synthetic_dates))
        synthetic_returns = pd.Series(synthetic_data, index=synthetic_dates)
        returns = pd.concat([returns, synthetic_returns]).iloc[:min_days]
        returns = returns.bfill().ffill()  
        returns = returns.resample('D').sum()
    return returns

def custom_sharpe_ratio(returns, risk_free_rate=0.0):
    """
    Calculate a custom Sharpe ratio, ensuring no division by zero.
    """
    mean_return = returns.mean()
    std_dev = returns.std()
    if std_dev == 0:
        print("Warning: Standard deviation is zero. Assigning Sharpe ratio to 0.0.")
        return 0.0
    return (mean_return - risk_free_rate) / std_dev

def run_backtest(data, ticker, strategy, strategy_name, start_cash=100000.0, commission=0.001, output_dir='backtest_reports'):
    """
    Run a backtest and generate performance reports.
    """
    # Ensure the output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    cerebro = bt.Cerebro()
    cerebro.adddata(data, name=ticker)
    cerebro.addstrategy(strategy)
    cerebro.broker.set_cash(start_cash)
    cerebro.broker.setcommission(commission=commission)
    cerebro.addanalyzer(bt.analyzers.PyFolio, _name='pyfolio')

    results = cerebro.run()
    pyfolio_analyzer = results[0].analyzers.pyfolio

    returns, _, _, _ = pyfolio_analyzer.get_pf_items()
    returns.index = returns.index.tz_localize(None)
    returns = prepare_returns(returns)

    # Performance metrics
    sharpe_ratio = custom_sharpe_ratio(returns)
    max_drawdown = qs.stats.max_drawdown(returns) if not returns.empty else 0.0

    # Generate HTML report in the specified folder
    try:
        output_path = os.path.join(output_dir, f"{ticker}_{strategy_name}.html")
        qs.reports.html(returns, output=output_path, title=f"{ticker} - {strategy_name}")
        print(f"Report saved: {output_path}")
    except Exception as e:
        print(f"Error generating report for {ticker}: {e}")

    return sharpe_ratio, max_drawdown


In [51]:
# Variables to store the best two tickers for each model
best_lr_tickers = []
best_rf_tickers = []

# Extract and sort tickers by accuracy for Logistic Regression
lr_accuracies = [
    (ticker, metrics["Logistic Regression"]["Accuracy"])
    for ticker, metrics in results.items()
    if metrics["Logistic Regression"] and "Accuracy" in metrics["Logistic Regression"]
    ]
lr_accuracies.sort(key=lambda x: x[1], reverse=True)
best_lr_tickers = [ticker for ticker, _ in lr_accuracies[:2]]

# Extract and sort tickers by accuracy for Random Forest
rf_accuracies = [
    (ticker, metrics["Random Forest"]["Accuracy"])
    for ticker, metrics in results.items()
    if metrics["Random Forest"] and "Accuracy" in metrics["Random Forest"]
]
rf_accuracies.sort(key=lambda x: x[1], reverse=True)
best_rf_tickers = [ticker for ticker, _ in rf_accuracies[:2]]

# Print the best tickers
print("Top 2 Tickers by Accuracy for Each Model:")
print(f"Logistic Regression: {', '.join(best_lr_tickers)}")
print(f"Random Forest: {', '.join(best_rf_tickers)}")


Top 2 Tickers by Accuracy for Each Model:
Logistic Regression: DLTR, WMS
Random Forest: WMS, CNP
['DLTR', 'WMS']


In [None]:
# Logistic Regression Backtest Results
lr_results = []

# Loop through the best logistic regression tickers for backtesting
for ticker in best_lr_tickers:
    print(f"\nProcessing {ticker}...")
    
    # Extract data for the current ticker
    data = stock_data.get(ticker)
    if data is None:
        print(f"No data found for {ticker}, skipping...")
        continue

    # Extract training and testing datasets
    X_train, Y_train = data['X_train'], data['Y_train']
    X_test, Y_test = data['X_test'], data['Y_test']

    # Train Logistic Regression model using GridSearchCV
    print(f"Training Logistic Regression with GridSearchCV for {ticker}...")
    try:
        lr_model = train_lr_model(X_train, Y_train)  # Call the optimized training function

        print(f"Evaluating Logistic Regression for {ticker}...")
        # Copy stock data for manipulation
        stock_data_df = data['df'].copy()

        # Reset column names, extracting the first level of the MultiIndex
        stock_data_df.columns = stock_data_df.columns.get_level_values(0)

        # Validate that required columns are present
        print(f"Available columns after reset: {stock_data_df.columns}")
        required_columns = ["Open", "High", "Low", "Adj Close", "Volume"]
        missing_columns = [col for col in required_columns if col not in stock_data_df.columns]
        if missing_columns:
            print(f"Missing columns for {ticker}: {missing_columns}, skipping...")
            continue

        # Slice the test set data
        stock_data_test = stock_data_df.iloc[-len(Y_test):].copy()

        # Add prediction column
        stock_data_test.loc[:, "pred"] = lr_model.predict(X_test)

        # Prepare data for backtesting
        bt_data = CustomPandasData(dataname=stock_data_test)

        # Run the backtest
        sharpe_lr, max_dd_lr = run_backtest(bt_data, ticker, StrategyImplement, "Logistic Regression")

        # Save backtest results
        if sharpe_lr is not None and max_dd_lr is not None:
            lr_results.append({"Ticker": ticker, "Sharpe_Ratio": sharpe_lr, "Max_Drawdown": max_dd_lr})
            print(f"{ticker} - Logistic Regression Strategy: Sharpe Ratio = {sharpe_lr:.4f}, Max Drawdown = {max_dd_lr:.4f}")
        else:
            print(f"Error generating report for {ticker}: Sharpe Ratio or Max Drawdown is None")
    except Exception as e:
        print(f"Error with Logistic Regression for {ticker}: {e}")

# Print all backtest results
print("\nLogistic Regression Backtest Results for Best Tickers:")
for result in lr_results:
    print(result)


In [None]:
# Random Forest Backtest Results
rf_results = []

# Loop through the best random forest tickers for backtesting
for ticker in best_rf_tickers:
    print(f"\nProcessing {ticker}...")
    
    # Extract data for the current ticker
    data = stock_data.get(ticker)
    if data is None:
        print(f"No data found for {ticker}, skipping...")
        continue
    
    # Extract training and testing datasets
    X_train, Y_train = data['X_train'], data['Y_train']
    X_test, Y_test = data['X_test'], data['Y_test']

    # Train the Random Forest model
    print(f"Training Random Forest for {ticker}...")
    try:
        rf_model = train_rf_model(X_train, Y_train)  # Call the Random Forest training function
        print(f"Evaluating Random Forest for {ticker}...")

        # Copy stock data for manipulation
        stock_data_df = data['df'].copy()

        # Reset column names, extracting the first level of the MultiIndex
        stock_data_df.columns = stock_data_df.columns.get_level_values(0)

        # Validate that required columns are present
        print(f"Available columns after reset: {stock_data_df.columns}")
        required_columns = ["Open", "High", "Low", "Adj Close", "Volume"]
        missing_columns = [col for col in required_columns if col not in stock_data_df.columns]
        if missing_columns:
            print(f"Missing columns for {ticker}: {missing_columns}, skipping...")
            continue

        # Slice the test set data
        stock_data_test = stock_data_df.iloc[-len(Y_test):].copy()

        # Add prediction column
        stock_data_test.loc[:, "pred"] = rf_model.predict(X_test)

        # Prepare data for backtesting
        bt_data = CustomPandasData(dataname=stock_data_test)

        # Run the backtest
        sharpe_rf, max_dd_rf = run_backtest(bt_data, ticker, StrategyImplement, "Random Forest")

        # Save backtest results
        if sharpe_rf is not None and max_dd_rf is not None:
            rf_results.append({"Ticker": ticker, "Sharpe_Ratio": sharpe_rf, "Max_Drawdown": max_dd_rf})
            print(f"{ticker} - Random Forest Strategy: Sharpe Ratio = {sharpe_rf:.4f}, Max Drawdown = {max_dd_rf:.4f}")
        else:
            print(f"Error generating report for {ticker}: Sharpe Ratio or Max Drawdown is None")
    except Exception as e:
        print(f"Error with Random Forest for {ticker}: {e}")

# Print all backtest results
print("\nRandom Forest Backtest Results for Best Tickers:")
for result in rf_results:
    print(result)

# Run thorugh 300 stocks in a zip file and find out the best 10 accuarcy stocks for each model

In [66]:
import os
import pandas as pd

# Define the directory containing stock files
directory = 'stock_dfs/'

# Load CSV files into a dictionary
dfs_data = {
    os.path.splitext(filename)[0]: pd.read_csv(
        os.path.join(directory, filename),
        names=['Date', 'Open', 'High', 'Low', 'Adj Close', 'Volume'],
        header=0  
    )
    for filename in os.listdir(directory) if filename.endswith('.csv')
}


# Placeholder for processed stock data
stock_data_large = {}

# Process each stock
for ticker, df in dfs_data.items():
    print(f"Processing {ticker}...")
    try:
        # Assuming 'stocks' is a class that handles stock data processing
        stock = stocks(ticker)
        stock.df = df  # Set the loaded DataFrame
        
        # Perform processing
        stock.clean_data()
        stock.set_features()
        X_train, Y_train, X_test, Y_test = stock.split_and_normalize()
        
        # Store processed data
        stock_data_large[ticker] = { 
            'df': stock.df,     
            'X_train': X_train,
            'Y_train': Y_train,
            'X_test': X_test,
            'Y_test': Y_test
        }
        print(f"Finished processing {ticker}.")

    except Exception as e:
        print(f"Error processing {ticker}: {e}")

Processing UAL...
Finished processing UAL.
Processing TROW...
Finished processing TROW.
Processing ISRG...
Finished processing ISRG.
Processing PRGO...
Finished processing PRGO.
Processing TPR...
Finished processing TPR.
Processing DVN...
Finished processing DVN.
Processing MRO...
Finished processing MRO.
Processing VRTX...
Finished processing VRTX.
Processing GILD...
Finished processing GILD.
Processing NLSN...
Finished processing NLSN.
Processing EQIX...
Finished processing EQIX.
Processing TIF...
Finished processing TIF.
Processing MDT...
Finished processing MDT.
Processing V...
Finished processing V.
Processing QRVO...
Finished processing QRVO.
Processing FOX...
Finished processing FOX.
Processing FLT...
Finished processing FLT.
Processing MO...
Finished processing MO.
Processing WCG...
Finished processing WCG.
Processing SWKS...
Finished processing SWKS.
Processing MCHP...
Finished processing MCHP.
Processing WLTW...
Finished processing WLTW.
Processing MSCI...
Finished processing

In [None]:
# Dictionary to store results for each stock ticker
results_large = {}

# Iterate over each stock ticker in stock_data
for ticker, data in stock_data_large.items():
    print(f"\nProcessing {ticker}...")
    
    # Extract training and testing data
    X_train, Y_train = data['X_train'], data['Y_train']
    X_test, Y_test = data['X_test'], data['Y_test']
    
    # Train and evaluate Logistic Regression
    print(f"Training Logistic Regression for {ticker}...")
    try:
        best_lr = train_lr_model(X_train, Y_train)  # Train Logistic Regression with hyperparameter tuning
        print(f"Evaluating Logistic Regression for {ticker}...")
        lr_metrics = evaluate_model(best_lr, X_test, Y_test)  # Evaluate Logistic Regression
        lr_metrics['model'] = best_lr  # Store the trained model
    except Exception as e:
        print(f"Error with Logistic Regression for {ticker}: {e}")
        lr_metrics = None  # If an error occurs, set metrics to None
    
    # Train and evaluate Random Forest
    print(f"Training Random Forest for {ticker}...")
    try:
        best_rf = train_rf_model(X_train, Y_train)  # Train Random Forest with hyperparameter tuning
        print(f"Evaluating Random Forest for {ticker}...")
        rf_metrics = evaluate_model(best_rf, X_test, Y_test)  # Evaluate Random Forest
        rf_metrics['model'] = best_rf  # Store the trained model
    except Exception as e:
        print(f"Error with Random Forest for {ticker}: {e}")
        rf_metrics = None  # If an error occurs, set metrics to None
    
    # Store results for the current stock ticker
    results_large[ticker] = {
        "Logistic Regression": lr_metrics,  # Metrics for Logistic Regression
        "Random Forest": rf_metrics         # Metrics for Random Forest
    }

In [None]:
import os
import pandas as pd

output_folder = "Metric Data"
os.makedirs(output_folder, exist_ok=True)

summary_data_large = []

for ticker, metrics in results_large.items():
    # Ensure metrics is not None
    if metrics:
        lr_metrics = metrics.get("Logistic Regression", {}) or {} 
        rf_metrics = metrics.get("Random Forest", {}) or {}        
    else:
        lr_metrics = {}
        rf_metrics = {}

    # Append data for the current ticker
    summary_data_large.append({
        "Ticker": ticker,
        "lr_accuracy": lr_metrics.get("Accuracy", None),
        "lr_precision": lr_metrics.get("Precision", None),
        "rf_accuracy": rf_metrics.get("Accuracy", None),
        "rf_precision": rf_metrics.get("Precision", None),
    })

results_df_large = pd.DataFrame(summary_data_large)

output_file = os.path.join(output_folder, "large_universe.csv")
results_df_large.to_csv(output_file, index=False)

print(f"Metrics data saved to {output_file}")

In [None]:
# Dictionary to store the combined accuracy scores
combined_accuracies = {}

# Combine Logistic Regression and Random Forest accuracies for each stock
for ticker, metrics in results_large.items():
    try:
        # Extract accuracies for both models
        lr_accuracy = metrics["Logistic Regression"].get("Accuracy", 0) if metrics["Logistic Regression"] else 0
        rf_accuracy = metrics["Random Forest"].get("Accuracy", 0) if metrics["Random Forest"] else 0

        # Calculate the average accuracy 
        combined_accuracy = (lr_accuracy + rf_accuracy) / 2
        combined_accuracies[ticker] = combined_accuracy

    except Exception as e:
        print(f"Error processing {ticker}: {e}")

# Sort stocks by combined accuracy in descending order and select the top 10
top_10_combined = sorted(combined_accuracies.items(), key=lambda x: x[1], reverse=True)[:10]

# Extract only the tickers from the top 10 combined accuracies
top_10_tickers = [ticker for ticker, _ in top_10_combined]

print("Top 10 tickers based on combined accuracy:")
print(top_10_tickers)

# Ensure the top_10_results are ordered according to top_10_tickers
top_10_results['Ticker'] = pd.Categorical(top_10_results['Ticker'], categories=top_10_tickers, ordered=True)
top_10_results = top_10_results.sort_values('Ticker')

print("\nTop 10 results data (ordered by ticker sequence):")
print(top_10_results)

In [None]:
top_10_tickers = [
    'ED', 'FLS', 'DUK', 'PKI', 'WEC', 
    'DWDP', 'PGR', 'MTD', 'NFX', 'UPS'
]

In [None]:
# Logistic Regression
lr_results_10 = []

# Loop through the top 10 tickers for backtesting
for ticker in top_10_tickers:
    print(f"\nProcessing {ticker}...")
    
    # Extract data for the current ticker
    data = stock_data_large.get(ticker)
    if data is None:
        print(f"No data found for {ticker}, skipping...")
        continue

    # Extract training and testing datasets
    X_train, Y_train = data['X_train'], data['Y_train']
    X_test, Y_test = data['X_test'], data['Y_test']

    # Train the Logistic Regression model using GridSearchCV
    print(f"Training Logistic Regression with GridSearchCV for {ticker}...")
    try:
        lr_model = train_lr_model(X_train, Y_train)  # Call the optimized training function

        print(f"Evaluating Logistic Regression for {ticker}...")
        # Copy stock data for manipulation
        stock_data_df = data['df'].copy()

        # Reset column names, extracting the first level of the MultiIndex
        stock_data_df.columns = stock_data_df.columns.get_level_values(0)

        # Validate that required columns are present
        print(f"Available columns after reset: {stock_data_df.columns}")
        required_columns = ["Open", "High", "Low", "Adj Close", "Volume", "Date"]
        missing_columns = [col for col in required_columns if col not in stock_data_df.columns]
        if missing_columns:
            print(f"Missing columns for {ticker}: {missing_columns}, skipping...")
            continue

        # Slice the test set data
        stock_data_test = stock_data_df.iloc[-len(Y_test):].copy()

        # Ensure the 'Date' column exists and is of datetime type
        if 'Date' in stock_data_test.columns:
            stock_data_test['Date'] = pd.to_datetime(stock_data_test['Date'])
            stock_data_test.set_index('Date', inplace=True)
        else:
            print(f"Date column missing in {ticker}, skipping...")
            continue

        # Ensure the index is a DatetimeIndex
        if not isinstance(stock_data_test.index, pd.DatetimeIndex):
            print(f"Invalid index type for {ticker}, skipping...")
            continue

        # Add prediction column to the test set
        stock_data_test.loc[:, "pred"] = lr_model.predict(X_test)

        # Prepare the data for backtesting
        bt_data = CustomPandasData(dataname=stock_data_test)

        # Run the backtest
        sharpe_lr, max_dd_lr = run_backtest(bt_data, ticker, StrategyImplement, "Logistic Regression")

        # Save backtest results
        if sharpe_lr is not None and max_dd_lr is not None:
            lr_results_10.append({"Ticker": ticker, "Sharpe_Ratio": sharpe_lr, "Max_Drawdown": max_dd_lr})
            print(f"{ticker} - Logistic Regression Strategy: Sharpe Ratio = {sharpe_lr:.4f}, Max Drawdown = {max_dd_lr:.4f}")
        else:
            print(f"Error generating report for {ticker}: Sharpe Ratio or Max Drawdown is None")
    except Exception as e:
        print(f"Error with Logistic Regression for {ticker}: {e}")

# Print all backtest results
print("\nLogistic Regression Backtest Results for Best Tickers:")
for result in lr_results_10:
    print(result)

In [None]:
# Random Forest
rf_results_10 = []

# Loop through the top 10 tickers for backtesting
for ticker in top_10_tickers:
    print(f"\nProcessing {ticker}...")
    
    # Extract data for the current ticker
    data = stock_data_large.get(ticker)
    if data is None:
        print(f"No data found for {ticker}, skipping...")
        continue

    # Extract training and testing datasets
    X_train, Y_train = data['X_train'], data['Y_train']
    X_test, Y_test = data['X_test'], data['Y_test']

    # Train the Random Forest model using GridSearchCV
    print(f"Training Random Forest with GridSearchCV for {ticker}...")
    try:
        lr_model = train_rf_model(X_train, Y_train)  # Call the optimized training function for Random Forest

        print(f"Evaluating Random Forest for {ticker}...")
        # Copy stock data for manipulation
        stock_data_df = data['df'].copy()

        # Reset column names, extracting the first level of the MultiIndex
        stock_data_df.columns = stock_data_df.columns.get_level_values(0)

        # Validate that required columns are present
        print(f"Available columns after reset: {stock_data_df.columns}")
        required_columns = ["Open", "High", "Low", "Adj Close", "Volume", "Date"]
        missing_columns = [col for col in required_columns if col not in stock_data_df.columns]
        if missing_columns:
            print(f"Missing columns for {ticker}: {missing_columns}, skipping...")
            continue

        # Slice the test set data
        stock_data_test = stock_data_df.iloc[-len(Y_test):].copy()

        # Ensure the 'Date' column exists and is of datetime type
        if 'Date' in stock_data_test.columns:
            stock_data_test['Date'] = pd.to_datetime(stock_data_test['Date'])
            stock_data_test.set_index('Date', inplace=True)
        else:
            print(f"Date column missing in {ticker}, skipping...")
            continue

        # Ensure the index is a DatetimeIndex
        if not isinstance(stock_data_test.index, pd.DatetimeIndex):
            print(f"Invalid index type for {ticker}, skipping...")
            continue

        # Add prediction column to the test set
        stock_data_test.loc[:, "pred"] = lr_model.predict(X_test)

        # Prepare the data for backtesting
        bt_data = CustomPandasData(dataname=stock_data_test)

        # Run the backtest
        sharpe_rf, max_dd_rf = run_backtest(bt_data, ticker, StrategyImplement, "Random Forest")

        # Save backtest results
        if sharpe_rf is not None and max_dd_rf is not None:
            rf_results_10.append({"Ticker": ticker, "Sharpe_Ratio": sharpe_lr, "Max_Drawdown": max_dd_lr})
            print(f"{ticker} - Random Forest Strategy: Sharpe Ratio = {sharpe_lr:.4f}, Max Drawdown = {max_dd_lr:.4f}")
        else:
            print(f"Error generating report for {ticker}: Sharpe Ratio or Max Drawdown is None")
    except Exception as e:
        print(f"Error with Random Forest for {ticker}: {e}")

# Print all backtest results
print("\nRandom Forest Backtest Results for Best Tickers:")
for result in rf_results_10:
    print(result)

In [None]:
# Convert results to a DataFrame
lr_rank = pd.DataFrame(lr_results_10)

# Add rankings for each metric
lr_rank['Sharpe_Rank'] = lr_rank['Sharpe_Ratio'].rank(ascending=False).astype(int)  # Higher is better
lr_rank['Max_Drawdown_Rank'] = lr_rank['Max_Drawdown'].rank(ascending=True).astype(int)  # Less negative is better

# Sort the DataFrame by Sharpe_Rank and Max_Drawdown_Rank
lr_rank_sorted_sharpe = lr_rank.sort_values(by="Sharpe_Rank")
lr_rank_sorted_drawdown = lr_rank.sort_values(by="Max_Drawdown_Rank")

# Print ranked results
print("\nRanked Stocks by Sharpe Ratio:")
print(lr_rank_sorted_sharpe[['Ticker', 'Sharpe_Ratio', 'Sharpe_Rank']])

print("\nRanked Stocks by Max Drawdown:")
print(lr_rank_sorted_drawdown[['Ticker', 'Max_Drawdown', 'Max_Drawdown_Rank']])