# Scikit-Learn and TextAttack

## Complex Phrasal Features Experiments

In [None]:
import numpy as np
import random
import datasets
import os
import pandas as pd
import re
import pickle

from sklearn.feature_extraction.text import CountVectorizer, ENGLISH_STOP_WORDS
from sklearn import preprocessing
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, f1_score, r2_score, mean_squared_error
from sklearn.linear_model import LinearRegression
import concurrent.futures

import stanza
import textstat

import spacy
import neuralcoref

import textattack

In [None]:
stanza.download('en')  # This can be important if you have multiple versions of stanza kicking around

In [None]:
nlp_lem = stanza.Pipeline(lang='en', processors='tokenize,lemma')

In [None]:
const_nlp = stanza.Pipeline(lang='en', processors='tokenize,pos,constituency')

In [None]:
nlp_spacy = spacy.load('en')
neuralcoref.add_to_pipe(nlp_spacy)

Helpful lemmatizer functions for later.

In [None]:
def lemmatize_text(text):
    return [b.lemma for b in nlp_lem(text.lower()).iter_words()]

# TextAttack Model Wrapper

This is used to provide hooks for TextAttack to tokenize input text and attack the model.

In [None]:
from textattack.models.wrappers import ModelWrapper

class BetterSklearnModelWrapper(ModelWrapper):
    """Loads a scikit-learn model and tokenizer (tokenizer implements
    `transform` and model implements `predict_proba`).
    May need to be extended and modified for different types of
    tokenizers.
    """

    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer

    def __call__(self, text_input_list):
        encoded_text_matrix = self.tokenizer.transform(text_input_list)

        predictions = self.model.predict_proba(encoded_text_matrix)
        return predictions

    def get_grad(self, text_input):
        raise NotImplementedError()

# Phrasal Vectorizer
This is a "tokenizer" in TextAttack parlance.  It converts an input sequence of words into a feature vector that is input to a model.

In [None]:
class FeatureVectorizer:
    def generate_freq_features(self, b):
        freq_features = []
        
        if type(b) == 'str':
            print("Error: string type, expected list of lemmas")
            raise TypeError

        tokens, counts = np.unique(b, return_counts=True)

        log_counts = sorted(np.log(counts), reverse=True)
        ranks = np.log(np.arange(1, len(log_counts)+1)).reshape(-1,1)
        reg = LinearRegression().fit(ranks, log_counts)

        preds = reg.predict(ranks)

        r2 = r2_score(log_counts, preds)
        slope = reg.coef_[0]
        mse = mean_squared_error(log_counts, preds)

        freq_features = (slope, r2, mse)

        return freq_features
    
    def generate_verb_phrase_ratios(self, s):
        phrasal_count = 0
        doc = const_nlp(s)

        for sentence in doc.sentences:
            c = sentence.constituency
            for p in c.preterminals():
                if p.label == 'RP':
                    phrasal_count += 1

        phrasal_ratio = phrasal_count/doc.num_words
        return phrasal_ratio
    
    def generate_fluency_features(self, s):
        gunning_fog_score = textstat.gunning_fog(s)
        flesch_reading_ease_score = textstat.flesch_reading_ease(s)

        return gunning_fog_score, flesch_reading_ease_score
    
    def generate_coref_ratios(self, s):
        coref_ratios = []
        doc = nlp_spacy(s)
        coref_ratio = len(doc._.coref_clusters)/len(doc)
        return coref_ratio

    def array_match_count(self, an, search):
        """
        Find matching sequences of elements in an array, fairly efficiently
        """
        if len(search) == 0:
            return 0

        try:
            search_index = 0
            found_count = 0

            while search_index < len(an):
                first_word_index = an.index(search[0], search_index)
                if (an[first_word_index:first_word_index+len(search)] == search):
                    found_count += 1

                search_index = first_word_index+1
        except ValueError:
            pass

        return found_count
    
    def get_rate_of_wordlist(self, text, wordlist):
        """
        Given a text and a list of words (or phrases),
        get the number of occurences of these in the relevant dataframe column,
        divided by the length of the list
        """
        ratio = 0
        count = 0
        
        if (len(text) > 0):
            for w in wordlist:
                count += self.array_match_count(text, w)
            ratio = count/len(text)

        return ratio

    def get_all_wordlist_ratios(self, text):
        ratio_archaisms = self.get_rate_of_wordlist(text, self.archaisms)
        ratio_idioms = self.get_rate_of_wordlist(text, self.wiki_english_idioms)
        ratio_cliches = self.get_rate_of_wordlist(text, self.cliches)
        
        return (ratio_archaisms, ratio_idioms, ratio_cliches)
    
    def __init__(self):
        #self.cliches, self.archaisms, self.idioms = self.load_phrasal_features()
        # Load initial lookup tables and similar
        with open("intermediate_data/cliche_lemmas.pkl", "rb") as f:
            self.cliches = pickle.load(f)
        with open("intermediate_data/archaisms_lemmas.pkl", "rb") as f:
            self.archaisms = pickle.load(f)
        with open("intermediate_data/idiom_lemmas.pkl", "rb") as f:
            self.wiki_english_idioms = pickle.load(f)
        with open("models/linear_svm_3k_of_10k_scaler.pkl", "rb") as f:
            self.scaler = pickle.load(f)
    
    def transform(self, text_list):
        """
        Transform text into a feature vector
        """
        #print(f"Received text_list of length {len(text_list)}")
        #print(f"Starting string: {text_list[:22]}")
        
        vectors = []
        
        # Processing individually... for now
        for text in text_list:
            # Lemmatize text as some steps require lemmas
            lemma_text = lemmatize_text(text)

            # Call various processing components
            _verb_phrase = self.generate_verb_phrase_ratios(text)
            _coref_ratios = self.generate_coref_ratios(text)
            _gf_scores, _fre_scores = self.generate_fluency_features(text)
            _phrasal_ratios = self.get_all_wordlist_ratios(lemma_text)
            _freq_features = self.generate_freq_features(lemma_text)

            # Construct feature vector from results
            vector = []
            vector.extend(_freq_features)
            vector.append(_verb_phrase)
            vector.append(_coref_ratios)
            vector.extend(_phrasal_ratios)
            vector.append(_gf_scores)
            vector.append(_fre_scores)
                        
            vectors.append(vector)
        
        #print("Transform returned vectors")
        #print(len(vectors))
        vectors = self.scaler.transform(vectors)
        return vectors


    def get_feature_names(self):
        return  ["Slope", "R2", "MSE", "Verb Phrase", "Coreference", "Archaisms", "Idioms", "Cliches", "Gunning-Fog", "Flesch"]

# Attacking Statistical Model


## Data and Model Loading

In [None]:
MAX_CHAR=400 # Too large and character-based attacks really don't work

In [None]:
gpt2_345m_test = pd.read_json("./data/gpt-2-output-dataset/data/medium-345M-k40.test.jsonl", lines=True)
webtext_test = pd.read_json("./data/gpt-2-output-dataset/data/webtext.test.jsonl", lines=True)

machine_samples = [[a[:MAX_CHAR], 1] for a in list(gpt2_345m_test['text'])]
human_samples = [[a[:MAX_CHAR], 0] for a in list(webtext_test['text'])]

dataset = human_samples.copy()
dataset.extend(machine_samples)
random.seed(0)
random.shuffle(dataset)

In [None]:
ta_dataset = textattack.datasets.Dataset(dataset[200:215]) # TODO: Reset this when done processing the rest

Load pre-trained SVM model

In [None]:
with open("models/linear_svm_3k_of_10k_proba.pickle", "rb") as f:
    model = pickle.load(f)

## Pre-Attack Tests

Check a single feature vector

In [None]:
f = FeatureVectorizer()
v = f.transform([dataset[0][0]])
v

Check a single evaluation

In [None]:
feature_vectorizer = FeatureVectorizer()
model_wrapper = BetterSklearnModelWrapper(model,feature_vectorizer)

In [None]:
model_wrapper.__call__([dataset[0][0]])

## Run Attacks

In [None]:
from textattack.attack_recipes import DeepWordBugGao2018, TextFoolerJin2019
dwb_attack = DeepWordBugGao2018.build(model_wrapper)
tf_attack = TextFoolerJin2019.build(model_wrapper) # sudo ln -s /usr/local/cuda-11.0/targets/x86_64-linux/lib/libcusolver.so.10 /usr/local/cuda-11.0/targets/x86_64-linux/lib/libcusolver.so.11

In [None]:
attack_args = textattack.AttackArgs(
num_examples=-1,
attack_n=False,
log_to_csv="attack_logs/log.csv",
checkpoint_interval=5,
checkpoint_dir="attack_checkpoints",
disable_stdout=False,
parallel=False  # can try multi GPU here
)

**Note that in between runs here we're manually backing up the "log" files.**

In [None]:
# Try an attack
attacker = textattack.Attacker(tf_attack, ta_dataset, attack_args)
results = attacker.attack_dataset()
finished_results = []
for idx, result in enumerate(results):
    print(f'Result for sample {idx}:')
    finished_results.append(result)
    print(result.__str__(color_method='ansi'))
    print('\n\n')

In [None]:
attacker = textattack.Attacker(dwb_attack, ta_dataset, attack_args)
results = attacker.attack_dataset()
finished_results = []
for idx, result in enumerate(results):
    print(f'Result for sample {idx}:')
    finished_results.append(result)
    print(result.__str__(color_method='ansi'))
    print('\n\n')