In [None]:
## Preprocess & Train Audio Model with Clean + Noisy Examples
import os
import random
import pickle
from pathlib import Path

import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import LabelBinarizer
from sklearn.utils import compute_class_weight
from tensorflow.keras import Model, optimizers
from tensorflow.keras.layers import Dense
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from tqdm import tqdm

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

random.seed(4)
np.random.seed(4)
tf.random.set_seed(4)

MODEL_VERSION = 36
TRAIN_IDS = [10, 100, 101, 102, 103]
VALID_IDS = [104]
TEST_IDS = [3, 4]
HAND = 'Right'
SR = 16_000
BATCH_SIZE = 32
EPOCHS = 30
EXAMPLE_MODEL_PATH = Path("../../Models/Reference_Model/example_model.hdf5")
CLEAN_DATA_ROOT = Path("../../Data/Train_Data/4. AudioExamples/01_Original")
NOISE_DATA_ROOT = Path("../../Data/Train_Data/6. AudioExamples_noise")
OUTPUT_MODEL_ROOT = Path(f"../../Models/tensorflow_model/Audio/Audio_noise_ver{MODEL_VERSION}")
METRICS_ROOT = Path(f"../../Result/Train_Result/Model_Accuracy/Audio/Audio_noise_ver{MODEL_VERSION}")
PREDICTIONS_ROOT = Path(f"../../Result/Model_Preds/Audio/Audio_noise_ver{MODEL_VERSION}")

# Create output directories
for d in (OUTPUT_MODEL_ROOT, METRICS_ROOT, PREDICTIONS_ROOT):
    d.mkdir(parents=True, exist_ok=True)


def load_examples(ids, root: Path):
    """Load examples and labels for given participant IDs from `root`."""
    X_list, y_list = [], []
    for pid in ids:
        dir_path = root / str(pid) / HAND / str(SR)
        if not dir_path.is_dir():
            continue
        for pkl in dir_path.glob("*.pkl"):
            pid_str, activity, trial = pkl.stem.split("---")
            data = pickle.load(open(pkl, "rb"))
            X_list.append(data)
            y_list += [[pid_str, activity, trial]] * data.shape[0]
    X = np.concatenate(X_list, axis=0)
    y = np.array(y_list)
    return X, y


def load_combined(ids):
    """Combine clean and noisy examples for training/validation."""
    Xc, yc = load_examples(ids, CLEAN_DATA_ROOT)
    Xn, yn = load_examples(ids, NOISE_DATA_ROOT)
    return np.vstack([Xc, Xn]), np.vstack([yc, yn])


def build_finetuned_model(num_classes=5):
    """
    Load a pretrained reference model,
    replace its final layer to match `num_classes`,
    and compile it.
    """
    base = tf.keras.models.load_model(EXAMPLE_MODEL_PATH)
    x = base.layers[-2].output
    out = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=base.input, outputs=out)
    model.compile(
        optimizer=optimizers.Adam(1e-3),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model


def data_generator(X, y, batch_size, shuffle=True):
    """Yield batches of `[X], y` for Keras `.fit`."""
    n = X.shape[0]
    indices = np.arange(n)
    while True:
        if shuffle:
            np.random.shuffle(indices)
        for start in range(0, n, batch_size):
            batch = indices[start:start + batch_size]
            yield [X[batch]], y[batch]


X_train, y_train = load_combined(TRAIN_IDS)
X_val, y_val = load_combined(VALID_IDS)

# Shuffle training set
perm = np.random.permutation(X_train.shape[0])
X_train, y_train = X_train[perm], y_train[perm]

# Extract activity labels
act_train = y_train[:, 1]
act_val = y_val[:, 1]

# Binarize
lb = LabelBinarizer()
Y_train = lb.fit_transform(act_train)
Y_val = lb.transform(act_val)
print("Label Mapping:", dict(zip(lb.classes_, lb.transform(lb.classes_))))

# Class weights
cw = compute_class_weight("balanced", classes=lb.classes_, y=act_train)
class_weight_dict = {i: w for i, w in enumerate(cw)}

model = build_finetuned_model(num_classes=len(lb.classes_))

steps_per_epoch = int(np.ceil(len(Y_train) / BATCH_SIZE))
validation_steps = int(np.ceil(len(Y_val) / BATCH_SIZE))

callbacks = [
    ReduceLROnPlateau('val_loss', factor=0.1, patience=3, min_lr=1e-6, verbose=1),
    EarlyStopping('val_loss', patience=5, verbose=1, restore_best_weights=True)
]

history = model.fit(
    data_generator(X_train, Y_train, BATCH_SIZE, shuffle=True),
    steps_per_epoch=steps_per_epoch,
    epochs=EPOCHS,
    validation_data=data_generator(X_val, Y_val, BATCH_SIZE, shuffle=False),
    validation_steps=validation_steps,
    class_weight=class_weight_dict,
    callbacks=callbacks,
    verbose=1
)

model_save_path = OUTPUT_MODEL_ROOT / HAND / str(SR)
model_save_path.mkdir(parents=True, exist_ok=True)
model.save(model_save_path / "Audio_Scratch.h5")

X_test, y_test = load_combined(TEST_IDS)
act_test = y_test[:, 1]
Y_test = lb.transform(act_test)

preds = model.predict(X_test, batch_size=BATCH_SIZE)
y_pred = lb.classes_[np.argmax(preds, axis=1)]

ba = balanced_accuracy_score(act_test, y_pred) * 100
f1 = f1_score(act_test, y_pred, average='weighted') * 100

with open(METRICS_ROOT / "test_metrics.txt", "w") as f:
    f.write(f"Balanced Accuracy: {ba:.2f}%\nF1 Score: {f1:.2f}%\n")

print(f"Test BA: {ba:.2f}%  F1: {f1:.2f}%")
