In [None]:

from __future__ import annotations
import os, gc, json, csv, itertools, logging, sys
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple, List
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, regularizers, backend as K
from sklearn.preprocessing import StandardScaler

SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)

CSV_PATH      = Path(r"C:/Users/matti/OneDrive/Thesis/Preprocessing/dataset1.csv")
TARGET        = "next_close"
TEST_FR       = 0.10
N_SPLITS      = 3
LOOKBACK_SET  = [3, 5, 8]
EPOCHS        = 50
PATIENCE      = 5
BATCH_SIZE    = 32

#Grid
HP_SPACE: Dict[str, List] = {
    "LSTM_UNITS":   [64, 96, 128, 192],
    "L2_REG":       [1e-5, 1e-4],
    "DROPOUT":      [0.05, 0.10, 0.20],
    "REC_DROPOUT":  [0.05, 0.10],
    "LR":           [5e-4, 1.1e-3, 2e-3],
}

#Logging
LOG_DIR = Path("logs3")
LOG_DIR.mkdir(exist_ok=True)
stamp   = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = LOG_DIR / f"run_{stamp}.log"
logging.basicConfig(
    level=logging.INFO,
    handlers=[logging.FileHandler(log_path), logging.StreamHandler(sys.stdout)],
    format="%(asctime)s | %(levelname)s | %(message)s",
)
logging.info("Logging to %s", log_path)

#Eval metrics
def rmse(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    return float(np.sqrt(np.mean(np.square(y_true - y_pred))))

def mda(y_true: np.ndarray, y_pred: np.ndarray, y_prev: np.ndarray) -> float:
    return float(np.mean(np.sign(y_pred - y_prev) == np.sign(y_true - y_prev)))

# Clean data (like in GRU)
def prepare_dataset(path: Path, lookback: int):
    df = (pd.read_csv(path, parse_dates=["date"])
            .sort_values("date").set_index("date"))

    df[TARGET] = df["close"].shift(-1)
    df.dropna(inplace=True)

    feature_cols = df.columns.difference([TARGET])
    n_test = int(len(df) * TEST_FR)
    df_tv = df.iloc[:-n_test]
    df_te = df.iloc[-n_test:]

    def make_seq(df_part: pd.DataFrame, lb: int):
        X_all = df_part[feature_cols].values
        y_all = df_part[[TARGET]].values.squeeze()
        prev = df_part["close"].values
        X_seq, y_seq, prev_seq = [], [], []
        for t in range(lb, len(df_part)):
            X_seq.append(X_all[t-lb:t])
            y_seq.append(y_all[t])
            prev_seq.append(prev[t-1])
        return np.array(X_seq), np.array(y_seq), np.array(prev_seq)

    #output unscaled
    X_tv_raw, y_tv_raw, prev_tv = make_seq(df_tv, lookback)
    X_te_raw, y_te_raw, prev_te = make_seq(df_te, lookback)

    return X_tv_raw, y_tv_raw, prev_tv, X_te_raw, y_te_raw, prev_te


def walk_forward_splits(n_samples: int, n_splits: int):
    fold = n_samples // (n_splits + 1)
    for i in range(1, n_splits + 1):
        tr_end, va_end = fold * i, fold * (i + 1)
        yield np.arange(tr_end), np.arange(tr_end, va_end)

#architecture
def build_model(shape: Tuple[int, int], hp: Dict) -> tf.keras.Model:
    model = tf.keras.Sequential([
        layers.Input(shape=shape),
        layers.LSTM(
            hp["LSTM_UNITS"],
            dropout=hp["DROPOUT"],
            recurrent_dropout=hp["REC_DROPOUT"],
            kernel_regularizer=regularizers.l2(hp["L2_REG"]),
            recurrent_regularizer=regularizers.l2(hp["L2_REG"]),
            bias_regularizer=regularizers.l2(hp["L2_REG"]),
        ),
        layers.Dense(1),
    ])
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=hp["LR"], clipnorm=1.0),
        loss="mse",
    )
    return model

def grid_search(X_raw, y_raw, shape, lookback: int):
    best_hp, best_rmse = None, np.inf
    for values in itertools.product(*HP_SPACE.values()):
        hp = dict(zip(HP_SPACE.keys(), values))
        rmses = []

        for tr_idx, va_idx in walk_forward_splits(len(X_raw), N_SPLITS):
            #Fit scalers only on training fold
            scaler_X = StandardScaler().fit(X_raw[tr_idx].reshape(-1, X_raw.shape[-1]))
            scaler_y = StandardScaler().fit(y_raw[tr_idx].reshape(-1, 1))

            #Transform training and validation sets
            X_tr = scaler_X.transform(X_raw[tr_idx].reshape(-1, X_raw.shape[-1])).reshape(-1, lookback, X_raw.shape[-1])
            y_tr = scaler_y.transform(y_raw[tr_idx].reshape(-1, 1)).squeeze()

            X_va = scaler_X.transform(X_raw[va_idx].reshape(-1, X_raw.shape[-1])).reshape(-1, lookback, X_raw.shape[-1])
            y_va = scaler_y.transform(y_raw[va_idx].reshape(-1, 1)).squeeze()

            model = build_model(shape, hp)
            es = tf.keras.callbacks.EarlyStopping(
                monitor="val_loss", patience=PATIENCE, restore_best_weights=True, verbose=0)

            model.fit(X_tr, y_tr,
                      validation_data=(X_va, y_va),
                      epochs=EPOCHS, batch_size=BATCH_SIZE, verbose=0, callbacks=[es])

            #Inverse-transform predictions for RMSE
            y_pred = model.predict(X_va, verbose=0).squeeze()
            y_pred_inv = scaler_y.inverse_transform(y_pred.reshape(-1, 1)).squeeze()
            y_true_inv = scaler_y.inverse_transform(y_va.reshape(-1, 1)).squeeze()
            rmses.append(rmse(y_true_inv, y_pred_inv))

            K.clear_session(); gc.collect()

        avg = float(np.mean(rmses))
        if avg < best_rmse:
            best_rmse, best_hp = avg, hp

    return best_hp, best_rmse


#Main Run function
def run_experiment(csv_path: Path):
    summary_csv = LOG_DIR / "summary2.csv"
    write_header = not summary_csv.exists()
    best_overall = {}; best_rmse = np.inf; best_model = None

    for lb in LOOKBACK_SET:
        logging.info("=== LOOKBACK %d ===", lb)

        X_tv_raw, y_tv_raw, prev_tv, X_te_raw, y_te_raw, prev_te = prepare_dataset(csv_path, lb)

        #Tune
        hp, cv_rmse = grid_search(X_tv_raw, y_tv_raw, X_tv_raw.shape[1:], lb)
        logging.info("CV RMSE %.4f with %s", cv_rmse, hp)

        #final scaler fit on full training data
        scaler_X = StandardScaler().fit(X_tv_raw.reshape(-1, X_tv_raw.shape[-1]))
        scaler_y = StandardScaler().fit(y_tv_raw.reshape(-1, 1))

        X_tv = scaler_X.transform(X_tv_raw.reshape(-1, X_tv_raw.shape[-1])).reshape(X_tv_raw.shape)
        y_tv = scaler_y.transform(y_tv_raw.reshape(-1, 1)).squeeze()

        X_te = scaler_X.transform(X_te_raw.reshape(-1, X_te_raw.shape[-1])).reshape(X_te_raw.shape)
        y_te = scaler_y.transform(y_te_raw.reshape(-1, 1)).squeeze()

        #final model training
        model = build_model(X_tv.shape[1:], hp)
        es = tf.keras.callbacks.EarlyStopping(monitor="loss", patience=PATIENCE,
                                              restore_best_weights=True, verbose=0)
        model.fit(X_tv, y_tv, epochs=EPOCHS, batch_size=BATCH_SIZE,
                  verbose=0, callbacks=[es])

        #final test evaluation
        y_pred = model.predict(X_te, verbose=0).squeeze()
        y_pred_inv = scaler_y.inverse_transform(y_pred.reshape(-1, 1)).squeeze()
        y_true_inv = scaler_y.inverse_transform(y_te.reshape(-1, 1)).squeeze()

        test_rmse = rmse(y_true_inv, y_pred_inv)
        test_mda  = mda(y_true_inv, y_pred_inv, prev_te)
        logging.info("TEST - RMSE %.4f | MDA %.3f", test_rmse, test_mda)

        #save
        with summary_csv.open("a", newline="") as fp:
            w = csv.writer(fp)
            if write_header:
                w.writerow(["timestamp", "lookback", "rmse", "mda", *sorted(hp.keys())])
                write_header = False
            w.writerow([stamp, lb, test_rmse, test_mda, *[hp[k] for k in sorted(hp)]])

        if test_rmse < best_rmse:
            best_rmse, best_overall, best_model = test_rmse, dict(lookback=lb, hp=hp,
                                                                  rmse=test_rmse, mda=test_mda), model

    #save best config and model
    best_json = LOG_DIR / f"best_config_{stamp}.json"
    best_json.write_text(json.dumps(best_overall, indent=2))
    logging.info("Best config saved to %s", best_json)

    MODEL_DIR = Path("saved_models"); MODEL_DIR.mkdir(exist_ok=True)
    mod_path  = MODEL_DIR / f"lstm_best_{stamp}.keras"
    best_model.save(mod_path)
    logging.info("Best model weights saved to %s", mod_path)

if __name__ == "__main__":
    run_experiment(CSV_PATH)
