In [2]:
#기본 IMPORT
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms, datasets
from sklearn.metrics import f1_score, confusion_matrix

In [4]:
#Device 및 기본 설정 + 데이터셋 받아오기(MNIST)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BATCH_SIZE=32
EPOCHS=10

train_dataset = datasets.MNIST(root='data/MNIST',train=True, download=True, transform=transforms.ToTensor())
test_dataset = datasets.MNIST(root='data/MNIST',train=False, transform=transforms.ToTensor())
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,batch_size=BATCH_SIZE, shuffle=False)

for (X_train, y_train) in train_loader:
    print('X_train:', X_train.size(),'type:',X_train.type())
    print('y_train:',y_train.size(), 'type:',y_train.type())

pltsize = 1
plt.figure(figsize=(10 * pltsize, pltsize)) #10개 plot하기 위한 figure 크기 설정

for i in range(10):
    plt.subplot(1, 10, i + 1) # plot.subplot(rows, columns, index)
    plt.axis('off')
    plt.imshow(X_train[i, :, :, :].numpy().reshape(28, 28), cmap = "gray_r")
    plt.title('Class: ' + str(y_train[i].item()))

In [7]:
#모델을 4층구조 - Conv - Relu - Pooling 4층
class Net(nn.Module): 
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        
        self.fc1 = nn.Linear(64 * 7 * 7, 512)
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 10)
        
        self.dropout = nn.Dropout(0.5)
        
        self.bn1 = nn.BatchNorm2d(32)
        self.bn2 = nn.BatchNorm2d(64)
        self.bn_fc1 = nn.BatchNorm1d(512)
        self.bn_fc2 = nn.BatchNorm1d(256)
        
    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        
        x = x.view(-1, 64 * 7 * 7)
        
        x = F.relu(self.bn_fc1(self.fc1(x)))
        x = self.dropout(x)
        x = F.relu(self.bn_fc2(self.fc2(x)))
        x = self.dropout(x)
        
        x = F.log_softmax(self.fc3(x), dim=1)
        
        return x


In [None]:
#model,optimizer,criterion 설정
model = Net().to(DEVICE)
optimizer = torch.optim.SGD(model.parameters(),lr=0.01,momentum=0.5)
criterion=nn.CrossEntropyLoss()

print(model)

In [9]:
# 학습 함수
def train(model, device, train_loader, optimizer, criterion, epochs):
    model.train()
    for epoch in range(epochs):
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
        print(f"Epoch {epoch + 1} completed.")

In [16]:
# FGSM Attack Code
def fgsm_attack(image, epsilon, data_grad):
    # Sign of the gradient
    sign_data_grad = data_grad.sign()
    # Perturbation with epsilon * sign of gradient
    perturbed_image = image + epsilon * sign_data_grad
    # Return the perturbed image
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    return perturbed_image

def train_with_fgsm(model, device, train_loader, optimizer, criterion, epochs, epsilon):
    model.train()
    for epoch in range(epochs):
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            
            # Ensure data requires gradients for FGSM
            data.requires_grad_()

            optimizer.zero_grad()
            
            # Forward pass
            output = model(data)
            loss = criterion(output, target)
            
            # Backward pass
            loss.backward()
            
            # Collect the gradient of the input data
            data_grad = data.grad.data
            
            # Create FGSM perturbed data
            perturbed_data = fgsm_attack(data, epsilon, data_grad)
            
            # Re-classify the perturbed image
            output = model(perturbed_data)
            
            # Re-calculate the loss using the perturbed image
            loss = criterion(output, target)
            loss.backward()
            
            optimizer.step()
        print(f"Epoch {epoch + 1} completed with FGSM attack.")




In [26]:
# PGD Attack Code
def pgd_attack(model, image, label, epsilon, alpha, attack_iters):
    # Ensure the image requires gradients for PGD
    perturbed_image = image.clone().detach().requires_grad_(True).to(DEVICE)
    original_image = image.clone().detach()

    for _ in range(attack_iters):
        # Forward pass to get the loss
        output = model(perturbed_image)
        loss = criterion(output, label)
        
        # Zero the gradients for each iteration
        model.zero_grad()
        
        # Backward pass to get gradients of perturbed image
        loss.backward()

        # Update the perturbed image using gradient ascent
        with torch.no_grad():
            perturbed_image = perturbed_image + alpha * perturbed_image.grad.sign()
            perturbation = torch.clamp(perturbed_image - original_image, min=-epsilon, max=epsilon)
            perturbed_image = torch.clamp(original_image + perturbation, 0, 1)
        
        # Re-enable gradient tracking for the next iteration
        perturbed_image.requires_grad_()

    return perturbed_image


def train_with_pgd(model, device, train_loader, optimizer, criterion, epochs, epsilon, alpha, attack_iters):
    model.train()
    for epoch in range(epochs):
        for data, target in train_loader:
            data, target = data.to(device), target.to(device)
            
            # Ensure data requires gradients for PGD
            data.requires_grad_()
            
            optimizer.zero_grad()
            
            # Generate PGD perturbed data
            perturbed_data = pgd_attack(model, data, target, epsilon, alpha, attack_iters)
            
            # Forward pass with perturbed data
            output = model(perturbed_data)
            loss = criterion(output, target)
            
            # Backward pass and optimization step
            loss.backward()
            optimizer.step()
        
        print(f"Epoch {epoch + 1} completed with PGD attack.")

In [40]:
#Original Image, Perturbated Image 비교코드
def visualize_comparison(original_images, perturbed_images, original_labels, perturbed_labels, num_images=5):
    fig, axes = plt.subplots(num_images, 2, figsize=(10, num_images * 3))
    
    for i in range(num_images):
        # Original image
        axes[i, 0].imshow(original_images[i].cpu().squeeze(), cmap='gray')
        axes[i, 0].set_title(f"Original Image - Label: {original_labels[i]}")
        axes[i, 0].axis('off')

        # Adversarial (perturbed) image
        axes[i, 1].imshow(perturbed_images[i].cpu().squeeze(), cmap='gray')
        axes[i, 1].set_title(f"Perturbed Image - Predicted: {perturbed_labels[i]}")
        axes[i, 1].axis('off')

    plt.tight_layout()
    plt.show()

In [12]:
#그냥 Evaluation
def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    all_labels = []
    all_predictions = []
    
    with torch.no_grad():
        for image, label in test_loader:
            image = image.to(DEVICE)
            label = label.to(DEVICE)
            output = model(image)
            test_loss += criterion(output, label).item()
            prediction = output.max(1, keepdim=True)[1]
            correct += prediction.eq(label.view_as(prediction)).sum().item()
            
            # Store the labels and predictions for F1 score and confusion matrix calculation
            all_labels.extend(label.cpu().numpy())
            all_predictions.extend(prediction.cpu().numpy())

    test_loss /= (len(test_loader.dataset) / BATCH_SIZE)
    test_accuracy = 100. * correct / len(test_loader.dataset)

    # Calculate F1 Score (macro-averaged)
    f1 = f1_score(all_labels, all_predictions, average='macro')

    # Calculate Confusion Matrix
    cm = confusion_matrix(all_labels, all_predictions)

    # Print the results
    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.2f}%")
    print(f"F1 Score (Macro): {f1:.4f}")
    print("Confusion Matrix:")
    print(cm)

In [43]:
#FGSM, PGD 공격받은 모델에 대한 Evaluation
# Evaluation function with adversarial attack (FGSM) and visualization
def evaluate_with_fgsm_attack(model, test_loader, criterion, epsilon):
    model.eval()
    clean_loss, adv_loss = 0, 0
    clean_correct, adv_correct = 0, 0
    clean_labels, clean_preds = [], []
    adv_labels, adv_preds = [], []
    
    # Store images for visualization
    original_images_list = []
    perturbed_images_list = []
    original_labels_list = []
    perturbed_preds_list = []

    for batch_idx, (data, target) in enumerate(test_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)

        # Clean test data evaluation
        output = model(data)
        clean_loss += criterion(output, target).item()
        clean_pred = output.max(1, keepdim=True)[1]
        clean_correct += clean_pred.eq(target.view_as(clean_pred)).sum().item()

        # Store for F1 score and confusion matrix
        clean_labels.extend(target.cpu().numpy())
        clean_preds.extend(clean_pred.cpu().numpy())

        # Generate adversarial examples using FGSM
        data.requires_grad_()
        output = model(data)
        loss = criterion(output, target)
        model.zero_grad()
        loss.backward()
        data_grad = data.grad.data
        perturbed_data = fgsm_attack(data, epsilon, data_grad)

        # Adversarial test data evaluation
        output = model(perturbed_data)
        adv_loss += criterion(output, target).item()
        adv_pred = output.max(1, keepdim=True)[1]
        adv_correct += adv_pred.eq(target.view_as(adv_pred)).sum().item()

        # Store for F1 score and confusion matrix
        adv_labels.extend(target.cpu().numpy())
        adv_preds.extend(adv_pred.cpu().numpy())

        # Save images and predictions for visualization (only save first 10 examples)
        if batch_idx < 10:
            original_images_list.append(data.cpu().clone())
            perturbed_images_list.append(perturbed_data.cpu().clone())
            original_labels_list.append(target.cpu().numpy())
            perturbed_preds_list.append(adv_pred.cpu().numpy())

    # Calculate average loss
    clean_loss /= len(test_loader.dataset) / BATCH_SIZE
    adv_loss /= len(test_loader.dataset) / BATCH_SIZE

    # Calculate accuracy
    clean_accuracy = 100. * clean_correct / len(test_loader.dataset)
    adv_accuracy = 100. * adv_correct / len(test_loader.dataset)

    # Calculate F1 score
    clean_f1 = f1_score(clean_labels, clean_preds, average='macro')
    adv_f1 = f1_score(adv_labels, adv_preds, average='macro')

    # Confusion matrices
    clean_cm = confusion_matrix(clean_labels, clean_preds)
    adv_cm = confusion_matrix(adv_labels, adv_preds)

    # Print results
    print(f"Clean Test Loss: {clean_loss:.4f}, Clean Accuracy: {clean_accuracy:.2f}%")
    print(f"Clean F1 Score (Macro): {clean_f1:.4f}")
    print("Clean Confusion Matrix:")
    print(clean_cm)

    print(f"Adversarial Test Loss: {adv_loss:.4f}, Adversarial Accuracy: {adv_accuracy:.2f}%")
    print(f"Adversarial F1 Score (Macro): {adv_f1:.4f}")
    print("Adversarial Confusion Matrix:")
    print(adv_cm)

    # Visualize the comparison of original and perturbed images (5 pairs for simplicity)
    visualize_comparison(original_images_list, perturbed_images_list, original_labels_list, perturbed_preds_list, num_images=10)
    
# Evaluation function with PGD attack and image comparison
def evaluate_with_pgd_attack(model, test_loader, criterion, epsilon, alpha, attack_iters):
    model.eval()
    clean_loss, adv_loss = 0, 0
    clean_correct, adv_correct = 0, 0
    clean_labels, clean_preds = [], []
    adv_labels, adv_preds = [], []
    
    # Store images for visualization
    original_images_list = []
    perturbed_images_list = []
    original_labels_list = []
    perturbed_preds_list = []

    for batch_idx, (data, target) in enumerate(test_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)

        # Clean test data evaluation
        output = model(data)
        clean_loss += criterion(output, target).item()
        clean_pred = output.max(1, keepdim=True)[1]
        clean_correct += clean_pred.eq(target.view_as(clean_pred)).sum().item()

        # Store for F1 score and confusion matrix
        clean_labels.extend(target.cpu().numpy())
        clean_preds.extend(clean_pred.cpu().numpy())

        # Generate adversarial examples using PGD
        perturbed_data = pgd_attack(model, data, target, epsilon, alpha, attack_iters)

        # Adversarial test data evaluation
        output = model(perturbed_data)
        adv_loss += criterion(output, target).item()
        adv_pred = output.max(1, keepdim=True)[1]
        adv_correct += adv_pred.eq(target.view_as(adv_pred)).sum().item()

        # Store for F1 score and confusion matrix
        adv_labels.extend(target.cpu().numpy())
        adv_preds.extend(adv_pred.cpu().numpy())

        # Save images and predictions for visualization
        if batch_idx < 5:  # Save only the first 5 batches for visualization
            original_images_list.append(data.cpu().clone())
            perturbed_images_list.append(perturbed_data.cpu().clone())
            original_labels_list.append(target.cpu().numpy())
            perturbed_preds_list.append(adv_pred.cpu().numpy())

    # Calculate average loss
    clean_loss /= len(test_loader.dataset) / BATCH_SIZE
    adv_loss /= len(test_loader.dataset) / BATCH_SIZE

    # Calculate accuracy
    clean_accuracy = 100. * clean_correct / len(test_loader.dataset)
    adv_accuracy = 100. * adv_correct / len(test_loader.dataset)

    # Calculate F1 score
    clean_f1 = f1_score(clean_labels, clean_preds, average='macro')
    adv_f1 = f1_score(adv_labels, adv_preds, average='macro')

    # Confusion matrices
    clean_cm = confusion_matrix(clean_labels, clean_preds)
    adv_cm = confusion_matrix(adv_labels, adv_preds)

    # Print results
    print(f"Clean Test Loss: {clean_loss:.4f}, Clean Accuracy: {clean_accuracy:.2f}%")
    print(f"Clean F1 Score (Macro): {clean_f1:.4f}")
    print("Clean Confusion Matrix:")
    print(clean_cm)

    print(f"Adversarial Test Loss (PGD): {adv_loss:.4f}, Adversarial Accuracy (PGD): {adv_accuracy:.2f}%")
    print(f"Adversarial F1 Score (Macro, PGD): {adv_f1:.4f}")
    print("Adversarial Confusion Matrix (PGD):")
    print(adv_cm)

    # Visualize the comparison of original and perturbed images
    visualize_comparison(original_images_list, perturbed_images_list, original_labels_list, perturbed_preds_list, num_images=5)


In [13]:
import random

def fgsm_attack(image, epsilon, data_grad):
    # Collect the element-wise sign of the data gradient
    sign_data_grad = data_grad.sign()
    # Create the perturbed image by adjusting each pixel of the input image
    perturbed_image = image + epsilon * sign_data_grad
    # Adding clipping to maintain [0,1] range
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    # Return the perturbed image
    return perturbed_image

# 후처리 및 시각화 함수
def post_training_visualization(model, data_loader, epsilon=0.3, device='cuda'):
    model.eval()  # 모델을 평가 모드로 설정
    images, adv_images, labels, adv_labels = [], [], [], []

    # 데이터 로더를 통해 이미지를 가져옴
    for data, target in data_loader:
        data, target = data.to(device), target.to(device)
        data.requires_grad = True

        # 원본 이미지의 예측
        output = model(data)
        init_pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability

        # Gradient 계산
        loss = criterion(output, target)
        model.zero_grad()
        loss.backward()
        data_grad = data.grad.data

        # Adversarial example 생성
        perturbed_data = fgsm_attack(data, epsilon, data_grad)
        output = model(perturbed_data)
        final_pred = output.max(1, keepdim=True)[1]  # 최종 예측

        # 클래스 변경이 있는 경우에만 저장
        for i in range(data.size(0)):
            if init_pred[i] != final_pred[i]:
                images.append(data[i].detach())
                adv_images.append(perturbed_data[i].detach())
                labels.append(init_pred[i].item())
                adv_labels.append(final_pred[i].item())

        # 이미지가 5개 이상 모이면 중단
        if len(images) >= 5:
            break

def visualize_adversarial_examples(model, original_images, adversarial_images, original_labels, adversarial_labels, title):
    plt.figure(figsize=(10, 5))
    for i in range(len(original_images)):
        # 원본 이미지
        plt.subplot(2, 5, i + 1)
        plt.title(f"Orig: {original_labels[i]}")
        plt.imshow(original_images[i].squeeze().cpu().numpy(), cmap='gray')
        plt.axis('off')

        # Adversarial 이미지
        plt.subplot(2, 5, i + 6)
        plt.title(f"Adv: {adversarial_labels[i]}")
        plt.imshow(adversarial_images[i].squeeze().cpu().numpy(), cmap='gray')
        plt.axis('off')

    plt.suptitle(title)
    plt.show()


In [None]:
# 공격 없는 학습 실행
train(model, DEVICE, train_loader, optimizer, criterion, EPOCHS)





#------------------------------------
#저 아래 안해도 됨 PGD나 FGSM은 train 말고 일단 test에서 진행해볼 예정



# FGSM 학습 실행
# epsilon = 0.3  # Perturbation parameter
# train_with_fgsm(model, DEVICE, train_loader, optimizer, criterion, EPOCHS, epsilon)

# PGD 학습 실행
# epsilon = 0.3  # Perturbation limit
# alpha = 0.01   # Step size
# attack_iters = 40  # Number of iterations for PGD
# train_with_pgd(model, DEVICE, train_loader, optimizer, criterion, EPOCHS, epsilon, alpha, attack_iters)

In [None]:
#evaluate(model,test_loader)

# Example usage after training the model
#evaluate_with_fgsm_attack(model, test_loader, criterion, epsilon=0.3)

#pgd attack 먹여보기
evaluate_with_pgd_attack(model,test_loader,criterion,epsilon=0.3,alpha=0.1,attack_iters=40)

In [19]:
# 시각화 실행
#post_training_visualization(model, test_loader, 3, DEVICE)