In [None]:
from google.colab import drive
drive.mount('/content/drive')


In [None]:
# !wget -O "https://drive.google.com/drive/folders/163a6BaNwZ_0g6hqYMVECM4dcw8Awyg9X?usp=sharing"

In [None]:
!wget -O Avenue_Dataset.zip "https://www.cse.cuhk.edu.hk/leojia/projects/detectabnormal/Avenue_Dataset.zip"

In [None]:
!unzip -q Avenue_Dataset.zip -d Avenue_Dataset


In [None]:
import os
os.listdir("Avenue_Dataset")


In [None]:
!wget -O ground_truth_demo.zip "https://www.cse.cuhk.edu.hk/leojia/projects/detectabnormal/ground_truth_demo.zip"


In [None]:
!unzip -q ground_truth_demo.zip -d ground_truth_demo


In [None]:
import os
os.listdir("ground_truth_demo")


In [None]:
!pip install scipy opencv-python


In [None]:
import os
import shutil
import cv2
import numpy as np
from scipy.io import loadmat
from tqdm import tqdm

# Paths
video_folder = "/content/Avenue_Dataset/Avenue Dataset/testing_videos"
original_mask_folder = "ground_truth_demo/ground_truth_demo/testing_label_mask"
target_mask_folder = "Avenue_Dataset/testing_vol"

# Step 1: Clean and prepare the mask folder
if os.path.exists(target_mask_folder):
    shutil.rmtree(target_mask_folder)
os.makedirs(target_mask_folder, exist_ok=True)

# Copy only valid files (with format X_label.mat)
for file in os.listdir(original_mask_folder):
    if file.endswith("_label.mat"):
        shutil.copy(
            os.path.join(original_mask_folder, file),
            os.path.join(target_mask_folder, file)
        )
print("✅ Mask folder cleaned and valid files copied.")

# Output paths for frames
normal_out = "frames_normal"
abnormal_out = "frames_abnormal"
os.makedirs(normal_out, exist_ok=True)
os.makedirs(abnormal_out, exist_ok=True)

# Extract video numbers from mask filenames
available_masks = sorted([
    f for f in os.listdir(target_mask_folder) if f.endswith("_label.mat")
])
video_indices = [int(f.split("_")[0]) for f in available_masks]

# Step 2: Separate normal and abnormal frames
for video_idx in video_indices:
    video_path = os.path.join(video_folder, f"{video_idx:02d}.avi")
    mask_path = os.path.join(target_mask_folder, f"{video_idx}_label.mat")

    if not os.path.exists(video_path):
        print(f"⚠️ Video not found: {video_path}")
        continue

    cap = cv2.VideoCapture(video_path)
    mask_data = loadmat(mask_path)
    volLabel = mask_data['volLabel'][0]

    frame_num = 0
    pbar = tqdm(total=len(volLabel), desc=f"Processing video {video_idx:02d}")

    while True:
        ret, frame = cap.read()
        if not ret or frame_num >= len(volLabel):
            break

        mask = volLabel[frame_num]
        frame_resized = cv2.resize(frame, (mask.shape[1], mask.shape[0]))

        if np.sum(mask) == 0:
            save_path = os.path.join(normal_out, f"{video_idx:02d}_{frame_num:04d}.jpg")
        else:
            save_path = os.path.join(abnormal_out, f"{video_idx:02d}_{frame_num:04d}.jpg")

        cv2.imwrite(save_path, frame_resized)
        frame_num += 1
        pbar.update(1)

    cap.release()
    pbar.close()

print("\n✅ Normal and abnormal frames were successfully separated.")


In [None]:
import os

# Output paths for frames
normal_out = "frames_normal"
abnormal_out = "frames_abnormal"

# Count files in each folder
num_normal = len([f for f in os.listdir(normal_out) if f.endswith(".jpg")])
num_abnormal = len([f for f in os.listdir(abnormal_out) if f.endswith(".jpg")])

# Print results
print(f"✅ Number of normal frames: {num_normal}")
print(f"🚨 Number of abnormal frames: {num_abnormal}")
print(f"📊 Total number of frames: {num_normal + num_abnormal}")


In [None]:
import os
import cv2
import random
import shutil
import numpy as np

# Paths
normal_dir = "frames_normal"
abnormal_dir = "frames_abnormal"
balanced_dir = "frames_balanced"

# Output paths
balanced_normal = os.path.join(balanced_dir, "normal")
balanced_abnormal = os.path.join(balanced_dir, "abnormal")
os.makedirs(balanced_normal, exist_ok=True)
os.makedirs(balanced_abnormal, exist_ok=True)

# List of files
normal_files = [f for f in os.listdir(normal_dir) if f.endswith(".jpg")]
abnormal_files = [f for f in os.listdir(abnormal_dir) if f.endswith(".jpg")]

# Counts
num_normal = len(normal_files)
num_abnormal = len(abnormal_files)

# Copy all normal frames
for f in normal_files:
    shutil.copy(os.path.join(normal_dir, f), os.path.join(balanced_normal, f))

# ✅ Augmentation functions (geometry only)
def rotate_image(img, angle):
    h, w = img.shape[:2]
    M = cv2.getRotationMatrix2D((w/2, h/2), angle, 1)
    return cv2.warpAffine(img, M, (w, h))

def flip_image(img):
    return [cv2.flip(img, 0), cv2.flip(img, 1)]  # vertical and horizontal

def scale_image(img, scale=1.2):
    h, w = img.shape[:2]
    resized = cv2.resize(img, (int(w * scale), int(h * scale)))
    center_crop = resized[
        int((resized.shape[0] - h) / 2):int((resized.shape[0] + h) / 2),
        int((resized.shape[1] - w) / 2):int((resized.shape[1] + w) / 2)
    ]
    return center_crop

def translate_image(img, tx=10, ty=10):
    M = np.float32([[1, 0, tx], [0, 1, ty]])
    return cv2.warpAffine(img, M, (img.shape[1], img.shape[0]))

def crop_random(img):
    h, w = img.shape[:2]
    crop_size = int(min(h, w) * 0.85)
    x = random.randint(0, w - crop_size)
    y = random.randint(0, h - crop_size)
    crop = img[y:y+crop_size, x:x+crop_size]
    return cv2.resize(crop, (w, h))

def augment_image(img):
    aug_list = []
    aug_list.append(rotate_image(img, 90))
    aug_list.append(rotate_image(img, 180))
    aug_list.append(rotate_image(img, 270))
    aug_list.extend(flip_image(img))
    aug_list.append(scale_image(img, 1.2))
    aug_list.append(translate_image(img, tx=15, ty=15))
    aug_list.append(crop_random(img))
    return aug_list

# Augment abnormal images until count matches normal
augmented_count = 0
index = 0

while (num_abnormal + augmented_count) < num_normal:
    filename = abnormal_files[index % len(abnormal_files)]
    img_path = os.path.join(abnormal_dir, filename)
    img = cv2.imread(img_path)

    aug_imgs = augment_image(img)
    for aug in aug_imgs:
        if (num_abnormal + augmented_count) >= num_normal:
            break
        aug_filename = f"aug_{augmented_count:05d}_{filename}"
        cv2.imwrite(os.path.join(balanced_abnormal, aug_filename), aug)
        augmented_count += 1

    index += 1

# Copy original abnormal frames
for f in abnormal_files:
    shutil.copy(os.path.join(abnormal_dir, f), os.path.join(balanced_abnormal, f))

# Final counts
final_normal = len(os.listdir(balanced_normal))
final_abnormal = len(os.listdir(balanced_abnormal))

print("✅ Dataset successfully balanced with geometric augmentation only.")
print(f"🔹 Normal frames: {final_normal}")
print(f"🔸 Abnormal frames (original + augmented): {final_abnormal}")


In [None]:
import os
import numpy as np
import cv2
import torch
import timm
from tqdm import tqdm
from torchvision import transforms

# ⚙️ Settings
IMG_SIZE = 224
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 📁 Data paths
normal_dir = "/content/frames_balanced/normal"
anomaly_dir = "/content/frames_balanced/abnormal"
save_dir = "/content"

# ✅ List of models
model_names = [
    'convnext_tiny',
    'repvgg_a0',
    'mobileone_s0',
    'poolformer_s12',
    'maxvit_tiny_tf_224',
    'coatnet_0_rw_224'
]

# ✅ Image preprocessing
preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# 📥 Get list of image paths
def get_image_paths(directory):
    return [os.path.join(directory, f)
            for f in os.listdir(directory)
            if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

# 🔍 Feature extraction
def extract_features(model, image_paths):
    features = []
    for path in tqdm(image_paths, desc="Extracting features"):
        img = cv2.imread(path)
        if img is None:
            continue
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img_tensor = preprocess(img).unsqueeze(0).to(device)

        with torch.no_grad():
            feat = model(img_tensor)
        features.append(feat.cpu().numpy().squeeze())
    return np.array(features)

# 🔁 Run processing for each model
for model_name in model_names:
    print(f"\n🔧 Processing model: {model_name}")
    model = timm.create_model(model_name, pretrained=True, num_classes=0)
    model.eval()

    # ⛔ Freeze 85% of layers
    total_layers = sum(1 for _ in model.parameters())
    trainable_start = int(total_layers * 0.85)
    for i, param in enumerate(model.parameters()):
        param.requires_grad = i >= trainable_start
    model.to(device)

    # 🔹 Normal and anomaly frames
    normal_paths = get_image_paths(normal_dir)
    anomaly_paths = get_image_paths(anomaly_dir)

    print("🔹 Extracting NORMAL features...")
    normal_features = extract_features(model, normal_paths)

    print("🔸 Extracting ANOMALY features...")
    anomaly_features = extract_features(model, anomaly_paths)

    # 💾 Save features
    model_id = model_name.replace('/', '_')
    np.save(os.path.join(save_dir, f'normal_features_{model_id}.npy'), normal_features)
    np.save(os.path.join(save_dir, f'anomaly_features_{model_id}.npy'), anomaly_features)

    print(f"✅ Saved features for model: {model_id}")


In [None]:
import os
import numpy as np

save_dir = "/content"
model_names = [
    'convnext_tiny',
    'repvgg_a0',
    'mobileone_s0',
    'poolformer_s12',
    'maxvit_tiny_tf_224',
    'coatnet_0_rw_224'
]

for model_name in model_names:
    model_id = model_name.replace('/', '_')
    normal_path = os.path.join(save_dir, f'normal_features_{model_id}.npy')
    anomaly_path = os.path.join(save_dir, f'anomaly_features_{model_id}.npy')

    normal_features = np.load(normal_path)
    anomaly_features = np.load(anomaly_path)

    print(f"Model: {model_id}")
    print(f"  Normal features shape: {normal_features.shape}")
    print(f"  Anomaly features shape: {anomaly_features.shape}\n")


In [None]:
!pip install grad-cam


In [None]:
import os
import numpy as np
import cv2
import torch
import timm
import torch.nn.functional as F
from torchvision import transforms
import matplotlib.pyplot as plt

# ⚙️ Path to abnormal images
anomaly_dir = "/content/frames_balanced/abnormal"
IMG_SIZE = 224

# 🌀 Preprocessing
preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# 🧠 Load model
model = timm.create_model('repvgg_a0', pretrained=True, num_classes=2)
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 🔍 Grad-CAM++
class GradCAMPlusPlus:
    def __init__(self, model, target_layer):
        self.model = model
        self.target_layer = target_layer
        self.gradients = None
        self.activations = None
        self._register_hooks()

    def _register_hooks(self):
        def forward_hook(module, input, output):
            self.activations = output
        def backward_hook(module, grad_in, grad_out):
            self.gradients = grad_out[0]
        self.target_layer.register_forward_hook(forward_hook)
        self.target_layer.register_backward_hook(backward_hook)

    def generate(self, input_tensor):
        self.model.zero_grad()
        output = self.model(input_tensor)
        class_idx = torch.argmax(output, dim=1).item()
        score = output[0, class_idx]
        score.backward()

        gradients = self.gradients
        activations = self.activations

        b, k, h, w = gradients.size()
        alpha_num = gradients.pow(2)
        alpha_denom = 2 * gradients.pow(2) + activations * gradients.pow(3).sum(-1).sum(-1).view(b, k, 1, 1)
        alpha_denom = torch.where(alpha_denom != 0.0, alpha_denom, torch.ones_like(alpha_denom))
        alpha = alpha_num / alpha_denom

        weights = torch.relu(F.relu(gradients) * alpha).sum(-1).sum(-1).view(b, k, 1, 1)
        cam = (weights * activations).sum(1, keepdim=True)

        cam = F.relu(cam)
        cam = F.interpolate(cam, size=(IMG_SIZE, IMG_SIZE), mode='bilinear', align_corners=False)
        cam = cam - cam.min()
        cam = cam / (cam.max() + 1e-8)

        return cam.cpu().detach().numpy().squeeze()

# 🎯 Identify target convolutional layer
conv_layers = [m for m in model.modules() if isinstance(m, torch.nn.Conv2d)]
target_layer = conv_layers[-1]
gradcam = GradCAMPlusPlus(model, target_layer)

# 📂 Get list of image paths
image_paths = sorted([
    os.path.join(anomaly_dir, fname)
    for fname in os.listdir(anomaly_dir)
    if fname.lower().endswith(('.jpg', '.jpeg', '.png'))
])[:5]  # Only the first 5 images

# 🔥 Process and display heatmaps
for path in image_paths:
    img = cv2.imread(path)
    if img is None:
        continue
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img_tensor = preprocess(img_rgb).unsqueeze(0).to(device)

    # Generate heatmap
    heatmap = gradcam.generate(img_tensor)

    # Convert to colored heatmap
    heatmap_color = cv2.applyColorMap(np.uint8(255 * heatmap), cv2.COLORMAP_JET)
    heatmap_color = cv2.cvtColor(heatmap_color, cv2.COLOR_BGR2RGB)

    # Resize original image
    img_resized = cv2.resize(img_rgb, (IMG_SIZE, IMG_SIZE))

    # Blend heatmap with original image
    alpha = 0.4
    blended = cv2.addWeighted(img_resized, 1 - alpha, heatmap_color, alpha, 0)

    # Display with matplotlib + colorbar
    plt.figure(figsize=(5, 5))
    plt.imshow(blended)
    plt.title(f"Heatmap - {os.path.basename(path)}")
    plt.axis('off')

    # Add colorbar from raw heatmap
    im = plt.imshow(heatmap, cmap='jet', alpha=0)
    cbar = plt.colorbar(im, fraction=0.046, pad=0.04)
    cbar.set_label("Anomaly Score", rotation=270, labelpad=15)

    plt.tight_layout()
    plt.show()


In [None]:
# import numpy as np
# import tensorflow as tf
# from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, GRU, SimpleRNN, Flatten
# from tensorflow.keras.models import Model
# from tensorflow.keras.optimizers import Adam
# from tensorflow.keras.callbacks import EarlyStopping
# from sklearn.model_selection import StratifiedKFold
# from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# from sklearn.utils import shuffle
# import matplotlib.pyplot as plt
# import pandas as pd
# import gc
# from tensorflow.keras import backend as K
# import seaborn as sns
# import os

# # Force CPU usage to avoid cuDNN errors
# os.environ["CUDA_VISIBLE_DEVICES"] = ""

# # --- Data loading and preprocessing ---
# def prepare_data(normal_path, anomaly_path):
#     X_normal = np.load(normal_path, allow_pickle=True)
#     print(f"X_normal shape before mean: {X_normal.shape}")
#     if X_normal.ndim >= 4:
#         X_normal = X_normal.mean(axis=(2, 3))
#         print(f"X_normal shape after mean(axis=(2,3)): {X_normal.shape}")
#     else:
#         print("X_normal has less than 4 dims, skipping mean over axis 2 and 3")

#     X_anomaly = np.load(anomaly_path, allow_pickle=True)
#     print(f"X_anomaly shape before mean: {X_anomaly.shape}")
#     if X_anomaly.ndim >= 4:
#         X_anomaly = X_anomaly.mean(axis=(2, 3))
#         print(f"X_anomaly shape after mean(axis=(2,3)): {X_anomaly.shape}")
#     else:
#         print("X_anomaly has less than 4 dims, skipping mean over axis 2 and 3")

#     X = np.concatenate([X_normal, X_anomaly])
#     y = np.concatenate([np.zeros(len(X_normal)), np.ones(len(X_anomaly))])

#     # Convert to float32 for TensorFlow compatibility
#     X = X.astype(np.float32)

#     # Check for NaN or Inf values
#     print("NaN in X:", np.any(np.isnan(X)))
#     print("Inf in X:", np.any(np.isinf(X)))
#     X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)  # Clean data

#     X, y = shuffle(X, y, random_state=42)
#     return X, y

# def create_sequences(X, y, seq_len=5):
#     X_seq, y_seq = [], []
#     for i in range(len(X) - seq_len + 1):
#         X_seq.append(X[i:i + seq_len])
#         y_seq.append(y[i + seq_len - 1])
#     return np.array(X_seq), np.array(y_seq)

# # --- Model builders ---
# def build_lstm_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = LSTM(64, activation='tanh', recurrent_activation='sigmoid', use_bias=True, unroll=True)(inputs)  # unroll=True to avoid cuDNN
#     x = Dropout(0.3)(x)
#     x = Dense(32, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# def build_gru_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = GRU(64)(inputs)
#     x = Dropout(0.3)(x)
#     x = Dense(32, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# def build_lstm_gru_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = LSTM(64, return_sequences=True, activation='tanh', recurrent_activation='sigmoid', use_bias=True, unroll=True)(inputs)
#     x = Dropout(0.3)(x)
#     x = GRU(64)(x)
#     x = Dropout(0.3)(x)
#     x = Dense(32, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# def build_rnn_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = SimpleRNN(64)(inputs)
#     x = Dropout(0.3)(x)
#     x = Dense(32, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# def build_dense_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = Flatten()(inputs)
#     x = Dense(64, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     x = Dense(32, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# # --- Train and evaluate using k-fold ---
# def train_and_evaluate_kfold(model_fn, X, y, model_name, n_splits=5):
#     skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
#     metrics_per_fold = {
#         'accuracy': [],
#         'precision': [],
#         'recall': [],
#         'f1': [],
#         'val_loss': [],
#         'val_accuracy': []
#     }
#     val_loss_histories = []
#     val_acc_histories = []

#     fold_num = 1
#     for train_idx, val_idx in skf.split(X, y):
#         print(f"\n➡️ Fold {fold_num}/{n_splits} - Model: {model_name}")
#         X_train, X_val = X[train_idx], X[val_idx]
#         y_train, y_val = y[train_idx], y[val_idx]

#         model = model_fn(X_train.shape[1:])
#         es = EarlyStopping(patience=5, restore_best_weights=True, verbose=1)

#         history = model.fit(
#             X_train, y_train,
#             validation_data=(X_val, y_val),
#             epochs=50,
#             batch_size=32,
#             callbacks=[es],
#             verbose=1
#         )

#         y_pred = (model.predict(X_val) > 0.5).astype(int)

#         acc = accuracy_score(y_val, y_pred)
#         prec = precision_score(y_val, y_pred, zero_division=0)
#         rec = recall_score(y_val, y_pred, zero_division=0)
#         f1 = f1_score(y_val, y_pred, zero_division=0)

#         print(f"Fold {fold_num} - Acc: {acc:.4f}, Prec: {prec:.4f}, Rec: {rec:.4f}, F1: {f1:.4f}")

#         metrics_per_fold['accuracy'].append(acc)
#         metrics_per_fold['precision'].append(prec)
#         metrics_per_fold['recall'].append(rec)
#         metrics_per_fold['f1'].append(f1)
#         metrics_per_fold['val_loss'].append(history.history['val_loss'][-1])
#         metrics_per_fold['val_accuracy'].append(history.history['val_accuracy'][-1])

#         val_loss_histories.append(history.history['val_loss'])
#         val_acc_histories.append(history.history['val_accuracy'])

#         K.clear_session()  # Clear memory
#         del model
#         gc.collect()

#         fold_num += 1

#     # Average metrics
#     avg_metrics = {k: np.mean(v) for k, v in metrics_per_fold.items()}
#     print(f"\n➡️ {model_name} - Mean Metrics over {n_splits} folds:")
#     print(f"Accuracy: {avg_metrics['accuracy']:.4f}, Precision: {avg_metrics['precision']:.4f}, Recall: {avg_metrics['recall']:.4f}, F1: {avg_metrics['f1']:.4f}")

#     # Average val_loss and val_accuracy curves (pad shorter histories with last value)
#     max_epochs = max(len(h) for h in val_loss_histories)

#     def pad_history(histories, max_len):
#         padded = []
#         for h in histories:
#             if len(h) < max_len:
#                 h = h + [h[-1]] * (max_len - len(h))
#             padded.append(h)
#         return np.array(padded)

#     avg_val_loss = np.mean(pad_history(val_loss_histories, max_epochs), axis=0)
#     avg_val_acc = np.mean(pad_history(val_acc_histories, max_epochs), axis=0)

#     return avg_metrics, avg_val_loss, avg_val_acc

# def main():
#     feature_model_names = [
#         'convnext_tiny',
#         'repvgg_a0',
#         'mobileone_s0',
#         'poolformer_s12',
#         'maxvit_tiny_tf_224',
#         'coatnet_0_rw_224'
#     ]

#     model_builders = {
#         "LSTM": build_lstm_model,
#         "GRU": build_gru_model,
#         "LSTM_GRU": build_lstm_gru_model,
#         "SimpleRNN": build_rnn_model,
#         "Dense": build_dense_model
#     }

#     seq_len = 10
#     save_dir = '/content/drive/MyDrive/AVA'  # Adjust path as needed
#     all_results = []
#     curve_data = {}

#     for feature_model_name in feature_model_names:
#         print(f"\n\n📁 Loading features for model: {feature_model_name}")
#         normal_path = f"{save_dir}/normal_features_{feature_model_name}.npy"
#         anomaly_path = f"{save_dir}/anomaly_features_{feature_model_name}.npy"

#         X, y = prepare_data(normal_path, anomaly_path)
#         X_seq, y_seq = create_sequences(X, y, seq_len=seq_len)

#         print(f"X_seq shape: {X_seq.shape}, y_seq shape: {y_seq.shape}")

#         for model_name, builder in model_builders.items():
#             full_model_name = f"{feature_model_name}_{model_name}"
#             metrics, avg_val_loss, avg_val_acc = train_and_evaluate_kfold(builder, X_seq, y_seq, full_model_name, n_splits=5)
#             metrics.update({"Feature_Model": feature_model_name, "Architecture": model_name})
#             all_results.append(metrics)

#             curve_data[full_model_name] = {
#                 'val_loss': avg_val_loss,
#                 'val_accuracy': avg_val_acc
#             }

#     df = pd.DataFrame(all_results)
#     print("\n\n📊 Overall Model Comparison:\n")
#     print(df)

#     metrics_to_plot = ["accuracy", "precision", "recall", "f1"]
#     plt.figure(figsize=(16, 6))
#     for i, metric in enumerate(metrics_to_plot):
#         plt.subplot(1, len(metrics_to_plot), i + 1)
#         sns.barplot(x="Feature_Model", y=metric, hue="Architecture", data=df)
#         plt.title(metric.capitalize())
#         plt.ylim(0, 1)
#         plt.xticks(rotation=45)
#     plt.tight_layout()
#     plt.show()

#     plt.figure(figsize=(16, 6))
#     plt.subplot(1, 2, 1)
#     for name, curves in curve_data.items():
#         plt.plot(curves['val_accuracy'], label=name)
#     plt.title('Validation Accuracy (averaged over folds)')
#     plt.xlabel('Epochs')
#     plt.ylabel('Accuracy')
#     plt.ylim(0, 1)
#     plt.legend(fontsize=8, loc='lower right')

#     plt.subplot(1, 2, 2)
#     for name, curves in curve_data.items():
#         plt.plot(curves['val_loss'], label=name)
#     plt.title('Validation Loss (averaged over folds)')
#     plt.xlabel('Epochs')
#     plt.ylabel('Loss')
#     plt.legend(fontsize=8, loc='upper right')

#     plt.tight_layout()
#     plt.show()

# if __name__ == "__main__":
#     main()

In [None]:
# import numpy as np
# import tensorflow as tf
# from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, GRU, SimpleRNN, Flatten
# from tensorflow.keras.models import Model
# from tensorflow.keras.optimizers import Adam
# from tensorflow.keras.callbacks import EarlyStopping
# from sklearn.model_selection import train_test_split
# from sklearn.metrics import (
#     accuracy_score, precision_score, recall_score, f1_score,
#     roc_auc_score, confusion_matrix, classification_report,
#     roc_curve, precision_recall_curve, auc
# )
# from sklearn.utils import shuffle
# import matplotlib.pyplot as plt
# import seaborn as sns
# import pandas as pd
# import os

# os.environ["CUDA_VISIBLE_DEVICES"] = ""

# # --- Data loading and preprocessing ---
# def prepare_data(normal_path, anomaly_path):
#     X_normal = np.load(normal_path, allow_pickle=True)
#     if X_normal.ndim >= 4:
#         X_normal = X_normal.mean(axis=(2, 3))
#     X_anomaly = np.load(anomaly_path, allow_pickle=True)
#     if X_anomaly.ndim >= 4:
#         X_anomaly = X_anomaly.mean(axis=(2, 3))
#     X = np.concatenate([X_normal, X_anomaly])
#     y = np.concatenate([np.zeros(len(X_normal)), np.ones(len(X_anomaly))])
#     X = X.astype(np.float32)
#     X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
#     X, y = shuffle(X, y, random_state=42)
#     return X, y

# def create_sequences(X, y, seq_len=5):
#     X_seq, y_seq = [], []
#     for i in range(len(X) - seq_len + 1):
#         X_seq.append(X[i:i + seq_len])
#         y_seq.append(y[i + seq_len - 1])
#     return np.array(X_seq), np.array(y_seq)

# # --- Model builders with 32 and 16 units ---
# def build_lstm_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = LSTM(32, activation='tanh', recurrent_activation='sigmoid', unroll=True)(inputs)
#     x = Dropout(0.3)(x)
#     x = Dense(16, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# def build_gru_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = GRU(32)(inputs)
#     x = Dropout(0.3)(x)
#     x = Dense(16, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# def build_lstm_gru_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = LSTM(32, return_sequences=True, unroll=True)(inputs)
#     x = Dropout(0.3)(x)
#     x = GRU(32)(x)
#     x = Dropout(0.3)(x)
#     x = Dense(16, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# def build_simple_rnn_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = SimpleRNN(32)(inputs)
#     x = Dropout(0.3)(x)
#     x = Dense(16, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# def build_dense_model(input_shape):
#     inputs = Input(shape=input_shape)
#     x = Flatten()(inputs)
#     x = Dense(32, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     x = Dense(16, activation='relu')(x)
#     x = Dropout(0.3)(x)
#     outputs = Dense(1, activation='sigmoid')(x)
#     model = Model(inputs, outputs)
#     model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
#     return model

# # --- Main execution ---
# def main():
#     feature_model_name = 'convnext_tiny'  # adjust if needed
#     seq_len = 10
#     save_dir = '/content/drive/MyDrive/AVA'  # adjust to your paths
#     normal_path = f"{save_dir}/normal_features_{feature_model_name}.npy"
#     anomaly_path = f"{save_dir}/anomaly_features_{feature_model_name}.npy"

#     X, y = prepare_data(normal_path, anomaly_path)
#     X_seq, y_seq = create_sequences(X, y, seq_len=seq_len)

#     model_builders = {
#         "LSTM": build_lstm_model,
#         "GRU": build_gru_model,
#         "LSTM_GRU": build_lstm_gru_model,
#         "SimpleRNN": build_simple_rnn_model,
#         "Dense": build_dense_model
#     }

#     num_runs = 5
#     all_results = []

#     for model_name, builder in model_builders.items():
#         print(f"\n==== Training model: {model_name} ====")

#         metrics_list = {
#             "accuracy": [],
#             "precision": [],
#             "recall": [],
#             "f1": [],
#             "auc": []
#         }

#         for run in range(num_runs):
#             print(f" Run {run+1}/{num_runs}")

#             X_train, X_test, y_train, y_test = train_test_split(
#                 X_seq, y_seq, test_size=0.2, stratify=y_seq, random_state=run
#             )

#             model = builder(X_train.shape[1:])
#             es = EarlyStopping(patience=5, restore_best_weights=True, verbose=0)
#             history = model.fit(
#                 X_train, y_train,
#                 validation_data=(X_test, y_test),
#                 epochs=50,
#                 batch_size=32,
#                 callbacks=[es],
#                 verbose=0
#             )

#             y_prob = model.predict(X_test).ravel()
#             y_pred = (y_prob > 0.5).astype(int)

#             acc = accuracy_score(y_test, y_pred)
#             prec = precision_score(y_test, y_pred, zero_division=0)
#             rec = recall_score(y_test, y_pred, zero_division=0)
#             f1 = f1_score(y_test, y_pred, zero_division=0)
#             auc_score = roc_auc_score(y_test, y_prob)

#             # Store metrics
#             metrics_list["accuracy"].append(acc)
#             metrics_list["precision"].append(prec)
#             metrics_list["recall"].append(rec)
#             metrics_list["f1"].append(f1)
#             metrics_list["auc"].append(auc_score)

#             all_results.append({
#                 "Model": model_name,
#                 "Run": run + 1,
#                 "Accuracy": acc,
#                 "Precision": prec,
#                 "Recall": rec,
#                 "F1": f1,
#                 "AUC": auc_score
#             })

#             # For last run, print detailed info and plots
#             if run == num_runs - 1:
#                 print(f"\n--- Detailed results for {model_name} (run {run+1}) ---")
#                 print(f"Accuracy: {acc:.4f}")
#                 print(f"Precision: {prec:.4f}")
#                 print(f"Recall: {rec:.4f}")
#                 print(f"F1-score: {f1:.4f}")
#                 print(f"AUC: {auc_score:.4f}")

#                 print("\nConfusion Matrix:")
#                 cm = confusion_matrix(y_test, y_pred)
#                 print(cm)

#                 print("\nClassification Report:")
#                 print(classification_report(y_test, y_pred, digits=4))

#                 # ROC Curve
#                 fpr, tpr, _ = roc_curve(y_test, y_prob)
#                 roc_auc = auc(fpr, tpr)
#                 plt.figure(figsize=(6,5))
#                 plt.plot(fpr, tpr, label=f'ROC curve (AUC = {roc_auc:.3f})')
#                 plt.plot([0,1], [0,1], 'k--')
#                 plt.title(f'{model_name} ROC Curve')
#                 plt.xlabel('False Positive Rate')
#                 plt.ylabel('True Positive Rate')
#                 plt.legend(loc='lower right')
#                 plt.grid(True)
#                 plt.show()

#                 # Precision-Recall Curve
#                 precision_vals, recall_vals, _ = precision_recall_curve(y_test, y_prob)
#                 pr_auc = auc(recall_vals, precision_vals)
#                 plt.figure(figsize=(6,5))
#                 plt.plot(recall_vals, precision_vals, label=f'PR curve (AUC = {pr_auc:.3f})')
#                 plt.title(f'{model_name} Precision-Recall Curve')
#                 plt.xlabel('Recall')
#                 plt.ylabel('Precision')
#                 plt.legend(loc='upper right')
#                 plt.grid(True)
#                 plt.show()

#                 # Training plots
#                 plt.figure(figsize=(12,5))
#                 plt.subplot(1,2,1)
#                 plt.plot(history.history['accuracy'], label='Train Accuracy')
#                 plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
#                 plt.title(f'{model_name} Accuracy')
#                 plt.xlabel('Epoch')
#                 plt.ylabel('Accuracy')
#                 plt.legend()
#                 plt.grid(True)

#                 plt.subplot(1,2,2)
#                 plt.plot(history.history['loss'], label='Train Loss')
#                 plt.plot(history.history['val_loss'], label='Validation Loss')
#                 plt.title(f'{model_name} Loss')
#                 plt.xlabel('Epoch')
#                 plt.ylabel('Loss')
#                 plt.legend()
#                 plt.grid(True)

#                 plt.tight_layout()
#                 plt.show()

#         # Print mean and std for all runs
#         print(f"\n==== Summary for {model_name} (over {num_runs} runs) ====")
#         for metric in metrics_list:
#             mean_val = np.mean(metrics_list[metric])
#             std_val = np.std(metrics_list[metric])
#             print(f"{metric.capitalize()}: {mean_val:.4f} ± {std_val:.4f}")

#     # Save all results to CSV
#     df = pd.DataFrame(all_results)
#     csv_save_path = os.path.join(save_dir, f"results_all_models_{feature_model_name}.csv")
#     df.to_csv(csv_save_path, index=False)
#     print(f"\nAll run results saved to: {csv_save_path}")

# if __name__ == "__main__":
#     main()


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, GRU, SimpleRNN, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report,
    roc_curve, precision_recall_curve, auc
)
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os
import gc
from tensorflow.keras import backend as K

os.environ["CUDA_VISIBLE_DEVICES"] = ""

# --- Data loading and preprocessing ---
def prepare_data(normal_path, anomaly_path):
    X_normal = np.load(normal_path, allow_pickle=True)
    if X_normal.ndim >= 4:
        X_normal = X_normal.mean(axis=(2, 3))
    X_anomaly = np.load(anomaly_path, allow_pickle=True)
    if X_anomaly.ndim >= 4:
        X_anomaly = X_anomaly.mean(axis=(2, 3))
    X = np.concatenate([X_normal, X_anomaly])
    y = np.concatenate([np.zeros(len(X_normal)), np.ones(len(X_anomaly))])
    X = X.astype(np.float32)
    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    X, y = shuffle(X, y, random_state=42)
    return X, y

def create_sequences(X, y, seq_len=5):
    X_seq, y_seq = [], []
    for i in range(len(X) - seq_len + 1):
        X_seq.append(X[i:i + seq_len])
        y_seq.append(y[i + seq_len - 1])
    return np.array(X_seq), np.array(y_seq)

# --- Model builders with 32 and 16 units ---
def build_lstm_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, activation='tanh', recurrent_activation='sigmoid', unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = GRU(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_lstm_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, return_sequences=True, unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = GRU(32)(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_simple_rnn_model(input_shape):
    inputs = Input(shape=input_shape)
    x = SimpleRNN(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_dense_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Flatten()(inputs)
    x = Dense(32, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

# --- Main execution ---
def main():
    feature_model_name = 'convnext_tiny'  # Change if needed
    seq_len = 10
    save_dir = '/content/drive/MyDrive/AVA'  # Change to your data path
    normal_path = f"{save_dir}/normal_features_{feature_model_name}.npy"
    anomaly_path = f"{save_dir}/anomaly_features_{feature_model_name}.npy"

    X, y = prepare_data(normal_path, anomaly_path)
    X_seq, y_seq = create_sequences(X, y, seq_len=seq_len)

    model_builders = {
        "LSTM": build_lstm_model,
        "GRU": build_gru_model,
        "LSTM_GRU": build_lstm_gru_model,
        "SimpleRNN": build_simple_rnn_model,
        "Dense": build_dense_model
    }

    num_runs = 5
    all_results = []

    for model_name, builder in model_builders.items():
        print(f"\n==== Training model: {model_name} ====")

        metrics_list = {
            "accuracy": [],
            "precision": [],
            "recall": [],
            "f1": [],
            "auc": []
        }

        histories = []

        for run in range(num_runs):
            print(f" Run {run+1}/{num_runs}")

            X_train, X_test, y_train, y_test = train_test_split(
                X_seq, y_seq, test_size=0.2, stratify=y_seq, random_state=run
            )

            model = builder(X_train.shape[1:])
            es = EarlyStopping(patience=5, restore_best_weights=True, verbose=0)
            history = model.fit(
                X_train, y_train,
                validation_data=(X_test, y_test),
                epochs=50,
                batch_size=32,
                callbacks=[es],
                verbose=1
            )
            histories.append(history)

            y_prob = model.predict(X_test).ravel()
            y_pred = (y_prob > 0.5).astype(int)

            acc = accuracy_score(y_test, y_pred)
            prec = precision_score(y_test, y_pred, zero_division=0)
            rec = recall_score(y_test, y_pred, zero_division=0)
            f1 = f1_score(y_test, y_pred, zero_division=0)
            auc_score = roc_auc_score(y_test, y_prob)

            # Store metrics
            metrics_list["accuracy"].append(acc)
            metrics_list["precision"].append(prec)
            metrics_list["recall"].append(rec)
            metrics_list["f1"].append(f1)
            metrics_list["auc"].append(auc_score)

            all_results.append({
                "Model": model_name,
                "Run": run + 1,
                "Accuracy": acc,
                "Precision": prec,
                "Recall": rec,
                "F1": f1,
                "AUC": auc_score
            })

            # Clear to save memory
            K.clear_session()
            del model
            gc.collect()

        # Plot training curves for last run
        last_history = histories[-1]
        plt.figure(figsize=(12,5))
        plt.subplot(1,2,1)
        plt.plot(last_history.history['accuracy'], label='Train Accuracy')
        plt.plot(last_history.history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'{model_name} Accuracy per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.subplot(1,2,2)
        plt.plot(last_history.history['loss'], label='Train Loss')
        plt.plot(last_history.history['val_loss'], label='Validation Loss')
        plt.title(f'{model_name} Loss per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()

        # Print mean ± std summary for all runs
        print(f"\n==== Summary for {model_name} (over {num_runs} runs) ====")
        for metric in metrics_list:
            mean_val = np.mean(metrics_list[metric])
            std_val = np.std(metrics_list[metric])
            print(f"{metric.capitalize()}: {mean_val:.4f} ± {std_val:.4f}")

        # Plot boxplots of the 5 runs for metrics
        plt.figure(figsize=(10,6))
        sns.boxplot(data=[metrics_list["accuracy"], metrics_list["f1"], metrics_list["auc"]],
                    palette="Set2")
        plt.xticks([0,1,2], ['Accuracy', 'F1-score', 'AUC'])
        plt.title(f'{model_name} Metrics Boxplot over {num_runs} runs')
        plt.ylabel('Score')
        plt.grid(True)
        plt.show()

    # Save all results to CSV
    df = pd.DataFrame(all_results)
    csv_save_path = os.path.join(save_dir, f"results_all_models_{feature_model_name}.csv")
    df.to_csv(csv_save_path, index=False)
    print(f"\nAll run results saved to: {csv_save_path}")

if __name__ == "__main__":
    main()


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, GRU, SimpleRNN, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report,
    roc_curve, precision_recall_curve, auc
)
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os
import gc
from tensorflow.keras import backend as K

os.environ["CUDA_VISIBLE_DEVICES"] = ""

# --- Data loading and preprocessing ---
def prepare_data(normal_path, anomaly_path):
    X_normal = np.load(normal_path, allow_pickle=True)
    if X_normal.ndim >= 4:
        X_normal = X_normal.mean(axis=(2, 3))
    X_anomaly = np.load(anomaly_path, allow_pickle=True)
    if X_anomaly.ndim >= 4:
        X_anomaly = X_anomaly.mean(axis=(2, 3))
    X = np.concatenate([X_normal, X_anomaly])
    y = np.concatenate([np.zeros(len(X_normal)), np.ones(len(X_anomaly))])
    X = X.astype(np.float32)
    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    X, y = shuffle(X, y, random_state=42)
    return X, y

def create_sequences(X, y, seq_len=5):
    X_seq, y_seq = [], []
    for i in range(len(X) - seq_len + 1):
        X_seq.append(X[i:i + seq_len])
        y_seq.append(y[i + seq_len - 1])
    return np.array(X_seq), np.array(y_seq)

# --- Model builders with 32 and 16 units ---
def build_lstm_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, activation='tanh', recurrent_activation='sigmoid', unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = GRU(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_lstm_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, return_sequences=True, unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = GRU(32)(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_simple_rnn_model(input_shape):
    inputs = Input(shape=input_shape)
    x = SimpleRNN(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_dense_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Flatten()(inputs)
    x = Dense(32, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

# --- Main execution ---
def main():
    feature_model_name = 'repvgg_a0'  # Change if needed
    seq_len = 10
    save_dir = '/content/drive/MyDrive/AVA'  # Change to your data path
    normal_path = f"{save_dir}/normal_features_{feature_model_name}.npy"
    anomaly_path = f"{save_dir}/anomaly_features_{feature_model_name}.npy"

    X, y = prepare_data(normal_path, anomaly_path)
    X_seq, y_seq = create_sequences(X, y, seq_len=seq_len)

    model_builders = {
        "LSTM": build_lstm_model,
        "GRU": build_gru_model,
        "LSTM_GRU": build_lstm_gru_model,
        "SimpleRNN": build_simple_rnn_model,
        "Dense": build_dense_model
    }

    num_runs = 5
    all_results = []

    for model_name, builder in model_builders.items():
        print(f"\n==== Training model: {model_name} ====")

        metrics_list = {
            "accuracy": [],
            "precision": [],
            "recall": [],
            "f1": [],
            "auc": []
        }

        histories = []

        for run in range(num_runs):
            print(f" Run {run+1}/{num_runs}")

            X_train, X_test, y_train, y_test = train_test_split(
                X_seq, y_seq, test_size=0.2, stratify=y_seq, random_state=run
            )

            model = builder(X_train.shape[1:])
            es = EarlyStopping(patience=5, restore_best_weights=True, verbose=0)
            history = model.fit(
                X_train, y_train,
                validation_data=(X_test, y_test),
                epochs=50,
                batch_size=32,
                callbacks=[es],
                verbose=1
            )
            histories.append(history)

            y_prob = model.predict(X_test).ravel()
            y_pred = (y_prob > 0.5).astype(int)

            acc = accuracy_score(y_test, y_pred)
            prec = precision_score(y_test, y_pred, zero_division=0)
            rec = recall_score(y_test, y_pred, zero_division=0)
            f1 = f1_score(y_test, y_pred, zero_division=0)
            auc_score = roc_auc_score(y_test, y_prob)

            # Store metrics
            metrics_list["accuracy"].append(acc)
            metrics_list["precision"].append(prec)
            metrics_list["recall"].append(rec)
            metrics_list["f1"].append(f1)
            metrics_list["auc"].append(auc_score)

            all_results.append({
                "Model": model_name,
                "Run": run + 1,
                "Accuracy": acc,
                "Precision": prec,
                "Recall": rec,
                "F1": f1,
                "AUC": auc_score
            })

            # Clear to save memory
            K.clear_session()
            del model
            gc.collect()

        # Plot training curves for last run
        last_history = histories[-1]
        plt.figure(figsize=(12,5))
        plt.subplot(1,2,1)
        plt.plot(last_history.history['accuracy'], label='Train Accuracy')
        plt.plot(last_history.history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'{model_name} Accuracy per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.subplot(1,2,2)
        plt.plot(last_history.history['loss'], label='Train Loss')
        plt.plot(last_history.history['val_loss'], label='Validation Loss')
        plt.title(f'{model_name} Loss per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()

        # Print mean ± std summary for all runs
        print(f"\n==== Summary for {model_name} (over {num_runs} runs) ====")
        for metric in metrics_list:
            mean_val = np.mean(metrics_list[metric])
            std_val = np.std(metrics_list[metric])
            print(f"{metric.capitalize()}: {mean_val:.4f} ± {std_val:.4f}")

        # Plot boxplots of the 5 runs for metrics
        plt.figure(figsize=(10,6))
        sns.boxplot(data=[metrics_list["accuracy"], metrics_list["f1"], metrics_list["auc"]],
                    palette="Set2")
        plt.xticks([0,1,2], ['Accuracy', 'F1-score', 'AUC'])
        plt.title(f'{model_name} Metrics Boxplot over {num_runs} runs')
        plt.ylabel('Score')
        plt.grid(True)
        plt.show()

    # Save all results to CSV
    df = pd.DataFrame(all_results)
    csv_save_path = os.path.join(save_dir, f"results_all_models_{feature_model_name}.csv")
    df.to_csv(csv_save_path, index=False)
    print(f"\nAll run results saved to: {csv_save_path}")

if __name__ == "__main__":
    main()


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, GRU, SimpleRNN, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report,
    roc_curve, precision_recall_curve, auc
)
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os
import gc
from tensorflow.keras import backend as K

os.environ["CUDA_VISIBLE_DEVICES"] = ""

# --- Data loading and preprocessing ---
def prepare_data(normal_path, anomaly_path):
    X_normal = np.load(normal_path, allow_pickle=True)
    if X_normal.ndim >= 4:
        X_normal = X_normal.mean(axis=(2, 3))
    X_anomaly = np.load(anomaly_path, allow_pickle=True)
    if X_anomaly.ndim >= 4:
        X_anomaly = X_anomaly.mean(axis=(2, 3))
    X = np.concatenate([X_normal, X_anomaly])
    y = np.concatenate([np.zeros(len(X_normal)), np.ones(len(X_anomaly))])
    X = X.astype(np.float32)
    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    X, y = shuffle(X, y, random_state=42)
    return X, y

def create_sequences(X, y, seq_len=5):
    X_seq, y_seq = [], []
    for i in range(len(X) - seq_len + 1):
        X_seq.append(X[i:i + seq_len])
        y_seq.append(y[i + seq_len - 1])
    return np.array(X_seq), np.array(y_seq)

# --- Model builders with 32 and 16 units ---
def build_lstm_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, activation='tanh', recurrent_activation='sigmoid', unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = GRU(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_lstm_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, return_sequences=True, unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = GRU(32)(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_simple_rnn_model(input_shape):
    inputs = Input(shape=input_shape)
    x = SimpleRNN(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_dense_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Flatten()(inputs)
    x = Dense(32, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

# --- Main execution ---
def main():
    feature_model_name = 'mobileone_s0'  # Change if needed
    seq_len = 10
    save_dir = '/content/drive/MyDrive/AVA'  # Change to your data path
    normal_path = f"{save_dir}/normal_features_{feature_model_name}.npy"
    anomaly_path = f"{save_dir}/anomaly_features_{feature_model_name}.npy"

    X, y = prepare_data(normal_path, anomaly_path)
    X_seq, y_seq = create_sequences(X, y, seq_len=seq_len)

    model_builders = {
        "LSTM": build_lstm_model,
        "GRU": build_gru_model,
        "LSTM_GRU": build_lstm_gru_model,
        "SimpleRNN": build_simple_rnn_model,
        "Dense": build_dense_model
    }

    num_runs = 5
    all_results = []

    for model_name, builder in model_builders.items():
        print(f"\n==== Training model: {model_name} ====")

        metrics_list = {
            "accuracy": [],
            "precision": [],
            "recall": [],
            "f1": [],
            "auc": []
        }

        histories = []

        for run in range(num_runs):
            print(f" Run {run+1}/{num_runs}")

            X_train, X_test, y_train, y_test = train_test_split(
                X_seq, y_seq, test_size=0.2, stratify=y_seq, random_state=run
            )

            model = builder(X_train.shape[1:])
            es = EarlyStopping(patience=5, restore_best_weights=True, verbose=0)
            history = model.fit(
                X_train, y_train,
                validation_data=(X_test, y_test),
                epochs=50,
                batch_size=32,
                callbacks=[es],
                verbose=1
            )
            histories.append(history)

            y_prob = model.predict(X_test).ravel()
            y_pred = (y_prob > 0.5).astype(int)

            acc = accuracy_score(y_test, y_pred)
            prec = precision_score(y_test, y_pred, zero_division=0)
            rec = recall_score(y_test, y_pred, zero_division=0)
            f1 = f1_score(y_test, y_pred, zero_division=0)
            auc_score = roc_auc_score(y_test, y_prob)

            # Store metrics
            metrics_list["accuracy"].append(acc)
            metrics_list["precision"].append(prec)
            metrics_list["recall"].append(rec)
            metrics_list["f1"].append(f1)
            metrics_list["auc"].append(auc_score)

            all_results.append({
                "Model": model_name,
                "Run": run + 1,
                "Accuracy": acc,
                "Precision": prec,
                "Recall": rec,
                "F1": f1,
                "AUC": auc_score
            })

            # Clear to save memory
            K.clear_session()
            del model
            gc.collect()

        # Plot training curves for last run
        last_history = histories[-1]
        plt.figure(figsize=(12,5))
        plt.subplot(1,2,1)
        plt.plot(last_history.history['accuracy'], label='Train Accuracy')
        plt.plot(last_history.history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'{model_name} Accuracy per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.subplot(1,2,2)
        plt.plot(last_history.history['loss'], label='Train Loss')
        plt.plot(last_history.history['val_loss'], label='Validation Loss')
        plt.title(f'{model_name} Loss per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()

        # Print mean ± std summary for all runs
        print(f"\n==== Summary for {model_name} (over {num_runs} runs) ====")
        for metric in metrics_list:
            mean_val = np.mean(metrics_list[metric])
            std_val = np.std(metrics_list[metric])
            print(f"{metric.capitalize()}: {mean_val:.4f} ± {std_val:.4f}")

        # Plot boxplots of the 5 runs for metrics
        plt.figure(figsize=(10,6))
        sns.boxplot(data=[metrics_list["accuracy"], metrics_list["f1"], metrics_list["auc"]],
                    palette="Set2")
        plt.xticks([0,1,2], ['Accuracy', 'F1-score', 'AUC'])
        plt.title(f'{model_name} Metrics Boxplot over {num_runs} runs')
        plt.ylabel('Score')
        plt.grid(True)
        plt.show()

    # Save all results to CSV
    df = pd.DataFrame(all_results)
    csv_save_path = os.path.join(save_dir, f"results_all_models_{feature_model_name}.csv")
    df.to_csv(csv_save_path, index=False)
    print(f"\nAll run results saved to: {csv_save_path}")

if __name__ == "__main__":
    main()


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, GRU, SimpleRNN, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report,
    roc_curve, precision_recall_curve, auc
)
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os
import gc
from tensorflow.keras import backend as K

os.environ["CUDA_VISIBLE_DEVICES"] = ""

# --- Data loading and preprocessing ---
def prepare_data(normal_path, anomaly_path):
    X_normal = np.load(normal_path, allow_pickle=True)
    if X_normal.ndim >= 4:
        X_normal = X_normal.mean(axis=(2, 3))
    X_anomaly = np.load(anomaly_path, allow_pickle=True)
    if X_anomaly.ndim >= 4:
        X_anomaly = X_anomaly.mean(axis=(2, 3))
    X = np.concatenate([X_normal, X_anomaly])
    y = np.concatenate([np.zeros(len(X_normal)), np.ones(len(X_anomaly))])
    X = X.astype(np.float32)
    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    X, y = shuffle(X, y, random_state=42)
    return X, y

def create_sequences(X, y, seq_len=5):
    X_seq, y_seq = [], []
    for i in range(len(X) - seq_len + 1):
        X_seq.append(X[i:i + seq_len])
        y_seq.append(y[i + seq_len - 1])
    return np.array(X_seq), np.array(y_seq)

# --- Model builders with 32 and 16 units ---
def build_lstm_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, activation='tanh', recurrent_activation='sigmoid', unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = GRU(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_lstm_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, return_sequences=True, unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = GRU(32)(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_simple_rnn_model(input_shape):
    inputs = Input(shape=input_shape)
    x = SimpleRNN(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_dense_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Flatten()(inputs)
    x = Dense(32, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

# --- Main execution ---
def main():
    feature_model_name = 'poolformer_s12'  # Change if needed
    seq_len = 10
    save_dir = '/content/drive/MyDrive/AVA'  # Change to your data path
    normal_path = f"{save_dir}/normal_features_{feature_model_name}.npy"
    anomaly_path = f"{save_dir}/anomaly_features_{feature_model_name}.npy"

    X, y = prepare_data(normal_path, anomaly_path)
    X_seq, y_seq = create_sequences(X, y, seq_len=seq_len)

    model_builders = {
        "LSTM": build_lstm_model,
        "GRU": build_gru_model,
        "LSTM_GRU": build_lstm_gru_model,
        "SimpleRNN": build_simple_rnn_model,
        "Dense": build_dense_model
    }

    num_runs = 5
    all_results = []

    for model_name, builder in model_builders.items():
        print(f"\n==== Training model: {model_name} ====")

        metrics_list = {
            "accuracy": [],
            "precision": [],
            "recall": [],
            "f1": [],
            "auc": []
        }

        histories = []

        for run in range(num_runs):
            print(f" Run {run+1}/{num_runs}")

            X_train, X_test, y_train, y_test = train_test_split(
                X_seq, y_seq, test_size=0.2, stratify=y_seq, random_state=run
            )

            model = builder(X_train.shape[1:])
            es = EarlyStopping(patience=5, restore_best_weights=True, verbose=0)
            history = model.fit(
                X_train, y_train,
                validation_data=(X_test, y_test),
                epochs=50,
                batch_size=32,
                callbacks=[es],
                verbose=1
            )
            histories.append(history)

            y_prob = model.predict(X_test).ravel()
            y_pred = (y_prob > 0.5).astype(int)

            acc = accuracy_score(y_test, y_pred)
            prec = precision_score(y_test, y_pred, zero_division=0)
            rec = recall_score(y_test, y_pred, zero_division=0)
            f1 = f1_score(y_test, y_pred, zero_division=0)
            auc_score = roc_auc_score(y_test, y_prob)

            # Store metrics
            metrics_list["accuracy"].append(acc)
            metrics_list["precision"].append(prec)
            metrics_list["recall"].append(rec)
            metrics_list["f1"].append(f1)
            metrics_list["auc"].append(auc_score)

            all_results.append({
                "Model": model_name,
                "Run": run + 1,
                "Accuracy": acc,
                "Precision": prec,
                "Recall": rec,
                "F1": f1,
                "AUC": auc_score
            })

            # Clear to save memory
            K.clear_session()
            del model
            gc.collect()

        # Plot training curves for last run
        last_history = histories[-1]
        plt.figure(figsize=(12,5))
        plt.subplot(1,2,1)
        plt.plot(last_history.history['accuracy'], label='Train Accuracy')
        plt.plot(last_history.history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'{model_name} Accuracy per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.subplot(1,2,2)
        plt.plot(last_history.history['loss'], label='Train Loss')
        plt.plot(last_history.history['val_loss'], label='Validation Loss')
        plt.title(f'{model_name} Loss per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()

        # Print mean ± std summary for all runs
        print(f"\n==== Summary for {model_name} (over {num_runs} runs) ====")
        for metric in metrics_list:
            mean_val = np.mean(metrics_list[metric])
            std_val = np.std(metrics_list[metric])
            print(f"{metric.capitalize()}: {mean_val:.4f} ± {std_val:.4f}")

        # Plot boxplots of the 5 runs for metrics
        plt.figure(figsize=(10,6))
        sns.boxplot(data=[metrics_list["accuracy"], metrics_list["f1"], metrics_list["auc"]],
                    palette="Set2")
        plt.xticks([0,1,2], ['Accuracy', 'F1-score', 'AUC'])
        plt.title(f'{model_name} Metrics Boxplot over {num_runs} runs')
        plt.ylabel('Score')
        plt.grid(True)
        plt.show()

    # Save all results to CSV
    df = pd.DataFrame(all_results)
    csv_save_path = os.path.join(save_dir, f"results_all_models_{feature_model_name}.csv")
    df.to_csv(csv_save_path, index=False)
    print(f"\nAll run results saved to: {csv_save_path}")

if __name__ == "__main__":
    main()


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, GRU, SimpleRNN, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report,
    roc_curve, precision_recall_curve, auc
)
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os
import gc
from tensorflow.keras import backend as K

os.environ["CUDA_VISIBLE_DEVICES"] = ""

# --- Data loading and preprocessing ---
def prepare_data(normal_path, anomaly_path):
    X_normal = np.load(normal_path, allow_pickle=True)
    if X_normal.ndim >= 4:
        X_normal = X_normal.mean(axis=(2, 3))
    X_anomaly = np.load(anomaly_path, allow_pickle=True)
    if X_anomaly.ndim >= 4:
        X_anomaly = X_anomaly.mean(axis=(2, 3))
    X = np.concatenate([X_normal, X_anomaly])
    y = np.concatenate([np.zeros(len(X_normal)), np.ones(len(X_anomaly))])
    X = X.astype(np.float32)
    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    X, y = shuffle(X, y, random_state=42)
    return X, y

def create_sequences(X, y, seq_len=5):
    X_seq, y_seq = [], []
    for i in range(len(X) - seq_len + 1):
        X_seq.append(X[i:i + seq_len])
        y_seq.append(y[i + seq_len - 1])
    return np.array(X_seq), np.array(y_seq)

# --- Model builders with 32 and 16 units ---
def build_lstm_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, activation='tanh', recurrent_activation='sigmoid', unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = GRU(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_lstm_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, return_sequences=True, unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = GRU(32)(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_simple_rnn_model(input_shape):
    inputs = Input(shape=input_shape)
    x = SimpleRNN(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_dense_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Flatten()(inputs)
    x = Dense(32, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

# --- Main execution ---
def main():
    feature_model_name = 'maxvit_tiny_tf_224'  # Change if needed
    seq_len = 10
    save_dir = '/content/drive/MyDrive/AVA'  # Change to your data path
    normal_path = f"{save_dir}/normal_features_{feature_model_name}.npy"
    anomaly_path = f"{save_dir}/anomaly_features_{feature_model_name}.npy"

    X, y = prepare_data(normal_path, anomaly_path)
    X_seq, y_seq = create_sequences(X, y, seq_len=seq_len)

    model_builders = {
        "LSTM": build_lstm_model,
        "GRU": build_gru_model,
        "LSTM_GRU": build_lstm_gru_model,
        "SimpleRNN": build_simple_rnn_model,
        "Dense": build_dense_model
    }

    num_runs = 5
    all_results = []

    for model_name, builder in model_builders.items():
        print(f"\n==== Training model: {model_name} ====")

        metrics_list = {
            "accuracy": [],
            "precision": [],
            "recall": [],
            "f1": [],
            "auc": []
        }

        histories = []

        for run in range(num_runs):
            print(f" Run {run+1}/{num_runs}")

            X_train, X_test, y_train, y_test = train_test_split(
                X_seq, y_seq, test_size=0.2, stratify=y_seq, random_state=run
            )

            model = builder(X_train.shape[1:])
            es = EarlyStopping(patience=5, restore_best_weights=True, verbose=0)
            history = model.fit(
                X_train, y_train,
                validation_data=(X_test, y_test),
                epochs=50,
                batch_size=32,
                callbacks=[es],
                verbose=1
            )
            histories.append(history)

            y_prob = model.predict(X_test).ravel()
            y_pred = (y_prob > 0.5).astype(int)

            acc = accuracy_score(y_test, y_pred)
            prec = precision_score(y_test, y_pred, zero_division=0)
            rec = recall_score(y_test, y_pred, zero_division=0)
            f1 = f1_score(y_test, y_pred, zero_division=0)
            auc_score = roc_auc_score(y_test, y_prob)

            # Store metrics
            metrics_list["accuracy"].append(acc)
            metrics_list["precision"].append(prec)
            metrics_list["recall"].append(rec)
            metrics_list["f1"].append(f1)
            metrics_list["auc"].append(auc_score)

            all_results.append({
                "Model": model_name,
                "Run": run + 1,
                "Accuracy": acc,
                "Precision": prec,
                "Recall": rec,
                "F1": f1,
                "AUC": auc_score
            })

            # Clear to save memory
            K.clear_session()
            del model
            gc.collect()

        # Plot training curves for last run
        last_history = histories[-1]
        plt.figure(figsize=(12,5))
        plt.subplot(1,2,1)
        plt.plot(last_history.history['accuracy'], label='Train Accuracy')
        plt.plot(last_history.history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'{model_name} Accuracy per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.subplot(1,2,2)
        plt.plot(last_history.history['loss'], label='Train Loss')
        plt.plot(last_history.history['val_loss'], label='Validation Loss')
        plt.title(f'{model_name} Loss per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()

        # Print mean ± std summary for all runs
        print(f"\n==== Summary for {model_name} (over {num_runs} runs) ====")
        for metric in metrics_list:
            mean_val = np.mean(metrics_list[metric])
            std_val = np.std(metrics_list[metric])
            print(f"{metric.capitalize()}: {mean_val:.4f} ± {std_val:.4f}")

        # Plot boxplots of the 5 runs for metrics
        plt.figure(figsize=(10,6))
        sns.boxplot(data=[metrics_list["accuracy"], metrics_list["f1"], metrics_list["auc"]],
                    palette="Set2")
        plt.xticks([0,1,2], ['Accuracy', 'F1-score', 'AUC'])
        plt.title(f'{model_name} Metrics Boxplot over {num_runs} runs')
        plt.ylabel('Score')
        plt.grid(True)
        plt.show()

    # Save all results to CSV
    df = pd.DataFrame(all_results)
    csv_save_path = os.path.join(save_dir, f"results_all_models_{feature_model_name}.csv")
    df.to_csv(csv_save_path, index=False)
    print(f"\nAll run results saved to: {csv_save_path}")

if __name__ == "__main__":
    main()


In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, GRU, SimpleRNN, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, confusion_matrix, classification_report,
    roc_curve, precision_recall_curve, auc
)
from sklearn.utils import shuffle
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os
import gc
from tensorflow.keras import backend as K

os.environ["CUDA_VISIBLE_DEVICES"] = ""

# --- Data loading and preprocessing ---
def prepare_data(normal_path, anomaly_path):
    X_normal = np.load(normal_path, allow_pickle=True)
    if X_normal.ndim >= 4:
        X_normal = X_normal.mean(axis=(2, 3))
    X_anomaly = np.load(anomaly_path, allow_pickle=True)
    if X_anomaly.ndim >= 4:
        X_anomaly = X_anomaly.mean(axis=(2, 3))
    X = np.concatenate([X_normal, X_anomaly])
    y = np.concatenate([np.zeros(len(X_normal)), np.ones(len(X_anomaly))])
    X = X.astype(np.float32)
    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    X, y = shuffle(X, y, random_state=42)
    return X, y

def create_sequences(X, y, seq_len=5):
    X_seq, y_seq = [], []
    for i in range(len(X) - seq_len + 1):
        X_seq.append(X[i:i + seq_len])
        y_seq.append(y[i + seq_len - 1])
    return np.array(X_seq), np.array(y_seq)

# --- Model builders with 32 and 16 units ---
def build_lstm_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, activation='tanh', recurrent_activation='sigmoid', unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = GRU(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_lstm_gru_model(input_shape):
    inputs = Input(shape=input_shape)
    x = LSTM(32, return_sequences=True, unroll=True)(inputs)
    x = Dropout(0.3)(x)
    x = GRU(32)(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_simple_rnn_model(input_shape):
    inputs = Input(shape=input_shape)
    x = SimpleRNN(32)(inputs)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

def build_dense_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Flatten()(inputs)
    x = Dense(32, activation='relu')(x)
    x = Dropout(0.3)(x)
    x = Dense(16, activation='relu')(x)
    x = Dropout(0.3)(x)
    outputs = Dense(1, activation='sigmoid')(x)
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(1e-4), loss='binary_crossentropy', metrics=['accuracy'])
    return model

# --- Main execution ---
def main():
    feature_model_name = 'coatnet_0_rw_224'  # Change if needed
    seq_len = 10
    save_dir = '/content/drive/MyDrive/AVA'  # Change to your data path
    normal_path = f"{save_dir}/normal_features_{feature_model_name}.npy"
    anomaly_path = f"{save_dir}/anomaly_features_{feature_model_name}.npy"

    X, y = prepare_data(normal_path, anomaly_path)
    X_seq, y_seq = create_sequences(X, y, seq_len=seq_len)

    model_builders = {
        "LSTM": build_lstm_model,
        "GRU": build_gru_model,
        "LSTM_GRU": build_lstm_gru_model,
        "SimpleRNN": build_simple_rnn_model,
        "Dense": build_dense_model
    }

    num_runs = 5
    all_results = []

    for model_name, builder in model_builders.items():
        print(f"\n==== Training model: {model_name} ====")

        metrics_list = {
            "accuracy": [],
            "precision": [],
            "recall": [],
            "f1": [],
            "auc": []
        }

        histories = []

        for run in range(num_runs):
            print(f" Run {run+1}/{num_runs}")

            X_train, X_test, y_train, y_test = train_test_split(
                X_seq, y_seq, test_size=0.2, stratify=y_seq, random_state=run
            )

            model = builder(X_train.shape[1:])
            es = EarlyStopping(patience=5, restore_best_weights=True, verbose=0)
            history = model.fit(
                X_train, y_train,
                validation_data=(X_test, y_test),
                epochs=50,
                batch_size=32,
                callbacks=[es],
                verbose=1
            )
            histories.append(history)

            y_prob = model.predict(X_test).ravel()
            y_pred = (y_prob > 0.5).astype(int)

            acc = accuracy_score(y_test, y_pred)
            prec = precision_score(y_test, y_pred, zero_division=0)
            rec = recall_score(y_test, y_pred, zero_division=0)
            f1 = f1_score(y_test, y_pred, zero_division=0)
            auc_score = roc_auc_score(y_test, y_prob)

            # Store metrics
            metrics_list["accuracy"].append(acc)
            metrics_list["precision"].append(prec)
            metrics_list["recall"].append(rec)
            metrics_list["f1"].append(f1)
            metrics_list["auc"].append(auc_score)

            all_results.append({
                "Model": model_name,
                "Run": run + 1,
                "Accuracy": acc,
                "Precision": prec,
                "Recall": rec,
                "F1": f1,
                "AUC": auc_score
            })

            # Clear to save memory
            K.clear_session()
            del model
            gc.collect()

        # Plot training curves for last run
        last_history = histories[-1]
        plt.figure(figsize=(12,5))
        plt.subplot(1,2,1)
        plt.plot(last_history.history['accuracy'], label='Train Accuracy')
        plt.plot(last_history.history['val_accuracy'], label='Validation Accuracy')
        plt.title(f'{model_name} Accuracy per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.subplot(1,2,2)
        plt.plot(last_history.history['loss'], label='Train Loss')
        plt.plot(last_history.history['val_loss'], label='Validation Loss')
        plt.title(f'{model_name} Loss per Epoch')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()

        # Print mean ± std summary for all runs
        print(f"\n==== Summary for {model_name} (over {num_runs} runs) ====")
        for metric in metrics_list:
            mean_val = np.mean(metrics_list[metric])
            std_val = np.std(metrics_list[metric])
            print(f"{metric.capitalize()}: {mean_val:.4f} ± {std_val:.4f}")

        # Plot boxplots of the 5 runs for metrics
        plt.figure(figsize=(10,6))
        sns.boxplot(data=[metrics_list["accuracy"], metrics_list["f1"], metrics_list["auc"]],
                    palette="Set2")
        plt.xticks([0,1,2], ['Accuracy', 'F1-score', 'AUC'])
        plt.title(f'{model_name} Metrics Boxplot over {num_runs} runs')
        plt.ylabel('Score')
        plt.grid(True)
        plt.show()

    # Save all results to CSV
    df = pd.DataFrame(all_results)
    csv_save_path = os.path.join(save_dir, f"results_all_models_{feature_model_name}.csv")
    df.to_csv(csv_save_path, index=False)
    print(f"\nAll run results saved to: {csv_save_path}")

if __name__ == "__main__":
    main()
