## Attacking Image Models

### FGSM Attack

#### Library Imports

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torchvision import datasets, transforms


#### FGSM Generation Function

In [None]:
def Generate_FGSM_Image(model,
                        x,
                        epsilon):
  
  # Check if epsilon is 0
  # If so, that means no perturbation is added
  # We can avoid gradient calculations
  if epsilon == 0:
    return x

  # Convert x to a float and having gradients enabled
  x = x.clone().detach()
  x = x.to(torch.float)
  x - x.requires_grad_(True)

  # Get original label as predicted by model 
  _, y = torch.max(model(x), 1)

  # Compute Loss 
  loss_function = nn.CrossEntropyLoss()
  loss = loss_function(model(x), y)

  # Backpropagate Loss
  loss.backward()

  # Calculate perturbation using the FGSM equation
  perturbation = epsilon * torch.sign(x.grad)

  # Calculate the adversarial image
  x_adversarial = x + perturbation 

  return x_adversarial


#### Basic CNN Classifier

In [None]:
class BasicImageNetCNN(nn.Module):

    def __init__(self, in_channels=1):
        super(BasicImageNetCNN, self).__init__()

        # Define the convolutional layers
        self.conv1 = nn.Conv2d(in_channels, 64, 8, 1)
        self.conv2 = nn.Conv2d(64, 128, 6, 2)
        self.conv3 = nn.Conv2d(128, 128, 5, 2)

        # Define the fully connected layer
        self.fc = nn.Linear(128 * 3 * 3, 10)

    def forward(self, x):

        # Pass the imahe through convolutional layers one by one
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))

        # Flatten the output of the convolutional layer and pass to fully connected layer
        x = x.view(-1, 128 * 3 * 3)
        x = self.fc(x)
        
        return x


#### Dataset Setup

In [None]:
def load_cifar10_datasets(datapath):

    # Load the transformations
    train_transforms = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])
    test_transforms = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])

    # Obtain the datasets 
    # Download them if they are not present
    train_dataset = torchvision.datasets.CIFAR10(root=datapath, train=True, 
                                                 transform=train_transforms, download=True)
    test_dataset = torchvision.datasets.CIFAR10(root=datapath, train=False, 
                                                transform=test_transforms, download=True)

    # Create Data Loaders
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, 
                                               shuffle=True, num_workers=2)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, 
                                              shuffle=False, num_workers=2)
    
    return train_loader, test_loader


#### Training Base Model

In [None]:
NUM_EPOCHS = 10

train_data, test_data = load_cifar10_datasets(datapath = "./data")
model = BasicImageNetCNN(in_channels = 3)
loss_function = torch.nn.CrossEntropyLoss(reduction="mean")
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

if torch.cuda.is_available():
  device = "cuda"
  model = model.cuda()
else:
  device = "cpu"

model.train()

for epoch in range(NUM_EPOCHS):
  train_loss = 0.0
  for x, y in train_data:

    # Move image and labels to device if applicable
    x = x.to(device)
    y = y.to(device)

    # Zero out the gradients from previous epoch if any
    optimizer.zero_grad()

    # Calculate predicted value and loss
    y_pred = model(x)
    loss = loss_function(y_pred, y)

    # Backpropagation
    loss.backward()
    optimizer.step()

    # Keep track of the loss
    train_loss = train_loss + loss.item()

    # Print some information for logging 
    print("EPOCH: {} ---------- Loss: {}".format(epoch, train_loss))


#### Evaluating FGSM Attack

In [None]:
model.eval()
clean_correct = 0
fgsm_correct = 0
total = 0
for x, y in test_data:

    # Move image and labels to device if applicable
    x = x.to(device)
    y = y.to(device)

    # Calculate the adversarial images
    x_fgsm = Generate_FGSM_Image(model, x, epsilon = 0.005)

    # Run inference for predicted values on clean and adversarial examples 
    _, y_pred_clean = torch.max(model(x), 1)
    _, y_pred_fgsm = torch.max(model(x_fgsm), 1)

    # Calculate accuracy of clean and adversarial predictions
    clean_correct = clean_correct + y_pred_clean.eq(y).sum().item()
    fgsm_correct = fgsm_correct + y_pred_fgsm.eq(y).sum().item()
    total = total + y.size(0)

clean_accuracy = clean_correct / total
fgsm_accuracy = fgsm_correct / total


### PGD Attack

#### Modified FGSM Attack Function

In [None]:
def Generate_FGSM_Image_V2(model,
                        x,
				  y, // New Parameter
                        epsilon):
  
  # Check if epsilon is 0
  # If so, that means no perturbation is added
  # We can avoid gradient calculations
  if epsilon == 0:
    return x

  # Convert x to a float and having gradients enabled
  x = x.clone().detach()
  x = x.to(torch.float)
  x - x.requires_grad_(True)

  # Compute Loss 
  loss_function = nn.CrossEntropyLoss()
  loss = loss_function(model(x), y)

  # Backpropagate Loss
  loss.backward()

  # Calculate perturbation using the FGSM equation
  perturbation = epsilon * torch.sign(x.grad)

  # Calculate the adversarial image
  x_adversarial = x + perturbation 

  return x_adversarial


#### PGDM Attack Function

In [None]:
def Generate_PGDM_Image(model,
                        x,
                        epsilon,
                        num_iterations):
  
  # Obtain actual clean predictions from model
  _, y = torch.max(model(x), 1)

  # Calclate the initial adversarial value
  eta = torch.zeros_like(x)
  eta = torch.clamp(eta, -1*eps, 1*eps)
  x_adv = x + eta

  # For every iteration, do FGSM and clipping 
  for _ in range(num_iterations):

    # Note that the FGSM function signature has changed 
    # We are passing it the predicted value y as a parameter
    # Thus this will not be recomputed
    x_adv = Generate_FGSM_Image_V2(model, 
                                x_adv, 
                                y,
                                epsilon = 0.01)
    
    eta = x_adv - x
    eta = torch.clamp(eta, -1*eps, 1*eps)
    x_adv= x + eta

  # Return the final image
  return x_adv


## Attacking Text Models

### Dataset Setup

In [None]:
import pandas as pd
import numpy as np

df = pd.read_csv("FinalBalancedDataset.csv", skiprows = 1, names= ["TweetId",
                                                                    "Toxicity",
                                                                    "Tweet"])
df.head()

In [None]:
df.groupby("Toxicity").count()["TweetId"]

### TF-IDF Feature Extraction

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer
def Extract_TF_IDF(train_data, test_data):

    tf_idf = TfidfVectorizer()
    X_train_TFIDF = tf_idf.fit_transform(train_data)
    X_test_TFIDF = tf_idf.transform(test_data)

    return X_train_TFIDF, X_test_TFIDF

### Attack Strategies

#### Double Last Letter

In [None]:
def double_last_letter(sentences, max_perturbations = 3):

    # Output array
    modified_sentences = []

    for sentence in sentences:

        # Split into words
        words = sentence.split(' ')

        # Randomly choose words to manipulate
        rand_indices = np.random.randint(0, len(words), max_perturbations)

        for idx in rand_indices:

            # Check if the word is blank, if yes, skip
            if len(words[idx]) == 0:
              continue

            # Double the last letter in the chosen word
            words[idx]+=words[idx][-1]

        # Join back to make sentence
        modified_sentences.append(' '.join(word for word in words))

    return modified_sentences



#### Double Vowel

In [None]:

def double_vowel(sentences, max_perturbations = 3):

    total_perturbations = 0
    # Output array
    modified_sentences = []

    for sentence in sentences:
         
        # Split into words
        words = sentence.split(' ')

        for i in range(len(words)):

            # Check if maximum perturbations done
            # If so, break the loop and don't do any more!
            if total_perturbations>max_perturbations:
                break

            for vowel in ['a','e','i','o','u']:
                if vowel in words[i]:
                    words[i] = words[i].replace(vowel,vowel+vowel,1)
                    total_perturbations+=1

                    # Here replace only for one vowel
                    # So once replacement is done, break out 
                    # This will break only this loop
                    break

        modified_sentences.append(' '.join(word for word in words))

    return modified_sentences

### Dataset Preparation

In [None]:
X = df["Tweet"].tolist()
y = df["Toxicity"].tolist()

from sklearn.model_selection import train_test_split 
X_train, X_test, Y_train, Y_test = train_test_split(X, y, test_size = 0.3, stratify = y)

X_train_features, X_test_features = Extract_TF_IDF(X_train, X_test)

### Evaluation Helper Function

In [None]:
from sklearn.metrics import confusion_matrix

def evaluate_model(actual, predicted):
  confusion = confusion_matrix(actual, predicted)
  tn, fp, fn, tp = confusion.ravel()

  total = tp + fp + tn + fn

  accuracy = 1.0 * (tp + tn) / total
  if tp + fp != 0:
    precision = tp / (tp + fp)
  else:
    precision = 0

  if tp + fn != 0:
    recall = tp / (tp + fn)
  else:
    recall = 0

  if precision == 0 or recall == 0:
    f1 = 0
  else:
    f1 = 2 * precision * recall / (precision + recall)

  evaluation = { 'accuracy': accuracy,
                 'precision': precision,
                 'recall': recall,
                 'f1': f1}

  return evaluation

### Base Model Performance

In [None]:
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(n_estimators = 100) 
model.fit(X_train_features, Y_train)
Y_predicted = model.predict(X_test_features)
evaluation = evaluate_model(Y_test, Y_predicted)

print("Accuracy: {}".format(str(evaluation['accuracy'])))
print("Precision: {}".format(str(evaluation['precision'])))
print("Recall: {}".format(str(evaluation['recall'])))
print("F-1: {}".format(str(evaluation['f1'])))

### Adversarial Attack Performance

In [None]:
# Obtain adversarial samples
X_test_adversarial = double_vowel(X_test, max_perturbations=5)

# Extract features
X_train_features, X_test_features = Extract_TF_IDF(X_train, X_test_adversarial)

# Train model
model = RandomForestClassifier(n_estimators = 100) 
model.fit(X_train_features, Y_train)

# Predict on adversarial samples
Y_predicted = model.predict(X_test_features)

# Evaluate
evaluation = evaluate_model(Y_test, Y_predicted)
print("Accuracy: {}".format(str(evaluation['accuracy'])))
print("Precision: {}".format(str(evaluation['precision'])))
print("Recall: {}".format(str(evaluation['recall'])))
print("F-1: {}".format(str(evaluation['f1'])))