# Introduction
This jupyter notebook should always contain the latest implementation of our attack.
If you want to use it, duplicate it. Please also update it if you have an improvement.

In [None]:
import csv
import os, sys
from pathlib import Path
import numpy as np

import torch
from transformers import BertTokenizerFast, AutoModelForMaskedLM, AutoTokenizer, pipeline

#### Paths Setup

In [None]:
# if started in repository
if Path.cwd().absolute().match("ml4nlp2-adversarial-attack"): 
    REPO_PATH = Path.cwd().absolute()
else:
    # Specify the path to the repository
    BASE_PATH= Path("/kaggle/working/")
    REPO_PATH = BASE_PATH / "ml4nlp2-adversarial-attack"
    assert BASE_PATH.exists(), "Base path not found. Please change, where you want to have the repo installed."

DATA_BASE_PATH = REPO_PATH / "clef2024-checkthat-lab" / "task6" /"incrediblAE_public_release"
OUTPUT_DIR = REPO_PATH / "output"

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # used for the victim models
print(f"DEVICE: {DEVICE}")

##### Create Repository: (In case this notebook is uploaded without the repository and files:)
Note: Skip this section if the repository already exists on your local machine

In [None]:
if REPO_PATH.exists():
    os.chdir(REPO_PATH)
    raise Exception("Repository already exists.")

# Cloning Repostiory
!git config --global user.email "<User Email>"
!git config --global user.name "<User Name>"
!git clone --recurse-submodules https://<Github Token>@github.com/<User Name>/ml4nlp2-adversarial-attack

##### Special Imports
(Imports that need be installed manually, and are not default in CLoud Computing Environments (Colab, Kaggle)

In [None]:
os.chdir(REPO_PATH)
%pip install -r requirements.txt

In [None]:
sys.path.append(os.path.join(REPO_PATH, "BODEGA"))
os.chdir(REPO_PATH)

from BODEGA.victims.bert import VictimBERT
from BODEGA.victims.bilstm import VictimBiLSTM
from BODEGA.victims.caching import VictimCache
from bleurt_pytorch import BleurtForSequenceClassification, BleurtTokenizer


from lime.lime_text import LimeTextExplainer

import OpenAttack
from OpenAttack import Victim
from OpenAttack.attack_assist.goal import ClassifierGoal


In [None]:
from beam_attack.infrastructure_helper_functions import get_incredible_dataset, free_up_model_space
import beam_attack.attack as beam_attack
from beam_attack import BodegaAttackEvaluations

In [None]:
import numpy

from datasets import Dataset, DatasetDict, concatenate_datasets
from transformers import AutoTokenizer, DataCollatorWithPadding, AutoConfig
from transformers import AutoModelForSequenceClassification
from torch.utils.data import DataLoader
from tqdm.auto import tqdm

from utils.data_mappings import SEPARATOR
import pathlib

BATCH_SIZE = 16
MAX_LEN = 512
EPOCHS = 5
MAX_BATCHES = -1
pretrained_model = "roberta-base"




def eval_loop(model, eval_dataloader, device, skip_visual=False):
    print("Evaluating...")
    model.eval()
    progress_bar = tqdm(range(len(eval_dataloader)), ascii=True, disable=skip_visual)
    correct = 0
    size = 0
    TPs = 0
    FPs = 0
    FNs = 0
    for i, batch in enumerate(eval_dataloader):
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)

        logits = outputs.logits
        # print(logits)
        # a = input()
        pred = torch.argmax(logits, dim=-1).detach().to(torch.device('cpu')).numpy()
        Y = batch["labels"].to(torch.device('cpu')).numpy()
        eq = numpy.equal(Y, pred)
        size += len(eq)
        correct += sum(eq)
        TPs += sum(numpy.logical_and(numpy.equal(Y, 1.0), numpy.equal(pred, 1.0)))
        FPs += sum(numpy.logical_and(numpy.equal(Y, 0.0), numpy.equal(pred, 1.0)))
        FNs += sum(numpy.logical_and(numpy.equal(Y, 1.0), numpy.equal(pred, 0.0)))
        progress_bar.update(1)

        # print(Y)
        # print(pred)
        # a = input()

        if i == MAX_BATCHES:
            break
    print('Accuracy: ' + str(correct / size))
    print('F1: ' + str(2 * TPs / (2 * TPs + FPs + FNs)))
    print(correct, size, TPs, FPs, FNs)

    results = {
        'Accuracy': correct/size,
        'F1': 2 * TPs / (2 * TPs + FPs + FNs)
    }
    return results


class VictimRoBERTa(OpenAttack.Classifier):
    def __init__(self, path, task, device=torch.device('cpu')):
        self.device = device
        config = AutoConfig.from_pretrained(pretrained_model)
        self.model = AutoModelForSequenceClassification.from_config(config)
        self.model.load_state_dict(torch.load(path))
        self.model.to(device)
        self.model.eval()
        self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model)
        self.with_pairs = (task == 'FC' or task == 'C19')

    def get_pred(self, input_):
        return self.get_prob(input_).argmax(axis=1)

    def get_prob(self, input_):
        try:
            probs = None
            # print(len(input_), input_)

            batched = [input_[i * BATCH_SIZE:(i + 1) * BATCH_SIZE] for i in
                       range((len(input_) + BATCH_SIZE - 1) // BATCH_SIZE)]
            for batched_input in batched:
                if not self.with_pairs:
                    tokenised = self.tokenizer(batched_input, truncation=True, padding=True, max_length=MAX_LEN,
                                               return_tensors="pt")
                else:
                    parts = [x.split(SEPARATOR) for x in batched_input]
                    tokenised = self.tokenizer([x[0] for x in parts], [(x[1] if len(x) == 2 else '') for x in parts],
                                               truncation=True, padding=True,
                                               max_length=MAX_LEN,
                                               return_tensors="pt")
                with torch.no_grad():
                    tokenised = {k: v.to(self.device) for k, v in tokenised.items()}
                    outputs = self.model(**tokenised)
                probs_here = torch.nn.functional.softmax(outputs.logits, dim=-1).to(torch.device('cpu')).numpy()
                if probs is not None:
                    probs = numpy.concatenate((probs, probs_here))
                else:
                    probs = probs_here
            return probs
        except Exception as e:
            # Used for debugging
            raise

# CODE: Beam Attack Usage

In [None]:
class BeamAttacker(OpenAttack.attackers.ClassificationAttacker):
    # imp options: "lime", "random", "bert" (for the bert-style word importances)
    def __init__(self, device, verbose=False, k=20, width=None, early_stop=1, temperature=1.0, 
                 imp="lime", lime_num_features=5000, lime_num_samples=5000, lime_kernel_width=25,
                 filter_beams_with_only_negative=False, only_positive_importances=False, remove_words=True, keep_original=True, 
                 positional=True, add_semantic_pruning=False, semantic_pruning_ratio=0.0):
        self.imp = imp
        self.lime_num_features = lime_num_features
        self.lime_num_samples = lime_num_samples
        self.temperature = temperature
        self.verbose = verbose
        self.k = k
        self.positional = positional
        if width:
            self.width = width
        else:
            self.width = k
        self.early_stop = early_stop
        self.filter_beams_with_only_negative = filter_beams_with_only_negative
        self.only_positive_importances = only_positive_importances
        self.remove_words = remove_words
        self.keep_original = keep_original
        self.add_semantic_pruning = add_semantic_pruning
        self.semantic_pruning_ratio = semantic_pruning_ratio
        self.device = device
        
        # self.bert_tokenizer = BertTokenizerFast.from_pretrained("bert-base-uncased", do_lower_case=True)
        self.roberta_tokenizer = AutoTokenizer.from_pretrained('roberta-large')
        # self.pipe = pipeline("fill-mask", model="roberta-large", tokenizer=self.roberta_tokenizer, device=self.device)
        self.roberta_model = AutoModelForMaskedLM.from_pretrained('roberta-large').to(device)
        # self.roberta_tokenizer = AutoTokenizer.from_pretrained('vinai/bertweet-large')
        # self.roberta_model = AutoModelForMaskedLM.from_pretrained('vinai/bertweet-large').to(device)
        
        if self.imp == "lime":
            self.explainer = LimeTextExplainer(verbose=verbose, kernel_width=lime_kernel_width, bow=not positional)
        else:
            self.explainer = None
        
        self.result_out = []

        # if we have early_stop > 1, we get multiple candidates and calculate their bodega scores and return the best one.
        if self.early_stop > 1 or self.add_semantic_pruning:
            # Load BLEURT model and tokenizer
            self.bleurt_model = BleurtForSequenceClassification.from_pretrained('lucadiliello/BLEURT-20')
            self.bleurt_tokenizer = BleurtTokenizer.from_pretrained('lucadiliello/BLEURT-20')
            # self.bleurt_model = BleurtForSequenceClassification.from_pretrained('lucadiliello/BLEURT-20-D12')
            # self.bleurt_tokenizer = BleurtTokenizer.from_pretrained('lucadiliello/BLEURT-20-D12')
        else:
            self.scorer = None
            
        print("Beam_Attack: Initialized: ", self.k, self.width, self.early_stop)

    def attack(self, victim : VictimCache, input_ : str, goal : ClassifierGoal):      
        beam_outputs, target_class, initial_proba = beam_attack.attack_text_bfs(text=input_, victim=victim, 
                device=self.device, roberta_model=self.roberta_model, roberta_tokenizer = self.roberta_tokenizer, k=self.k, width=self.width, 
                verbose=self.verbose, early_stop=self.early_stop,  imp=self.imp, temperature=self.temperature, explainer=self.explainer,
                lime_num_features=self.lime_num_features, lime_num_samples=self.lime_num_samples,
                filter_beams_with_only_negative=self.filter_beams_with_only_negative, only_positive_importances=self.only_positive_importances, 
                remove_words=self.remove_words, keep_original=self.keep_original, add_semantic_pruning=self.add_semantic_pruning, 
                semantic_pruning_ratio=self.semantic_pruning_ratio, bleurt_model=self.bleurt_model, bleurt_tokenizer=self.bleurt_tokenizer, positional=self.positional)
        
        self.result_out.append({"output":beam_outputs, "target_class": target_class, "initial_probability": initial_proba})
        
        if not any(beam_outputs):
            return None
        elif len(beam_outputs) > 1:
            semantic_similarities = beam_attack.calculate_bleurt_score(self.bleurt_model, self.bleurt_tokenizer, 
                            [input_]*len(beam_outputs), [beam[0] for beam in beam_outputs], device=self.device)
            return beam_outputs[np.argmax(semantic_similarities)][0]
        
        return beam_outputs[0][0]
               
    def save_results(self, path):
        with open(path, 'w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["output", "target_class", "initial_probability"])
            for row in self.result_out:
                writer.writerow(row)
        
        print("Beam_Attack: Results saved at: ", path.absolute())


# Testing

In [None]:
task = "RD"
first_n_samples = None

task_path = DATA_BASE_PATH / task

victim_model = 'surprise' # BERT or BiLSTM or surprise
model_path = Path(os.path.join(task_path, f"{victim_model}-512.pth"))
if victim_model == 'BERT':
    victim = VictimCache(model_path, VictimBERT(model_path, task, DEVICE))
    victim_model_generator ="bert-style"
elif victim_model == 'BiLSTM':
    victim = VictimCache(model_path, VictimBiLSTM(model_path, task, DEVICE))
    victim_model_generator ="bert-style"
elif victim_model == 'surprise':
    victim = VictimCache(model_path, VictimRoBERTa(model_path, task, DEVICE))
    victim_model_generator ="surprise"

# Prepare victim
print("Loading up victim model...")
dataset, with_pairs = get_incredible_dataset(task, task_path, victim_model_generator=victim_model_generator,subset="attack", first_n_samples=first_n_samples, randomised=False)
SEPARATOR_CHAR = '~'
SEPARATOR = ' ' + SEPARATOR_CHAR + ' '

dataset = dataset.select(indices=range(300, len(dataset)))

print("Dataset: ", dataset.shape, dataset.features)

OUTPUT_DIR.mkdir(parents=False, exist_ok=True)
results_path = OUTPUT_DIR / ("results_" + task + "_" + f"_{victim_model}.txt")
beam_attack_log_path = OUTPUT_DIR / ("beam_attack_log_" + task + f"_{victim_model}" + ".csv")

In [None]:
attacker = BeamAttacker(device=DEVICE, verbose=False, k=10, width=30, early_stop=10, imp="bert", lime_num_samples=500, temperature=1)
scorer = BodegaAttackEvaluations.setup_bodega_scorer_and_run_attacks(attacker, dataset, DEVICE, OUTPUT_DIR, victim, results_path=results_path, task=task, victim_name=victim_model)

BodegaAttackEvaluations.calculate_metrics(scorer, results_path=results_path)

In [None]:
victim.finalise()
attacker.save_results(beam_attack_log_path)
del victim
del attacker
free_up_model_space()