# Neural Networks Algorithm

## Necessary Configuration

In [1]:
# Imports
# Basic
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

# Tuning
import optuna
import plotly.io as pio
pio.renderers.default = 'notebook'  # or 'colab', 'iframe', 'vscode' depending on environment

# Preprocessing and algorithm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Test algorithm
from sklearn.metrics import mean_squared_error, mean_absolute_error

# Importing a file
import sys
sys.path.insert(0, "../")
from indicator import add_indicators

ImportError: DLL load failed while importing _multiarray_umath: The specified module could not be found.

ImportError: DLL load failed while importing _multiarray_umath: The specified module could not be found.

ImportError: numpy.core._multiarray_umath failed to import

ImportError: numpy.core.umath failed to import

TypeError: Unable to convert function return value to a Python type! The signature was
	() -> handle

In [None]:
# Path imports
import os
from pathlib import Path
from IPython import get_ipython

# Set notebook directory as cwd
try:
    notebook_path = get_ipython().run_line_magic("pwd", "")  # Gets notebook's folder
    os.chdir(notebook_path)
    print("CWD set to notebook folder:", os.getcwd())
except Exception as e:
    print("Could not set CWD:", e)

## Dataset

In [None]:
# Ticker name
tick = "NVDA"

# Whether or not to use truncated data
truncated = False
# Load time-indexed stock price dataset
if truncated:
    total = pd.read_csv(f"../single_stock/{tick.lower()}_data/truncated.csv")
else:
    total = pd.read_csv(f"../single_stock/{tick.lower()}_data/full.csv")

total.set_index("date", inplace=True)
total.index = pd.to_datetime(total.index, errors="raise")

total

In [None]:
# Deal with NaNs
total["summary missing"] = total["lsa"].isna().astype(int)
total.loc[:, "lsa":"textrank"].fillna(0, inplace=True)

# No NaNs allowed in target column
total["close"].isna().any()

## Train-test Split

In [None]:
# Sort in descending order
total.sort_index(ascending=True, inplace=True)

# Train-test split
train, test = None, None

if truncated:
    train, test = train_test_split(
        total, 
        train_size=0.8, 
        shuffle=False, 
        test_size=0.2
    )
else:
    # Get length of sentiment available days portion
    first_idx = total.loc[:, 'lsa'].first_valid_index()
    last_idx = total.loc[:, 'lsa'].last_valid_index()
    diff = last_idx - first_idx

    # Splice it by .85 to get train and test
    delta = timedelta((diff*.85).days)

    # Doesn't work too well if the news is towards the past
    # if total.index.get_loc(first_idx) + 1 > total.shape[0] - total.index.get_loc(last_idx):
    mid_idx = first_idx + delta
    # train = total.loc[:mid_idx]
    # test = total.loc[mid_idx:]
    # else:
    #     mid_idx = last_idx - delta
    #     test = total.loc[:mid_idx]
    #     train = total.loc[mid_idx:]

In [None]:
print(mid_idx)

## Objective Trial

In [None]:
def rmse(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_true - y_pred)))

In [None]:
def objective(trial):
    lagged = total.copy()
    
    # Lag features
    for lag in range(1, trial.suggest_int("lag", 0, 20)):
        lagged[f"close_lag_{lag}"] = total["close"].shift(lag)

    # Future price
    pred_days = trial.suggest_int("pred_days", 1, 15)
    lagged["close"] = lagged["close"].shift(-pred_days)
    
    # Indicator Hyperparameters
    sma = {
        trial.suggest_int("smap1", 3, 11),
        trial.suggest_int("smap2", 11, 21),
        trial.suggest_int("smap3", 21, 51),
        trial.suggest_int("smap4", 51, 201),
    }
    ema = {
        trial.suggest_int("emap1", 5, 13),
        trial.suggest_int("emap2", 13, 27),
        trial.suggest_int("emap3", 27, 51),
        trial.suggest_int("emap4", 51, 101),
    }
    macd_fast = trial.suggest_int("macd_fast",   10, 15)
    macd_slow = trial.suggest_int("macd_slow", 20, 30)
    macd_signal = trial.suggest_int("macd_signal", 7, 12)
    rsi_window = trial.suggest_int("rsi_window", 10, 20)
    srsi_window = trial.suggest_int("srsi_window", 10, 20)
    srsi_k = trial.suggest_int("srsi_k", 2, 5)
    srsi_d = trial.suggest_int("srsi_d", 2, 5)
    roc_window = trial.suggest_int("roc_window", 10, 20)
    mom_window = trial.suggest_int("mom_window", 5, 15)
    cmf_window = trial.suggest_int("cmf_window", 10, 30)
    mfi_window = trial.suggest_int("mfi_window", 10, 20)
    bollinger_window = trial.suggest_int("bollinger_window", 15, 30)
    bollinger_num_std = trial.suggest_int("bollinger_num_std", 1, 3)
    inc_obv = trial.suggest_categorical("inc_obv", [True, False])
    atr_window = trial.suggest_int("atr_window", 10, 20)
    donchian_window = trial.suggest_int("donchian_window", 15, 30)
    trix_window = trial.suggest_int("trix_window", 10, 20)
    inc_fib = trial.suggest_categorical("inc_fib", [True, False])
    pc_window = trial.suggest_int("pc_window", 15, 30)
    inc_fractals = trial.suggest_categorical("inc_fractals", [True, False])

    # Rearrange to descending
    lagged = lagged[::-1]
    
    train_loc = add_indicators(
        lagged.loc[mid_idx:],
        smap=sma,
        emap=ema,
        macd_fast=macd_fast,
        macd_slow=macd_slow,
        macd_signal=macd_signal,
        rsi_window=rsi_window,
        srsi_window=srsi_window,
        srsi_k=srsi_k,
        srsi_d=srsi_d,
        roc_window=roc_window,
        mom_window=mom_window,
        cmf_window=cmf_window,
        mfi_window=mfi_window,
        bollinger_window=bollinger_window,
        bollinger_num_std=bollinger_num_std,
        inc_obv=inc_obv,
        atr_window=atr_window,
        donchian_window=donchian_window,
        trix_window=trix_window,
        inc_fib=inc_fib,
        pc_window=pc_window,
        inc_fractals=inc_fractals)

    test_loc = add_indicators(
        lagged.loc[:mid_idx],
        smap=sma,
        emap=ema,
        macd_fast=macd_fast,
        macd_slow=macd_slow,
        macd_signal=macd_signal,
        rsi_window=rsi_window,
        srsi_window=srsi_window,
        srsi_k=srsi_k,
        srsi_d=srsi_d,
        roc_window=roc_window,
        mom_window=mom_window,
        cmf_window=cmf_window,
        mfi_window=mfi_window,
        bollinger_window=bollinger_window,
        bollinger_num_std=bollinger_num_std,
        inc_obv=inc_obv,
        atr_window=atr_window,
        donchian_window=donchian_window,
        trix_window=trix_window,
        inc_fib=inc_fib,
        pc_window=pc_window,
        inc_fractals=inc_fractals)

    # Fill index
    train_loc.index = pd.to_datetime(train_loc.index, errors="raise")
    test_loc.index = pd.to_datetime(test_loc.index, errors="raise")
    
    # Rearrange to ascending
    train_loc = train_loc[::-1].asfreq("B")
    train_loc["missing all"] = train_loc["close"].isna().astype(int)
    train_loc = train_loc.ffill().bfill()
    
    test_loc = test_loc[::-1].asfreq("B")
    test_loc["missing all"] = test_loc["close"].isna().astype(int)
    test_loc = test_loc.ffill().bfill()

    # Split x and y
    train_x = train_loc.drop(columns="close")
    train_y = train_loc["close"]
    test_x = test_loc.drop(columns="close")
    test_y = test_loc["close"]
    
    # Statistical Hyperparameters
    scaler = trial.suggest_categorical("scaler", ["standard", "minmax", "robust"])
    
    # X and Y scalers
    scaler_x, scaler_y = None, None

    # Use hyperparams
    if scaler == "standard":
        scaler_x, scaler_y = StandardScaler(), StandardScaler()
    elif scaler == "minmax":
        scaler_x, scaler_y = MinMaxScaler(), MinMaxScaler()
    else:
        scaler_x, scaler_y = RobustScaler(), RobustScaler()

    # X scale
    train_x = scaler_x.fit_transform(train_x)
    test_x = scaler_x.transform(test_x)

    # Y scale
    train_y = scaler_y.fit_transform(train_y.values.reshape(-1, 1))
    stest_y = scaler_y.transform(test_y.values.reshape(-1, 1))

    # Model Hyperparameters
    model = keras.Sequential()    # Start model
    model.add(layers.Input(shape=(train_x.shape[1],)))    # Specifies number of neurons in input layer (number of features)

    # Layer independent hyperparams
    min_neurons = 16
    max_neurons = 96
    min_layers = 1
    max_layers = 6
    dropout = trial.suggest_float("dropout", 0.1, 0.5)
    activation = trial.suggest_categorical("activation", ["relu", "leaky_relu", "tanh", "softmax", "gelu"])

    # Layer creation
    for layer in range(trial.suggest_int("n_layers", min_layers, max_layers)):
        model.add(
            layers.Dense(
                trial.suggest_int(f"n_neurons_l{layer}", min_neurons, max_neurons),
                activation=activation
            )    # Hidden layer with specified neuron count and activation function
        )    # Adds layer

        if layers < 2:    # Dropout on first 2 layers
            layers.Dropout(dropout)

    # Output layer
    model.add(layers.Dense(1))

    # Compile model
    optimizers = ["adam", "sgd", "rmsprop"]
    
    model.compile(
        optimizer=trial.suggest_categorical("optimizer", optimizers),
        loss=rmse
    )

    # Fit model
    epochs = trial.suggest_int("epochs", 50, 225)
    batch_size = trial.suggest_int("batch", 28, 64)
    
    history = model.fit(
        train_x,
        train_y,
        validation_split=0.1,
        epochs=epochs,
        batch_size=batch_size,
        verbose=2
    )

    pred_y = y_scaler.inverse_transform(model.predict(test_x))
    return np.sqrt(mean_squared_error(y_test_unscaled, y_pred))

## Execution of Algorithm

In [None]:
# import warnings
# warnings.simplefilter("ignore")

study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=1, show_progress_bar = True)

# Print the best hyperparameters
print("Best hyperparameters: ", study.best_params)

## Showing Results

In [None]:
print("Best trial:")
trial = study.best_trial
print(f"    RMSE: {trial.value}")
print("    Params:")
for key, value in trial.params.items():
    print(f"        {key}: {value}")

In [None]:

optuna.visualization.plot_parallel_coordinate(study)

In [None]:
optuna.visualization.plot_slice(study, params=["emap1", "lags", "learning_rate"]).update_layout(
    font=dict(size=18),  # Change this to your desired size
    title_font=dict(size=20),
    xaxis_title_font=dict(size=16),
    yaxis_title_font=dict(size=16)
)

In [None]:
optuna.visualization.plot_param_importances(study).update_layout(
    font=dict(size=18),  # Change this to your desired size
    title_font=dict(size=20),
    xaxis_title_font=dict(size=16),
    yaxis_title_font=dict(size=16)
)

In [None]:
# Plot

# from sklearn.linear_model import LogisticRegression


# 1. Initialize something to store results in
results = {}

optuna.visualization.plot_optimization_history(study)

# plt.plot(train[-100:], label="Train")
# plt.plot(test.index, test, label="Test")
# plt.plot(test.index, predictions, label="Prediction")
# plt.legend()
# plt.title("XGBoost with Skforecast")
# plt.show()