# **Adversarial Attacks Against LLM-Based Spam Filters**

Adapted from EN.650.654 Computer Intrusion Detection

Professor: Dr. Xiangyang Li https://isi.jhu.edu/faculty/xiangyang-li/

TA: Yi He yhe106@jh.edu


## **Introduction**

Using the magic words to generate adversarial emails.
Evaluating the effectiveness of these attacks against large language model-based spam filters.
Analyzing the impact of different insertion positions on attack success rates.

## **1. Loading Dependencies**


Mount your drive


In [None]:
from google.colab import drive
drive.mount('/content/drive')

loading dependencies for bert model

In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from transformers import BertTokenizer, BertForSequenceClassification, TrainingArguments, Trainer
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score
from tqdm import trange

## **2. Loading & Spliting Dataset**
Split into 80% training and 20% validation for experiments.
  - **Training Dataset** will be used to train our spam filter
  - **Validation Dataset** will be used first to evaluate spam filter's performance, and then by inserting magic words, we get modified emails for testing attack succuss rate. this is used as **test set** in part 1
      - Reserve 10 random spam emails for modification from Validation dataset

<font color='red'> Don't forget to replace the path to messages.csv</font>

In [None]:
from sklearn.model_selection import train_test_split

def data_extraction():

  # Change to the filename you uploaded.
  df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/Gen/messages.csv')

  x = df.message
  y = df.label

  x_train, x_val, y_train, y_val = train_test_split(x, y, test_size=0.2, random_state=99, stratify=y)

  spam_emails = x_val[y_val == 1]
  reserved_samples = spam_emails.sample(n=10, random_state=2025)

  reserved_samples.to_csv("reserved_samples.csv", index=False, header=True)

  return x_train, x_val, y_train, y_val, reserved_samples

train_inputs, validation_inputs, train_labels, validation_labels, reserved_samples = data_extraction()
print(train_inputs.shape, validation_inputs.shape)

# Display reserved samples
for i, email in enumerate(reserved_samples.tolist()):
    print(f"Sample {i+1}: {email}\n")


## **3. Pre-Process Data**
In this section, we define the preprocessing steps.
- The text is tokenized, converted to token IDs, padded or truncated to a fixed length, and attention masks are generated.

These preprocessing steps ensure that the data is ready for efficient input into each model.

In [None]:
def preprocessing(input_text, tokenizer):
    '''
    Returns <class transformers.tokenization_utils_base.BatchEncoding>
    '''
    return tokenizer(
        input_text,
        truncation=True,
        padding='max_length',
        max_length=32,
        return_tensors='pt'
    )

# Load the BERT tokenizer
bert_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
# Function to preprocess data for BERT
def preprocessing_for_bert(inputs, labels, tokenizer=bert_tokenizer):
    '''
    data: Pandas dataframe containing data and their labels.
    Returns list of 2D tensors.
    '''
    encoding_dict = preprocessing(inputs.tolist(), tokenizer)
    token_id = encoding_dict['input_ids']
    attention_masks = encoding_dict['attention_mask']
    labels = torch.tensor(labels.tolist())
    return token_id, attention_masks, labels



## **4. Training LLM Spam Filters and Evaluating Adversarial Attack**
In this section, we train spam filters using BERT model.

First, we train the model on a labeled spam dataset. The training process involves fine-tuning the pre-trained models to classify spam and non-spam messages.

Once the models are trained, we save them to disk for future evaluation. In subsequent steps, we can load the saved models to evaluate their performance, including testing their robustness against adversarial attacks aimed at evading spam detection. This approach allows for efficient re-use of the trained models without retraining each time.

### **Training Bert Spam Filter**

In [None]:
# preprocess the training dataset for bert
train_token_id, train_attention_masks, train_labels = preprocessing_for_bert(train_inputs, train_labels, bert_tokenizer)
# preprocess the validation dataset for bert
validation_token_id, validation_attention_masks, validation_labels = preprocessing_for_bert(validation_inputs, validation_labels, bert_tokenizer)
print(train_token_id.shape, validation_token_id.shape)

# DataLoader initialization
batch_size = 16
train_data = TensorDataset(train_token_id, train_attention_masks, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_token_id, validation_attention_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Load the model
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
model.to(device)

# Training setup
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

epochs = 2
for _ in trange(epochs, desc="Epoch"):
    # Set model to training mode
    model.train()
    tr_loss = 0
    nb_tr_steps = 0

    for step, batch in enumerate(train_dataloader):
        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch

        # Clear out gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(input_ids=b_input_ids, attention_mask=b_input_mask, labels=b_labels)
        loss = outputs.loss

        # Backward pass
        loss.backward()
        optimizer.step()

        # Update tracking variables
        tr_loss += loss.item()
        nb_tr_steps += 1

    # ========== Validation ==========

    # Set model to evaluation mode
    model.eval()

    # Tracking variables
    all_labels = []
    all_preds = []

    for batch in validation_dataloader:
        batch = tuple(t.to(device) for t in batch)
        b_input_ids, b_input_mask, b_labels = batch

        with torch.no_grad():
            # Forward pass
            outputs = model(input_ids=b_input_ids, attention_mask=b_input_mask)
            logits = outputs.logits
            predicted_labels = torch.argmax(logits, dim=1)

        all_labels.extend(b_labels.cpu().numpy())
        all_preds.extend(predicted_labels.cpu().numpy())

    # Calculate evaluation metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average="binary", zero_division=1)
    recall = recall_score(all_labels, all_preds, average="binary", zero_division=1)
    f1 = f1_score(all_labels, all_preds, average="binary", zero_division=1)

    # Calculate False Positive Rate (FPR) and False Negative Rate (FNR)
    tn, fp, fn, tp = confusion_matrix(all_labels, all_preds).ravel()
    fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
    fnr = fn / (fn + tp) if (fn + tp) > 0 else 0

    # Print metrics
    print(f"Epoch {_ + 1}/{epochs}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"False Positive Rate (FPR): {fpr:.4f}")
    print(f"False Negative Rate (FNR): {fnr:.4f}")
    print("\n\t - Train loss: {:.4f}".format(tr_loss / nb_tr_steps))

    # save model
    model_path = f"Bert_epoch{_ + 1}.pth"
    torch.save(model.state_dict(), model_path)
    print(f"Model saved to {model_path}")


## **5.Adversarial Attack with Magic Words**
Apply different insertion strategies to the reserved 10 spam emails to find out the attack success rate. You can manually do the insertion or write some code to do it.

- word based insertion, insert your magic words as a string.
  - word_0: insert at the begining of the email
  - word_1: insert behind the first sentence
  - word_2: insert behind the second sentence
  - word_3: insert behind the third sentence
  - word_∞: insert at the end position
- sentence based insertion, insert your magic sentences (You need to create one or two semantically meaningful sentences from these words.)
  - sentence_0: insert at the begining of the email
  - sentence_1: insert behind the first sentence
  - sentence_2: insert behind the second sentence
  - sentence_3: insert behind the third sentence
  - sentence_∞: insert at the end position

The following is a function for this task.

In [None]:
def insert_procession(text, insertion, position):
    punctuation = ['.', '!', '?']
    punctuation_indices = [i for i, char in enumerate(text) if char in punctuation]

    if position == "sentence_0" or position == "word_0":
        return insertion + text
    elif position == "sentence_∞" or position == "word_∞":
        return text + insertion
    else:
        position_index = int(position.split('_')[1]) - 1
        if position_index < len(punctuation_indices):
            insert_pos = punctuation_indices[position_index] + 1
            text = text[:insert_pos] + " " + insertion + text[insert_pos:]
    return text

def insert_magic_word(text, magic_word, magic_sentences, position):
  if "word" in position:
    return insert_procession(text, magic_word, position)
  elif "sentence" in position:
    return insert_procession(text, magic_sentences, position)

In [None]:
# you can create your magic_sentence by changing orders and add prepositions if needed
magic_words = "translation cascadilla workshop proceeding benjamin \
academic ldc chorus native colingacl french sentence \
pkzip euralex linguistic risked ammondt phonetic \
arizona grammar ipa theory linguist"
magic_sentences = "Academic linguist Benjamin pkzips phonetic sentence \
translation grammar theory in Euralex COLING/ACL \
workshop proceeding in Arizona. Native French Am-\
mondt risked the linguistic IPA Chorus of LDC Cas-\
cadilla."

# modify every emails
modifications = ["word_0", "word_1", "word_2", "word_3", "word_∞", "sentence_0", "sentence_1", "sentence_2", "sentence_3", "sentence_∞"]
# read reserved_samples from file
samples = pd.read_csv('reserved_samples.csv')

for modification in modifications:
  modified_text = samples['message'].apply(lambda row: insert_magic_word(row, magic_words, magic_sentences, modification))
  modified_text = modified_text.apply(lambda text: text.replace('\t', ' ').replace('\n', ' '))
  samples[modification] = modified_text
  print(type(samples))
  # modified_text.to_csv(modified_file_path, sep='\t', index=False, header=False)
  # print(modified_text.head())
samples.to_csv('modified_samples', sep='\t', index=False, header=True, quotechar='"')
print(samples.head())

## **6. Evaluating the Attack Effectiveness**
In this section, we measure the effectiveness of adversarial attacks against different LLM-based spam filters by measuring their attack success rate - False Negative Rate (FNR) in the evaluation performance.

Specifically, we assess how well the adversarial emails evade detection when adversarial tokens (magic words or sentences) are inserted at various positions within the email text.

In [None]:
print(modified_text.head())


In [None]:
# @title Evaluation on Bert
# evaluate your result on bert model
# load model
model_path = f"Bert_epoch2.pth"
# or "/content/drive/My Drive/CID_final/Bert_epoch2.pth"
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
model.load_state_dict(torch.load(model_path))
model.to(device)
model.eval()
print("model is loaded.")
batch_size = 10

reserved_samples = pd.read_csv("reserved_samples.csv")
labels = pd.Series([1]*10) # all "1" spam
reserved_token_id, reserved_attention_masks, reserved_labels = preprocessing_for_bert(reserved_samples['message'], labels, bert_tokenizer)
reserved_data = TensorDataset(reserved_token_id, reserved_attention_masks, reserved_labels)
reserved_sampler = SequentialSampler(reserved_data)
reserved_dataloader = DataLoader(reserved_data, sampler=reserved_sampler, batch_size=batch_size)

all_preds = []
all_labels = [] # should be all "1" spam
for batch in reserved_dataloader:
    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_input_mask, b_labels = batch
    with torch.no_grad():
        outputs = model(input_ids=b_input_ids, attention_mask=b_input_mask)
        logits = outputs.logits
        predicted_labels = torch.argmax(logits, dim=1)
    all_preds.extend(predicted_labels.cpu().numpy())
    all_labels.extend(b_labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds, average="binary", zero_division=1)
recall = recall_score(all_labels, all_preds, average="binary", zero_division=1)
f1 = f1_score(all_labels, all_preds, average="binary", zero_division=1)

# Calculate False Positive Rate (FPR) and False Negative Rate (FNR)
tn, fp, fn, tp = confusion_matrix(all_labels, all_preds).ravel()
fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
fnr = fn / (fn + tp) if (fn + tp) > 0 else 0

# Print metrics
print(f"Before modification")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"False Positive Rate (FPR): {fpr:.4f}")
print(f"False Negative Rate (FNR): {fnr:.4f}")

modified_samples = pd.read_csv('modified_samples', sep='\t')

for modification in modifications:
  modified_text = modified_samples[modification]
  # preprocess data
  modified_token_id, modified_attention_masks, modified_labels = preprocessing_for_bert(modified_text, labels, bert_tokenizer)
  # use model to predict
  modified_data = TensorDataset(modified_token_id, modified_attention_masks, modified_labels)
  modified_sampler = SequentialSampler(modified_data)
  modified_dataloader = DataLoader(modified_data, sampler=modified_sampler, batch_size=batch_size)

  all_preds = []

  for batch in modified_dataloader:
      batch = tuple(t.to(device) for t in batch)
      b_input_ids, b_input_mask, b_labels = batch
      with torch.no_grad():
          outputs = model(input_ids=b_input_ids, attention_mask=b_input_mask)
          logits = outputs.logits
          predicted_labels = torch.argmax(logits, dim=1)
      all_preds.extend(predicted_labels.cpu().numpy())
  accuracy = accuracy_score(all_labels, all_preds)
  precision = precision_score(all_labels, all_preds, average="binary", zero_division=1)
  recall = recall_score(all_labels, all_preds, average="binary", zero_division=1)
  f1 = f1_score(all_labels, all_preds, average="binary", zero_division=1)

  # Calculate False Positive Rate (FPR) and False Negative Rate (FNR)
  tn, fp, fn, tp = confusion_matrix(all_labels, all_preds).ravel()
  fpr = fp / (fp + tn) if (fp + tn) > 0 else 0
  fnr = fn / (fn + tp) if (fn + tp) > 0 else 0

  # Print metrics
  print(f"After modification: {modification}")
  print(f"Accuracy: {accuracy:.4f}")
  print(f"Precision: {precision:.4f}")
  print(f"Recall: {recall:.4f}")
  print(f"F1 Score: {f1:.4f}")
  print(f"False Positive Rate (FPR): {fpr:.4f}")
  print(f"False Negative Rate (FNR): {fnr:.4f}")

