In [15]:
import os
import joblib
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping
from keras_tuner import RandomSearch
from tensorflow.keras import layers, models
from tensorflow import keras
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import matplotlib.pyplot as plt

# Ensure TensorFlow uses GPU if available
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

def preprocess_data(file_path):
    df_stock = pd.read_csv(file_path)

    # Convert 'Date' column to datetime
    df_stock['Date'] = pd.to_datetime(df_stock['Date'], format='%Y-%m-%d')

    # Remove '%' and convert 'Percent Change' to float, handling errors
    df_stock['Percent Change'] = df_stock['Percent Change'].str.replace('%', '').apply(pd.to_numeric, errors='coerce')

    # Remove commas from 'Volume' and convert to float
    df_stock['Volume'] = df_stock['Volume'].astype(str).str.replace(',', '').apply(pd.to_numeric, errors='coerce')
    # Create additional features
    df_stock['day_of_week'] = df_stock['Date'].dt.dayofweek
    df_stock['month'] = df_stock['Date'].dt.month

    # Select features and target
    features = ['Close', 'day_of_week', 'month']
    target = 'Close'

    # Normalize data
    scaler = MinMaxScaler()
    df_stock[features] = scaler.fit_transform(df_stock[features])

    # Save the scaler per stock
    scaler_save_path = file_path.replace('.csv', '_scaler.pkl')
    joblib.dump(scaler, scaler_save_path)

    return df_stock, features, target, scaler

def df_to_X_y(df, features, target, window_size=5):
    X, y = [], []
    for i in range(len(df) - window_size):
        X.append(df[features].iloc[i:i+window_size].values)
        y.append(df[target].iloc[i+window_size])
    return np.array(X), np.array(y)

def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0.1):
        x = layers.LayerNormalization(epsilon=1e-6)(inputs)
        x = layers.MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
        x = layers.Dropout(dropout)(x)
        res = x + inputs

        x = layers.LayerNormalization(epsilon=1e-6)(res)
        x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(x)
        x = layers.Dropout(dropout)(x)
        x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
        return x + res

def gated_residual_network(inputs, units): 
        x = layers.Dense(units, activation='relu')(inputs) 
        x = layers.Dense(inputs.shape[-1])(x) 
        gate = layers.Dense(inputs.shape[-1], activation='sigmoid')(inputs) 
        return x * gate + inputs

WINDOW_SIZE = 5

# Function to calculate the percentage error for each prediction
def calculate_accuracy(y_true, y_pred, threshold=0.05):
    # Calculate percentage error
    percentage_error = np.abs((y_true - y_pred) / y_true)
    
    # Count how many predictions are within the threshold (e.g., 5%)
    accurate_predictions = np.sum(percentage_error <= threshold)
    
    # Calculate the accuracy as the proportion of accurate predictions
    accuracy = accurate_predictions / len(y_true)
    return accuracy
def build_model(hp, input_shape):
        input_shape = (WINDOW_SIZE, 3)
        inputs = layers.Input(shape=input_shape)
        x = inputs
        x = gated_residual_network(x, units=hp.Int('grn_units', min_value=32, max_value=128, step=32))
        x = layers.LSTM(units=50, return_sequences=True)(x)
        x = layers.LSTM(units=50, return_sequences=True)(x)
        for i in range(hp.Int('num_transformer_blocks', 2, 8, 2)):
            x = transformer_encoder(
                x,
                head_size=hp.Int('head_size', 8, 256, 32),
                num_heads=hp.Int('num_heads', 2, 16),
                ff_dim=hp.Int('ff_dim', 4, 64),
                dropout=hp.Float(f'dropout_{i}', 0.1, 0.6)
            )
        x = layers.GlobalAveragePooling1D()(x)
        for i in range(hp.Int('num_mlp_layers', 1, 3)):
            x = layers.Dense(hp.Int(f'mlp_units_{i}', 32, 256, 32))(x)
            x = layers.Activation('relu')(x)
            x = layers.Dropout(hp.Float(f'mlp_dropout_{i}', 0.1, 0.6))(x)
        outputs = layers.Dense(1)(x)
        model = models.Model(inputs, outputs)
        optimizer_name = hp.Choice('optimizer', ['adam', 'adamax'])
        learning_rate = hp.Float('learning_rate', 1e-4, 1e-2, sampling='LOG')
        if optimizer_name == 'adam':
            optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
        else:
            optimizer = keras.optimizers.Adamax(learning_rate=learning_rate)
        model.compile(
            optimizer=optimizer, 
            loss='mean_absolute_error', 
            metrics=['mean_absolute_error', 'mean_squared_error']
        )
        return model


# After predicting with your trained model
def evaluate_model(model, X_test, y_test):
    # Predict using the trained model
    y_pred = model.predict(X_test)

    # Calculate various metrics
    mae = mean_absolute_error(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    # Calculate accuracy (percentage within 5% threshold)
    accuracy = calculate_accuracy(y_test, y_pred, threshold=0.05)

    print(f"Mean Absolute Error: {mae}")
    print(f"Mean Squared Error: {mse}")
    print(f"R² Score: {r2}")
    print(f"Accuracy (within 5%): {accuracy * 100:.2f}%")
    
    return mae, mse, r2, accuracy



def train_model(X_train, y_train, input_shape):
    tuner = RandomSearch(lambda hp: build_model(hp, input_shape),
                         objective='mean_absolute_error',
                         max_trials=5,
                         executions_per_trial=2,
                         directory='hyperparam_tuning',
                         project_name='tft_stock')

    tuner.search(X_train, y_train, epochs=50, validation_split=0.2, callbacks=[EarlyStopping(patience=5)])
    
    best_model = tuner.get_best_models(num_models=1)[0]
    # Train the best model again on full training data and store history
    history = best_model.fit(X_train, y_train, epochs=50, validation_split=0.2, callbacks=[EarlyStopping(patience=5)])
    return best_model



def plot_training_history(history):
    metrics = ['loss', 'mean_absolute_error', 'mean_squared_error']
    
    plt.figure(figsize=(12, 4))
    
    for i, metric in enumerate(metrics):
        plt.subplot(1, 3, i+1)
        plt.plot(history.history[metric], label=f'Training {metric}')
        plt.plot(history.history[f'val_{metric}'], label=f'Validation {metric}')
        plt.xlabel('Epochs')
        plt.ylabel(metric.replace('_', ' ').title())
        plt.legend()
        plt.title(f'{metric.replace("_", " ").title()} Over Epochs')

    plt.tight_layout()
    plt.show()

# Example of using it after training a model
def train_and_evaluate(data_directory):
    for file_name in os.listdir(data_directory):
        if file_name.endswith('.csv'):
            stock_name = os.path.splitext(file_name)[0]
            file_path = os.path.join(data_directory, file_name)
            print(f"Processing {stock_name}...")

            # Preprocess data
            df, features, target, scaler = preprocess_data(file_path)

            # Prepare sequences
            X, y = df_to_X_y(df, features, target)

            if len(X) == 0:
                print(f"Not enough data to train for {stock_name}. Skipping...")
                continue

            # Split data into training and test sets (80% training, 20% testing)
            train_size = int(len(X) * 0.8)
            X_train, X_test = X[:train_size], X[train_size:]
            y_train, y_test = y[:train_size], y[train_size:]

            # Train the model
            input_shape = (X.shape[1], X.shape[2])
            model, history = train_model(X_train, y_train, input_shape)

            # Evaluate the model on the test set
            evaluate_model(model, X_test, y_test)

            plot_training_history(history)

            # Save the model for later use
            model_save_path = os.path.join(data_directory, f"{stock_name}_tft_model.keras")
            model.save(model_save_path)
            print(f"Model for {stock_name} saved at {model_save_path}")

if __name__ == "__main__":
    data_directory = './NULB/'  # Replace with the actual path
    train_and_evaluate(data_directory)


Trial 5 Complete [00h 00m 44s]
mean_absolute_error: 0.08044939115643501

Best mean_absolute_error So Far: 0.06830994598567486
Total elapsed time: 00h 03m 28s
Model for NUBL saved at ./NULB/NUBL_tft_model.keras


  saveable.load_own_variables(weights_store.get(inner_path))
