In [1]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
from tqdm import tqdm
from scipy.optimize import differential_evolution
from scipy import stats

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
])

dataset_path = 'dog-breeds'
dataset = ImageFolder(root=dataset_path, transform=transform)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True, num_workers=0, pin_memory=True)

In [2]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(128 * 16 * 16, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

class CoordinateCNN(nn.Module):
    def __init__(self, input_channels, image_height, image_width):
        super(CoordinateCNN, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.pooled_height = image_height // 8
        self.pooled_width = image_width // 8
        self.fc1 = nn.Linear(128 * self.pooled_height * self.pooled_width, 256)
        self.fc2 = nn.Linear(256, 2)  # Output for pixel coordinates (x, y)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x) 
        return x

In [3]:
def train_and_validate(model, train_loader, num_epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Training]"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {epoch_loss:.4f}")
        scheduler.step(epoch_loss)


    
def test_model(model, dataset):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch in DataLoader(dataset, batch_size=64, shuffle=False, num_workers=0, pin_memory=True):
            if len(batch) == 3:  
                inputs, labels, _ = batch
            else:  
                inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = correct / total
    print(f'Test Accuracy: {accuracy:.4f}')

def train_coordinate_model(model, optimizer, attacked_loader, num_epochs=10):
    criterion = nn.MSELoss()
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, _, coords in tqdm(attacked_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Training]"):
            images, coords = images.to(device), coords.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, coords)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * images.size(0)

        epoch_loss = running_loss / len(attacked_loader.dataset)
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}")
        scheduler.step(epoch_loss)

def train_with_amp(model, train_loader, num_epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)
    scaler = torch.cuda.amp.GradScaler()

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} [Training]"):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            with torch.cuda.amp.autocast():
                outputs = model(inputs)
                loss = criterion(outputs, labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {epoch_loss:.4f}")
        scheduler.step(epoch_loss)

In [4]:
# One-pixel attack dataset
class OnePixelAttackDataset(Dataset):
    def __init__(self, dataset, model):
        self.dataset = dataset
        self.model = model

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        img, label = self.dataset[idx]
        img = img.cuda()
        img_np = np.array(img.permute(1, 2, 0).cpu()).copy()  

        def attack_one_pixel(p):
            x, y, r, g, b = int(p[0]), int(p[1]), int(p[2]), int(p[3]), int(p[4])
            attacked_img = img_np.copy()
            attacked_img[x, y] = [r, g, b]
            attacked_img = torch.from_numpy(attacked_img.transpose(2, 0, 1)).float().cuda()
            output = self.model(attacked_img.unsqueeze(0))
            return -F.softmax(output, dim=1)[0][label].item()

        bounds = [(0, img_np.shape[0] - 1), (0, img_np.shape[1] - 1), (0, 255), (0, 255), (0, 255)]
        result = differential_evolution(attack_one_pixel, bounds)
        x, y, r, g, b = map(int, result.x)
        img_np[x, y] = [r, g, b]

        img_np = img_np.transpose(2, 0, 1)  # Convert back to CHW format
        attacked_img = torch.from_numpy(img_np).float() / 255

        return attacked_img, label, torch.tensor([x, y], dtype=torch.float32)

In [5]:
class RepairedDataset(Dataset):
    def __init__(self, attacked_dataset, coordinate_model, patch_size=5):
        self.attacked_dataset = attacked_dataset
        self.coordinate_model = coordinate_model
        self.patch_size = patch_size

    def __len__(self):
        return len(self.attacked_dataset)

    def __getitem__(self, idx):
        img, label, _ = self.attacked_dataset[idx]
        img_tensor = img.unsqueeze(0).to(device)

        with torch.no_grad():
            predicted_coords = self.coordinate_model(img_tensor).squeeze()

        x, y = predicted_coords.int().tolist()
        img_np = img.permute(1, 2, 0).cpu().numpy()  

        x_start = max(0, x - self.patch_size // 2)
        x_end = min(img_np.shape[0], x + self.patch_size // 2 + 1)
        y_start = max(0, y - self.patch_size // 2)
        y_end = min(img_np.shape[1], y + self.patch_size // 2 + 1)

        patch = img_np[x_start:x_end, y_start:y_end]

        mode_values = stats.mode(patch.reshape(-1, 3), axis=0).mode[0]

        img_np[x_start:x_end, y_start:y_end] = mode_values

        repaired_img = torch.from_numpy(img_np.transpose(2, 0, 1)).float()
        return repaired_img, label

In [6]:
num_classes = len(dataset.classes)
cnn_model = SimpleCNN(num_classes).to(device)

if torch.cuda.is_available():
    train_with_amp(cnn_model, train_loader)
else:
    train_and_validate(cnn_model, train_loader)

# Test the model
print("Testing original model:")
test_model(cnn_model, dataset)

#

Epoch 1/10 [Training]: 100%|██████████| 31/31 [00:09<00:00,  3.43it/s]


Epoch 1/10, Training Loss: 7.3359


Epoch 2/10 [Training]: 100%|██████████| 31/31 [00:01<00:00, 15.98it/s]


Epoch 2/10, Training Loss: 0.9255


Epoch 3/10 [Training]: 100%|██████████| 31/31 [00:02<00:00, 15.00it/s]


Epoch 3/10, Training Loss: 0.2038


Epoch 4/10 [Training]: 100%|██████████| 31/31 [00:02<00:00, 14.75it/s]


Epoch 4/10, Training Loss: 0.0612


Epoch 5/10 [Training]: 100%|██████████| 31/31 [00:02<00:00, 14.87it/s]


Epoch 5/10, Training Loss: 0.0284


Epoch 6/10 [Training]: 100%|██████████| 31/31 [00:02<00:00, 14.68it/s]


Epoch 6/10, Training Loss: 0.0168


Epoch 7/10 [Training]: 100%|██████████| 31/31 [00:02<00:00, 14.82it/s]


Epoch 7/10, Training Loss: 0.0095


Epoch 8/10 [Training]: 100%|██████████| 31/31 [00:02<00:00, 14.78it/s]


Epoch 8/10, Training Loss: 0.0056


Epoch 9/10 [Training]: 100%|██████████| 31/31 [00:01<00:00, 16.71it/s]


Epoch 9/10, Training Loss: 0.0042


Epoch 10/10 [Training]: 100%|██████████| 31/31 [00:01<00:00, 16.85it/s]


Epoch 10/10, Training Loss: 0.0030
Testing original model:
Test Accuracy: 1.0000


In [7]:
cnn_model.eval()
attacked_dataset = OnePixelAttackDataset(dataset, cnn_model)
attacked_loader = DataLoader(attacked_dataset, batch_size=32, shuffle=False, num_workers=0, pin_memory=True)



In [8]:
print("Testing attacked dataset:")
test_model(cnn_model, attacked_dataset)

#

Testing attacked dataset:


In [None]:
coordinate_model = CoordinateCNN(input_channels=3, image_height=128, image_width=128).to(device)
optimizer = torch.optim.Adam(coordinate_model.parameters(), lr=0.001)
train_coordinate_model(coordinate_model, optimizer, attacked_loader)



In [None]:
repaired_dataset = RepairedDataset(attacked_dataset, coordinate_model, patch_size=5)
repaired_loader = DataLoader(repaired_dataset, batch_size=32, shuffle=False, num_workers=0, pin_memory=True)


In [None]:

print("Testing repaired dataset:")
test_model(cnn_model, repaired_dataset)