Request ID: 2300505793480487

In [17]:
# Path: pgd_jailbreak.py

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import random
import time
import os

# Define the model
class LSTM_PGD_Beam_Search(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, n_layers=1):
        super(LSTM_PGD_Beam_Search, self).__init__()
        self.hidden_size = hidden_size
        self.n_layers = n_layers
        self.lstm = nn.LSTM(input_size, hidden_size, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)
        self.beam_width = 5
        self.max_length = 20
        self.eps = 0.1
        self.alpha = 0.1
        self.targeted = False
        self.target = None
        self.target_label = None
        self.target_confidence = None
        self.targeted_attack = False
        self.targeted_attack_label = None
        self.targeted_attack_confidence = None
        self.targeted_attack_perturbation = None
        self.targeted_attack_perturbation_norm = None
        self.targeted_attack_perturbation_norm_list = []
        self.targeted_attack_perturbation_norm_list.append(0)

    def forward(self, x, hidden):
        out, hidden = self.lstm(x, hidden)
        out = self.fc(out)
        out = self.softmax(out)
        return out, hidden
    
    def init_hidden(self, batch_size):
        return (torch.zeros(self.n_layers, batch_size, self.hidden_size), torch.zeros(self.n_layers, batch_size, self.hidden_size))
    
    def beam_search(self, x, hidden, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence):
        batch_size = x.shape[0]
        # Initialize the beam search
        beam = [[[], 0, hidden]]
        # Loop through the max length
        for i in range(max_length):
            # Create a new beam
            new_beam = []
            # Loop through the beam
            for b in beam:
                # If the sequence is not empty
                if len(b[0]) > 0:
                    # If the sequence is finished
                    if b[0][-1] == 1:
                        new_beam.append(b)
                    # If the sequence is not finished
                    else:
                        # Forward pass
                        out, hidden = self.forward(x, b[2])
                        # Get the top k predictions
                        topk = torch.topk(out[0, -1], beam_width)
                        # Loop through the top k predictions
                        for j in range(beam_width):
                            # Get the new sequence
                            new_seq = b[0].copy()
                            new_seq.append(topk.indices[j].item())
                            # Get the new probability
                            new_prob = b[1] + topk.values[j].item()
                            # Get the new hidden state
                            new_hidden = (hidden[0][:, 0, :].unsqueeze(0).clone(), hidden[1][:, 0, :].unsqueeze(0).clone())
                            # If the sequence is finished
                            if new_seq[-1] == 1:
                                # If the sequence is targeted
                                if targeted:
                                    # If the sequence is targeted
                                    if new_seq == target:
                                        new_beam.append([new_seq, new_prob, new_hidden])
                                # If the sequence is not targeted
                                else:
                                    new_beam.append([new_seq, new_prob, new_hidden])
                            # If the sequence is not finished
                            else:
                                new_beam.append([new_seq, new_prob, new_hidden])
            # Sort the new beam
            new_beam = sorted(new_beam, key=lambda x: x[1], reverse=True)
            # Prune the new beam
            beam = new_beam[:beam_width]
        return beam
    
    def pgd(self, x, y, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence):
        # Get the batch size
        batch_size = x.shape[0]
        # Create the perturbation
        perturbation = torch.zeros(x.shape)
        # Loop through the batch
        for i in range(batch_size):
            # Get the input and the target
            x_i = x[i].unsqueeze(0).clone()
            y_i = y[i].unsqueeze(0).clone()
            # Get the initial hidden state
            hidden = self.init_hidden(1)
            # Beam search
            beam = self.beam_search(x_i, hidden, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence)
            # Get the best sequence
            best_seq = beam[0][0]
            # Get the best sequence as the perturbation
            perturbation[i] = x_i - x_i
            for j in range(len(best_seq)):
                perturbation[i, j] = best_seq[j]
        return perturbation
    
    def attack(self, x, y, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence):
        # Get the perturbation
        perturbation = self.pgd(x, y, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence)
        # Get the adversarial example
        x_adv = x + perturbation
        return x_adv
    
    def train(self, x, y, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence):
        # Get the adversarial example
        x_adv = self.attack(x, y, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence)
        # Forward pass
        out, hidden = self.forward(x_adv, self.init_hidden(x.shape[0]))
        # Loss
        loss = F.nll_loss(out[:, -1], y)
        return loss, x_adv
    
    def test(self, x, y, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence):
        # Get the adversarial example
        x_adv = self.attack(x, y, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence)
        # Forward pass
        out, hidden = self.forward(x_adv, self.init_hidden(x.shape[0]))
        # Loss
        loss = F.nll_loss(out[:, -1], y)
        # Accuracy
        pred = out[:, -1].argmax(dim=1, keepdim=True)
        correct = pred.eq(y.view_as(pred)).sum().item()
        return loss, correct, x_adv
    
    def attack_all(self, x, y, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence):
        # Get the adversarial example
        x_adv = self.attack(x, y, max_length, beam_width, eps, alpha, targeted, target, targeted_attack, targeted_attack_label, targeted_attack_confidence)
        # Forward pass
        out, hidden = self.forward(x_adv, self.init_hidden(x.shape[0]))
        # Loss
        loss = F.nll_loss(out[:, -1], y)
        # Accuracy
        pred = out[:, -1].argmax(dim=1, keepdim=True)
        correct = pred.eq(y.view_as(pred)).sum().item()
        return loss, correct, x_adv
    

config = {
    'x': None, # input tensor
    'y': None, # target tensor
    'beam_width': 5, # beam width
    'max_length': 20, # maximum length
    'eps': 0.1, # epsilon
    'alpha': 0.1, # alpha
    'targeted': False, # targeted attack
    'target': None, # target label
    'targeted_attack': False, # targeted attack
    'targeted_attack_label': None, # targeted attack label
    'targeted_attack_confidence': None # targeted attack confidence
}
    
# Train the model
def train(model, device, train_loader, optimizer, epoch, log_interval, model_path):
    # Set the model to train mode
    model.train( config['x'], config['y'], config['max_length'], config['beam_width'], config['eps'], config['alpha'], config['targeted'], config['target'], config['targeted_attack'], config['targeted_attack_label'], config['targeted_attack_confidence'])
    # Loop through the data
    for batch_idx, (data, target) in enumerate(train_loader):
        # Send the data and target to the device
        data, target = data.to(device), target.to(device)
        # Zero the gradients
        optimizer.zero_grad()
        # Forward pass
        loss, x_adv = model.train(data, target, model.max_length, model.beam_width, model.eps, model.alpha, model.targeted, model.target, model.targeted_attack, model.targeted_attack_label, model.targeted_attack_confidence)
        # Backward pass
        loss.backward()
        # Optimize
        optimizer.step()
        # Print the logs
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            # Save the model with the lowest loss
            if loss.item() < model.best_loss:
                model.best_loss = loss.item()
                torch.save(model.state_dict(), model.model_path)
                print('Model saved')
    return model

# Test the model
def test(model, device, test_loader):
    # Set the model to test mode
    model.eval()
    # Variables
    test_loss = 0
    correct = 0
    # Disable gradient calculation
    with torch.no_grad():
        # Loop through the data
        for data, target in test_loader:
            # Send the data and target to the device
            data, target = data.to(device), target.to(device)
            # Forward pass
            loss, corr, x_adv = model.test(data, target, model.max_length, model.beam_width, model.eps, model.alpha, model.targeted, model.target, model.targeted_attack, model.targeted_attack_label, model.targeted_attack_confidence)
            # Calculate the test loss
            test_loss += loss.item()
            # Calculate the test accuracy
            correct += corr
    # Calculate the average test loss
    test_loss /= len(test_loader.dataset)
    # Print the test results
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    return test_loss, correct, x_adv

# Test the model on the entire test set
def test_all(model, device, test_loader):
    # Set the model to test mode
    model.eval()
    # Variables
    test_loss = 0
    correct = 0
    # Disable gradient calculation
    with torch.no_grad():
        # Loop through the data
        for data, target in test_loader:
            # Send the data and target to the device
            data, target = data.to(device), target.to(device)
            # Forward pass
            loss, corr, x_adv = model.attack_all(data, target, model.max_length, model.beam_width, model.eps, model.alpha, model.targeted, model.target, model.targeted_attack, model.targeted_attack_label, model.targeted_attack_confidence)
            # Calculate the test loss
            test_loss += loss.item()
            # Calculate the test accuracy
            correct += corr
    # Calculate the average test loss
    test_loss /= len(test_loader.dataset)
    # Print the test results
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    return test_loss, correct, x_adv

# Plot the adversarial examples
def plot_adversarial_examples(model, device, test_loader, classes, dataset, adv_examples_path):
    # Set the model to test
    model.eval()
    # Initialize the lists
    data_list = []
    target_list = []
    x_adv_list = []
    pred_list = []
    # Loop through the data
    for data, target in test_loader:
        # Send the data to the device
        data, target = data.to(device), target.to(device)
        # Forward pass
        loss, corr, x_adv = model.attack_all(data, target, model.max_length, model.beam_width, model.eps, model.alpha, model.targeted, model.target, model.targeted_attack, model.targeted_attack_label, model.targeted_attack_confidence)
        # Append the data, target, adversarial example, and prediction
        data_list.append(data)
        target_list.append(target)
        x_adv_list.append(x_adv)
        pred_list.append(x_adv.argmax(dim=1))

    # Concatenate the lists
    data = torch.cat(data_list)
    target = torch.cat(target_list)
    x_adv = torch.cat(x_adv_list)
    pred = torch.cat(pred_list)
    # Plot the adversarial examples
    for i in range(len(data)):
        # Get the class name
        class_name = classes[target[i]]
        # Get the adversarial class name
        adv_class_name = classes[pred[i]]
        # Get the image
        img = data[i].cpu().numpy()
        # Get the adversarial image
        adv_img = x_adv[i].cpu().numpy()
        # Reshape the image
        img = np.transpose(img, (1, 2, 0))
        # Reshape the adversarial image
        adv_img = np.transpose(adv_img, (1, 2, 0))
        # Plot the image
        plt.figure()
        plt.subplot(1, 2, 1)
        plt.title(class_name)
        plt.imshow(img, cmap='gray')
        plt.axis('off')
        # Plot the adversarial image
        plt.subplot(1, 2, 2)
        plt.title(adv_class_name)
        plt.imshow(adv_img, cmap='gray')
        plt.axis('off')
        # Save the plot
        plt.savefig(adv_examples_path + '/adversarial_example_' + str(i) + '.png')
        plt.close()
    return

# Main function
def main():
    # Set the seed
    torch.manual_seed(0)
    np.random.seed(0)
    random.seed(0)
    # Set the device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    # Hyperparameters
    batch_size = 64
    max_epochs = 10
    learning_rate = 0.01
    momentum = 0.9
    log_interval = 10
    # Load the data
    from torchvision import datasets, transforms
    # Transform the data
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    # Download and load the data
    train_set = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_set = datasets.MNIST('data', train=False, download=True, transform=transform)
    # Create the data loaders
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)
    # Classes
    classes = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
    # Model path
    model_path = 'models/lstm_pgd_beam_search_mnist.pt'
    # Adversarial examples path
    adv_examples_path = 'results/adversarial_examples'
    # Create the model
    model = LSTM_PGD_Beam_Search(28, 128, 10, 2).to(device)
    # Optimizer
    optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
    # Model path
    model.model_path = model_path
    # Best loss
    model.best_loss = np.inf
    # Training loop
    for epoch in range(1, max_epochs + 1):
        model = train(model, device, train_loader, optimizer, epoch, log_interval, model_path)
        test_loss, correct, x_adv = test(model, device, test_loader)
    # Test the model on the entire test set
    test_loss, correct, x_adv = test_all(model, device, test_loader)
    # Plot the adversarial examples
    plot_adversarial_examples(model, device, test_loader , classes, test_set, adv_examples_path)
    return

# Execute the main function
if __name__ == '__main__':
    main()





    

   



AttributeError: 'NoneType' object has no attribute 'shape'