In [65]:
from yolo_threat import YoloThreat
import torch
import torch.optim as optim
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import matplotlib.pyplot as plt
from tune import Tuner
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score
from IPython.display import display, clear_output
%matplotlib inline

In [59]:
def fgsm_attack(model, images, labels, epsilon):
    images.requires_grad = True
    outputs = model.forward(images)
    loss = torch.nn.BCEWithLogitsLoss()(outputs.squeeze(-1), labels.float())
    model.zero_grad()
    loss.backward()
    perturbation = epsilon * images.grad.sign()
    adv_images = torch.clamp(images + perturbation, 0, 1).detach()
    return adv_images

def pgd_attack(model, images, labels, epsilon, alpha, iters):
    adv_images = images.clone().detach()
    adv_images.requires_grad = True
    for _ in range(iters):
        outputs = model.forward(adv_images)
        loss = torch.nn.BCEWithLogitsLoss()(outputs.squeeze(-1), labels.float())
        model.zero_grad()
        loss.backward()
        adv_images = adv_images + alpha * adv_images.grad.sign()
        perturbation = torch.clamp(adv_images - images, -epsilon, epsilon)
        adv_images = torch.clamp(images + perturbation, 0, 1).detach_()
        adv_images.requires_grad = True
    return adv_images


def add_gaussian_noise(images, noise_level=0.05):
    noisy_images = images + noise_level * torch.randn_like(images)
    return torch.clamp(noisy_images, 0, 1)

In [60]:
class AdversarialTrainer:
    def __init__(self, tuner, Xtrain, ytrain, epsilon=0.1, alpha=0.01, attack_iters=5, noise_level=0.05):
        self.tuner = tuner
        self.Xtrain = Xtrain
        self.ytrain = ytrain
        self.epsilon = epsilon
        self.alpha = alpha
        self.attack_iters = attack_iters
        self.noise_level = noise_level
        self.train_losses = []
        self.val_losses = []

    def adversarial_training(self, batch_size=32):
        train_dataset = TensorDataset(self.Xtrain, self.ytrain)
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        adv_X, adv_y = [], []
        self.tuner.model.train()  # Ensure the model is in training mode

        for batch_idx, (X_batch, y_batch) in enumerate(train_loader):
            print(f"Generating adversarial examples for batch {batch_idx + 1}/{len(train_loader)}")

            X_batch, y_batch = X_batch.detach(), y_batch.detach()

            # Clean examples
            adv_X.append(X_batch)
            adv_y.append(y_batch)

            # FGSM examples
            fgsm_X = fgsm_attack(self.tuner.model, X_batch, y_batch, self.epsilon)
            adv_X.append(fgsm_X.detach())  # Detach after creating adversarial example
            adv_y.append(y_batch)

            # PGD examples
            pgd_X = pgd_attack(self.tuner.model, X_batch, y_batch, self.epsilon, self.alpha, self.attack_iters)
            adv_X.append(pgd_X.detach())
            adv_y.append(y_batch)

            # Gaussian Noise examples
            noisy_X = add_gaussian_noise(X_batch, self.noise_level)
            adv_X.append(noisy_X.detach())
            adv_y.append(y_batch)

        # Combine all adversarial examples
        adv_X = torch.cat(adv_X)
        adv_y = torch.cat(adv_y)
        print(f"Adversarial examples generated. Total examples: {adv_X.shape[0]}")
        return adv_X, adv_y

    def fine_tune_with_adversarial(self, optimizer=optim.Adam, epochs=50, batch_size=32, lr=0.001, **kwargs):
        # Generate adversarial examples
        print("Starting adversarial example generation...")
        adv_X, adv_y = self.adversarial_training(batch_size=batch_size)

        # Fine-tune the model with adversarial examples
        print("Starting adversarial fine-tuning...")
        optimizer = optimizer(self.tuner.model.parameters(), lr=lr)

        train_dataset = TensorDataset(adv_X, adv_y)
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        criterion = self.tuner.get_loss(kwargs.get("lam", 0.01))

        for epoch in range(epochs):
            print(f"Epoch {epoch + 1}/{epochs}")
            self.tuner.model.train()

            epoch_loss = 0
            for batch_idx, (X_batch, y_batch) in enumerate(train_loader):
                optimizer.zero_grad()
                X_batch, y_batch = X_batch.detach(), y_batch.detach()  # Detach to prevent graph accumulation
                y_pred = self.tuner.model.forward(X_batch).squeeze(-1)
                loss = criterion(y_pred, y_batch)
                loss.backward()  # Backward pass
                optimizer.step()

                epoch_loss += loss.item()
                print(f"Batch {batch_idx + 1}/{len(train_loader)} | Loss: {loss.item():.4f}")

            avg_epoch_loss = epoch_loss / len(train_loader)
            self.train_losses.append(avg_epoch_loss)
            print(f"Epoch {epoch + 1} completed | Average Loss: {avg_epoch_loss:.4f}")

            # Generate validation loss
            self.tuner.model.eval()
            val_loss = 0
            with torch.no_grad():
                val_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
                for X_batch, y_batch in val_loader:
                    y_pred = self.tuner.model.forward(X_batch).squeeze(-1)
                    val_loss += criterion(y_pred, y_batch).item()
            avg_val_loss = val_loss / len(val_loader)
            self.val_losses.append(avg_val_loss)
            print(f"Validation Loss: {avg_val_loss:.4f}")

            # Dynamic Plotting
            clear_output(wait=True)
            self.plot_error_trace(dynamic=True)

        print("Adversarial fine-tuning completed.")
        self.plot_error_trace(dynamic=False)
        return self.tuner.model

    def plot_error_trace(self, dynamic=False):
        plt.figure(figsize=(10, 6))
        plt.plot(self.train_losses, label="Training Loss")
        plt.plot(self.val_losses, label="Validation Loss")
        plt.xlabel("Epochs")
        plt.ylabel("Loss")
        plt.title("Training and Validation Loss")
        plt.legend()
        plt.grid(True)
        if dynamic:
            display(plt.gcf())  # Display dynamically
        else:
            plt.show()



In [62]:
# Evaluate Model with Metrics
def evaluate_model_with_metrics(model, X, y, epsilon, batch_size=32):
    model.eval()
    data_loader = DataLoader(TensorDataset(X, y), batch_size=batch_size, shuffle=False)

    all_preds_clean = []
    all_labels_clean = []
    all_preds_adv = []
    all_labels_adv = []

    with torch.no_grad():
        for X_batch, y_batch in data_loader:
            # Clean predictions
            outputs = model.forward(X_batch)
            preds_clean = torch.round(torch.sigmoid(outputs.squeeze(-1))).cpu().numpy()
            all_preds_clean.extend(preds_clean)
            all_labels_clean.extend(y_batch.cpu().numpy())

            # Adversarial predictions (FGSM)
            X_adv = fgsm_attack(model, X_batch, y_batch, epsilon)
            outputs_adv = model.forward(X_adv)
            preds_adv = torch.round(torch.sigmoid(outputs_adv.squeeze(-1))).cpu().numpy()
            all_preds_adv.extend(preds_adv)
            all_labels_adv.extend(y_batch.cpu().numpy())

    # Convert predictions and labels to NumPy arrays
    all_preds_clean = np.array(all_preds_clean)
    all_labels_clean = np.array(all_labels_clean)
    all_preds_adv = np.array(all_preds_adv)
    all_labels_adv = np.array(all_labels_adv)

    # Compute metrics for clean predictions
    clean_precision = precision_score(all_labels_clean, all_preds_clean)
    clean_recall = recall_score(all_labels_clean, all_preds_clean)
    clean_f1 = f1_score(all_labels_clean, all_preds_clean)
    clean_acc = np.mean(all_preds_clean == all_labels_clean) * 100

    # Compute metrics for adversarial predictions
    adv_precision = precision_score(all_labels_adv, all_preds_adv)
    adv_recall = recall_score(all_labels_adv, all_preds_adv)
    adv_f1 = f1_score(all_labels_adv, all_preds_adv)
    adv_acc = np.mean(all_preds_adv == all_labels_adv) * 100

    print("Clean Data Metrics:")
    print(f"Accuracy: {clean_acc:.2f}%, Precision: {clean_precision:.2f}, Recall: {clean_recall:.2f}, F1 Score: {clean_f1:.2f}")
    print("Adversarial Data Metrics (FGSM):")
    print(f"Accuracy: {adv_acc:.2f}%, Precision: {adv_precision:.2f}, Recall: {adv_recall:.2f}, F1 Score: {adv_f1:.2f}")

    return {
        "clean": {
            "accuracy": clean_acc,
            "precision": clean_precision,
            "recall": clean_recall,
            "f1_score": clean_f1
        },
        "adversarial": {
            "accuracy": adv_acc,
            "precision": adv_precision,
            "recall": adv_recall,
            "f1_score": adv_f1
        }
    }

In [63]:

model = YoloThreat.load_new_model()

Using cache found in /Users/harshitsingh/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2024-11-22 Python-3.12.2 torch-2.2.2 CPU

Fusing layers... 
YOLOv5n6 summary: 280 layers, 3239884 parameters, 0 gradients
Adding AutoShape... 


In [64]:
Xtrain = torch.load('../data/danger/raw/train.pt')
ytrain = torch.load('../data/danger/raw/train_labels.pt')
Xtest = torch.load('../data/danger/raw/test.pt')
ytest = torch.load('../data/danger/raw/test_labels.pt')
Xtrain.shape, ytrain.shape, Xtest.shape, ytest.shape

(torch.Size([8736, 3, 128, 128]),
 torch.Size([8736]),
 torch.Size([2185, 3, 128, 128]),
 torch.Size([2185]))

In [44]:
# Xtrain = torch.nn.functional.interpolate(Xtrain, size=(128, 128), mode='bilinear', align_corners=False)
# Xtest = torch.nn.functional.interpolate(Xtest, size=(128, 128), mode='bilinear', align_corners=False)

# print(f"Xtrain shape: {Xtrain.shape}, ytrain shape: {ytrain.shape}")
# print(f"Xtest shape: {Xtest.shape}, ytest shape: {ytest.shape}")

Xtrain shape: torch.Size([8736, 3, 128, 128]), ytrain shape: torch.Size([8736])
Xtest shape: torch.Size([2185, 3, 128, 128]), ytest shape: torch.Size([2185])


In [67]:
tuner = Tuner(model, data_dir=(Xtrain, ytrain))
# Initialize AdversarialTrainer
adversarial_trainer = AdversarialTrainer(
    tuner=tuner,
    Xtrain=Xtrain,
    ytrain=ytrain,
    epsilon=0.1,
    alpha=0.01,
    attack_iters=3,
    noise_level=0.02
)

tuned_model = adversarial_trainer.fine_tune_with_adversarial(
    optimizer=optim.Adam,
    epochs=1,
    batch_size=512,
    lr=1,
    freeze_limit=20,
    warmup=0.1,
    lam=0.01
)

Starting adversarial example generation...
Generating adversarial examples for batch 1/18


In [57]:
epsilon = 0.1
evaluate_model_with_metrics(tuned_model, Xtest, ytest, epsilon)

NameError: name 'tuned_model' is not defined