# Cross Validation

In [None]:
import os
import numpy as np
import pandas as pd

from sklearn.model_selection import TimeSeriesSplit
from tensorflow.keras.metrics import MeanSquaredError, RootMeanSquaredError, MeanAbsoluteError
from src.LongShortTermMemory import LSTMModel

from src.StockDataProcessor import StockDataProcessor
from src.LongShortTermMemory import LSTMModel

In [None]:
class CrossValidator:
    """
    Performs time series cross-validation for LSTM models.
    """

    def __init__(self, n_splits=5):
        """
        Initializes the cross-validator.

        Args:
            n_splits: The number of splits for cross-validation.
        """
        self.n_splits = n_splits
        self.tscv = TimeSeriesSplit(n_splits=self.n_splits)

    def cross_val_score(self, model_creator, x_data, y_data, epochs=50, batch_size=32):
        """
        Performs cross-validation and returns the evaluation scores.

        Args:
            model_creator: A function that returns a compiled Keras model.
            X_data: The preprocessed input data (sequences) for the model.
            y_data: The preprocessed target data for the model. 
            epochs: The number of epochs for training.
            batch_size: The batch size for training.

        Returns:
            list: A list of dictionaries containing evaluation metrics for each fold.
        """
        all_scores = []

        for fold, (train_index, test_index) in enumerate(self.tscv.split(x_data)):
            print(f"Fold: {fold + 1}")

            # --- Data Splitting for This Fold --- 
            X_train, X_test = x_data[train_index], x_data[test_index]
            y_train, y_test = y_data[train_index], y_data[test_index]

            # --- Model Training and Evaluation ---
            model = model_creator()
            model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, verbose=0)
            scores = self.evaluate_model(model, X_test, y_test)
            all_scores.append(scores)

        return all_scores


    def evaluate_model(self, model, X_test, y_test):
        """
        Evaluates the model and returns a dictionary of metrics.
        """
        y_pred = model.predict(X_test)
        
        mse = MeanSquaredError()(y_test, y_pred).numpy()
        rmse = RootMeanSquaredError()(y_test, y_pred).numpy() # np.sqrt(mse)  
        mae = MeanAbsoluteError()(y_test, y_pred).numpy()

        return {'mse': mse, 'rmse': rmse, 'mae': mae}

In [None]:
# Configureation
EPOCHS = 100
BATCH_SIZE = 10
TIME_STEPS = 60

FOLDER_PREFIX = "data/min/"
TOKEN = "GOOG"
RUN_FOLDER = f"{FOLDER_PREFIX}{TOKEN}/"
WORK_DIR = os.path.join(os.getcwd(), RUN_FOLDER)
CSV_FILE = f"{WORK_DIR}data.csv"

PROJECT_FOLDER = os.path.join(os.getcwd(), RUN_FOLDER)
if not os.path.exists(PROJECT_FOLDER):
    os.makedirs(PROJECT_FOLDER)

In [None]:
# --- Load Data & Determine Dates ---
data = pd.read_csv(CSV_FILE, parse_dates=['Datetime'])

total_rows = len(data)
validation_date_index = int(total_rows * 0.65)
validation_date = data['Datetime'].iloc[validation_date_index]

In [None]:
# Load data
(x_train, y_train), (x_test, y_test), (training_data, test_data) = StockDataProcessor.load_csv_transform_to_numpy(TIME_STEPS, CSV_FILE, validation_date)
x_data = np.concatenate((x_train, x_test), axis=0)
y_data = np.concatenate((y_train, y_test), axis=0)

In [None]:
# Initilize model
lstm = LSTMModel(x_train.shape, EPOCHS, BATCH_SIZE, PROJECT_FOLDER)

In [None]:
# Create a CrossValidator instance
cross_validator = CrossValidator(n_splits=5)

In [None]:
# Perform cross-validation (pass raw data and time_steps)
all_scores = cross_validator.cross_val_score(
    LSTMModel.create, 
    x_data,
    y_data,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE
)

In [None]:
# Print the results for each fold
for fold, scores in enumerate(all_scores):
    print(f"Fold {fold + 1}: MSE={scores['mse']:.4f}, RMSE={scores['rmse']:.4f}, MAE={scores['mae']:.4f}")

# Calculate and print the average scores across all folds
average_scores = {metric: np.mean([fold_scores[metric] for fold_scores in all_scores]) for metric in all_scores[0]}
print(f"Average: MSE={average_scores['mse']:.4f}, RMSE={average_scores['rmse']:.4f}, MAE={average_scores['mae']:.4f}")