### <font color='blue'>Part 1: Preprocessing and Exploration of the AG_News Dataset</font>

In [None]:
from torch import cuda

print("Cuda availablity is:", cuda.is_available())

#### Load the Dataset

In [None]:
import pandas as pd

# Define the URLs of the dataset files on GitHub
train_url = "https://raw.githubusercontent.com/mhjabreel/CharCnn_Keras/master/data/ag_news_csv/train.csv"
test_url = "https://raw.githubusercontent.com/mhjabreel/CharCnn_Keras/master/data/ag_news_csv/test.csv"

# Load the dataset using pandas
train_df = pd.read_csv(train_url, header=None, names=["label", "title", "description"])
test_df = pd.read_csv(test_url, header=None, names=["label", "title", "description"])

print("Train dataset shape:", train_df.shape)
print("Test dataset shape:", test_df.shape)

#### Combine the title and description columns in both the train and test dataframes

In [None]:
train_df['text'] = train_df['title'] + " " + train_df['description']
test_df['text'] = test_df['title'] + " " + test_df['description']

#### Tokenize the text using the Hugging Face Transformers library

In [None]:
from transformers import AutoTokenizer


# Choose a pre-trained model architecture (e.g., BERT)
model_name = "bert-base-uncased"

# Instantiate a tokenizer based on a pre-trained model (e.g., BERT)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Tokenize the text in the train and test dataframes
train_encodings = tokenizer(train_df['text'].tolist(), truncation=True, padding=True, max_length=256)
test_encodings = tokenizer(test_df['text'].tolist(), truncation=True, padding=True, max_length=256)

#### Convert the labels into numerical format

In [None]:
# Subtract 1 from the label values to make them zero-based (i.e., 0 to 3 instead of 1 to 4)
train_labels = train_df['label'].values - 1
test_labels = test_df['label'].values - 1

#### Create PyTorch DataLoader objects for training and testing

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class AGNewsDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

    def get_text_and_label(self, idx):
        text = self.encodings.tokenizer.decode(self.encodings['input_ids'][idx])
        label = self.labels[idx]
        return text, label
    
# Randomly sample a subset of the original dataset for training
train_df_sample = train_df.sample(frac=0.001, random_state=42)

# Tokenize the text in the sampled train dataframe and the test dataframe
train_encodings = tokenizer(train_df_sample['text'].tolist(), truncation=True, padding=True, max_length=256)
test_encodings = tokenizer(test_df['text'].tolist(), truncation=True, padding=True, max_length=256)

# Convert the labels into numerical format using the sampled train dataframe
train_labels = train_df_sample['label'].values - 1
test_labels = test_df['label'].values - 1

# Create dataset objects for the sampled train data and test data
train_dataset = AGNewsDataset(train_encodings, train_labels)
test_dataset = AGNewsDataset(test_encodings, test_labels)

# Create DataLoader objects for the sampled train data and test data
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

### <font color='blue'>Part 2: Train and validate the text classification model using pyTorch</font>

#### Define the model, loss function, and optimizer

In [None]:
import torch
from transformers import AutoModelForSequenceClassification
from torch.optim import AdamW

# from transformers import AdamW
# AdamW optimizer from the transformers library is deprecated

# Instantiate the model for sequence classification
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=4)

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Define the loss function and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = AdamW(model.parameters(), lr=5e-5)

torch.cuda.empty_cache()

print("\nWhen initializing the BertForSequenceClassification model from the pre-trained BERT model, some weights are not used and some are newly initialized, as expected")
print("\nThis model should be fine-tuned on a downstream task for better performance")


#### Monitor GPU memory usage

In [None]:
if torch.cuda.is_available():
    print(torch.cuda.memory_summary(device=None, abbreviated=False))

#### Train the model on the preprocessed dataset for several epochs

In [None]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0

    for batch in dataloader:
        optimizer.zero_grad()
        
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        outputs = model(input_ids, attention_mask=attention_mask)
        loss = criterion(outputs.logits, labels)
        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()

    return running_loss / len(dataloader)

# Train the model for the desired number of epochs
num_epochs = 3
if torch.cuda.is_available():
    num_epochs = 50

for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {train_loss:.4f}")

#### Save the trained model for future use

In [None]:
torch.save(model.state_dict(), "model.pt")

#### Evaluate the model's performance on the test set

In [None]:
def evaluate(model, dataloader, device):
    model.eval()
    true_labels = []
    pred_labels = []

    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs.logits, dim=1)

            true_labels.extend(labels.cpu().numpy())
            pred_labels.extend(preds.cpu().numpy())

    return true_labels, pred_labels

# Evaluate the model on the test set
true_labels, pred_labels = evaluate(model, test_loader, device)

#### Calculate performance metrics

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Calculate performance metrics
accuracy = accuracy_score(true_labels, pred_labels)
precision = precision_score(true_labels, pred_labels, average='weighted')
recall = recall_score(true_labels, pred_labels, average='weighted')
f1 = f1_score(true_labels, pred_labels, average='weighted')

# Print the performance metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")

### <font color='blue'>Part 3: Implement the TextFooler-based Attack</font>

#### Import TextFooler

In [None]:
# Select a subset of the test set for generating adversarial examples
subset_test_df = test_df.sample(n=100, random_state=42)
subset_test_encodings = tokenizer(subset_test_df['text'].tolist(), truncation=True, padding=True, max_length=256)
subset_test_labels = subset_test_df['label'].values - 1
subset_test_dataset = AGNewsDataset(subset_test_encodings, subset_test_labels)
subset_test_loader = DataLoader(subset_test_dataset, batch_size=1, shuffle=False)

#### Custom dataset class for TextAttack

In [None]:
class SimpleTextDataset:
    def __init__(self, dataset, tokenizer):
        self.dataset = dataset
        self.tokenizer = tokenizer
        self.texts = []
        self.labels = []

        for item in dataset:
            text = self.tokenizer.decode(item['input_ids'])
            label = item['labels'].item()
            self.texts.append(text)
            self.labels.append(label)

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

#### Create a TextAttack dataset from the subset of the test set using the custom dataset class

In [None]:
subset_test_dataset_custom = SimpleTextDataset(subset_test_dataset, tokenizer)

#### Prepare the Model for TextFooler

In [None]:
import textattack
from textattack.attack_recipes import TextFoolerJin2019
from textattack.models.wrappers import HuggingFaceModelWrapper
import pickle

# Wrap the trained model for use with TextAttack
model_wrapper = HuggingFaceModelWrapper(model, tokenizer)

# Instantiate the TextFooler attack
attack = TextFoolerJin2019.build(model_wrapper)

# Generate adversarial examples
adversarial_examples = []
for i in range(len(subset_test_dataset_custom)):
    original_text, ground_truth_label = subset_test_dataset_custom[i]
    attack_result = attack.attack(original_text, ground_truth_label)
    adversarial_examples.append(attack_result)

# Save the generated adversarial examples for future analysis
with open("adversarial_examples.pkl", "wb") as f:
    pickle.dump(adversarial_examples, f)


 ### <font color='blue'>Part 4: Evaluate the Impact on the Model's Performance </font>

#### Load the adversarial examples:


In [None]:
with open("adversarial_examples.pkl", "rb") as f:
    adversarial_examples = pickle.load(f)

#### Define a function to compute accuracy

In [None]:
def compute_accuracy(model, tokenizer, dataset):
    total_count = len(dataset)
    correct_count = 0

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for text, true_label in dataset:
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
        inputs = {key: tensor.to(device) for key, tensor in inputs.items()}  # Move inputs to the same device as the model
        outputs = model(**inputs)
        predicted_label = torch.argmax(outputs.logits, dim=1).item()

        if predicted_label == true_label:
            correct_count += 1

    return correct_count / total_count

#### Evaluate the model on the original test dataset

In [None]:
original_accuracy = compute_accuracy(model, tokenizer, subset_test_dataset_custom)
print("Original Accuracy: {:.2f}%".format(original_accuracy * 100))

#### Create a dataset with the adversarial examples:

In [None]:
from textattack.attack_results import SuccessfulAttackResult

successful_attacks = [example for example in adversarial_examples if isinstance(example, SuccessfulAttackResult)]
adversarial_dataset = [(example.perturbed_text(), example.original_result.ground_truth_output) for example in successful_attacks]


#### Evaluate the model on the adversarial examples

In [None]:
adversarial_accuracy = compute_accuracy(model, tokenizer, adversarial_dataset)
print("Adversarial Accuracy: {:.2f}%".format(adversarial_accuracy * 100))

#### Case Study: Visualizing the difference between original and adversarial text

In [None]:
num_examples = 5  # Number of examples to display

print("Case Study: Visualizing the difference between original and adversarial text\n")

for i, attack_result in enumerate(successful_attacks[:num_examples]):
    print(f"Example {i+1}:")
    print("-" * 80)
    print("Original Text:")
    print(attack_result.original_text())
    print("\nAdversarial Text:")
    print(attack_result.perturbed_text())
    print("\nGround Truth Label:", attack_result.original_result.ground_truth_output)
    print("Predicted Label (Original):", attack_result.original_result.output)
    print("Predicted Label (Adversarial):", attack_result.perturbed_result.output)
    print("-" * 80)
    print("\n")

 ### <font color='blue'>Part 5: Adverserial Training </font>

#### Generate adversarial examples using the training dataset

In [None]:
from torch.utils.data import DataLoader
from tqdm.auto import tqdm

train_subset = torch.utils.data.Subset(train_dataset, range(10))  # Use a subset of 1000 samples
train_subset_custom = SimpleTextDataset(train_subset, tokenizer)

adversarial_train_examples = []

for original_text, ground_truth_label in tqdm(train_subset_custom, desc="Generating adversarial examples"):
    attack_result = attack.attack(original_text, ground_truth_label)
    if isinstance(attack_result, SuccessfulAttackResult):
        adversarial_train_examples.append((attack_result.perturbed_text(), ground_truth_label))


#### Mix the original training dataset with the generated adversarial examples.


In [None]:
mixed_train_dataset = [(text, label) for text, label in train_subset_custom] + adversarial_train_examples

#### Monitor GPU memory usage

In [None]:
torch.cuda.empty_cache()
if torch.cuda.is_available():
    print(torch.cuda.memory_summary(device=None, abbreviated=False))

#### Train the model on the mixed dataset.


In [None]:
from transformers import AdamW

# Move the model to the GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Use a DataLoader to handle batching of the mixed dataset
mixed_train_dataloader = DataLoader(mixed_train_dataset, batch_size=1, shuffle=True)

# Set up the optimizer
optimizer = AdamW(model.parameters(), lr=5e-5)

# Train the model
num_epochs = 1
if torch.cuda.is_available():
    num_epochs = 50
model.train()

for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    for batch in tqdm(mixed_train_dataloader, desc="Training"):
        optimizer.zero_grad()
        texts, labels = batch
        inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True)
        inputs = {key: tensor.to(device) for key, tensor in inputs.items()}
        labels = torch.tensor(labels).to(device)
        
        outputs = model(**inputs, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()


#### Evaluate the model's performance on the test dataset and adversarial examples.

In [None]:
model.eval()

# Evaluate on the original test dataset
test_accuracy = compute_accuracy(model, tokenizer, subset_test_dataset_custom)
print(f"Accuracy on the test dataset: {test_accuracy * 100:.2f}%")

# Evaluate on the adversarial examples
adversarial_accuracy = compute_accuracy(model, tokenizer, adversarial_dataset)
print(f"Accuracy on the adversarial examples: {adversarial_accuracy * 100:.2f}%")
