In [None]:
# ===============================
# GTZAN Seq2Seq (Encoder–Decoder) with Multi-Timestep Outputs
# Works with features_3_sec.csv (57 features) and auto-handles reshape
# ===============================

# Libraries
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# Reproducibility
np.random.seed(42)
tf.random.set_seed(42)

# ---------- Load dataset ----------
data = pd.read_csv("features_3_sec.csv")
drop_cols = [c for c in ["filename", "length", "label"] if c in data.columns]
feature_cols = [c for c in data.columns if c not in drop_cols]
X = data[feature_cols].values            # (samples, n_features)
y = data["label"].values

print(f"Dataset shape: {data.shape}")
print(f"Features used: {len(feature_cols)} -> {feature_cols[:5]} ...")
print("Genres:", sorted(data["label"].unique()))

# ---------- Encode labels ----------
le = LabelEncoder()
y_enc = le.fit_transform(y)
n_classes = len(le.classes_)
print("Number of classes:", n_classes)

# ---------- Scale features (before reshape) ----------
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
n_samples, n_features = X_scaled.shape

# ---------- Choose a valid (timesteps, features_per_step) ----------
def pick_sequence_shape(n_feats):
    # Prefer a reasonable number of timesteps, but must divide n_feats exactly
    preferred = [32, 30, 29, 28, 24, 20, 19, 16, 15, 12, 10, 8, 6, 4, 3, 2]
    for t in preferred:
        if n_feats % t == 0:
            return t, n_feats // t
    # Fallback: every feature is a timestep
    return n_feats, 1

timesteps, features_per_step = pick_sequence_shape(n_features)
print(f"Chosen sequence shape -> timesteps={timesteps}, features_per_step={features_per_step}")

# Reshape to sequences
X_seq = X_scaled.reshape(n_samples, timesteps, features_per_step)
print("X_seq shape:", X_seq.shape)   # e.g., (9990, 19, 3) for 57 features

# ---------- Train/Test split ----------
X_train, X_test, y_train, y_test = train_test_split(
    X_seq, y_enc, test_size=0.2, random_state=42, stratify=y_enc
)

# ---------- Build decoder targets (multi-timestep) ----------
# Repeat the one-hot label across all decoder timesteps
y_train_1h = to_categorical(y_train, num_classes=n_classes)  # (N, C)
y_test_1h  = to_categorical(y_test,  num_classes=n_classes)

y_train_seq = np.repeat(y_train_1h[:, None, :], timesteps, axis=1)  # (N, T, C)
y_test_seq  = np.repeat(y_test_1h[:,  None, :], timesteps, axis=1)

# Teacher forcing inputs: shift right, zero at t=0 as <START>
dec_in_train = np.zeros_like(y_train_seq)
dec_in_train[:, 1:, :] = y_train_seq[:, :-1, :]
dec_in_test  = np.zeros_like(y_test_seq)
dec_in_test[:, 1:, :]  = y_test_seq[:, :-1, :]

print("Decoder target shape:", y_train_seq.shape)
print("Decoder input shape:", dec_in_train.shape)

# ---------- Encoder–Decoder model ----------
enc_inputs = tf.keras.Input(shape=(timesteps, features_per_step), name="encoder_inputs")
# Encoder
enc_lstm = tf.keras.layers.LSTM(128, return_state=True, name="encoder_lstm")
_, state_h, state_c = enc_lstm(enc_inputs)
enc_states = [state_h, state_c]

# Decoder
dec_inputs = tf.keras.Input(shape=(timesteps, n_classes), name="decoder_inputs")
dec_lstm = tf.keras.layers.LSTM(128, return_sequences=True, return_state=True, name="decoder_lstm")
dec_outputs, _, _ = dec_lstm(dec_inputs, initial_state=enc_states)
dec_outputs = tf.keras.layers.Dropout(0.3)(dec_outputs)
dec_dense = tf.keras.layers.Dense(n_classes, activation="softmax", name="decoder_dense")
dec_outputs = dec_dense(dec_outputs)

model = tf.keras.Model([enc_inputs, dec_inputs], dec_outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(1e-3),
              loss="categorical_crossentropy",
              metrics=["accuracy"])
model.summary()

# ---------- Callbacks ----------
callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=12,
                                     restore_best_weights=True, min_delta=1e-3),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.7,
                                         patience=6, min_lr=1e-6, verbose=1),
    tf.keras.callbacks.ModelCheckpoint("best_seq2seq_genre.keras",
                                       monitor="val_accuracy", save_best_only=True, verbose=1),
]

# ---------- Train ----------
history = model.fit(
    [X_train, dec_in_train], y_train_seq,
    validation_data=([X_test, dec_in_test], y_test_seq),
    epochs=60, batch_size=64, callbacks=callbacks, verbose=1
)

# ---------- Evaluate ----------
loss, acc = model.evaluate([X_test, dec_in_test], y_test_seq, verbose=0)
print(f"\nTest accuracy (per-step): {acc:.4f}  |  loss: {loss:.4f}")

# ---------- Step-wise predictions -> collapse to single class via majority vote ----------
y_pred_seq = model.predict([X_test, dec_in_test], verbose=0)     # (N, T, C)
y_pred_steps = np.argmax(y_pred_seq, axis=-1)                    # (N, T)
# majority vote across timesteps
y_pred_final = np.array([np.bincount(seq).argmax() for seq in y_pred_steps])
y_true_final = y_test

print("\nClassification Report (majority vote across timesteps):")
print(classification_report(y_true_final, y_pred_final, target_names=le.classes_))

# ---------- Confusion matrix ----------
cm = confusion_matrix(y_true_final, y_pred_final)
plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d",
            xticklabels=le.classes_, yticklabels=le.classes_, cmap="Blues")
plt.title("Confusion Matrix (Majority Vote)")
plt.xlabel("Predicted"); plt.ylabel("True"); plt.tight_layout(); plt.show()

# ---------- Save final model ----------
model.save("seq2seq_gtzan_multi_timestep.keras")
print("\n✅ Saved model: seq2seq_gtzan_multi_timestep.keras")
