In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import keras
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
# import Input layer
from keras.layers import Input, Dense, LSTM, Dropout, BatchNormalization, Masking, LayerNormalization, Bidirectional, Conv1D, GlobalAvgPool1D, Attention, Activation, MaxPool1D, GlobalAveragePooling1D
from keras.models import Model
from keras.losses import Huber
from keras.optimizers import Adam
#from tensorflow_addons.losses import SigmoidFocalCrossEntropy
from keras import regularizers
from keras.callbacks import EarlyStopping
from sklearn.metrics import mean_squared_error
# traint test
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight

2025-05-15 11:09:52.438051: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1747303792.456073  526879 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1747303792.461251  526879 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-05-15 11:09:52.478997: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
#check for gpus
tf.config.list_physical_devices('GPU')

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [3]:
ticker_str = "SPY"
yf_ticker = yf.Ticker(ticker_str)
yf_data = yf_ticker.history(period="30y")

yf_data["Log Return"] = np.log(yf_data["Close"] / yf_data["Close"].shift(1))
yf_data = yf_data[1:]

yf_data.head()

$SPY: possibly delisted; no price data found  (period=30y)


Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,Log Return
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1


In [4]:
# features
yf_data["Log Return Sq"] = yf_data["Log Return"] ** 2
lookback = 60

num_rolling = [15, 30, 60, 180]
for i in num_rolling:
    yf_data[f"Rolling Mean {i}"] = yf_data["Log Return"].rolling(i).mean()
    yf_data[f"Rolling Std {i}"] = yf_data["Log Return"].rolling(i).std()
    yf_data[f"Rolling Skew {i}"] = yf_data["Log Return"].rolling(i).skew()
    yf_data[f"Rolling Kurt {i}"] = yf_data["Log Return"].rolling(i).kurt()

    yf_data[f"Rolling Vol Mean {i}"] = yf_data["Log Return Sq"].rolling(i).mean()
    yf_data[f"Rolling Vol Std {i}"] = yf_data["Log Return Sq"].rolling(i).std()
    yf_data[f"Rolling Vol Skew {i}"] = yf_data["Log Return Sq"].rolling(i).skew()
    yf_data[f"Rolling Vol Kurt {i}"] = yf_data["Log Return Sq"].rolling(i).kurt()



high_low = yf_data["High"] - yf_data["Low"]
high_close = (yf_data["High"] - yf_data["Close"].shift()).abs()
low_close = (yf_data["Low"]  - yf_data["Close"].shift()).abs()
tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
yf_data["ATR 14"] = tr.rolling(14).mean()

mid = yf_data["Close"].rolling(20).mean()
std = yf_data["Close"].rolling(20).std()
yf_data["BB Width"] = (mid + 2*std - (mid - 2*std)) / mid

delta = yf_data["Close"].diff()
up = delta.clip(lower=0)
down = -delta.clip(upper=0)
ema_up = up.ewm(span=14).mean()
ema_dn = down.ewm(span=14).mean()
yf_data["RSI 14"] = 100 - (100 / (1 + ema_up/ema_dn))

# close z score in recent window
yf_data["Z Score"] = (yf_data["Close"] - yf_data["Close"].rolling(lookback).mean()) / yf_data["Close"].rolling(lookback).std()

yf_data = yf_data[max(num_rolling):]             

In [5]:
yf_data.columns

Index(['Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume', 'Log Return',
       'Log Return Sq', 'Rolling Mean 15', 'Rolling Std 15', 'Rolling Skew 15',
       'Rolling Kurt 15', 'Rolling Vol Mean 15', 'Rolling Vol Std 15',
       'Rolling Vol Skew 15', 'Rolling Vol Kurt 15', 'Rolling Mean 30',
       'Rolling Std 30', 'Rolling Skew 30', 'Rolling Kurt 30',
       'Rolling Vol Mean 30', 'Rolling Vol Std 30', 'Rolling Vol Skew 30',
       'Rolling Vol Kurt 30', 'Rolling Mean 60', 'Rolling Std 60',
       'Rolling Skew 60', 'Rolling Kurt 60', 'Rolling Vol Mean 60',
       'Rolling Vol Std 60', 'Rolling Vol Skew 60', 'Rolling Vol Kurt 60',
       'Rolling Mean 180', 'Rolling Std 180', 'Rolling Skew 180',
       'Rolling Kurt 180', 'Rolling Vol Mean 180', 'Rolling Vol Std 180',
       'Rolling Vol Skew 180', 'Rolling Vol Kurt 180', 'ATR 14', 'BB Width',
       'RSI 14', 'Z Score'],
      dtype='object')

In [None]:
features = [
    "Log Return",
    "Log Return Sq",
    # "ATR 14",
    # "BB Width",
    # "RSI 14",
    # "Z Score"
]


features += [f"Rolling Mean {i}" for i in num_rolling]
features += [f"Rolling Std {i}" for i in num_rolling]
features += [f"Rolling Skew {i}" for i in num_rolling]
features += [f"Rolling Kurt {i}" for i in num_rolling]

features += [f"Rolling Vol Mean {i}" for i in num_rolling]
features += [f"Rolling Vol Std {i}" for i in num_rolling]
features += [f"Rolling Vol Skew {i}" for i in num_rolling]
features += [f"Rolling Vol Kurt {i}" for i in num_rolling]

# features += [f"VolVol {i}" for i in num_rolling]
# features += [f"Vol Mean {i}" for i in num_rolling]
#features += ["Vol Mean", "Vol Vol"]


# features += [f"Log Return Lag {i}" for i in range(1, num_lags + 1)]
# features += [f"Log Return Sq Lag {i}" for i in range(1, num_lags + 1)]

In [7]:
df = yf_data[features].copy()

#target_scaler = MinMaxScaler()
df["reg_y1"] = yf_data["Log Return"].copy().shift(-1)
df["reg_y2"] = np.sqrt("Log Return Sq").shift(-1)
df = df[:-1]

TypeError: ufunc 'sqrt' not supported for the input types, and the inputs could not be safely coerced to any supported types according to the casting rule ''safe''

In [None]:
# # qcut instad
#num_classes = 3
q = [0, 0.6, 1]
num_classes = len(q) - 1

#class_target_scaler = StandardScaler()
#class_target_scaler.fit_transform(yf_data["Log Return Sq"].values.reshape(-1, 1))

yf_data["Vol"] = yf_data["Log Return"].rolling(5).std()

df["target reg"] = yf_data["Vol"].copy().shift(-1)

df["target class"] = pd.qcut(yf_data["Vol"].copy(), q, labels=[i for i in range(num_classes)]).shift(-1)


df = df[:-1]

target_type = "class"

if target_type == "reg":
    #targets = ["reg_y1", "reg_y2"]
    targets = ["target reg"] # vol

else:
    targets = ["target class"]

df[targets].describe()

Unnamed: 0,target class
count,7365
unique,2
top,0
freq,4420


In [None]:
df.groupby("target class")["target reg"].describe()

  df.groupby("target class")["target reg"].describe()


Unnamed: 0_level_0,count,mean,std,min,25%,50%,75%,max
target class,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
0,4420.0,0.00581,0.002128,0.000425,0.004149,0.005812,0.007554,0.009631
1,2945.0,0.016162,0.008534,0.009633,0.011318,0.01365,0.01776,0.092075


In [None]:
yf_data["Log Return Sq"].values

array([1.07331132e-04, 1.35009754e-05, 1.73489851e-05, ...,
       1.05709923e-03, 4.33258734e-05, 1.63127929e-06])

In [None]:
df[targets].value_counts()

target class
0               4420
1               2945
Name: count, dtype: int64

In [None]:
len(features)

34

In [None]:
#plt.hist(df["Volatility"], bins=100)
# colour x axis by class
# plt.hist(df[df["target class"] == 0]["reg_y1"], bins=50, alpha=0.5, density=True)
# plt.hist(df[df["target class"] == 1]["reg_y1"], bins=50, alpha=0.5, density=True)

if target_type == "reg":
    for i in np.unique(df["target class"]):
        plt.hist(df[df["target class"] == i]["reg_y1"], bins=100, alpha=0.5, density=False)
    plt.title("Log Return Distribution Classificaiton")
    plt.xlabel("y1")
    plt.ylabel("Density")
    plt.legend([f"Class {i}" for i in np.unique(df["target class"])])
    plt.show()

    for i in np.unique(df["target class"]):
        plt.hist(df[df["target class"] == i]["reg_y2"], bins=100, alpha=0.5, density=True)
    plt.title("Volatility Distribution Classificaiton")
    plt.xlabel("y2")
    plt.ylabel("Density")
    plt.legend([f"Class {i}" for i in np.unique(df["target class"])])
    plt.show()

In [None]:
df[features].isna().sum().sum()

np.int64(0)

In [None]:
df[targets].isna().sum().sum()

np.int64(3)

In [None]:
batch_size = 32
lookback = 60
n_features = len(features)

def create_dataset(data, target, lookback=1):
    X, y = [], []
    for i in range(len(data) - lookback):
        X.append(data[i:(i + lookback)])
        y.append(target[i + lookback])
    return np.array(X), np.array(y)

model = keras.Sequential()
model.add(Input(shape=(lookback, n_features)))
model.add(Conv1D(32, 3, activation="relu", padding="same"))

model.add(LayerNormalization())
model.add(Dropout(0.2))

model.add(LSTM(24, return_sequences=True))
model.add(LSTM(12, return_sequences=False))

model.add(Dense(16, activation="relu"))  
model.add(Dropout(0.2))                    

if target_type == "reg":
    model.add(Dense(len(targets), activation="linear")) # log return, vol
else:
    model.add(Dense(len(np.unique(df["target class"])), activation="softmax")) # class output
    
if target_type == "reg":
    metric = "mape"
    model.compile(optimizer=Adam(3e-4), loss="mean_squared_error", metrics=[metric]) # reg compile
else:
    metric = "accuracy"
    model.compile(optimizer=Adam(1e-4), loss="sparse_categorical_crossentropy", metrics=[metric]) # class compile

model.summary()

I0000 00:00:1747303209.581149  437505 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 5529 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4060 Ti, pci bus id: 0000:07:00.0, compute capability: 8.9


In [None]:
from tensorflow.keras.utils import plot_model

plot_model(
    model,
    to_file=f"model_images/{target_type}.png",
    show_shapes=True,
    show_layer_names=True,
    show_dtype=False,
    rankdir="TB",
    dpi=96
)

You must install graphviz (see instructions at https://graphviz.gitlab.io/download/) for `plot_model` to work.


In [None]:
target_type

'class'

In [None]:
train, test = train_test_split(df, test_size=0.2, shuffle=False)
val, test = train_test_split(test, test_size=0.5, shuffle=False)

feature_scaler = StandardScaler()
train[features] = feature_scaler.fit_transform(train[features])
test[features] = feature_scaler.transform(test[features])
val[features] = feature_scaler.transform(val[features])

# if target_type == "reg":
#     target_scaler = StandardScaler()
#     train[targets] = target_scaler.fit_transform(train[targets])
#     test[targets] = target_scaler.transform(test[targets])
#     val[targets] = target_scaler.transform(val[targets])

# make sure the data length is divisible by the batch size
# train = train.iloc[:len(train) - len(train) % batch_size]
# test = test.iloc[:len(test) - len(test) % batch_size]
# val = val.iloc[:len(val) - len(val) % batch_size]

train.shape[0] / batch_size, test.shape[0] / batch_size, val.shape[0] / batch_size

(184.1875, 23.03125, 23.03125)

In [None]:
train.index.max() < val.index.min(), val.index.max() < test.index.min()

(True, True)

In [None]:
df.isna().sum().sum()

np.int64(6)

In [None]:
X_train, y_train = create_dataset(train[features].values, train[targets].values, lookback)
X_test, y_test = create_dataset(test[features].values, test[targets].values, lookback)
X_val, y_val = create_dataset(val[features].values, val[targets].values, lookback)

In [None]:
np.unique(y_test[:, 0])

array([0, 1])

In [None]:
# rename all class 0 to class 1 in 3 class
# y_train[y_train == 0] = 1
# y_test[y_test == 0] = 1
# y_val[y_val == 0] = 1

In [None]:
np.unique(y_test[:, 0])

array([0, 1])

In [None]:
if target_type != "reg":
    # Assuming y_train is your 1D array of class labels
    class_weights = class_weight.compute_class_weight(
        class_weight='balanced',
        classes=np.unique(y_train),
        y=y_train[:,0]
    )
    # Convert to dictionary format required by Keras
    class_weight_dict = dict(enumerate(class_weights))

In [None]:
#np.unique(y_train[:, 0]), class_weight_dict

In [None]:
# Train the model
early_stopping = EarlyStopping(monitor=f"val_{metric}", patience=3, restore_best_weights=True)
if target_type == "reg":
    history = model.fit(X_train, y_train, epochs=100, batch_size=batch_size, validation_data=(X_val, y_val), callbacks=[early_stopping], verbose=1)
else:
    history = model.fit(X_train, y_train, epochs=100, batch_size=batch_size, validation_data=(X_val, y_val), callbacks=[early_stopping], verbose=1, class_weight=class_weight_dict)
    
# Evaluate the mode;
loss, metric = model.evaluate(X_test, y_test, batch_size=batch_size)
print(f"Test Loss: {loss:.4f}")
print(f"Test Metric: {metric:.4f}")

Epoch 1/100


I0000 00:00:1747303212.465879  437748 cuda_dnn.cc:529] Loaded cuDNN version 90701


[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 36ms/step - accuracy: 0.5510 - loss: 1.0457 - val_accuracy: 0.7548 - val_loss: 0.8763
Epoch 2/100
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 34ms/step - accuracy: 0.7532 - loss: 0.8668 - val_accuracy: 0.7637 - val_loss: 0.7346
Epoch 3/100
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 36ms/step - accuracy: 0.7659 - loss: 0.7230 - val_accuracy: 0.7651 - val_loss: 0.6453
Epoch 4/100
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 35ms/step - accuracy: 0.7807 - loss: 0.6202 - val_accuracy: 0.7563 - val_loss: 0.5882
Epoch 5/100
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 33ms/step - accuracy: 0.7762 - loss: 0.5538 - val_accuracy: 0.7563 - val_loss: 0.5516
Epoch 6/100
[1m183/183[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 35ms/step - accuracy: 0.7766 - loss: 0.5157 - val_accuracy: 0.7622 - val_loss: 0.5262
[1m22/22[0m [32m━━━━━━━━

In [None]:
from sklearn.metrics import classification_report

if target_type != "reg":
    y_pred = np.argmax(model.predict(X_test), axis=1)
    print(classification_report(y_test, y_pred, digits=3))
else:
    from sklearn.metrics import (mean_absolute_error, mean_squared_error,
                             mean_absolute_percentage_error, r2_score,
                             explained_variance_score)

    def regression_report(y_true, y_pred, digits: int = 3):
        """Return a DataFrame with the main regression metrics."""
        mae  = mean_absolute_error(y_true, y_pred)
        mse  = mean_squared_error(y_true, y_pred)
        rmse = np.sqrt(mse)
        mape = mean_absolute_percentage_error(y_true, y_pred)
        r2   = r2_score(y_true, y_pred)
        evs  = explained_variance_score(y_true, y_pred)

        data = {
            "MAE":  mae,
            "RMSE": rmse,
            "R²":   r2,
            "MAPE": mape,
            "Expl. Var.": evs
        }
        return pd.DataFrame(data, index=["score"]).round(digits)

    # usage
    y_pred = model.predict(X_test).ravel()
    report = regression_report(y_test, y_pred)
    print(report)

    print(f"Baseline R2: {r2_score(y_test[:-1], y_test[1:])}")


[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 22ms/step
              precision    recall  f1-score   support

           0      0.828     0.846     0.837       455
           1      0.670     0.640     0.654       222

    accuracy                          0.778       677
   macro avg      0.749     0.743     0.746       677
weighted avg      0.776     0.778     0.777       677



In [None]:
# save model
model.save(f"keras_models/{target_type}.keras")

In [None]:
# save scaler
import pickle
with open(f"scalers/{target_type}_features.pkl", "wb") as f:
    pickle.dump(feature_scaler, f)

# if target_type == "reg":
#     with open(f"scalers/{target_type}_target.pkl", "wb") as f:
#         pickle.dump(target_scaler, f)