In [None]:

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

import checklist
from checklist.editor import Editor
from checklist.perturb import Perturb
from checklist.test_types import INV
import csv
import spacy
import numpy as np

from tqdm import tqdm
from sklearn.metrics import accuracy_score


## Model setup

In [None]:

def load_model_and_tokenizer(name="qwen"):
    
    path_dict = {
        "qwen" : "Qwen/Qwen1.5-7B-Chat",
        "aya" : "CohereForAI/aya-101",
    }
    
    assert name in path_dict, "unknown model"
    
    tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen1.5-7B-Chat")
    model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen1.5-7B-Chat", torch_dtype="auto")
    
    return model, tokenizer


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

model, tokenizer = load_model_and_tokenizer(name="qwen")

model = model.to(device)


## Create dataset

### Sentiment analysis

In [None]:
# Load and parse airline tweets

def read_tweets(path):
    r = csv.DictReader(open(path))
    labels = []
    confs = []
    airlines = []
    tdata = []
    reasons = []
    for row in r:
        sentiment, conf, airline, text = row['airline_sentiment'], row['airline_sentiment_confidence'], row['airline'], row['text']
        labels.append(sentiment)
        confs.append(conf)
        airlines.append(airline)
        tdata.append(text)
        reasons.append(row['negativereason'])

    mapping = {'negative': 0, 'positive': 2, 'neutral': 1}
    labels = np.array([mapping[x] for x in labels]).astype(int)
    
    return tdata, labels # labels, confs, airlines, tdata, reasons

data, labels = read_tweets('./Tweets.csv')

nlp = spacy.load('en_core_web_sm')
sentences = data
parsed_data = list(nlp.pipe(sentences))


#### Named Entity Recognition (NER) test using INVariance

In [None]:

# Change location
perturb_location_data = Perturb.perturb(parsed_data, Perturb.change_location, nsamples=1000, n=5).data

# Change names
perturb_names_data = Perturb.perturb(parsed_data, Perturb.change_names, nsamples=1000, n=5).data


#### Robustness using INVariance

In [None]:

import string

def random_string(n):
    return ''.join(np.random.choice([x for x in string.ascii_letters + string.digits], n))

def random_url(n=6):
    return 'https://t.co/%s' % random_string(n)

def random_handle(n=6):
    return '@%s' % random_string(n)

def add_irrelevant(sentence):
    urls_and_handles = [random_url(n=6) for _ in range(5)] + [random_handle() for _ in range(5)]
    irrelevant_before = ['@airline '] + urls_and_handles
    irrelevant_after = urls_and_handles 
    rets = ['%s %s' % (x, sentence) for x in irrelevant_before ]
    rets += ['%s %s' % (sentence, x) for x in irrelevant_after]
    return rets


In [None]:

# Add randomly generated URLs and handles
perturb_irrelevant_data = Perturb.perturb(sentences, add_irrelevant, nsamples=1000).data

# Add typos
perturb_punc_data = Perturb.perturb(parsed_data, Perturb.punctuation, nsamples=1000).data
perturb_typo1_data = Perturb.perturb(sentences, Perturb.add_typos, nsamples=1000, typos=1).data
perturb_typo2_data = Perturb.perturb(sentences, Perturb.add_typos, nsamples=1000, typos=2).data
perturb_typo5_data = Perturb.perturb(sentences, Perturb.add_typos, nsamples=1000, typos=5).data

# Contract or expand contractions
perturb_contract_data = Perturb.perturb(sentences, Perturb.contractions, nsamples=1000).data


#### Vocab+POS

In [None]:

editor = checklist.editor.Editor()
editor.tg

air_noun = ['flight', 'seat', 'pilot', 'staff', 'service', 'customer service', 'aircraft', 'plane', 'food', 'cabin crew', 'company', 'airline', 'crew']
editor.add_lexicon('air_noun', air_noun)

pos_adj = ['good', 'great', 'excellent', 'amazing', 'extraordinary', 'beautiful', 'fantastic',
           'nice', 'incredible', 'exceptional', 'awesome', 'perfect', 'fun', 'happy', 'adorable',
           'brilliant', 'exciting', 'sweet', 'wonderful']
neg_adj = ['awful', 'bad', 'horrible', 'weird', 'rough', 'lousy', 'unhappy', 'average',
           'difficult', 'poor', 'sad', 'frustrating', 'hard', 'lame', 'nasty', 'annoying',
           'boring', 'creepy', 'dreadful', 'ridiculous', 'terrible', 'ugly', 'unpleasant']
neutral_adj = ['American', 'international',  'commercial', 'British', 'private', 'Italian',
               'Indian', 'Australian', 'Israeli']

editor.add_lexicon('pos_adj', pos_adj, overwrite=True)
editor.add_lexicon('neg_adj', neg_adj, overwrite=True)
editor.add_lexicon('neutral_adj', neutral_adj, overwrite=True)

pos_verb_present = ['like', 'enjoy', 'appreciate', 'love',  'recommend', 'admire', 'value',
                    'welcome']
neg_verb_present = ['hate', 'dislike', 'regret',  'abhor', 'dread', 'despise' ]
neutral_verb_present = ['see', 'find']
pos_verb_past = ['liked', 'enjoyed', 'appreciated', 'loved', 'admired', 'valued', 'welcomed']
neg_verb_past = ['hated', 'disliked', 'regretted',  'abhorred', 'dreaded', 'despised']
neutral_verb_past = ['saw', 'found']

editor.add_lexicon('pos_verb_present', pos_verb_present, overwrite=True)
editor.add_lexicon('neg_verb_present', neg_verb_present, overwrite=True)
editor.add_lexicon('neutral_verb_present', neutral_verb_present, overwrite=True)
editor.add_lexicon('pos_verb_past', pos_verb_past, overwrite=True)
editor.add_lexicon('neg_verb_past', neg_verb_past, overwrite=True)
editor.add_lexicon('neutral_verb_past', neutral_verb_past, overwrite=True)
editor.add_lexicon('pos_verb', pos_verb_present+ pos_verb_past, overwrite=True)
editor.add_lexicon('neg_verb', neg_verb_present + neg_verb_past, overwrite=True)
editor.add_lexicon('neutral_verb', neutral_verb_present + neutral_verb_past, overwrite=True)

neutral_words = set(['.', 'the', 'The', ',', 'a', 'A', 'and', 'of', 'to', 'it', 'that', 'in',
                     'this', 'for',  'you', 'there', 'or', 'an', 'by', 'about', 'flight', 'my',
                     'in', 'of', 'have', 'with', 'was', 'at', 'it', 'get', 'from', 'this',
                     'Flight', 'plane'])

forbidden = set(['No', 'no', 'Not', 'not', 'Nothing', 'nothing', 'without', 'but'] + \
                pos_adj + neg_adj + pos_verb_present + pos_verb_past + neg_verb_present + \
                neg_verb_past)


In [None]:

def change_neutral(d):
    examples = []
    subs = []
    words_in = [x for x in d.capitalize().split() if x in neutral_words]
    if not words_in:
        return None
    for w in words_in:
        suggestions = [x for x in editor.suggest_replace(d, w, beam_size=5, words_and_sentences=True) if x[0] not in forbidden]
        examples.extend([x[1] for x in suggestions])
        subs.extend(['%s -> %s' % (w, x[0]) for x in suggestions])
    if examples:
        idxs = np.random.choice(len(examples), min(len(examples), 10), replace=False)
        return [examples[i] for i in idxs]


In [None]:

# Replace neutral words with other neutral words (INV)
perturb_change_neutral_data = Perturb.perturb(sentences, change_neutral, nsamples=1000).data


#### Vocab+POS using DIRectional expectation test (DIR)

## Inference

### Run model

In [None]:

def response_from_generate(model, messages):
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    
    model_inputs = tokenizer([text], return_tensors="pt").to(device)
    generated_ids = model.generate(model_inputs.input_ids, max_new_tokens=1)
    generated_ids = [output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)]
    
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    
    output_mapping = {'A' : 0, 'B' : 1, 'C' : 2}
    
    return output_mapping[response]


def response_from_forward(model, messages):
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    model_inputs = tokenizer([text], return_tensors="pt").to(device)
    output = model.forward(model_inputs.input_ids)
    
    # idx 32 = A (positive), idx 33 = B (negative), idx 34 = C (neutral)
    response = torch.argmax(output.logits[0, -1, 32:35]).item()

    return response


def inference(model, data, inference_mode='generate'):
    """
    Perform inference on model using created data samples. The first sentence
    in each list of strings is the gold label. inference_mode='generate' means
    .generate() is used to create a written response; inference_mode='forward'
    means .forward() uses the output logits to determine the response.
    """
    
    system_message = "Give the sentiment of the user's prompt. Please only respond with A (positive), B (negative) or C (neutral)."
    
    gold_labels, pred_labels = [], []
    
    for sentences in tqdm(data):
        sentence_labels = []
        for i, user_prompt in enumerate(sentences):

            messages = [
                {"role": "system", "content": system_message},
                {"role": "user", "content": user_prompt}
            ]
            
            if inference_mode == 'generate':
                response = response_from_generate(model, messages)
            elif inference_mode == 'forward':
                response = response_from_forward(model, messages)
            else:
                assert False, 'unknown inference mode'
            
            if i == 0:
                gold_labels.append(response)
            else:
                sentence_labels.append(response)
        
        pred_labels.append(sentence_labels)
    
    return gold_labels, pred_labels


# gold_labels, pred_labels = inference(model, perturb_location_data, inference_mode='forward')

### Evaluate

In [None]:

def evaluate(gold_labels, pred_labels):
    
    y_true, y_pred = [], []
    
    for i, sentence_labels in enumerate(pred_labels):
        for prompt_label in sentence_labels:
            y_pred.append(prompt_label)
            y_true.append(gold_labels[i])
    
    return accuracy_score(y_true, y_pred)

# evaluate(gold_labels, pred_labels)


## Testing area

In [None]:
# Sentiment NER INV

gold_labels, pred_labels = inference(model, perturb_location_data, inference_mode='forward')
print(f'Accuracy: {evaluate(gold_labels, pred_labels):.2f}')

gold_labels, pred_labels = inference(model, perturb_names_data, inference_mode='forward')
print(f'Accuracy: {evaluate(gold_labels, pred_labels):.2f}')


In [None]:
# Sentiment Robust. INV

gold_labels, pred_labels = inference(model, perturb_irrelevant_data, inference_mode='forward')
print(f'Accuracy: {evaluate(gold_labels, pred_labels):.2f}')

gold_labels, pred_labels = inference(model, perturb_punc_data, inference_mode='forward')
print(f'Accuracy: {evaluate(gold_labels, pred_labels):.2f}')

gold_labels, pred_labels = inference(model, perturb_typo1_data, inference_mode='forward')
print(f'Accuracy: {evaluate(gold_labels, pred_labels):.2f}')

gold_labels, pred_labels = inference(model, perturb_typo2_data, inference_mode='forward')
print(f'Accuracy: {evaluate(gold_labels, pred_labels):.2f}')

gold_labels, pred_labels = inference(model, perturb_typo5_data, inference_mode='forward')
print(f'Accuracy: {evaluate(gold_labels, pred_labels):.2f}')

gold_labels, pred_labels = inference(model, perturb_contract_data, inference_mode='forward')
print(f'Accuracy: {evaluate(gold_labels, pred_labels):.2f}')


In [None]:

gold_labels, pred_labels = inference(model, perturb_change_neutral_data, inference_mode='forward')
print(f'Accuracy: {evaluate(gold_labels, pred_labels):.2f}')
