In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from datetime import datetime
from datetime import timedelta

In [None]:
# The dataset has been converted into batches via "split_csv.ipynb". 1 batch contains 10 stocks ipynb files. 
# Change the batch folder to do training model and predictions for certain stocks. 
BATCH_FOLDER = 'batch_26'

TRAINING_MODEL_PATH = '../trainingModel'
DATASET_PATH = f'../trainingDataset/{BATCH_FOLDER}'
LOG_DIR = '../trainingLogs'
ACCURACY_THRESHOLD = 90
MAX_RETRIES = 3
WINDOW_SIZE = 32
BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 1000
SPLIT_TIME = 3000

In [None]:
def normalize_feature(data):
    mean = np.mean(data)
    std = np.std(data)
    normalized_data = (data - mean) / std
    return normalized_data, (mean, std)

In [None]:
def denormalize_data(data, stats):
    stats = np.array(stats)
    means = stats[:, 0]
    stds = stats[:, 1]
    return data * stds + means

In [None]:
def model_forecast(model, data, window_size):
    forecast = []
    for time in range(len(data) - window_size):
        forecast.append(model.predict(data[time:time + window_size][np.newaxis]))
    return np.array(forecast).squeeze()

In [None]:
def parse_data_from_file(filename):
    data = np.loadtxt(filename, delimiter=',', skiprows=1, usecols=(1, 2, 3))
    low, high, close = data[:, 0], data[:, 1], data[:, 2]

    low_normalized, stats_low = normalize_feature(low)
    high_normalized, stats_high = normalize_feature(high)
    close_normalized, stats_close = normalize_feature(close)

    features = np.stack([low_normalized, high_normalized, close_normalized], axis=1)
    times = np.arange(len(data))

    return times, features, stats_low, stats_high, stats_close

In [None]:
def windowed_dataset(series, window_size):
    dataset = tf.data.Dataset.from_tensor_slices(series)
    dataset = dataset.window(window_size + 1, shift=1, drop_remainder=True)
    dataset = dataset.flat_map(lambda window: window.batch(window_size + 1))
    dataset = dataset.shuffle(SHUFFLE_BUFFER_SIZE)
    dataset = dataset.map(lambda window: (window[:-1], window[-1]))
    dataset = dataset.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
    return dataset

In [None]:
def train_val_split(time, data, split_time=SPLIT_TIME):
    return time[:split_time], data[:split_time], time[split_time:], data[split_time:]

In [None]:
def create_model():
    model = tf.keras.models.Sequential([
        tf.keras.Input(shape=(WINDOW_SIZE, 3)),
        tf.keras.layers.Conv1D(filters=32, kernel_size=3, strides=1, padding="causal", activation="relu"),
        tf.keras.layers.LSTM(64, return_sequences=True, dropout=0.3, recurrent_dropout=0.2),
        tf.keras.layers.LSTM(32, dropout=0.2, recurrent_dropout=0.1),
        tf.keras.layers.Dense(16, activation="relu", kernel_regularizer=tf.keras.regularizers.l2(0.01)),
        tf.keras.layers.Dense(3)
    ])

    optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)

    model.compile(
        loss=tf.keras.losses.Huber(delta=1.0),
        optimizer=optimizer,
        metrics=["mae"]
    )

    return model

In [None]:
def calculate_accuracy(true_values, predicted_values, tolerance=0.05):
    differences = np.abs(true_values - predicted_values)
    within_tolerance = differences <= (tolerance * true_values)
    accuracy = np.mean(within_tolerance) * 100
    return accuracy

In [None]:
def compute_metrics(true_series, forecast):
    """Computes MSE and MAE metrics for the forecast"""
    mse = tf.keras.losses.MSE(true_series, forecast)
    mae = tf.keras.losses.MAE(true_series, forecast)
    return mse, mae