In [1]:
import dask
import dask.dataframe as dd
from dask_ml.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib
import os
from imblearn.over_sampling import SMOTE

In [2]:
from sklearn.metrics import accuracy_score, classification_report
import talib

In [3]:
# Please update inputs in distributed_utils.py with the same inputs as here
DATA_DIR = "processed_data"
MODEL_DIR = "models"
os.makedirs(MODEL_DIR, exist_ok=True)

In [4]:
def load_and_prepare_data(ticker):
    """
    Load and prepare data for a given ticker. Includes inspection of NaN values before dropping them.

    Parameters:
        ticker (str): Stock ticker.

    Returns:
        tuple: Features (X) and target (y).
    """
    file_path = f"{DATA_DIR}/{ticker}_historical_prices_.csv"
    df = dd.read_csv(file_path)
    
    # Ensure necessary columns exist
    required_columns = ["Close", "SMA_20", "EMA_20", "RSI", "Volatility", "Close_lag_1", "Close_lag_2", "Momentum"]
    for col in required_columns:
        if col not in df.columns:
            raise ValueError(f"Column {col} missing in {ticker} dataset.")

    # df["MACD"], df["MACD_signal"], _ = talib.MACD(df["Close"].to_numpy())
    
    # Inspect NaN values
    print(f"\nInspecting NaN values for {ticker}:")
    nan_summary = df.isna().sum().compute()  # Get the total number of NaN values per column
    print(nan_summary)
    
    # Check rows with NaN
    print(f"\nRows with NaN values (sample) for {ticker}:")
    rows_with_nan = df[df.isna().any(axis=1)].compute()
    print(rows_with_nan.head())  # Print the first few rows with NaN values
    
    # Features and target
    print(f"\nDropping rows with NaN values for {ticker}...")
    df = df.dropna()  # Drop rows with NaN values
    print(f"Remaining rows after dropping NaN: {len(df)}")
    
    X = df[["SMA_20", "EMA_20", "RSI", "RSI_SMA_Ratio", "Volatility", "Close_lag_1", "Close_lag_2", "Momentum"]]
    # y = (df["Close"].shift(-1) > df["Close"]).astype(int)  # 1 if price goes up, 0 otherwise
    y = (df["Close"].shift(-5) > df["Close"]).astype(int)
    return X, y

def train_random_forest(ticker):
    """
    Train an improved Random Forest model for a specific stock ticker.
    Includes:
    - Data split into train and test sets.
    - Hyperparameter tuning.
    - Class balancing.
    - Improved feature engineering.

    Parameters:
        ticker (str): Stock ticker symbol.

    Returns:
        str: Path to the saved model.
    """
    print(f"\nTraining Random Forest model for {ticker}...")

    # Step 1: Load and Prepare Data
    X, y = load_and_prepare_data(ticker)
    X = X.compute()  # Convert Dask DataFrame to Pandas
    y = y.compute()

    # Inspect class balance
    print(f"Class distribution for {ticker}:")
    print(y.value_counts())
    smote = SMOTE(random_state=42)
    
    # Step 2: Split the Data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    X_train_balanced, y_train_balanced = smote.fit_resample(X_train, y_train)
    # # Step 2: Train-Test Split
    # X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    # Step 3: Define the Model
    model = RandomForestClassifier(random_state=42, class_weight="balanced")

    # Step 4: Hyperparameter Grid
    param_grid = {
        "n_estimators": [50, 100, 200],
        "max_depth": [5, 10, 15],
        "min_samples_split": [2, 5, 10]
    }

    # Step 5: Hyperparameter Tuning with GridSearchCV
    print(f"Performing grid search for {ticker}...")
    grid_search = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        cv=3,
        scoring="accuracy",
        n_jobs=-1
    )
    grid_search.fit(X_train_balanced, y_train_balanced)

    # Step 6: Evaluate the Model
    best_model = grid_search.best_estimator_
    print(f"Best parameters for {ticker}: {grid_search.best_params_}")

    y_pred = best_model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy for {ticker}: {accuracy:.2f}")

    # Detailed Classification Report
    print(f"Classification Report for {ticker}:\n")
    print(classification_report(y_test, y_pred))

    # Step 7: Save the Model
    model_path = f"{MODEL_DIR}/{ticker}_improved_random_forest.pkl"
    joblib.dump(best_model, model_path)
    print(f"Model saved to {model_path}")

    return model_path

def train_models_for_all_tickers(tickers):
    """
    Train Random Forest models for all tickers in parallel using Dask.

    Parameters:
        tickers (list): List of stock tickers.
    """
    # Each ticker's training task is executed in parallel using dask.delayed.
    futures = [dask.delayed(train_random_forest)(ticker) for ticker in tickers]
    results = dask.compute(*futures)  # Run tasks in parallel
    print("All models trained and saved:")
    for result in results:
        print(result)

In [5]:
tickers = ["AAPL", "GOOGL", "MSFT", "TSLA"]
train_models_for_all_tickers(tickers)


Training Random Forest model for AAPL...

Training Random Forest model for MSFT...

Training Random Forest model for TSLA...

Training Random Forest model for GOOGL...

Inspecting NaN values for AAPL:

Inspecting NaN values for TSLA:

Inspecting NaN values for MSFT:

Inspecting NaN values for GOOGL:
Unnamed: 0          0
Price               0
Close               0
High                0
Low                 0
Open                0
Volume              0
SMA_20             19
EMA_20              0
RSI                13
Bollinger_Upper    19
Bollinger_Lower    19
10_day_MA           9
50_day_MA          49
RSI_SMA_Ratio      19
Volatility         19
Close_lag_1         1
Close_lag_2         2
Momentum            5
dtype: int64

Rows with NaN values (sample) for AAPL:
Unnamed: 0          0
Price               0
Close               0
High                0
Low                 0
Open                0
Volume              0
SMA_20             19
EMA_20              0
RSI                13
Bollin

## Used Logistic Regression, but as expected, model didn't perform well

In [1]:
import dask
import dask.dataframe as dd
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from sklearn.metrics import classification_report, accuracy_score
import os
import joblib

# Define the directories
DATA_DIR = "processed_data"
MODEL_DIR = "models"
os.makedirs(MODEL_DIR, exist_ok=True)

def load_and_prepare_data(ticker):
    """
    Load and prepare data for a given ticker.

    Parameters:
        ticker (str): Stock ticker.

    Returns:
        tuple: Features (X) and target (y).
    """
    file_path = f"{DATA_DIR}/{ticker}_historical_prices_.csv"
    df = dd.read_csv(file_path)
    
    # Ensure necessary columns exist
    required_columns = ["Close", "SMA_20", "EMA_20", "RSI"]
    for col in required_columns:
        if col not in df.columns:
            raise ValueError(f"Column {col} missing in {ticker} dataset.")
    
    # Drop NaN values and prepare features and target
    df["RSI_SMA_Ratio"] = df["RSI"] / df["SMA_20"]
    df["Volatility"] = df["Close"].rolling(window=20).std()
    df["Close_lag_1"] = df["Close"].shift(1)
    df["Close_lag_2"] = df["Close"].shift(2)

    df = df.dropna()
    X = df[["SMA_20", "EMA_20", "RSI", "RSI_SMA_Ratio", "Volatility", "Close_lag_1", "Close_lag_2"]]
    # y = (df["Close"].shift(-1) > df["Close"]).astype(int)  # Binary classification: 1 if price goes up, 0 otherwise
    y = (df["Close"].shift(-5) > df["Close"]).astype(int)
    return X, y

def train_logistic_regression(ticker):
    """
    Train a logistic regression model for a specific stock ticker using Dask-ML.

    Parameters:
        ticker (str): Stock ticker symbol.

    Returns:
        str: Path to the saved model.
    """
    print(f"Training Logistic Regression model for {ticker}...")
    
    # Step 1: Load and Prepare Data
    X, y = load_and_prepare_data(ticker)
    X = X.compute()  # Convert Dask DataFrame to Pandas
    y = y.compute()

    smote = SMOTE(random_state=42)
    
    # Step 2: Split the Data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    X_train_balanced, y_train_balanced = smote.fit_resample(X_train, y_train)

    # Print the new class distribution
    print("Class distribution after SMOTE:")
    print(pd.Series(y_train_balanced).value_counts())    
    # Step 3: Define the Model
    model = LogisticRegression(penalty='l2', max_iter=500, solver='lbfgs', class_weight="balanced", random_state=42)
    
    # Step 4: Hyperparameter Grid
    param_grid = {
        "C": [0.1, 1.0, 10.0],  # Regularization parameter
        "tol": [1e-4, 1e-3, 1e-2]
    }

    # Step 5: Distributed Grid Search with Dask-ML
    grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=3, scoring="accuracy", n_jobs=-1)
    grid_search.fit(X_train_balanced, y_train_balanced)
    
    # Step 6: Evaluate the Model
    best_model = grid_search.best_estimator_
    print(f"Best parameters for {ticker}: {grid_search.best_params_}")
    
    y_pred = best_model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy for {ticker}: {accuracy:.2f}")
    
    # Detailed classification metrics
    print(f"Classification Report for {ticker}:\n")
    print(classification_report(y_test, y_pred))
    
    # Step 7: Save the Model
    model_path = f"{MODEL_DIR}/{ticker}_logistic_regression.pkl"
    joblib.dump(best_model, model_path)
    print(f"Model saved to {model_path}")
    
    return model_path

def train_models_for_all_tickers_logistic(tickers):
    """
    Train Logistic Regression models for all tickers in parallel using Dask.

    Parameters:
        tickers (list): List of stock tickers.
    """
    futures = [dask.delayed(train_logistic_regression)(ticker) for ticker in tickers]
    results = dask.compute(*futures)  # Run tasks in parallel
    print("All models trained and saved:")
    for result in results:
        print(result)



In [2]:

# List of tickers to train on
tickers = ["AAPL", "GOOGL", "MSFT", "TSLA"]

# Train models for all tickers
train_models_for_all_tickers_logistic(tickers)

Training Logistic Regression model for GOOGL...Training Logistic Regression model for AAPL...

Training Logistic Regression model for MSFT...
Training Logistic Regression model for TSLA...
Class distribution after SMOTE:
Close
0    1482
1    1482
Name: count, dtype: int64
Class distribution after SMOTE:
Close
0    1482
1    1482
Name: count, dtype: int64
Class distribution after SMOTE:
Close
0    1482
1    1482
Name: count, dtype: int64
Class distribution after SMOTE:
Close
0    1482
1    1482
Name: count, dtype: int64
Best parameters for GOOGL: {'C': 10.0, 'tol': 0.0001}
Accuracy for GOOGL: 0.56
Classification Report for GOOGL:

              precision    recall  f1-score   support

           0       0.48      0.43      0.45       274
           1       0.61      0.66      0.63       371

    accuracy                           0.56       645
   macro avg       0.54      0.54      0.54       645
weighted avg       0.55      0.56      0.56       645

Best parameters for AAPL: {'C': 10.