In [None]:

!pip install --no-cache-dir transformers datasets
#!pip install --no-cache-dir datasets
import sys
import csv
from transformers import pipeline
from nltk.corpus import wordnet
import nltk
nltk.download('wordnet')
import random
from datasets import load_dataset

class BSNode:

    def __init__(self, data, prob):
        self.data = data
        self.prob = prob
        self.exclusion = []
        self.explored = False
        self.left = None
        self.right = None


    def isLeaf(self):
        if(self.left):
            return False
        if(self.right):
            return False
        return True


    #prints out node, left, data/prob/explored, then right
    def printNode(self):
        if(self.isLeaf()):
            print(self.data, self.prob, self.explored)
        else:
            if(self.left):
                self.left.printNode()

            print(self.data, self.prob, self.explored)

            if(self.right):
                self.right.printNode()



    #returns the node with the lowest probability
    # which hasn't yet been explored, if every node has been explored
    # returns None
    def lowestProb(self):
        if(self.isLeaf()): # cannot explore further
            if(self.explored): # already explored, exit
                return None
            else:
                return self # only one node here, so must be the lowest

        else: #explore right and left
            left_low = right_low = None

            if(self.left):
                left_low = self.left.lowestProb()
            if(self.right):
                right_low = self.right.lowestProb()

            if(not left_low and not right_low): #both branches explored, which means this node already is fully explored
                self.explored = True
                return None

            if(not left_low):
                return right_low
            if(not right_low):
                return left_low

            # finally if here, both left and right have not been explored
            # so chooose the lower probability
            if(left_low.prob < right_low.prob):
                return left_low
            else:
                return right_low






def binary_select_iterative_BSNode(classifier, query, attack_class = 0, BSStruct = None, initial_prob = None): #for analysis

    query_count = 0

    # set up initial root BSNode
    if(not BSStruct):
        if(not initial_prob):
            # get initial probability for query
            initial_prob = classifier(query)[0][attack_class]["score"]

        query_count += 1

        BSStruct = BSNode(query, initial_prob)



    initial_prob = BSStruct.prob

    # search to find lowest prob node, which has also not been explored
    cur_struct = BSStruct.lowestProb()
    final_prob = initial_prob

    #split_query = query.split()
    split_query = [x for x in query] #char level split

    if(len(cur_struct.exclusion) == 1): # only 1 word, no need to explore more, just return
        cur_struct.explored = True # exploring!
        return cur_struct.exclusion[0], cur_struct.prob, 0, BSStruct #exclusion list with only 1 item is that word's position in the text
    else: # set up start and end
        if(len(cur_struct.exclusion) == 0): # root node, no exclusion, entire text
            start = 0
            end = len(split_query) - 1#makes slicing easier
        else:
            start = cur_struct.exclusion[0]
            end = start + len(cur_struct.exclusion) - 1


    while start < end:
        mid = start + (end - start) // 2

        first_exclusion = list(range(start, mid+1))
        second_exclusion = list(range(mid+1, end+1))

        first_list = []
        second_list =[]

        for i in range(len(split_query)):
          if(i not in first_exclusion):
            first_list.append(split_query[i])
        #first_part = ' '.join(first_list)
        first_part = ''.join(first_list) #character level rejoin

        for i in range(len(split_query)):
          if(i not in second_exclusion):
            second_list.append(split_query[i])
        #second_part = ' '.join(second_list)
        second_part = ''.join(second_list) #char level rejoin

        first_prob = classifier(first_part)[0][attack_class]["score"]
        second_prob = classifier(second_part)[0][attack_class]["score"]
        query_count += 2  # Increment for both sentiment analysis requests

        # add information to Binary structure as we get it
        cur_struct.left = BSNode(' '.join([split_query[i] for i in first_exclusion]), first_prob)
        cur_struct.right = BSNode(' '.join([split_query[i] for i in second_exclusion]), second_prob)
        cur_struct.left.exclusion = first_exclusion
        cur_struct.right.exclusion = second_exclusion



        first_drop = initial_prob - first_prob
        second_drop = initial_prob - second_prob

        if first_drop > second_drop:
            if(len(first_exclusion) == 1):
                # need to note that we are exploring this node so we don't again!!!!
                cur_struct.left.explored = True

            final_prob = first_prob

            cur_struct = cur_struct.left
            start = first_exclusion[0]
            end = first_exclusion[-1]
        else:
            if(len(second_exclusion) == 1):
                cur_struct.right.explored = True

            final_prob = second_prob

            cur_struct = cur_struct.right
            start = second_exclusion[0]
            end = second_exclusion[-1]


        most_influential_pos = start


    return most_influential_pos, final_prob, query_count, BSStruct



def get_synonyms(word):
    synonyms = set()

    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            synonyms.add(lemma.name())

    return list(synonyms - {word})

# takes in an output from textclassifier and returns the highest probability
# needed for multiclass problems
def pred_class(preds):
    pred = 0
    score = preds[0]['score']

    for i in range(1, len(preds)):
        cur_score = preds[i]['score']
        if(cur_score > score):
            pred = i
            score = cur_score

    return pred

def char_replace(classifier, query, initial_prob, replace_pos, attack_class):
    split_query = [x for x in query]
    replace_char = split_query[replace_pos]
    charfile = open('/content/drive/MyDrive/Colab Notebooks/selected.neighbors')
    char_dict = {}
    charfile.readline()

    for line in charfile:
      cur_line = line.split('\t')
      x = cur_line[0][0]
      x_repl = cur_line[1][1]
      char_dict[x] = x_repl

    if(replace_char in char_dict):
      syn_char = char_dict[replace_char]
    else: # choose random character
      syn_char = random.choice(list(char_dict.values()))

    split_query[replace_pos] = syn_char
    cur_query = ''.join(split_query)
    query_count = 1
    cur_preds = classifier(cur_query)[0]
    cur_prob = cur_preds[attack_class]['score']
    cur_label = pred_class(cur_preds)

    # this means the attack flipped the label
    if(cur_label != attack_class):
      return True, cur_query, cur_prob, query_count
    else:# did not flip label
      return False, cur_query, cur_prob, query_count


# takes in a text and attempts to flip the label with binary selection and character synonym replacement
def BS_CR(classifier, query, attack_class, k = -1):
    done = False
    query_count = 0
    cur_struct = None
    initial_prob = classifier(query)[0][attack_class]["score"]
    query_count += 1

    cur_query = query
    chars_changed = 0
    while(not done):
        # call binary select
        #
        if(cur_struct):
            replace_pos, final_prob, queries, cur_struct = binary_select_iterative_BSNode(classifier, query, attack_class, BSStruct = cur_struct, initial_prob = initial_prob)
        else:
            replace_pos, final_prob, queries, cur_struct = binary_select_iterative_BSNode(classifier, query, attack_class, initial_prob = initial_prob)




        #cur_struct.printNode()

        #add new amount of queries
        query_count += queries


        # try and replace with wordnet replace
        success, cur_query, cur_prob, queries = char_replace(classifier, cur_query, initial_prob, replace_pos, attack_class)
        chars_changed += 1

        query_count += queries
        #print(success, cur_query, cur_prob, query_count)

        if(success):
            #cur_struct.printNode()
            return True, cur_query, query_count, cur_prob
        else:
            #check if everything explored
            if(not cur_struct.lowestProb()):
                done = True

        # check if k hit
        if(k != -1 and chars_changed >= k):
            done = True

    return False, cur_query, query_count, cur_prob


def GreedySelect(classifier, query, attack_class = 0, initial_prob = None):

    if(not initial_prob):
        initial_prob = classifier(query)[0][attack_class]["score"]

    prob_drops = {}

    #split_query = query.split()
    split_query = [x for x in query]

    # remove each char and then note the drop in probability
    for i in range(len(split_query)):
        cur_query = ''.join(split_query[:i] + split_query[i+1:])
        cur_prob = classifier(cur_query)[0][attack_class]["score"]
        prob_drops[i] = initial_prob - cur_prob


    return prob_drops




def GS_CR(classifier, query, attack_class, k = -1):
    done = False
    query_count = 0
    cur_struct = None
    initial_prob = classifier(query)[0][attack_class]["score"]

    query_count += 1
    chars_changed = 0
    cur_query = query

    prob_drops = GreedySelect(classifier, query, attack_class, initial_prob)
    query_count += len(prob_drops)

    prob_sorted = dict(sorted(prob_drops.items(), key=lambda item: item[1], reverse=True))

    while(not done):

        replace_pos = list(prob_sorted.keys())[0]
        prob_sorted.pop(replace_pos)

        # try and replace with wordnet replace
        success, cur_query, cur_prob, queries = char_replace(classifier, cur_query, initial_prob, replace_pos, attack_class)
        chars_changed += 1

        query_count += queries
        #print(success, cur_query, cur_prob, query_count)

        if(success):
            return True, cur_query, query_count, cur_prob
        else:
            #check if everything explored
            if(len(prob_sorted) == 0):
                done = True

        if(k != -1 and chars_changed >= k):
            done = True

    return False, cur_query, query_count, cur_prob




def testsst2(start = 0):
    classifier =  pipeline("text-classification", model="celine98/canine-s-finetuned-sst2", max_length=512, truncation = True, return_all_scores = True)
    dataset = load_dataset('glue', 'sst2')
    test_data = dataset['validation']
    end = start + 24
    #binaryout = csv.writer(open("/content/drive/MyDrive/Colab Notebooks/sst2_test_BS-CR_" + str(start)+'-'+str(end)+"_k_50.tsv", 'w'), delimiter = '\t')
    greedyout = csv.writer(open("/content/drive/MyDrive/Colab Notebooks/sst2_test_GS-CR_" + str(start)+'-'+str(end)+"_k_50.tsv", 'w'), delimiter = '\t')

    bin_queries = 0
    greedy_queries = 0
    total = 0
    cur_id = 0
    for cur in test_data:
        if(cur_id < start):
          cur_id +=1
          continue

        if(cur_id % 5 == 0):
          print(cur_id)
        #make sure the classifier is not already failing
        cur_pred = classifier(cur['sentence'])[0]
        predicted = pred_class(cur_pred)
        if(predicted != int(cur['label'])):
            success = 'Skipped'
            #binaryout.writerow([cur_id, success, cur['sentence'], 0, 0.0])
            greedyout.writerow([cur_id, success, cur['sentence'], 0, 0.0])
            cur_id += 1
            continue

        #success, final_query, query_count, final_prob = BS_CR(classifier, cur['sentence'], int(cur['label']), 50)
        #binaryout.writerow([cur_id, success, final_query, query_count, final_prob])
        #bin_queries += query_count

        success, final_query, query_count, final_prob = GS_CR(classifier, cur['sentence'], int(cur['label']), 50)
        greedyout.writerow([cur_id, success, final_query, query_count, final_prob])
        greedy_queries += query_count
        total += 1

        cur_id += 1
        if(cur_id > end):
            break
    #print('bs average:', bin_queries/total)
    print("gs average:", greedy_queries/total)


for i in range(700, 875, 25):
  testsst2(i)

#classifier =  pipeline("text-classification", model="textattack/distilbert-base-uncased-imdb", max_length=512, truncation = True, return_all_scores = True)

#query_count = 0
#classifier =  pipeline("text-classification", model="celine98/canine-s-finetuned-sst2", max_length=512, truncation = True, return_all_scores = True)
#print(GS_CR(classifier, "that loves its characters and communicates something rather beautiful about human nature", 1))
#print(BS_CR(classifier, "that loves its characters and communicates something rather beautiful about human nature", 1))

#text = 'the worst movie ever'
#success, final_query, query_count, final_prob = BS_WNR(classifier, text, 0)
#print(success, final_query, query_count, final_prob)

#text = 'the worst movie ever . . . not good once'
#success, final_query, query_count, final_prob = BS_WNR(classifier, text, 0)
#print(success, final_query, query_count, final_prob)

#text = 'the worst movie ever . . . not good once'
#success, final_query, query_count, final_prob = GS_WNR(classifier, text, 0)
#print(success, final_query, query_count, final_prob)

#text = '''"I Am Curious: Yellow" is a risible and pretentious steaming pile. It doesn't matter what one's political views are because this film can hardly be taken seriously on any level. As for the claim that frontal male nudity is an automatic NC-17, that isn't true. I've seen R-rated films with male nudity. Granted, they only offer some fleeting views, but where are the R-rated films with gaping vulvas and flapping labia? Nowhere, because they don't exist. The same goes for those crappy cable shows: schlongs swinging in the breeze but not a clitoris in sight. And those pretentious indie movies like The Brown Bunny, in which we're treated to the site of Vincent Gallo's throbbing johnson, but not a trace of pink visible on Chloe Sevigny. Before crying (or implying) "double-standard" in matters of nudity, the mentally obtuse should take into account one unavoidably obvious anatomical difference between men and women: there are no genitals on display when actresses appears nude, and the same cannot be said for a man. In fact, you generally won't see female genitals in an American film in anything short of porn or explicit erotica. This alleged double-standard is less a double standard than an admittedly depressing ability to come to terms culturally with the insides of women's bodies.'''
#success, final_query, query_count, final_prob = BS_WNR(classifier, text, 0)
#print(success, final_query, query_count, final_prob)

#text = '''"I Am Curious: Yellow" is a risible and pretentious steaming pile. It doesn't matter what one's political views are because this film can hardly be taken seriously on any level. As for the claim that frontal male nudity is an automatic NC-17, that isn't true. I've seen R-rated films with male nudity. Granted, they only offer some fleeting views, but where are the R-rated films with gaping vulvas and flapping labia? Nowhere, because they don't exist. The same goes for those crappy cable shows: schlongs swinging in the breeze but not a clitoris in sight. And those pretentious indie movies like The Brown Bunny, in which we're treated to the site of Vincent Gallo's throbbing johnson, but not a trace of pink visible on Chloe Sevigny. Before crying (or implying) "double-standard" in matters of nudity, the mentally obtuse should take into account one unavoidably obvious anatomical difference between men and women: there are no genitals on display when actresses appears nude, and the same cannot be said for a man. In fact, you generally won't see female genitals in an American film in anything short of porn or explicit erotica. This alleged double-standard is less a double standard than an admittedly depressing ability to come to terms culturally with the insides of women's bodies.'''
#success, final_query, query_count, final_prob = GS_WNR(classifier, text, 0)
#print(success, final_query, query_count, final_prob)





[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


700
705
710
715
720
725
gs average: 130.32
725
730
735
740
745
750
gs average: 116.04545454545455
750
755
760
765
770
gs average: 112.29411764705883
775
780
785
790
795
gs average: 129.56521739130434
800
805
810
815
820
gs average: 118.17391304347827
825
830
835
840
845
gs average: 116.25
850
855
860
865
870
gs average: 119.22222222222223


In [None]:
charfile = open('/content/drive/MyDrive/Colab Notebooks/selected.neighbors')
char_dict = {}
charfile.readline()

for line in charfile:
  cur_line = line.split('\t')
  x = cur_line[0][0]
  x_repl = cur_line[1][1]
  char_dict[x] = x_repl

print(char_dict)

import random
print(random.choice(list(char_dict.values())))


{'a': 'â', 'b': 'ḃ', 'c': 'ĉ', 'd': 'ḑ', 'e': 'ê', 'f': 'ḟ', 'g': 'ǵ', 'h': 'ĥ', 'i': 'î', 'j': 'ĵ', 'k': 'ǩ', 'l': 'ᶅ', 'm': 'ḿ', 'n': 'ň', 'o': 'ô', 'p': 'ṕ', 'q': 'ʠ', 'r': 'ř', 's': 'ŝ', 't': 'ẗ', 'u': 'ǔ', 'v': 'ṽ', 'w': 'ẘ', 'x': 'ẍ', 'y': 'ŷ', 'z': 'ẑ', 'A': 'Â', 'B': 'Ḃ', 'C': 'Ĉ', 'D': 'Ď', 'E': 'Ê', 'F': 'Ḟ', 'G': 'Ĝ', 'H': 'Ĥ', 'I': 'Î', 'J': 'Ĵ', 'K': 'Ǩ', 'L': 'Ĺ', 'M': 'Ḿ', 'N': 'Ň', 'O': 'Ô', 'P': 'Ṕ', 'Q': 'Q', 'R': 'Ř', 'S': 'Ŝ', 'T': 'Ť', 'U': 'Û', 'V': 'Ṽ', 'W': 'Ŵ', 'X': 'Ẍ', 'Y': 'Ŷ', 'Z': 'Ẑ'}
Ĉ


In [None]:
classifier =  pipeline("sentiment-analysis", model="textattack/distilbert-base-uncased-imdb", max_length=512, truncation = True, return_all_scores = True)
print(classifier("the whip moving-picture_show e'er . . . non adept at_one_time"))
char_classifier =  pipeline("text-classification", model="celine98/canine-s-finetuned-sst2", max_length=512, truncation = True, return_all_scores = True)
print(char_classifier("contains no wit , only labored gags"))

[[{'label': 'LABEL_0', 'score': 0.8160994052886963}, {'label': 'LABEL_1', 'score': 0.1839006543159485}]]
[[{'label': 'LABEL_0', 'score': 0.9941608309745789}, {'label': 'LABEL_1', 'score': 0.005839199293404818}]]
