In [None]:
import os
import pandas as pd
import numpy as np
from scipy.stats import spearmanr
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from imblearn.over_sampling import SVMSMOTE
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Optional: Torch imports if training the model in PyTorch
try:
    import torch
    import torch.nn as nn
    from torch.utils.data import Dataset, DataLoader
    import torch.optim as optim
    torch_available = True
except ImportError:
    torch_available = False

# ==== CONFIG ====
CSV_PATH = '/content/drive/MyDrive/thesis/combined_data_Auto_pilot.csv'  # adjust path if needed
SEQUENCE_LENGTH = 10
BATCH_SIZE = 64
EPOCHS = 20
N_FOLDS = 10
RANDOM_STATE = 42
MODEL_SAVE_PATH = 'cnn1dlstm_spoof_detector.pth'
DEVICE = torch.device('cuda' if torch_available and torch.cuda.is_available() else 'cpu')

# ==== 1) Load & prepare data ====
df = pd.read_csv(CSV_PATH)
feature_cols = [c for c in df.columns if c not in ('frame_id', 'image_num', 'label')]
df['y'] = (df['label'] == 'spoofed').astype(int)
y = df['y'].values
X = df[feature_cols].values

# ==== 2) Spearman feature filtering ====
rhos, pvals = {}, {}
for i, col in enumerate(feature_cols):
    rho, p = spearmanr(X[:, i], y)
    rhos[col] = abs(rho)
    pvals[col] = p
K = min(36, len(feature_cols))
selected = sorted(rhos, key=lambda k: rhos[k], reverse=True)[:K]
print(f"Selected features ({len(selected)}): {selected}")
X_sel = df[selected].values

# ==== 3) SMOTE balancing ====
smote = SVMSMOTE(random_state=RANDOM_STATE)
X_bal, y_bal = smote.fit_resample(X_sel, y)
print(f"Balanced dataset: {np.bincount(y_bal)}")

# ==== 4) Standardize & PCA ====
scaler = StandardScaler()
X_std = scaler.fit_transform(X_bal)
n_components = min(15, X_std.shape[1])
pca = PCA(n_components=n_components, random_state=RANDOM_STATE)
X_pca = pca.fit_transform(X_std)
print(f"PCA reduced to {n_components} components, explained variance: {pca.explained_variance_ratio_.sum():.3f}")

# ==== 5) Build sequences ====
def build_sequences(X_features, y_labels, seq_len):
    seq_X, seq_y = [], []
    for i in range(len(X_features) - seq_len):
        seq_X.append(X_features[i:i + seq_len])
        seq_y.append(y_labels[i + seq_len])
    return np.stack(seq_X), np.array(seq_y)

X_seq, y_seq = build_sequences(X_pca, y_bal, SEQUENCE_LENGTH)
print(f"Sequence data shapes: {X_seq.shape}, {y_seq.shape}")

# ==== 6) Dataset & DataLoader ====
class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# ==== 7) Model definition (PyTorch) ====
class CNN1DLSTM(nn.Module):
    def __init__(self, feat_dim, seq_len):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels=feat_dim, out_channels=32, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.pool1 = nn.MaxPool1d(2)
        self.conv2 = nn.Conv1d(32, 64, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool1d(2)
        self.lstm = nn.LSTM(input_size=64, hidden_size=128, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 2)
        )
    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.pool1(self.relu(self.conv1(x)))
        x = self.pool2(self.relu(self.conv2(x)))
        x = x.permute(0, 2, 1)
        out, _ = self.lstm(x)
        last = out[:, -1, :]
        return self.fc(last)

# ==== 8) Cross-validation training & evaluation ====
kf = KFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDOM_STATE)
metrics = {'acc': [], 'prec': [], 'rec': [], 'f1': []}

for fold, (train_idx, test_idx) in enumerate(kf.split(X_seq)):
    print(f"\n--- Fold {fold+1}/{N_FOLDS} ---")
    X_train, X_test = X_seq[train_idx], X_seq[test_idx]
    y_train, y_test = y_seq[train_idx], y_seq[test_idx]

    train_ds = TimeSeriesDataset(X_train, y_train)
    test_ds = TimeSeriesDataset(X_test, y_test)
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE)

    model = CNN1DLSTM(feat_dim=n_components, seq_len=SEQUENCE_LENGTH).to(DEVICE)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    # Training loop
    model.train()
    for epoch in range(EPOCHS):
        losses = []
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(DEVICE), y_batch.to(DEVICE)
            logits = model(X_batch)
            loss = criterion(logits, y_batch)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            losses.append(loss.item())
        if (epoch + 1) % 5 == 0:
            print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {np.mean(losses):.4f}")

    # Evaluation
    model.eval()
    preds, trues = [], []
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            logits = model(X_batch.to(DEVICE))
            preds.extend(logits.argmax(dim=1).cpu().numpy())
            trues.extend(y_batch.numpy())

    acc = accuracy_score(trues, preds)
    prec = precision_score(trues, preds)
    rec = recall_score(trues, preds)
    f1 = f1_score(trues, preds)
    print(f"Fold {fold+1} -> Acc: {acc:.4f}, Prec: {prec:.4f}, Rec: {rec:.4f}, F1: {f1:.4f}")

    metrics['acc'].append(acc)
    metrics['prec'].append(prec)
    metrics['rec'].append(rec)
    metrics['f1'].append(f1)

# ==== 9) Summary ====
print("\n=== Cross-Validation Results ===")
print(f"Avg Accuracy: {np.mean(metrics['acc']):.4f} ± {np.std(metrics['acc']):.4f}")
print(f"Avg Precision: {np.mean(metrics['prec']):.4f} ± {np.std(metrics['prec']):.4f}")
print(f"Avg Recall: {np.mean(metrics['rec']):.4f} ± {np.std(metrics['rec']):.4f}")
print(f"Avg F1-score: {np.mean(metrics['f1']):.4f} ± {np.std(metrics['f1']):.4f}")

# ==== 10) Train final model on full data and save ====
full_ds = TimeSeriesDataset(X_seq, y_seq)
full_loader = DataLoader(full_ds, batch_size=BATCH_SIZE, shuffle=True)

final_model = CNN1DLSTM(feat_dim=n_components, seq_len=SEQUENCE_LENGTH).to(DEVICE)
optimizer = optim.Adam(final_model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

print("\nTraining final model on full dataset...")
for epoch in range(EPOCHS):
    final_model.train()
    losses = []
    for X_batch, y_batch in full_loader:
        X_batch, y_batch = X_batch.to(DEVICE), y_batch.to(DEVICE)
        logits = final_model(X_batch)
        loss = criterion(logits, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
    print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {np.mean(losses):.4f}")

# Save the trained final model
torch.save(final_model.state_dict(), MODEL_SAVE_PATH)
print(f"Model saved to {MODEL_SAVE_PATH}")
