In [13]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

# Model Set Up:

In [15]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [16]:
class DenoiseBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)

    def forward(self, x):
        return x - self.conv2(self.relu(self.conv1(x)))
        
class CORblock_S_Robust(nn.Module):
    def __init__(self, in_channels, out_channels, scale=4, steps=2):
        super().__init__()
        self.steps = steps

        self.conv_input = nn.Conv2d(in_channels, out_channels, 1, bias=False)
        self.skip = nn.Conv2d(out_channels, out_channels, 1, stride=2, bias=False)
        self.norm_skip = nn.BatchNorm2d(out_channels)

        self.conv1 = nn.Conv2d(out_channels, out_channels * scale, 1, bias=False)
        self.conv2 = nn.Conv2d(out_channels * scale, out_channels * scale, 3, stride=2, padding=1, bias=False)
        self.conv3 = nn.Conv2d(out_channels * scale, out_channels, 1, bias=False)

        self.denoise = DenoiseBlock(out_channels * scale)
        self.skip_gate = nn.Conv2d(out_channels, out_channels, 1)
        self.nonlin = nn.ReLU(inplace=True)

        self.norm1 = nn.ModuleList([nn.BatchNorm2d(out_channels * scale) for _ in range(steps)])
        self.norm2 = nn.ModuleList([nn.BatchNorm2d(out_channels * scale) for _ in range(steps)])
        self.norm3 = nn.ModuleList([nn.BatchNorm2d(out_channels) for _ in range(steps)])

    def forward(self, x):
        x = self.conv_input(x)

        for step in range(self.steps):
            residual = self.skip(x)
            residual = self.norm_skip(residual)

            out = self.conv1(x)
            out = self.norm1[step](out)
            out = self.nonlin(out)

            out = self.conv2(out)
            out = self.norm2[step](out)
            out = self.nonlin(out)

            out = self.denoise(out)

            out = self.conv3(out)
            out = self.norm3[step](out)

            gate = torch.sigmoid(self.skip_gate(out))
            x = self.nonlin(gate * residual + (1 - gate) * out)

        return x

In [17]:
class CORNet_S_Robust(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.V1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Identity()
        )

        self.V2 = CORblock_S_Robust(64, 128, steps=2)
        self.V4 = CORblock_S_Robust(128, 256, steps=4)
        self.IT = CORblock_S_Robust(256, 512, steps=2)

        self.decoder = nn.Sequential(
            nn.AdaptiveAvgPool2d(output_size=1),
            nn.Flatten(),
            nn.Linear(512, num_classes),
            nn.Identity()
        )

    def forward(self, x):
        x = self.V1(x)
        x = self.V2(x)
        x = self.V4(x)
        x = self.IT(x)
        x = self.decoder(x)
        return x

In [18]:
model = CORNet_S_Robust(num_classes=10).to(device)
print(model)

CORNet_S_Robust(
  (V1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU(inplace=True)
    (7): Identity()
  )
  (V2): CORblock_S_Robust(
    (conv_input): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (skip): Conv2d(128, 128, kernel_size=(1, 1), stride=(2, 2), bias=False)
    (norm_skip): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (conv1): Conv2d(128, 512, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=Fal

# Dataset

In [19]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),              # Resize to what CORNet-S expects
    transforms.Grayscale(num_output_channels=3),# Convert 1-channel MNIST to 3
    transforms.ToTensor()
])

train_set = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_set  = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_set, batch_size=32, shuffle=True)
test_loader = DataLoader(test_set, batch_size=1000, shuffle=False)

# Training

In [20]:
from tqdm import tqdm

def train(model, device, train_loader, optimizer, epoch, criterion):
    model.train()
    pbar = tqdm(train_loader, desc=f"Train Epoch {epoch}")
    running_loss, correct, total = 0, 0, 0

    for data, target in pbar:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * data.size(0)
        pred = output.argmax(dim=1)
        correct += pred.eq(target).sum().item()
        total += target.size(0)

        pbar.set_postfix(loss=running_loss/total, acc=correct/total)

    return running_loss / total, correct / total


def test(model, device, test_loader, criterion):
    model.eval()
    pbar = tqdm(test_loader, desc="Testing")
    running_loss, correct, total = 0, 0, 0

    with torch.no_grad():
        for data, target in pbar:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)

            running_loss += loss.item() * data.size(0)
            pred = output.argmax(dim=1)
            correct += pred.eq(target).sum().item()
            total += target.size(0)

            pbar.set_postfix(loss=running_loss/total, acc=correct/total)

    return running_loss / total, correct / total

In [None]:
criterion = nn.CrossEntropyLoss()
epochs = 5

# Robust CORNet-S
robust_model = CORNet_S_Robust(num_classes=10).to(device)
robust_optimizer = optim.Adam(robust_model.parameters(), lr=0.001)

best_acc = 0.0  # Track best test accuracy

for epoch in range(1, epochs + 1):
    print(f"\n--- Epoch {epoch} ---")
    print("\nTraining CORNet-S-Robust")
    train(robust_model, device, train_loader, robust_optimizer, epoch, criterion)
    
    print("Testing CORNet-S-Robust")
    test_loss, test_acc = test(robust_model, device, test_loader, criterion)  # <- updated unpacking

    print(f"Test Accuracy: {test_acc * 100:.2f}%")

    # Save best model
    if test_acc > best_acc:
        best_acc = test_acc
        torch.save(robust_model.state_dict(), "best_robust_model.pth")
        print(f"✅ New best model saved with accuracy: {best_acc * 100:.2f}%")
    else:
        print(f"No improvement. Best remains: {best_acc * 100:.2f}%")


--- Epoch 1 ---

Training CORNet-S-Robust


Train Epoch 1:  69%|██████▉   | 1291/1875 [19:44<08:54,  1.09it/s, acc=0.876, loss=0.405]

In [10]:
torch.save(robust_model.state_dict(), "/kaggle/working/cornet_s_robust_mnist.pth")

In [11]:
!pip install -q torchattacks

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.1/50.1 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m142.0/142.0 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.2/61.2 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m28.7 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m127.9/127.9 MB[0m [3

In [15]:
model.load_state_dict(torch.load("/kaggle/input/cornet_s__robust_mnist/pytorch/default/1/cornet_s_robust_mnist.pth", map_location = device))

<All keys matched successfully>

In [12]:
criterion = nn.CrossEntropyLoss()
test(robust_model, device, test_loader, criterion)

Testing: 100%|██████████| 10/10 [00:08<00:00,  1.20it/s, acc=0.647, loss=0.894]


(0.893600982427597, 0.6469)

In [14]:
from torchattacks import PGD, CW

def evaluate_attack(model, attack, test_loader, name="Attack"):
    model.eval()
    correct, total = 0, 0
    pbar = tqdm(test_loader, desc=name)

    for data, target in pbar:
        data, target = data.to(device), target.to(device)
        adv_data = attack(data, target)

        output = model(adv_data)
        pred = output.argmax(dim=1)
        correct += pred.eq(target).sum().item()
        total += target.size(0)

        pbar.set_postfix(acc=correct/total)

    acc = correct / total
    print(f"{name} Accuracy: {acc*100:.2f}%")
    return acc

Adversarial Test (PGD): 100%|██████████| 10/10 [01:17<00:00,  7.71s/it]

📊 PGD Accuracy (ε=0.1): 0.0534





In [17]:
# Re-load if needed:
# baseline_model.load_state_dict(torch.load("/kaggle/working/cornet_s_mnist.pth"))
# robust_model.load_state_dict(torch.load("/kaggle/working/cornet_s_robust_mnist.pth"))
# baseline_model.to(device).eval()
# robust_model.to(device).eval()

pgd_baseline = PGD(baseline_model, eps=0.3, alpha=2/255, steps=40)
pgd_robust = PGD(robust_model, eps=0.3, alpha=2/255, steps=40)

cw_baseline = CW(baseline_model, c=1e-4, kappa=0, steps=100, lr=0.01)
cw_robust = CW(robust_model, c=1e-4, kappa=0, steps=100, lr=0.01)

print("\n=== PGD Attack ===")
evaluate_attack(baseline_model, pgd_baseline, test_loader, name="PGD Baseline")
evaluate_attack(robust_model, pgd_robust, test_loader, name="PGD Robust")

print("\n=== CW Attack ===")
evaluate_attack(baseline_model, cw_baseline, test_loader, name="CW Baseline")
evaluate_attack(robust_model, cw_robust, test_loader, name="CW Robust")

Adversarial Test (CW): 100%|██████████| 10/10 [04:25<00:00, 26.59s/it]

📊 CW Accuracy (c=0.1, kappa=0): 0.9458





# Another Model

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

In [2]:
class GatedRecurrentBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, timesteps=3):
        super().__init__()
        self.timesteps = timesteps
        self.conv_input = nn.Conv2d(in_channels, out_channels, kernel_size, padding=1)
        self.conv_gate = nn.Conv2d(out_channels, out_channels, kernel_size, padding=1)
        self.norm = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        h = self.norm(self.conv_input(x))
        h_list = []
        for _ in range(self.timesteps):
            gate = torch.sigmoid(self.conv_gate(h))
            h = gate * self.norm(self.conv_input(x)) + (1 - gate) * h
            h_list.append(h)
        return torch.mean(torch.stack(h_list), dim=0) 

In [3]:
class ResidualDenoiseBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.scale = nn.Parameter(torch.tensor(0.5))  # learnable weight

    def forward(self, x):
        denoised = self.conv2(self.relu(self.conv1(x)))
        return x + self.scale * denoised 

In [4]:
class CORNet_S_RD_V2(nn.Module):
    def __init__(self, num_classes=10, timesteps=3):
        super().__init__()
        self.input_filter = nn.Conv2d(1, 1, kernel_size=1)  # learnable input prefilter

        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool2d(2, 2)

        self.block1 = GatedRecurrentBlock(32, 64, timesteps=timesteps)
        self.block2 = GatedRecurrentBlock(64, 128, timesteps=timesteps)
        self.block3 = GatedRecurrentBlock(128, 128, timesteps=timesteps)

        self.denoise = ResidualDenoiseBlock(128)
        self.gap = nn.AdaptiveAvgPool2d(1)
        self.dropout = nn.Dropout(p=0.2)
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.input_filter(x)
        x = self.pool(self.relu(self.conv1(x)))
        x = self.block1(x)
        x = self.pool(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.denoise(x)
        x = self.gap(x).squeeze(-1).squeeze(-1)
        x = self.dropout(x)
        x = self.fc(x)
        return x

In [5]:
transform = transforms.ToTensor()

train_set = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_set = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_set, batch_size=64, shuffle=True)
test_loader = DataLoader(test_set, batch_size=1000, shuffle=False)


100%|██████████| 9.91M/9.91M [00:00<00:00, 37.8MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 1.00MB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 9.64MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 6.88MB/s]


In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CORNet_S_RD_V2().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

for epoch in range(5):
    model.train()
    total_loss = 0
    for x, y in train_loader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        output = model(x)
        loss = criterion(output, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}: Loss = {total_loss / len(train_loader):.4f}")


Epoch 1: Loss = 0.2463
Epoch 2: Loss = 0.0846
Epoch 3: Loss = 0.0625
Epoch 4: Loss = 0.0522
Epoch 5: Loss = 0.0431


In [7]:
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x, y = x.to(device), y.to(device)
        output = model(x)
        pred = output.argmax(dim=1)
        correct += (pred == y).sum().item()
        total += y.size(0)
print(f"Clean Accuracy: {correct / total * 100:.2f}%")


Clean Accuracy: 98.61%


In [10]:
torch.save(model.state_dict(), "cornet_s_rd_mnist_new.pth")

In [13]:
import torchattacks
from tqdm import tqdm

# ---------------- PGD Attack Setup ----------------
pgd_eps = 0.001        # Default MNIST perturbation
pgd_alpha = 2/255
pgd_steps = 40

pgd_attack = torchattacks.PGD(model, eps=pgd_eps, alpha=pgd_alpha, steps=pgd_steps)

# ---------------- PGD Adversarial Accuracy Function ----------------
def adversarial_test_pgd(attack, loader):
    model.eval()
    correct = 0
    total = 0
    
    for inputs, labels in tqdm(loader, desc=f"Adversarial Test (PGD)"):
        inputs, labels = inputs.to(device), labels.to(device)
        adv_inputs = attack(inputs, labels)
        outputs = model(adv_inputs)
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)
    
    acc = correct / total
    print(f"📊 PGD Accuracy (ε={pgd_eps}): {acc:.4f}")
    return acc

# ---------------- Run PGD Evaluation ----------------
pgd_acc = adversarial_test_pgd(pgd_attack, test_loader)


Adversarial Test (PGD): 100%|██████████| 10/10 [01:25<00:00,  8.55s/it]

📊 PGD Accuracy (ε=0.001): 0.9858





In [16]:
# ---------------- CW Attack Setup ----------------
cw_c = 1e-3          # L2 perturbation penalty
cw_kappa = 0         # Confidence
cw_steps = 100
cw_lr = 0.01

cw_attack = torchattacks.CW(model, c=cw_c, kappa=cw_kappa, steps=cw_steps, lr=cw_lr)

# ---------------- CW Adversarial Accuracy Function ----------------
def adversarial_test_cw(attack, loader):
    model.eval()
    correct = 0
    total = 0

    for inputs, labels in tqdm(loader, desc=f"Adversarial Test (CW)"):
        inputs, labels = inputs.to(device), labels.to(device)
        adv_inputs = attack(inputs, labels)
        outputs = model(adv_inputs)
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)

    acc = correct / total
    print(f"📊 CW Accuracy (c={cw_c}, kappa={cw_kappa}): {acc:.4f}")
    return acc

# ---------------- Run CW Evaluation ----------------
cw_acc = adversarial_test_cw(cw_attack, test_loader)


Adversarial Test (CW): 100%|██████████| 10/10 [00:35<00:00,  3.51s/it]

📊 CW Accuracy (c=0.001, kappa=0): 0.9857



