##Preliminary Notes

The aim of the InCrediblAE shared task is to build your own custom attack method that will generate adversarial examples to fool a victim classifier. This notebook is intended as an easy way for you to get started.

<br>

### Using GPU
It is recommended that you run this notebook with a GPU. To do this, click on "additional connection options" (next to Connect / RAM usage), select "change runtime type", and select a GPU.

<br>

### (optional) Mounting Google Drive - don't bother with this if running this notebook for first time
If you will be re-running this notebook many times, it might be convenient to mount your personal google drive. This will allow you to
1. load data/victim files quickly rather than re-downloading them with each session
2. save output files to a permanent location

Instructions for mounting are in the 'Making your own attack section'.


# Setup (installing dependencies)

In [None]:
# !git clone https://github.com/piotrmp/BODEGA

In [None]:
%pip install OpenAttack
%pip install editdistance
%pip install bert-score
%pip install git+https://github.com/lucadiliello/bleurt-pytorch.git


Collecting git+https://github.com/lucadiliello/bleurt-pytorch.git
  Cloning https://github.com/lucadiliello/bleurt-pytorch.git to /tmp/pip-req-build-0g8aqk3u
  Running command git clone --filter=blob:none --quiet https://github.com/lucadiliello/bleurt-pytorch.git /tmp/pip-req-build-0g8aqk3u
  Resolved https://github.com/lucadiliello/bleurt-pytorch.git to commit 279ca1bb4106bde5a89f0f82723197e23d8446cb
  Preparing metadata (setup.py) ... [?25l[?25hdone


In [None]:
# !git clone https://gitlab.clarin-pl.eu/syntactic-tools/lambo.git
# %pip install ./lambo


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Downloading victim models and data

Data and models are downloaded by cloning the [clef2024-checkthat repo](https://gitlab.com/checkthat_lab/clef2024-checkthat-lab.git)
* alternative [google drive folder link](https://drive.google.com/drive/folders/1ZsDHSejiv4USae0viTsfeLpvqXdeq0FL?usp=sharing)

Data and models are downloaded then moved to /content/BODEGA/incrediblAE_public_release

In [None]:
# # temporary folder for downloading victim models and data
# ! mkdir /content/clef2024-checkthat-lab

# import os, sys
# os.chdir("/content/clef2024-checkthat-lab")

# ! git init
# ! git remote add -f origin https://gitlab.com/checkthat_lab/clef2024-checkthat-lab.git
# ! git sparse-checkout init
# ! git sparse-checkout set "task6/incrediblAE_public_release"
# ! git pull origin main


In [None]:
# # move downloaded files to /content/BODEGA
# ! mv /content/clef2024-checkthat-lab/task6/incrediblAE_public_release /content/BODEGA/incrediblAE_public_release

# ! mv /content/BODEGA /content/drive/MyDrive/BODEGA

Misc set up

In [None]:
import os

In [None]:
#folder for storing results of attack method
! mkdir /content/drive/MyDrive/BODEGA/outputs

#code below assumes we are working from the BODEGA repo
os.chdir("/content/drive/MyDrive/BODEGA")

mkdir: cannot create directory ‘/content/drive/MyDrive/BODEGA/outputs’: File exists


# Making your own attack

## Imports

In [None]:
import gc
import os
import pathlib
import sys
import time
import random
import numpy as np

import OpenAttack
import torch
import datasets
from datasets import Dataset

from OpenAttack.tags import Tag
from OpenAttack.text_process.tokenizer import PunctTokenizer

from metrics.BODEGAScore import BODEGAScore
from utils.data_mappings import dataset_mapping, dataset_mapping_pairs, SEPARATOR_CHAR
from utils.no_ssl_verify import no_ssl_verify
from victims.bert import VictimBERT, readfromfile_generator
from victims.bilstm import VictimBiLSTM
from victims.caching import VictimCache
from victims.unk_fix_wrapper import UNK_TEXT

#imports for BodegaAttackEval wrapper
from typing import Any, Dict, Generator, Iterable, List, Optional, Union
from tqdm import tqdm
from OpenAttack.utils import visualizer, result_visualizer, get_language, language_by_name
from OpenAttack.tags import *

In [None]:
using_mounted_drive = False
print('Cuda device available', torch.cuda.is_available())

Cuda device available False


## (do not change) Wrapper for producing submission file

In [None]:
class BodegaAttackEval(OpenAttack.AttackEval):
  '''
  wrapper for OpenAttack.AttackEval to produce a submission.tsv file for shared task evaluation

  To perform evaluation, we use a new method: eval_and_save_tsv() rather than the usual AttackEval.eval()
  submission.tsv file consists of 4 columns for each sample in attack set: succeeded, num_queries, original_text and modified text (newlines are escaped)

  '''
  def eval_and_save_tsv(self, dataset: Iterable[Dict[str, Any]], total_len : Optional[int] = None, visualize : bool = False, progress_bar : bool = False, num_workers : int = 0, chunk_size : Optional[int] = None, tsv_file_path: Optional[os.PathLike] = None):
      """
      Evaluation function of `AttackEval`.

      Args:
          dataset: An iterable dataset.
          total_len: Total length of dataset (will be used if dataset doesn't has a `__len__` attribute).
          visualize: Display a pretty result for each data in the dataset.
          progress_bar: Display a progress bar if `True`.
          num_workers: The number of processes running the attack algorithm. Default: 0 (running on the main process).
          chunk_size: Processing pool trunks size.

          tsv_file_path: path to save submission tsv

      Returns:
          A dict of attack evaluation summaries.

      """


      if hasattr(dataset, "__len__"):
          total_len = len(dataset)

      def tqdm_writer(x):
          return tqdm.write(x, end="")

      if progress_bar:
          result_iterator = tqdm(self.ieval(dataset, num_workers, chunk_size), total=total_len)
      else:
          result_iterator = self.ieval(dataset, num_workers, chunk_size)

      total_result = {}
      total_result_cnt = {}
      total_inst = 0
      success_inst = 0

      #list for tsv
      x_orig_list = []
      x_adv_list = []
      num_queries_list = []
      succeed_list = []

      # Begin for
      for i, res in enumerate(result_iterator):
          total_inst += 1
          success_inst += int(res["success"])

          if TAG_Classification in self.victim.TAGS:
              x_orig = res["data"]["x"]
              if res["success"]:
                  x_adv = res["result"]
                  if Tag("get_prob", "victim") in self.victim.TAGS:
                      self.victim.set_context(res["data"], None)
                      try:
                          probs = self.victim.get_prob([x_orig, x_adv])
                      finally:
                          self.victim.clear_context()
                      y_orig = probs[0]
                      y_adv = probs[1]
                  elif Tag("get_pred", "victim") in self.victim.TAGS:
                      self.victim.set_context(res["data"], None)
                      try:
                          preds = self.victim.get_pred([x_orig, x_adv])
                      finally:
                          self.victim.clear_context()
                      y_orig = int(preds[0])
                      y_adv = int(preds[1])
                  else:
                      raise RuntimeError("Invalid victim model")
              else:
                  y_adv = None
                  x_adv = None
                  if Tag("get_prob", "victim") in self.victim.TAGS:
                      self.victim.set_context(res["data"], None)
                      try:
                          probs = self.victim.get_prob([x_orig])
                      finally:
                          self.victim.clear_context()
                      y_orig = probs[0]
                  elif Tag("get_pred", "victim") in self.victim.TAGS:
                      self.victim.set_context(res["data"], None)
                      try:
                          preds = self.victim.get_pred([x_orig])
                      finally:
                          self.victim.clear_context()
                      y_orig = int(preds[0])
                  else:
                      raise RuntimeError("Invalid victim model")
              info = res["metrics"]
              info["Succeed"] = res["success"]
              if visualize:
                  if progress_bar:
                      visualizer(i + 1, x_orig, y_orig, x_adv, y_adv, info, tqdm_writer, self.tokenizer)
                  else:
                      visualizer(i + 1, x_orig, y_orig, x_adv, y_adv, info, sys.stdout.write, self.tokenizer)

              #list for tsv
              succeed_list.append(res["success"])
              num_queries_list.append(res["metrics"]["Victim Model Queries"])
              x_orig_list.append(x_orig)

              if res["success"]:
                x_adv_list.append(x_adv)
              else:
                x_adv_list.append("ATTACK_UNSUCCESSFUL")



          for kw, val in res["metrics"].items():
              if val is None:
                  continue

              if kw not in total_result_cnt:
                  total_result_cnt[kw] = 0
                  total_result[kw] = 0
              total_result_cnt[kw] += 1
              total_result[kw] += float(val)
      # End for

      summary = {}
      summary["Total Attacked Instances"] = total_inst
      summary["Successful Instances"] = success_inst
      summary["Attack Success Rate"] = success_inst / total_inst
      for kw in total_result_cnt.keys():
          if kw in ["Succeed"]:
              continue
          if kw in ["Query Exceeded"]:
              summary["Total " + kw] = total_result[kw]
          else:
              summary["Avg. " + kw] = total_result[kw] / total_result_cnt[kw]

      if visualize:
          result_visualizer(summary, sys.stdout.write)


      #saving tsv
      if tsv_file_path is not None:
        with open(tsv_file_path, 'w') as f:
          f.write('succeeded' + '\t' + 'num_queries' + '\t' + 'original_text' + '\t' + 'modified_text' + '\t'+ '\n') #header
          for success, num_queries, x_orig, x_adv in zip(succeed_list, num_queries_list, x_orig_list, x_adv_list):
            escaped_x_orig = x_orig.replace('\n', '\\n') #escaping newlines
            escaped_x_adv = x_adv.replace('\n', '\\n')
            f.write(str(success) + '\t' + str(num_queries) + '\t' + escaped_x_orig + '\t' + escaped_x_adv + '\t'+ '\n')

      return summary

## (optional) Mounting Google Drive


Steps to use mounted google drive:
1. create a folder in your local google drive (e.g. `incrediblAE_public_release`)  
2. download all directories from the download link (see [Download section above](https://colab.research.google.com/drive/1juHWIL44z8O3C5wDAE45vzlJgX51KI5D?authuser=3#scrollTo=eVVE2-64rKuS&line=3&uniqifier=1://)) and upload them to your google drive folder
3. create an empty subdirectory called `outputs` (`incredibleAE_public_release/outputs/`)

At this point, your google drive folder should have 6 subdirectories (C19, FC, HN, PR2, RD, and outputs)
4. uncomment code below, replacing path_to_mounted_dir with path to your folder (e.g. `/content/drive/My Drive/incrediblAE_public_release`)



In [None]:
using_mounted_drive = True
path_to_mounted_folder = '/content/drive/MyDrive/BODEGA/incrediblAE_public_release'


You can also comment out the !gdown command in Downloading section, so the notebook doesn't redownload data each time you run it.

## Making custom attacker (token shuffler)

Here's an example of how to create a custom attack method.
Your attacker will need to subclass `OpenAttack.attackers.ClassificationAttacker`  

(See also OpenAttack framework docs: https://openattack.readthedocs.io/en/latest/)

In [None]:
'''
This example code shows how to design a customized attack model (that shuffles the tokens in the original sentence).
Taken from https://github.com/thunlp/OpenAttack/blob/master/examples/custom_attacker.py
'''

class BaseCaseSwapWordsAttack(OpenAttack.attackers.ClassificationAttacker):
    @property
    def TAGS(self):
        # returns tags can help OpenAttack to check your parameters automatically
        return { self.lang_tag, Tag("get_pred", "victim") }

    def __init__(self, tokenizer = None):
        if tokenizer is None:
            tokenizer = PunctTokenizer()
        self.tokenizer = tokenizer
        self.lang_tag = OpenAttack.utils.get_language([self.tokenizer])
        # We add parameter ``processor`` to specify the :py:class:`.TextProcessor` which is used for tokenization and detokenization.
        # By default, :py:class:`.DefaultTextProcessor` is used.

    def attack(self, victim, input_, goal):
        # Generate a potential adversarial example
        x_new = self.tokenizer.detokenize(
            self.swap( self.tokenizer.tokenize(input_, pos_tagging=False) )
        )

        # Get the predictions of victim classifier
        y_new = victim.get_pred([ x_new ])

        # Check for attack goal
        if goal.check(x_new, y_new):
            return x_new
        # Failed
        return None

    def swap(self, sentence):
        # Shuffle tokens to generate a potential adversarial example
        random.shuffle(sentence)

        # Return the potential adversarial example
        return sentence



In [None]:
import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
class ReplaceALL_LettersAttack(OpenAttack.attackers.ClassificationAttacker):
    @property
    def TAGS(self):
        # Returns tags to help OpenAttack check your parameters automatically
        return {self.lang_tag, Tag("get_pred", "victim")}

    def __init__(self, tokenizer=None):
        if tokenizer is None:
            # Assuming DefaultTextProcessor is used if no tokenizer is provided
            tokenizer = PunctTokenizer()
        self.tokenizer = tokenizer
        self.lang_tag = OpenAttack.utils.get_language([self.tokenizer])
        # Load stop words from NLTK
        self.stop_words = set(stopwords.words('english'))

    def attack(self, victim, input_, goal):
        tokens = self.tokenizer.tokenize(input_)
        adv_tokens = self.swap(tokens, -1, replace_all=True)
        x_new = self.tokenizer.detokenize(adv_tokens)
        print(x_new)
        y_new = victim.get_pred([x_new])

        if goal.check(x_new, y_new):
            return x_new
        return None

    def swap(self, tokens, k=3, replace_all=False):
        homoglyphs = {
            'a': ['а', 'ɑ', 'а'],
            'e': ['е'],
            'o': ['о', 'ο', 'о'],
            'c': ['с', 'ϲ'],
            'p': ['р'],
            'x': ['х'],
            'y': ['у'],
            'i': ['і', 'í'],
        }

        def get_word(token):
            return token[0] if isinstance(token, tuple) else token

        candidates = [
            token for token in tokens
            if get_word(token).lower() not in self.stop_words and any(char in homoglyphs for char in get_word(token))
        ]
        selected_words = random.sample(candidates, min(k, len(candidates))) if k > 0 else candidates

        for i in range(len(tokens)):
            current_word = get_word(tokens[i])
            if tokens[i] in selected_words:
                new_word = []
                for char in current_word:
                    if char in homoglyphs:
                        new_word.append(random.choice(homoglyphs[char]) if replace_all else homoglyphs[char][0])
                    else:
                        new_word.append(char)
                if isinstance(tokens[i], tuple):
                    tokens[i] = ("".join(new_word), tokens[i][1])
                else:
                    tokens[i] = "".join(new_word)
                if not replace_all:
                    selected_words.remove(tokens[i])

        return tokens

In [None]:
class ReplaceLettersAttack(OpenAttack.attackers.ClassificationAttacker):
    @property
    def TAGS(self):
        # Returns tags to help OpenAttack check your parameters automatically
        return {self.lang_tag, Tag("get_pred", "victim")}

    def __init__(self, tokenizer=None):
        if tokenizer is None:
            # Assuming DefaultTextProcessor is used if no tokenizer is provided
            tokenizer = PunctTokenizer()
        self.tokenizer = tokenizer
        self.lang_tag = OpenAttack.utils.get_language([self.tokenizer])
        # Load stop words from NLTK
        self.stop_words = set(stopwords.words('english'))

    def attack(self, victim, input_, goal):
        tokens = self.tokenizer.tokenize(input_)
        adv_tokens = self.swap(tokens)
        x_new = self.tokenizer.detokenize(adv_tokens)
        y_new = victim.get_pred([x_new])

        if goal.check(x_new, y_new):
            return x_new
        return None

    def swap(self, tokens, k=3, replace_all=False):
        homoglyphs = {
            'a': ['а', 'ɑ', 'а'],
            'e': ['е'],
            'o': ['о', 'ο', 'о'],
            'c': ['с', 'ϲ'],
            'p': ['р'],
            'x': ['х'],
            'y': ['у'],
            'i': ['і', 'í'],
        }

        # Define function to extract the text part of a token
        def get_word(token):
            return token[0] if isinstance(token, tuple) else token

        # Select indices of tokens to be replaced
        candidate_indices = [
            i for i, token in enumerate(tokens)
            if get_word(token).lower() not in self.stop_words and any(char in homoglyphs for char in get_word(token))
        ]
        selected_indices = random.sample(candidate_indices, min(k, len(candidate_indices))) if k > 0 else candidate_indices

        # Replace homoglyphs in selected tokens
        for i in selected_indices:
            current_word = get_word(tokens[i])
            new_word = ''.join(random.choice(homoglyphs[char]) if char in homoglyphs and replace_all else homoglyphs.get(char, [char])[0] for char in current_word)
            if isinstance(tokens[i], tuple):
                tokens[i] = (new_word, tokens[i][1])  # Maintain any tuple structure
            else:
                tokens[i] = new_word

        return tokens

In [None]:
!pip install replicate



In [None]:
# from google.colab import userdata
# userdata.get('REPLICATE_API_TOKEN')

In [None]:
import os
os.environ["REPLICATE_API_TOKEN"] = "your_token_here"

In [None]:
import replicate

In [None]:
class LLAMA2ParaphraseAttack(OpenAttack.attackers.ClassificationAttacker):
    @property
    def TAGS(self):
        return {self.lang_tag, Tag("get_pred", "victim")}

    def __init__(self, tokenizer=None):
        if tokenizer is None:
            tokenizer = PunctTokenizer()
        self.tokenizer = tokenizer
        self.lang_tag = OpenAttack.utils.get_language([self.tokenizer])

    def attack(self, victim, input_, goal):
        x_new = self.paraphrase_with_llama(input_)

        if x_new is None:
            return None

        y_new = victim.get_pred([x_new])

        if goal.check(x_new, y_new):
            return x_new

        return None

    def paraphrase_with_llama(self, sentence):
        token_count = len(sentence.split())

        print("token count:", token_count)

        input_params = {
            "top_p": 1,
            "prompt": f"Paraphrase the following sentence with similar length {token_count}: {sentence}",
            "temperature": 0.5,
            "max_new_tokens": token_count + 10
        }

        try:
            output = api.run(
                "meta/meta-llama-3-70b-instruct",
                input=input_params
            )
            print("".join(output))
            return "".join(output)
        except Exception as e:
            print(f"Failed to paraphrase with LLAMA-2: {str(e)}")
            return None



In [None]:
class LLAMA3MistralCheckerCollabAttack(OpenAttack.attackers.ClassificationAttacker):


#   ┌──────────────────────────────────────────────────────┐
#   │                                                      ▼
#   │           ┌─LLAMA3───┐   PARAPHRASED TEXT  ┌─MISTRAL─────┐  IF SAME
# INPUT────────►│PARAPHRASE├────────────────────►│MEANING CHECK├──────────►OUTPUT
#               │TEXT      │                     └──────┬──────┘
#               └──────────┘                            │
#                     ▲                                 │
#                     │            IF NOT               │
#                     └─────────────────────────────────┘

    @property
    def TAGS(self):
        return {self.lang_tag, Tag("get_pred", "victim")}

    def __init__(self, tokenizer=None):
        if tokenizer is None:
            tokenizer = PunctTokenizer()
        self.tokenizer = tokenizer
        self.lang_tag = OpenAttack.utils.get_language([self.tokenizer])

    def attack(self, victim, input_, goal):
        x_new = self.paraphrase_with_llama(input_)

        if x_new is None:
            return None

        y_new = victim.get_pred([x_new])

        if goal.check(x_new, y_new):
            return x_new

        return None

    def paraphrase_with_llama(self, sentence):
        token_count = len(sentence.split())

        print("token count:", token_count)

        input_params = {
            "top_p": 1,
            "prompt": f"Paraphrase the following sentence with similar length {token_count} only give sentence as output: {sentence}",
            "temperature": 0.7,
            "max_new_tokens": token_count + 10
        }



        try:
          is_same_meaning = False
          output = ""
          while not is_same_meaning:
            llama3_output = replicate.run(
                "meta/meta-llama-3-70b-instruct",
                input=input_params
            )
            print("".join(llama3_output))
            llama3_out = "".join(llama3_output)

            input_mistral_params = {
            "top_p": 1,
            "top_k": 20,
            "prompt": f"does given sentence has same meaning as \"{sentence}\" only output \"yes\" or \"no\" nothing else: {llama3_out}",
            "temperature": 0.3,
            "max_new_tokens": 3
            }


            mistral_out = replicate.run(
                "mistralai/mistral-7b-instruct-v0.2",
                input=input_mistral_params
                                  )
            mistral_out = "".join(mistral_out)
            mistral_out = mistral_out.lower()
            print(mistral_out)

            print(f"meaning checker result:{mistral_out}")

            if "yes" in mistral_out:
              print("found the one")
              output=llama3_out
              is_same_meaning = True

          return output


        except Exception as e:
            print(f"Failed to paraphrase with LLAMA-2: {str(e)}")
            return None



In [None]:




class LLAMA3StyleConverterAttack(OpenAttack.attackers.ClassificationAttacker):

#             ┌─LLAMA3────┐  PARAPHRASED TEXT  ┌─LLAMA3────┐
# INPUT──────►│PARAPHREASE├───────────────────►│MATCH STYLE├───►OUTPUT
#   │         │TEXT       │                    │OF INPUT   │
#   │         └───────────┘                    └───────────┘
#   │                                                ▲
#   │                                                │
#   └────────────────────────────────────────────────┘

    @property
    def TAGS(self):
        return {self.lang_tag, Tag("get_pred", "victim")}

    def __init__(self, tokenizer=None):
        if tokenizer is None:
            tokenizer = PunctTokenizer()
        self.tokenizer = tokenizer
        self.lang_tag = OpenAttack.utils.get_language([self.tokenizer])

    def attack(self, victim, input_, goal):
        x_new = self.paraphrase_with_llama(input_)

        if x_new is None:
            return None

        y_new = victim.get_pred([x_new])

        if goal.check(x_new, y_new):
            return x_new

        return None

    def paraphrase_with_llama(self, sentence):
        token_count = len(sentence.split())

        print("token count:", token_count)

        # input_params = {
        #     "top_p": 1,
        #     "prompt": f"Paraphrase the following sentence with similar length {token_count} only give sentence as output: {sentence}",
        #     "temperature": 0.7,
        #     "max_new_tokens": token_count + 10
        # }

        paraphraser = LLamaWrapper(prompt=f"Paraphrase the following sentence without changing the sentence structure with similar length {token_count}. change the words with their synonyms. only give sentence as output: {sentence}",top_p=1,top_k=5,temperature=0.7,max_new_tokens=token_count + 10)

        ret = paraphraser.run()

        style_matcher = LLamaWrapper(prompt=f"Match the exact structure of the sentence \"{ret}\" to the sentence \"{sentence}\" with max length of {token_count}. only give the sentence as output.",top_p=1,top_k=5,temperature=0.7,max_new_tokens=token_count + 10)
        end_res = style_matcher.run()

        return end_res


In [None]:
class BasakSwapWordsAttack(OpenAttack.attackers.ClassificationAttacker):
    @property
    def TAGS(self):
        # returns tags can help OpenAttack to check your parameters automatically
        return { self.lang_tag, Tag("get_pred", "victim") }

    def __init__(self, tokenizer = None):
        if tokenizer is None:
            tokenizer = PunctTokenizer()
        self.tokenizer = tokenizer
        self.lang_tag = OpenAttack.utils.get_language([self.tokenizer])
        # We add parameter ``processor`` to specify the :py:class:`.TextProcessor` which is used for tokenization and detokenization.
        # By default, :py:class:`.DefaultTextProcessor` is used.

    def attack(self, victim, input_, goal):
        # Generate a potential adversarial example
        # x_new = self.tokenizer.detokenize(
        #     self.swap( self.tokenizer.tokenize(input_, pos_tagging=False) )
        # )

        x_new = this.change_n_words_randomly(input_,len(input_)//2)

        # Get the predictions of victim classifier
        y_new = victim.get_pred([ x_new ])

        # Check for attack goal
        if goal.check(x_new, y_new):
            return x_new
        # Failed
        return None

    def change_n_words_randomly(text,i):


      lst = text.split()
      indexes = random.sample(range(0, len(lst)), i)

      if i == 1:
          k = indexes[0]
          l = indexes[0]
          while l == k:
              x = random.sample(range(0, len(lst)), i)
              l = x[0]

          #print(k,l)
          if k >l:
              a = lst[0:l+1] + lst[k:k+1] + lst[l+1:k] + lst[k+1:]
              return ' '.join(a)
          else:
              a = lst[0:k] + lst[k+1:l+1] + lst[k:k+1] + lst[l+1:]
              return ' '.join(a)



      tnk = 0
      if len(indexes)%2 != 0:
          tnk=1

      k = tnk
      while k <len(indexes):
          tmp = lst[indexes[k+1]]
          lst[indexes[k+1]] = lst[indexes[k]]
          lst[indexes[k]] =tmp
          k+=2

      if tnk == 1:
          tmp = lst[indexes[0]]
          lst[indexes[0]] = lst[indexes[len(indexes)-1]]
          lst[indexes[len(indexes)-1]] = tmp

      return ' '.join(lst)

## Testing your attack

The code below will test MyAttacker (above) on the victim classifier, compute BODEGA score, and output results to /content/BODEGA/outputs.

WARNING: files in default output directory (/content/BODGEa/outputs) do not persist after you disconnect from the colab runtime session. To keep them, you can either:

1. download them manually or
2. set `out_dir` to a mounted Google Drive directory (will automatically save files to your google drive)



### Choose task + victim classifier

In [None]:
# determinism
random.seed(10)
torch.manual_seed(10)
np.random.seed(0)

# Change these variables to what you want
task = 'PR2' # PR2, HN, FC, RD, C19
victim_model = 'BERT' # BERT or BiLSTM
using_custom_attacker = True # change to False if you want to test out OpenAttack's pre-implemented attackers (e.g. BERTattack)
attack = 'custom' # if using custom attack, this name can be whatever you want. If using pre-implemented attack, set to name of attacker ('BERTattack')

# misc variables - no need to change
targeted = False # this shared task evaluates performance in an untargeted scenario
visualize_adv_examples = True # prints adversarial samples as they are generated, showing the difference between original
using_first_n_samples = False # used when you want to evaluate on a subset of the full eval set.
first_n_samples = 20


### Run to evaluate attacker


In [None]:
import gc
gc.collect()

27

In [None]:

if using_mounted_drive:
    data_path =  pathlib.Path(f"{path_to_mounted_folder}/{task}")
    model_path = pathlib.Path(f"{path_to_mounted_folder}/{task}/{victim_model}-512.pth")
    out_dir = pathlib.Path(f"{path_to_mounted_folder}/outputs")

else:
  data_path =  pathlib.Path(f"/content/BODEGA/incrediblAE_public_release/{task}")
  model_path = pathlib.Path(f"/content/BODEGA/incrediblAE_public_release/{task}/{victim_model}-512.pth")
  out_dir = pathlib.Path("/content/BODEGA/outputs")



RESULTS_FILE_NAME = 'results_' + task + '_' + str(targeted) + '_' + attack + '_' + victim_model + '.txt' #stores BODEGA metrics
SUBMISSION_FILE_NAME = 'submission_' + task + '_' + str(targeted) + '_' + attack + '_' + victim_model + '.tsv' #stores original and modified text, to be submitted to shared task organizers

results_path = out_dir / RESULTS_FILE_NAME if out_dir else None
submission_path = out_dir / SUBMISSION_FILE_NAME if out_dir else None

if out_dir:
    if (out_dir / RESULTS_FILE_NAME).exists():
      print(f"Existing results file found. This script will overwrite previous file: {str(results_path)}")
    if submission_path.exists():
      print(f"Existing submission file found. This script will overwrite previous file: {str(submission_path)}")




# Prepare task data
with_pairs = (task == 'FC' or task == 'C19')

# Choose device
print("Setting up the device...")

using_TF = (attack in ['TextFooler', 'BAE'])
if using_TF:
    # Disable GPU usage by TF to avoid memory conflicts
    import tensorflow as tf

    tf.config.set_visible_devices(devices=[], device_type='GPU')

if torch.cuda.is_available():
    print('using GPU')
    victim_device = torch.device("cuda")
    attacker_device = torch.device("cuda")
else:
    victim_device = torch.device("cpu")
    attacker_device = torch.device('cpu')

# Prepare victim
print("Loading up victim model...")
if victim_model == 'BERT':
    victim = VictimCache(model_path, VictimBERT(model_path, task, victim_device))
elif victim_model == 'BiLSTM':
    victim = VictimCache(model_path, VictimBiLSTM(model_path, task, victim_device))

# Load data
print("Loading data...")
test_dataset = Dataset.from_generator(readfromfile_generator,
                                      gen_kwargs={'subset': 'attack', 'dir': data_path, 'trim_text': True,
                                                  'with_pairs': with_pairs})
if not with_pairs:
    dataset = test_dataset.map(dataset_mapping)
    dataset = dataset.remove_columns(["text"])
else:
    dataset = test_dataset.map(dataset_mapping_pairs)
    dataset = dataset.remove_columns(["text1", "text2"])

dataset = dataset.remove_columns(["fake"])

# Filter data
if using_first_n_samples:
  dataset = dataset.select(range(first_n_samples))

if targeted:
    dataset = [inst for inst in dataset if inst["y"] == 1 and victim.get_pred([inst["x"]])[0] == inst["y"]]

print("Subset size: " + str(len(dataset)))

# Prepare attack
print("Setting up the attacker...")

# Necessary to bypass the outdated SSL certifiacte on the OpenAttack servers
with no_ssl_verify():
  if using_custom_attacker:
    attacker = LLAMA3StyleConverterAttack()
  else:
    filter_words = OpenAttack.attack_assist.filter_words.get_default_filter_words('english') + [SEPARATOR_CHAR]
    if attack == 'PWWS':
        attacker = OpenAttack.attackers.PWWSAttacker(token_unk=UNK_TEXT, lang='english', filter_words=filter_words)
    elif attack == 'SCPN':
        os.environ["TOKENIZERS_PARALLELISM"] = "false"
        attacker = OpenAttack.attackers.SCPNAttacker(device=attacker_device)
    elif attack == 'TextFooler':
        attacker = OpenAttack.attackers.TextFoolerAttacker(token_unk=UNK_TEXT, lang='english',
                                                           filter_words=filter_words)
    elif attack == 'DeepWordBug':
        attacker = OpenAttack.attackers.DeepWordBugAttacker(token_unk=UNK_TEXT)
    elif attack == 'VIPER':
        attacker = OpenAttack.attackers.VIPERAttacker()
    elif attack == 'GAN':
        attacker = OpenAttack.attackers.GANAttacker()
    elif attack == 'Genetic':
        attacker = OpenAttack.attackers.GeneticAttacker(lang='english', filter_words=filter_words)
    elif attack == 'PSO':
        attacker = OpenAttack.attackers.PSOAttacker(lang='english', filter_words=filter_words)
    elif attack == 'BERTattack':
        attacker = OpenAttack.attackers.BERTAttacker(filter_words=filter_words, use_bpe=False, device=attacker_device)
    elif attack == 'BAE':
        attacker = OpenAttack.attackers.BAEAttacker(device=attacker_device, filter_words=filter_words)
    else:
        attacker = None

# Run the attack
print("Evaluating the attack...")
RAW_FILE_NAME = 'raw_' + task + '_' + str(targeted) + '_' + attack + '_' + victim_model + '.tsv'
raw_path = out_dir / RAW_FILE_NAME if out_dir else None

scorer = BODEGAScore(victim_device, task, align_sentences=True, semantic_scorer="BLEURT", raw_path = raw_path)
with no_ssl_verify():
    attack_eval = BodegaAttackEval(attacker, victim, language='english', metrics=[
        scorer  # , OpenAttack.metric.EditDistance()
    ])
    start = time.time()
    summary = attack_eval.eval_and_save_tsv(dataset, visualize=visualize_adv_examples, progress_bar=False, tsv_file_path = submission_path)
    end = time.time()
attack_time = end - start
attacker = None

# Remove unused stuff
victim.finalise()
del victim
gc.collect()
torch.cuda.empty_cache()
if "TOKENIZERS_PARALLELISM" in os.environ:
    del os.environ["TOKENIZERS_PARALLELISM"]

# Evaluate
start = time.time()
score_success, score_semantic, score_character, score_BODEGA= scorer.compute()
end = time.time()
evaluate_time = end - start

# Print results
print("Subset size: " + str(len(dataset)))
print("Success score: " + str(score_success))
print("Semantic score: " + str(score_semantic))
print("Character score: " + str(score_character))
print("BODEGA score: " + str(score_BODEGA))
print("Queries per example: " + str(summary['Avg. Victim Model Queries']))
print("Total attack time: " + str(attack_time))
print("Time per example: " + str((attack_time) / len(dataset)))
print("Total evaluation time: " + str(evaluate_time))

if out_dir:
  with open(results_path, 'w') as f:
      f.write("Subset size: " + str(len(dataset)) + '\n')
      f.write("Success score: " + str(score_success) + '\n')
      f.write("Semantic score: " + str(score_semantic) + '\n')
      f.write("Character score: " + str(score_character) + '\n')
      f.write("BODEGA score: " + str(score_BODEGA) + '\n')
      f.write("Queries per example: " + str(summary['Avg. Victim Model Queries']) + '\n')
      f.write("Total attack time: " + str(end - start) + '\n')
      f.write("Time per example: " + str((end - start) / len(dataset)) + '\n')
      f.write("Total evaluation time: " + str(evaluate_time) + '\n')

  print('-')
  print('Bodega metrics saved to', results_path)
  print('Submission file saved to', submission_path)

Existing submission file found. This script will overwrite previous file: /content/drive/MyDrive/BODEGA/incrediblAE_public_release/outputs/submission_PR2_False_custom_BERT.tsv
Setting up the device...
Loading up victim model...
Victim caching: file found, loading...
Loading data...
Subset size: 416
Setting up the attacker...
Evaluating the attack...


The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'BleurtSPTokenizer'. 
The class this function is called from is 'BertTokenizer'.


token count: 23
Saucier emphasized that the U.S. Constitution guarantees citizens inherent rights to protection from government persecution.
yes. both
meaning checker result:yes. both
found the one
[31mLabel: 0 (98.07%) --> Failed![0m               |                                   
                                            | Running Time:            1.8953   
“ The U . S . Constitution clearly states   | Query Exceeded:          no       
that all citizens are born with inalienable | Victim Model Queries:    2        
rights to be free from persecution by the   | BODEGA Score:            (later)  
government ,” Saucier said .                | Succeed:                 no       
                                            |                                   
token count: 10
The paper's sensationalism was also on full display.
yes. both
meaning checker result:yes. both
found the one
[32mLabel: 0 (97.23%) --> 1 (99.94%)[0m            |                                   
          

KeyboardInterrupt: 

Your output should look like this.
The custom attack has a very low BODEGA score, suggesting that the attack was not very successful (low success rate and low preservation of meaning).

VictimBERT on PR2:
```
Subset size: 416
Success score: 0.1778846153846154
Semantic score: 0.40792732766351186
Character score: 0.3001644500157
BODEGA score: 0.02308437726605881
Queries per example: 2.1778846153846154
Total attack time: 19.421820878982544
Time per example: 0.04668706942063112
Total evaluation time: 10.617336988449097
```

## Submission Files

Whenever you run an attack on a dataset, a submission_task.tsv file will be saved to your outputs directory. At the end of the test phase, you will need to submit your final attack's submission files to the shared task organisers for evaluation (1 for each dataset * num_victim_classifiers).

The submission file contains 4 pieces of information per attacked text:
1. was the attack successful
2. number of queries to victim model used to generate the adversarial sample
3. the original text
4. the adversarial text (or ATTACK_UNSUCCESSFUL if unsuccessful)

## Final tips:

### Using a subset of eval dataset
Testing your attack on the entire eval dataset can take a while. To speed things up, you can test on the first n samples of the dataset, by setting `using_first_n_samples` to `True`.  

### Running pre-implemented attacks

BODEGA supports a number of pre-existing attacks. Trying these might be useful if you want to:
- compare your performance with existing methods (also reported in the [BODEGA preprint](https://arxiv.org/abs/2303.08032))
- get inspiration from observing their substitutions

To use an existing attack requires only two changes to the code above:
1. set `using_custom_attacker` to `False`
2. set `attack` to the name of a supported attack
(`PWWS`, `SCPN`, `TextFooler`, `DeepWordBug`, `GAN`, `Genetic`, `PSO`, `BERTattack` or`BAE`)

Note that using `BAE` or `TextFooler` will require you to install additional dependencies since they rely on tensorflow:

- tensorflow >= 2.0.0
- tensorflow_hub

https://openattack.readthedocs.io/en/latest/quickstart/installation.html
