In [None]:
import nltk
nltk.download('all')
!pip install textattack
!pip install datasets



In [None]:
!pip install --upgrade transformers torch textattack


In [None]:
#this block only applies textfooler and then created combined dataset, further analysis in next block

import torch
import torch.nn as nn
import numpy as np
import json
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
from datasets import load_dataset, Dataset
from tqdm import tqdm
from itertools import product
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from textattack.attack_recipes import TextFoolerJin2019
from textattack.models.wrappers import HuggingFaceModelWrapper
import matplotlib.pyplot as plt

# Load and split SST-2 dataset
dataset = load_dataset("glue", "sst2")
train_data = dataset["train"].select(range(5000))  # Take only 5,000 samples

num_samples = len(train_data) // 2
clean_data = train_data.select(range(num_samples))
perturb_data = train_data.select(range(num_samples, len(train_data)))

# Save split datasets
clean_data.to_json("sst2_clean_split.json")
perturb_data.to_json("sst2_perturb_split.json")

# Load pre-trained tokenizer and model
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
classifier = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=2).to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

# Wrap model for TextAttack
wrapped_model = HuggingFaceModelWrapper(classifier, tokenizer)

# Initialize TextFooler Attack
attack = TextFoolerJin2019.build(wrapped_model)

# Apply adversarial attack
perturbed_texts = []
for example in tqdm(perturb_data, desc="Applying TextFooler Attack"):
    input_text = example['sentence']
    ground_truth_label = example['label']
    try:
        adv_example = attack.attack(input_text, ground_truth_label)
        adv_text = adv_example.perturbed_text()
    except Exception as e:
        print(f"Error generating adversarial example: {e}")
        adv_text = input_text
    perturbed_texts.append({"sentence": adv_text, "label": ground_truth_label})

# Convert to Dataset format
perturbed_dataset = Dataset.from_list(perturbed_texts)
perturbed_dataset.to_json("sst2_textfooler_split.json")

In [None]:
# Merge the clean and perturbed datasets to generate balanced dataset

from datasets import Dataset

clean_data = Dataset.from_json("sst2_clean_split.json")
perturbed_data = Dataset.from_json("sst2_textfooler_split.json")

num_samples = min(len(clean_data), len(perturbed_data))
clean_data = clean_data.select(range(num_samples))
perturbed_data = perturbed_data.select(range(num_samples))

# Convert to lists and filter out invalid entries
clean_list = [ex for ex in clean_data.to_list() if ex["sentence"] is not None]
perturbed_list = [ex for ex in perturbed_data.to_list() if ex["sentence"] is not None]

# Merge clean and perturbed datasets
final_dataset = Dataset.from_list(clean_list + perturbed_list)
final_dataset.to_json("sst2_final_balanced_textfooler.json")

print("wFinal balanced dataset (50% clean, 50% TextFooler-attacked) saved as 'sst2_final_balanced_textfooler.json'")


In [None]:
import os
import torch
import torch.nn as nn
import numpy as np
import json
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification, Trainer, TrainingArguments
from datasets import Dataset
from tqdm import tqdm
from itertools import product
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from scipy.spatial.distance import cosine
import matplotlib.pyplot as plt

os.environ["WANDB_DISABLED"] = "true"

# Load combined dataset after TextFooler attack
final_dataset = Dataset.from_json("sst2_final_balanced_textfooler.json")
print("Dataset columns:", final_dataset.column_names)
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')

def tokenize_function(examples):
    if "sentence" not in examples:
        raise ValueError("Dataset does not contain a 'sentence' column. Available columns: " + str(final_dataset.column_names))
    return tokenizer([str(s) for s in examples["sentence"]], padding="max_length", truncation=True, max_length=512)

# Filter dataset to remove invalid inputs
filtered_dataset = final_dataset.filter(lambda x: isinstance(x["sentence"], str) and x["sentence"].strip() != "")

# Apply tokenization
try:
    tokenized_dataset = filtered_dataset.map(tokenize_function, batched=True)
except ValueError as e:
    print("Error during tokenization:", e)
    print("Example dataset structure:", filtered_dataset[0])  # Debugging info
    exit()

# Define model
classifier = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=2)

training_args = TrainingArguments(
    output_dir="sst2_trained_model",
    evaluation_strategy="no",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="logs",
    logging_steps=10
)

trainer = Trainer(
    model=classifier,
    args=training_args,
    train_dataset=tokenized_dataset,
    tokenizer=tokenizer
)

trainer.train()

SAVE_PATH = "sst2_trained_model"
classifier.save_pretrained(SAVE_PATH)
tokenizer.save_pretrained(SAVE_PATH)
print(f"Trained model saved to '{SAVE_PATH}'")

# Function to extract logits
def get_logits(text):
    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=512).to(classifier.device)
    with torch.no_grad():
        outputs = classifier(**inputs)
    return outputs.logits.squeeze().cpu().numpy()

# Function to extract embeddings from hidden layers
def get_embedding(text):
    inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=512).to(classifier.device)
    with torch.no_grad():
        outputs = classifier(**inputs, output_hidden_states=True)
    return outputs.hidden_states[-1].mean(dim=1).squeeze().cpu().numpy()

# Function to compute logit and embedding differences
def compute_detection_features(original_text, adversarial_text):
    orig_logits = get_logits(original_text)
    adv_logits = get_logits(adversarial_text)

    logit_diff = float(np.linalg.norm(orig_logits - adv_logits))  # Logit difference

    orig_embedding = get_embedding(original_text)
    adv_embedding = get_embedding(adversarial_text)
    embed_similarity = float(1 - cosine(orig_embedding, adv_embedding))  # Embedding similarity

    return logit_diff, embed_similarity

# Evaluate detection on final dataset
detection_results = []
for example in tqdm(filtered_dataset, desc="Evaluating Detection"):
    text = example['sentence']
    label = example['label']
    logit_diff, embed_similarity = compute_detection_features(text, text)
    detected = int(logit_diff > 0.08 or embed_similarity < 0.75)  # Explicit detection flag
    detection_results.append({
        "text": text,
        "logit_diff": logit_diff,
        "embed_similarity": embed_similarity,
        "label": label,
        "detected": detected
    })

# Save detection results
with open("sst2_adversarial_detection_results.json", "w") as f:
    json.dump(detection_results, f, indent=4)

# Hyperparameter tuning
LOGIT_DIFF_VALUES = np.arange(0.1, 0.5, 0.08)
EMBED_SIM_VALUES = np.arange(0.7, 0.9, 0.08)
best_f1, best_auc = 0, 0
best_thresholds = (None, None)
all_tuning_results = []

for logit_thresh, embed_thresh in product(LOGIT_DIFF_VALUES, EMBED_SIM_VALUES):
    for res in detection_results:
        res["detected"] = res["logit_diff"] > logit_thresh or res["embed_similarity"] < embed_thresh

    true_labels = [res["label"] for res in detection_results]
    predicted_labels = [1 if res["detected"] else 0 for res in detection_results]

    if len(set(true_labels)) > 1:
        auc = roc_auc_score(true_labels, predicted_labels)
    else:
        auc = 0

    accuracy = accuracy_score(true_labels, predicted_labels)
    f1 = f1_score(true_labels, predicted_labels, zero_division=1)
    all_tuning_results.append({
        "logit_threshold": logit_thresh,
        "embed_threshold": embed_thresh,
        "accuracy": accuracy,
        "f1_score": f1,
        "auc": auc
    })

    if f1 > best_f1 or (f1 == best_f1 and auc > best_auc):
        best_f1 = f1
        best_auc = auc
        best_thresholds = (logit_thresh, embed_thresh)

# Compute ASR
misclassified_adversarial = sum(1 for res in detection_results if res["label"] == 1 and res["detected"] == 0)
total_adversarial = sum(1 for res in detection_results if res["label"] == 1)
asr = misclassified_adversarial / total_adversarial if total_adversarial > 0 else 0
print(f"Attack Success Rate (ASR): {asr:.2%}")

# Save tuning results
with open("sst2_threshold_tuning_results.json", "w") as f:
    json.dump({
        "best_thresholds": best_thresholds,
        "best_f1_score": best_f1,
        "best_auc": best_auc,
        "attack_success_rate": asr,
        "all_results": all_tuning_results
    }, f, indent=4)

print("Best thresholds, Accuracy, F1 Score, AUC, and Attack Success Rate saved.")
