In [2]:
import numpy as np
import warnings
import pandas as pd
import yfinance as yf
from datetime import datetime, timedelta
from sklearn.svm import SVC
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import StackingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import GridSearchCV, KFold, cross_val_score
from sklearn.naive_bayes import GaussianNB, MultinomialNB, BernoulliNB
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, make_scorer
import xgboost as xgb
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model

def predict_stock_direction(ticker, n_days, n_hist_wk, prediction_target=0, fe=None, n_components=10, n_jobs=24):
    warnings.filterwarnings('ignore')
    # ================== Data Preparation ====================
    end_date = datetime.today()
    start_date = end_date - timedelta(weeks=n_hist_wk)

    data = yf.download(ticker, start=start_date, end=end_date, interval='1d')

    # Adding in-day movement
    data['in_day_movement'] = data['Close'] - data['Open']

    if prediction_target == 0:  # Predict change of close
        data['target'] = data['Close'].pct_change()
    elif prediction_target == 1:  # Predict next day open relative to last day's close
        data['target'] = (data['Open'].shift(-1) - data['Close']) / data['Close']

    data['price_change'] = data['Close'].pct_change()
    data['volume_change'] = data['Volume'].pct_change()
    data['relative_high'] = (data['High'] - data['Close']) / data['Close']
    data['relative_low'] = (data['Low'] - data['Close']) / data['Close']

    feature_df = pd.DataFrame()

    # Add recent n_days data
    for i in range(n_days, 0, -1):
        feature_df[f'price_change_{i}'] = data['price_change'].shift(i)
        feature_df[f'volume_change_{i}'] = data['volume_change'].shift(i)
        feature_df[f'relative_high_{i}'] = data['relative_high'].shift(i)
        feature_df[f'relative_low_{i}'] = data['relative_low'].shift(i)
        feature_df[f'in_day_movement_{i}'] = data['in_day_movement'].shift(i)  # Added in-day movement feature
    
    # Add other factor
    feature_df['day_in_week'] = data.index.dayofweek
    feature_df['day_in_month'] = data.index.day
    feature_df['month_in_year'] = data.index.month
    feature_df['quarter'] = data.index.quarter

    # Changed this part to classify based on the new classes
    bins = [-np.inf, -0.02, -0.005, 0.005, 0.02, np.inf]
    labels = [0, 1, 2, 3, 4]
    feature_df['direction'] = pd.cut(data['target'], bins=bins, labels=labels).astype(float)
    feature_df = feature_df.dropna()

    # Your existing code for baseline accuracy, data splitting etc. goes here

    print('... Data prepared! Data size:', feature_df.shape)
    
    # =================== Train-test split =========================
    X = feature_df.drop(columns=['direction'])
    y = feature_df['direction']

    X_train_ori, X_test_ori, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    print('... Train/Tests splitted! Train size:', X_train_ori.shape)
    
    # Normalize the input data
    scaler = StandardScaler()
    X_train_normalized = scaler.fit_transform(X_train_ori)
    X_test_normalized = scaler.transform(X_test_ori)

    # PCA model
    pca = PCA(n_components=n_components)
    
    # Autoencoder model
    input_dim = X_train_normalized.shape[1]
    encoding_dim = n_components
    input_layer = Input(shape=(input_dim,))
    encoder_layer = Dense(encoding_dim, activation='relu')(input_layer)
    decoder_layer = Dense(input_dim, activation='linear')(encoder_layer)
    autoencoder = Model(inputs=input_layer, outputs=decoder_layer)
    autoencoder.compile(optimizer='adam', loss='mean_squared_error')

    X_train = None
    X_test  = None
    if fe == 'pca':
        X_train = pca.fit_transform(X_train_normalized)
        X_test = pca.transform(X_test_normalized)
        print('... PCA done! Train size:', X_train.shape)
    elif fe == 'ae':
        autoencoder.fit(X_train_normalized, X_train_normalized,
                    epochs=100,  # You can choose the number of epochs
                    batch_size=32,
                    shuffle=True,
                    validation_data=(X_test_normalized, X_test_normalized))
        encoder = Model(inputs=input_layer, outputs=encoder_layer)
        X_train = encoder.predict(X_train_normalized)
        X_test = encoder.predict(X_test_normalized)
        print('... Autoencoder trained! Train size:', X_train.shape)
    else:
        print('... No dimensionality reduction will be performed. Train size:', X_train.shape)
    
    scoring_metrics = {
        'accuracy': make_scorer(accuracy_score),
        'precision': make_scorer(precision_score),
        'recall': make_scorer(recall_score),
        'f1_score': make_scorer(f1_score)
    }
    
    print('... Model training started!')
    
    # ==================== Logistic Regression =======================
    log_reg = LogisticRegression(random_state=42)
    param_grid5 = {
        'penalty': ['l2', 'none'],
        'C': np.logspace(-4, 4, 9),
        'fit_intercept': [True, False],
        'solver': ['newton-cg', 'lbfgs', 'liblinear', 'sag', 'saga'],
        'max_iter': [100, 500, 1000, 2000]
    }
    grid_search_log_reg = GridSearchCV(
        log_reg, param_grid5, scoring=scoring_metrics, refit='accuracy', cv=5, n_jobs=n_jobs)
    grid_search_log_reg.fit(X_train, y_train)
    #print(f'... Best Logistic Regression accuracy on training set: {grid_search_log_reg.best_score_:.4f}')
    best_log_reg = grid_search_log_reg.best_estimator_
    y_pred = best_log_reg.predict(X_test)
    print(f'... Best Logistic Regression accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')

    # =================== Support vector machine =========================
    #svm = SVC()
    #param_grid1 = {
    #    'C': np.logspace(-2, 2, 5),
    #    'kernel': ['linear'],
    #}
    #grid_search_svm = GridSearchCV(
    #    svm, param_grid1, scoring=scoring_metrics, refit='accuracy', cv=5, n_jobs=n_jobs)
    #grid_search_svm.fit(X_train, y_train)
    #print(f'... Best SVM accuracy on training set: {grid_search_svm.best_score_:.4f}')
    #best_svm = grid_search_svm.best_estimator_
    #y_pred = best_svm.predict(X_test)
    #print(f'... Best SVM accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')

    # ==================== Random forest =======================
    rf = RandomForestClassifier(random_state=42)
    param_grid2 = {
        'n_estimators': [50, 100, 200],
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'max_features': ['sqrt'],
        'bootstrap': [True, False]
    }
    grid_search_rf = GridSearchCV(
        rf, param_grid2, scoring=scoring_metrics, refit='accuracy', cv=5, n_jobs=n_jobs)
    grid_search_rf.fit(X_train, y_train)
    #print(f'... Best Random forest accuracy on training set: {grid_search_rf.best_score_:.4f}')
    best_rf = grid_search_rf.best_estimator_
    y_pred = best_rf.predict(X_test)
    print(f'... Best Random forest accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')

    # ==================== Gradient boost =======================
    gbm = GradientBoostingClassifier(random_state=42)
    param_grid3 = {
        'n_estimators': [50, 100, 200],
        'learning_rate': [0.01, 0.1, 0.2],
        'max_depth': [3, 5, 8],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'max_features': ['sqrt']
    }
    grid_search_gbm = GridSearchCV(
        gbm, param_grid3, scoring=scoring_metrics, refit='accuracy', cv=5, n_jobs=n_jobs)
    grid_search_gbm.fit(X_train, y_train)
    #print(f'... Best Gradient boost accuracy on training set: {grid_search_gbm.best_score_:.4f}')
    best_gbm = grid_search_gbm.best_estimator_
    y_pred = best_gbm.predict(X_test)
    print(f'... Best Gradient boost accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')
    
    # ==================== XGBoost Classifier =======================
    xgb_clf = xgb.XGBClassifier(random_state=42, objective='binary:logistic')
    param_grid6 = {
        'learning_rate': [0.01, 0.1],
        'n_estimators': [50, 100],
        'max_depth': [3, 5],
        'min_child_weight': [1, 3],
        'gamma': [0, 0.1],
        'subsample': [0.8],
        'colsample_bytree': [0.8],
    }
    grid_search_xgb = GridSearchCV(
        xgb_clf, param_grid6, scoring=scoring_metrics, refit='accuracy', cv=5, n_jobs=n_jobs)
    grid_search_xgb.fit(X_train, y_train)
    #print(f'... Best XGBoost accuracy on training set: {grid_search_xgb.best_score_:.4f}')
    best_xgb = grid_search_xgb.best_estimator_
    y_pred = best_xgb.predict(X_test)
    print(f'... Best XGBoost accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')
    
    # ==================== Multi-Layer Percepton =======================
    mlp = MLPClassifier(random_state=42)
    param_grid4 = {
        'hidden_layer_sizes': [(50, 50), (100, 100), (50, 100), (100, 50), (50, 50, 50), 
                               (100, 50, 100), (50, 100, 50), (50, 100, 100),
                               (100, 100, 100), (50, 50, 50, 50)],
        'activation': ['relu', 'tanh'],
        'solver': ['adam'],
        'alpha': [0.0001, 0.001],
        'learning_rate': ['constant', 'adaptive'],
    }
    grid_search_mlp = GridSearchCV(
        mlp, param_grid4, scoring=scoring_metrics, refit='accuracy', cv=5, n_jobs=n_jobs)
    grid_search_mlp.fit(X_train, y_train)
    #print(f'... Best MLP accuracy on training set: {grid_search_mlp.best_score_:.4f}')
    best_mlp = grid_search_mlp.best_estimator_
    y_pred = best_mlp.predict(X_test)
    print(f'... Best MLP accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')

    # ==================== k-Nearest Neighbors (k-NN) Classifier =======================
    from sklearn.neighbors import KNeighborsClassifier

    knn = KNeighborsClassifier()
    param_grid_knn = {
        'n_neighbors': list(range(1, 31)),
        'weights': ['uniform', 'distance'],
        'metric': ['euclidean', 'manhattan', 'minkowski']
    }
    grid_search_knn = GridSearchCV(
        knn, param_grid_knn, scoring=scoring_metrics, refit='accuracy', cv=5, n_jobs=n_jobs)
    grid_search_knn.fit(X_train, y_train)
    #print(f'... Best k-NN accuracy on training set: {grid_search_knn.best_score_:.4f}')
    best_knn = grid_search_knn.best_estimator_
    y_pred = best_knn.predict(X_test)
    print(f'... Best k-NN accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')

    # ==================== Naive Bayes Classifier =======================
    from sklearn.naive_bayes import GaussianNB, MultinomialNB, BernoulliNB

    # Choose the appropriate Naive Bayes Classifier based on your data type
    naive_bayes = GaussianNB()
    # naive_bayes = MultinomialNB()
    # naive_bayes = BernoulliNB()

    naive_bayes.fit(X_train, y_train)
    #print(f'... Naive Bayes accuracy on training set: {cross_val_score(naive_bayes, X_train, y_train, cv=5, scoring="accuracy").mean():.4f}')
    y_pred = naive_bayes.predict(X_test)
    print(f'... Naive Bayes accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')

    # ==================== Decision Trees =======================
    from sklearn.tree import DecisionTreeClassifier

    dt = DecisionTreeClassifier(random_state=42)
    param_grid_dt = {
        'criterion': ['gini', 'entropy'],
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'max_features': [None, 'sqrt', 'log2']
    }
    grid_search_dt = GridSearchCV(
        dt, param_grid_dt, scoring=scoring_metrics, refit='accuracy', cv=5, n_jobs=n_jobs)
    grid_search_dt.fit(X_train, y_train)
    #print(f'... Best Decision tree accuracy on training set: {grid_search_dt.best_score_:.4f}')
    best_dt = grid_search_dt.best_estimator_
    y_pred = best_dt.predict(X_test)
    print(f'... Best Decision tree accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')

    # ==================== Stacking =======================
    stacked_clf = StackingClassifier(
        estimators=[
            #('svm', best_svm),
            ('random_forest', best_rf),
            ('gbm', best_gbm),
            ('mlp', best_mlp),
            ('logreg', best_log_reg),
            ('xgboost', best_xgb),
            ('knn', best_knn),
            ('DT', best_dt),
            ('naive_bayes', naive_bayes)
         ],
        final_estimator=LogisticRegression(random_state=42),
        cv=5,
        n_jobs=n_jobs
    )
    stacked_clf.fit(X_train, y_train)
    y_pred = stacked_clf.predict(X_test)
    print(f'... Stacking accuracy on test set: {accuracy_score(y_test, y_pred):.4f}')

    # ==================== Prediction =======================
    last_n_days_data = X.iloc[-1:]
    last_n_days_data_normalized = scaler.transform(last_n_days_data)

    if fe == 'pca':
        last_n_days_data_transformed = pca.transform(last_n_days_data_normalized)
        feature_columns = [f'PC{i + 1}' for i in range(n_components)]
    elif fe == 'ae':
        last_n_days_data_transformed = encoder.predict(last_n_days_data_normalized)
        feature_columns = [f'AE{i + 1}' for i in range(encoding_dim)]
    else:
        last_n_days_data_transformed = last_n_days_data_normalized
        feature_columns = X.columns

    last_n_days_data_with_feature_names = pd.DataFrame(
        data=last_n_days_data_transformed,
        columns=feature_columns
    )

    # Make predictions using the best models
    #tomorrow_direction_svm = best_svm.predict(last_n_days_data_with_feature_names)
    tomorrow_direction_rf = best_rf.predict(last_n_days_data_with_feature_names)
    tomorrow_direction_gbm = best_gbm.predict(last_n_days_data_with_feature_names)
    tomorrow_direction_mlp = best_mlp.predict(last_n_days_data_with_feature_names)
    tomorrow_direction_logreg = best_log_reg.predict(last_n_days_data_with_feature_names)
    tomorrow_direction_xgb = best_xgb.predict(last_n_days_data_with_feature_names)
    tomorrow_direction_knn = best_knn.predict(last_n_days_data_with_feature_names)
    tomorrow_direction_dt = best_dt.predict(last_n_days_data_with_feature_names)
    tomorrow_direction_naive_bayes = naive_bayes.predict(last_n_days_data_with_feature_names)
    tomorrow_direction_stacked = stacked_clf.predict(last_n_days_data_with_feature_names)

    # Print the predictions for tomorrow
    models = {
        "Logistic Regression": tomorrow_direction_logreg[0],
        #"SVM": tomorrow_direction_svm[0],
        "Random Forest": tomorrow_direction_rf[0],
        "GBM": tomorrow_direction_gbm[0],
        "MLP": tomorrow_direction_mlp[0],
        "XGBoost": tomorrow_direction_xgb[0],
        "KNN": tomorrow_direction_knn[0],
        "DesTree": tomorrow_direction_dt[0],
        "NaiveBayes": tomorrow_direction_naive_bayes[0],
        "Stacking": tomorrow_direction_stacked[0],
    }

    movement_categories = {
        0: 'Huge Down',
        1: 'Mild Down',
        2: 'Flat',
        3: 'Mild Up',
        4: 'Huge Up'
    }

    prediction_target_text = "Change of Close" if prediction_target == 0 else "Next Day's Open Price Relative to Last Day's Close"

    print(f"Predictions for {ticker} tomorrow (Predicting {prediction_target_text}):")
    for model_name, direction in models.items():
        print(f"{model_name}: {movement_categories[direction]}")

In [5]:
predict_stock_direction('QQQ', 5, 500, 0, 'pca', 10)
#predict_stock_direction('QQQ', 5, 500, 1, 'pca', 10)
predict_stock_direction('QQQ', 21, 500, 0, 'pca', 20)
#predict_stock_direction('QQQ', 21, 500, 1, 'pca', 20)

[*********************100%***********************]  1 of 1 completed
... Data prepared! Data size: (2407, 30)
... Train/Tests splitted! Train size: (1925, 29)
... PCA done! Train size: (1925, 10)
... Model training started!
... Best Logistic Regression accuracy on test set: 0.4440
... Best Random forest accuracy on test set: 0.4232
... Best Gradient boost accuracy on test set: 0.4315
... Best XGBoost accuracy on test set: 0.4295
... Best MLP accuracy on test set: 0.3610
... Best k-NN accuracy on test set: 0.3963
... Naive Bayes accuracy on test set: 0.4149
... Best Decision tree accuracy on test set: 0.4170
... Stacking accuracy on test set: 0.4274
Predictions for QQQ tomorrow (Predicting Change of Close):
Logistic Regression: Flat
Random Forest: Mild Up
GBM: Mild Up
MLP: Mild Up
XGBoost: Flat
KNN: Mild Up
DesTree: Mild Up
NaiveBayes: Flat
Stacking: Mild Up
[*********************100%***********************]  1 of 1 completed
... Data prepared! Data size: (2406, 30)
... Train/Tests spli

In [3]:
predict_stock_direction('SPY', 5, 500, 0, 'pca', 10)
#predict_stock_direction('SPY', 5, 500, 1, 'pca', 10)
predict_stock_direction('SPY', 21, 500, 0, 'pca', 20)
#predict_stock_direction('SPY', 21, 500, 1, 'pca', 20)

[*********************100%***********************]  1 of 1 completed
... Data prepared! Data size: (3852, 25)
Baseline accuracy ratio: 0.54
... Train/Tests splitted! Train size: (3081, 24)
... PCA done! Train size: (3081, 15)
... Model training started!
... Best Logistic Regression accuracy on test set: 0.5577
... Best Random forest accuracy on test set: 0.5305
... Best Gradient boost accuracy on test set: 0.5396
... Best XGBoost accuracy on test set: 0.5240
... Best MLP accuracy on test set: 0.5110
... Best k-NN accuracy on test set: 0.5175
... Naive Bayes accuracy on test set: 0.5435
... Best Decision tree accuracy on test set: 0.5149
... Stacking accuracy on test set: 0.5422
Predictions for SPY tomorrow:
Logistic Regression: UP
Random Forest: DOWN
GBM: UP
MLP: DOWN
XGBoost: UP
KNN: DOWN
DesTree: UP
NaiveBayes: UP
Stacking: UP
[*********************100%***********************]  1 of 1 completed
... Data prepared! Data size: (3837, 85)
Baseline accuracy ratio: 0.54
... Train/Tests spl