In [None]:
# Cell: imports
import tensorflow as tf
import numpy as np
from artifactremoval.models import ComplexSpectralModel
from pathlib import Path
import pickle
import keras_tuner as kt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import numpy as np

from artifactremoval.utils import *

# For reproducibility
tf.random.set_seed(42)
np.random.seed(42)


In [None]:
def load_most_recent_pickle(output_dir, prefix="spectral_train_"):
    # List all matching files
    pickle_files = sorted(
        output_dir.glob(f"{prefix}*.pkl"),
        key=lambda x: x.stat().st_mtime,  # Sort by modification time
        reverse=True  # Most recent first
    )
    
    if not pickle_files:
        raise FileNotFoundError(f"No pickle files found with prefix '{prefix}' in {output_dir}")
    
    most_recent_file = pickle_files[0]
    print(f"Loading most recent file: {most_recent_file.name}")
    
    with open(most_recent_file, "rb") as f:
        data = pickle.load(f)
    
    return data

def build_model(hp):
    # Tunable FC size
    dense_units = hp.Choice("dense_units", [256, 512, 1024, 2048, 4096])

    # Tunable dropout
    dr1 = hp.Float("dropout_rate1", 0.2, 0.6, step=0.1)
    dr2 = hp.Float("dropout_rate2", 0.0, 0.5, step=0.1)

    # Tunable learning‑rate
    lr  = hp.Float("learning_rate", 1e-5, 1e-2, sampling="log")

    model = ComplexSpectralModel().build_main_model(
        dropout_rate1=dr1,
        dropout_rate2=dr2,
        dense_units=dense_units  # add this arg in your class
    )

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model

class MyRandomTuner(kt.RandomSearch):
    def run_trial(self, trial, *fit_args, **fit_kwargs):
        """Override to tune batch_size while keeping Keras‑Tuner happy."""
        hp = trial.hyperparameters
        fit_kwargs["batch_size"] = hp.Choice("batch_size", [32, 64, 128, 256])

        # Add default callbacks / epochs if the user didn't pass them
        fit_kwargs.setdefault(
            "callbacks",
            [tf.keras.callbacks.EarlyStopping("val_loss", patience=3)]
        )
        fit_kwargs.setdefault("epochs", 15)

        # ALWAYS return the result so Tuner can read metrics
        return super().run_trial(trial, *fit_args, **fit_kwargs)

In [None]:
base_dir = Path.cwd().parent
input_dir = base_dir / "data" / "ratings" / "aggregate_data"
train_data = load_most_recent_pickle(input_dir, prefix="spectral_train_")

# --- Filter out entries with no consensus_rating ---
filtered = [e for e in train_data if e.get("consensus_rating") is not None]
print(f"Kept {len(filtered)}/{len(train_data)} spectra with valid labels")

# --- Rebuild X and y from the filtered list ---
X = np.stack([e['raw_spectrum'] for e in filtered])
y = np.array([e['consensus_rating'] for e in filtered])

print("Filtered X shape:", X.shape)   # should be (n_valid, 512)
print("Filtered y shape:", y.shape)   # should be (n_valid,)

# (Optional) If your labels are strings, you can encode them numerically:

le = LabelEncoder()
y_encoded = le.fit_transform(y)
print("Classes:", le.classes_)
print("Encoded y shape:", y_encoded.shape)

In [None]:
# Split into train / val / test
X_train, X_temp, y_train, y_temp = train_test_split(X, y_encoded, test_size=0.3, stratify=y, random_state=42)
X_val,   X_test, y_val,   y_test  = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42)

In [None]:
experiment_name = "randomsearch"
hyperparam_dir = base_dir / "data" / "hyperparam_tuning"
model_dir = base_dir / "data" / "models"

hyperparam_dir.mkdir(parents=True, exist_ok=True)
model_dir.mkdir(parents=True, exist_ok = True)

# ── 1. search ──────────────────────────────────────
tuner = MyRandomTuner(
    build_model,
    objective="val_accuracy",
    max_trials=60,
    directory=str(hyperparam_dir),
    project_name=experiment_name,
)

tuner.reload()                # brings the first 30 trials into memory

tuner.search(
    X_train, y_train,
    validation_data=(X_val, y_val),
)

In [None]:
experiment_name = "randomsearch"
hyperparam_dir = base_dir / "data" / "hyperparam_tuning"
model_dir = base_dir / "data" / "models"

tuner = MyRandomTuner(
    build_model,
    objective="val_accuracy",
    max_trials=60,
    directory=str(hyperparam_dir),
    project_name=experiment_name,
)

tuner.reload() 

# ── 2. pick best HPs ───────────────────────────────
best_hps = tuner.get_best_hyperparameters(1)[0]

# ── 3. build + train final model ───────────────────
best_model = tuner.hypermodel.build(best_hps)
# Option A – try/except
try:
    best_batch = best_hps.get("batch_size")
except ValueError:          # key not tuned
    best_batch = 32

# Option B – read the .values dict (allows a default in one line)
best_batch = best_hps.values.get("batch_size", 32)


history = best_model.fit(
    X_train, y_train,
    validation_data=(X_val, y_val),
    epochs=50,
    batch_size=best_batch,
    callbacks=[
        tf.keras.callbacks.EarlyStopping("val_loss", patience=5, restore_best_weights=True)
    ],
)

test_loss, test_acc = best_model.evaluate(X_test, y_test)


In [None]:
experiment_name = "randomsearch"
hyperparam_dir = base_dir / "data" / "hyperparam_tuning"
model_dir = base_dir / "data" / "models"

# save model + HP log
model_path = save_model(best_model, str(model_dir), best_hps)
print("Model saved to:", model_path)


In [None]:

# generate summary figures for the tuner run
df_trials = plot_tuner_results(
    build_model_fn=build_model,
    proj_dir=str(hyperparam_dir),
    proj_name="randomsearch",
    experiment="randomsearch",
)
print("Figures in:", df_trials.shape[0], "trials summarised")
