Imports

In [2]:
import torch
from transformers import BertModel
from transformers import AutoTokenizer
from typing import Dict, List
import numpy as np
import pandas as pd
from util import precision, recall, f1_score

Prepping dataset

In [5]:
file = open("data/arguments-training.tsv", 'r', encoding='utf8')
x_train = [line.strip().split('\t') for line in file.readlines()[1:]]
file.close()
print(x_train[0])

file = open("data/labels-training.tsv", 'r', encoding='utf8')
y_train = [line.strip().split('\t') for line in file.readlines()[1:]]
file.close()

file = open("data/arguments-validation.tsv", 'r', encoding='utf8')
x_valid = [line.strip().split('\t') for line in file.readlines()[1:]]
file.close()
print(x_valid[0])
file = open("data/labels-validation.tsv", 'r', encoding='utf8')
y_valid = [line.strip().split('\t') for line in file.readlines()[1:]]
file.close()
print(y_valid[0])
file = open("data/arguments-test.tsv", 'r', encoding='utf8')
x_test = [line.strip().split('\t') for line in file.readlines()[1:]]
file.close()
print(x_test[0])

['A01001', 'Entrapment should be legalized', 'in favor of', "if entrapment can serve to more easily capture wanted criminals, then why shouldn't it be legal?"]
['A01001', 'Entrapment should be legalized', 'in favor of', "if entrapment can serve to more easily capture wanted criminals, then why shouldn't it be legal?"]
['A01001', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0']
['A26004', 'We should end affirmative action', 'against', 'affirmative action helps with employment equity.']


In [6]:
#Tokenize, conjoin strings, and add special tokens, remove item ids from labels
import spacy

def tokenize(text, labels=None):
    nlp = spacy.load("en_core_web_sm")

    args = []
    labs = []
    if(labels != None):
        for arg, lab in zip(text, labels):
            if arg[3] == 'in favor of':
                sep = ['<PRO>']
            else:
                sep = ['<CON>']
            item = ['<SOS>'] + list(nlp(arg[1])) + sep + list(nlp(arg[3])) + ['<EOS>']
            args.append(item)
            labs.append(lab[1:20])
    else:
        for arg in text:
            if arg[3] == 'in favor of':
                sep = ['<PRO>']
            else:
                sep = ['<CON>']
            item = ['<SOS>'] + list(nlp(arg[1])) + sep + list(nlp(arg[3])) + ['<EOS>']
            args.append(item)

    return args, labs
    
def tokenize_allData(x_train,y_train,x_valid,y_valid,x_test):
    x_train, y_train = tokenize(x_train, y_train)
    x_valid, y_valid = tokenize(x_valid,y_valid)
    x_test, _ = tokenize(x_test)
    print(x_train[0], y_train[0])
    print("x_train size: ",len(x_train)," - x_train size: ",len(y_train))
    print("___________________")
    print(x_valid[0], y_valid[0])
    print("x_valid size: ",len(x_valid)," - y_valid size: ",len(y_valid))
    print("_______________")
    print(x_test[0])
    print("xTest size: ", len(x_test))
    return x_train,y_train,x_valid,y_valid,x_test
x_train,y_train,x_valid,y_valid,x_test = tokenize_allData(x_train,y_train,x_valid,y_valid,x_test)

['<SOS>', Entrapment, should, be, legalized, '<CON>', if, entrapment, can, serve, to, more, easily, capture, wanted, criminals, ,, then, why, should, n't, it, be, legal, ?, '<EOS>'] ['0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0']
x_train size:  5220  - x_train size:  5220
___________________
['<SOS>', Entrapment, should, be, legalized, '<CON>', if, entrapment, can, serve, to, more, easily, capture, wanted, criminals, ,, then, why, should, n't, it, be, legal, ?, '<EOS>'] ['0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0']
x_valid size:  1896  - y_valid size:  1896
_______________
['<SOS>', We, should, end, affirmative, action, '<CON>', affirmative, action, helps, with, employment, equity, ., '<EOS>']
xTest size:  1576


## Label Selection (Which labels to use in models )

In [8]:
def removeLabels_NotWanted(labels, labels_wanted):
    newLabels=[]
    for row in labels:
        newRow = []
        for i in labels_wanted:
            newRow.append(row[i])
        assert len(newRow) == len(labels_wanted)
        newLabels.append(newRow)
    return newLabels
def removeAllEmptyLabelRows(text,labels):
    newText=[]
    newLabels=[]
    for i in range(len(text)):
        intList = [eval(j) for j in labels[i] ]
        if(np.sum(intList)!=0):
            newText.append(text[i])
            newLabels.append(labels[i])
      #else:
            #print(labels[i])
    return newText,newLabels


### Creating different labels for training on     
    labels_starters recommended by Eval:   Self-direction: action, Achievement, Security: personal, Security: societal, Benevolence: caring, Universalism: concern.
 

In [9]:
reference_labels = {
    "Self-direction: thought": 0,
    "Self-direction: action": 1,	
    "Stimulation": 2,
    "Hedonism": 3,
    "Achievement": 4,
    "Power: dominance": 5,
    "Power: resources": 6,
    "Face": 7,	
    "Security: personal": 8,
    "Security: societal": 9,
    "Tradition": 10,
    "Conformity: rules": 11,
    "Conformity: interpersonal": 12,
    "Humility": 13,	
    "Benevolence: caring": 14,
    "Benevolence: dependability": 15,	
    "Universalism: concern": 16,	
    "Universalism: nature": 17,	
    "Universalism: tolerance": 18,
    "Universalism: objectivity": 19
}

recommended_categories = [1, 4, 8, 9, 14, 16]
print("Number of original items:", len(y_train))
labels_starters = removeLabels_NotWanted(y_train, recommended_categories)
x_train_trimmed, labels_starters_train = removeAllEmptyLabelRows(x_train, labels_starters)
print("Number of items with one of the desired labels:", len(labels_starters_train))
labels_starters_valid = removeLabels_NotWanted(y_valid, recommended_categories)
x_valid_trimmed, labels_starters_valid = removeAllEmptyLabelRows(x_valid, labels_starters_valid)

starters_dict = {
    "Self-direction: action":0,
    "Achievement": 1,
    "Security: personal": 2,
    "Security: societal": 3,
    "Benevolence: caring": 4,
    "Universalism: concern": 16
}


Number of original items: 5220
Number of items with one of the desired labels: 4835


In [50]:

security = [8, 9]
print("Number of original items:", len(y_train))
labels_security = removeLabels_NotWanted(y_train, security)
x_train_trimmed, labels_security_train = removeAllEmptyLabelRows(x_train, labels_security)
print("Number of items with one of the desired labels:", len(labels_security_train))
labels_security_valid = removeLabels_NotWanted(y_valid, security)
x_valid_trimmed, labels_security_valid = removeAllEmptyLabelRows(x_valid, labels_security_valid)

security_dict = {
    "Security: personal": 0,
    "Security: societal": 1
}


Number of original items: 5220
Number of items with one of the desired labels: 3090


## Create P3 label set

In [38]:
def makePT3Labels(allClassLabels):
    PT3LabelsDict = {}
    PT3Labels_idx = []
    PT3Labels_vects = []
    label=0
    for row in allClassLabels:
        row_lab = [key for key, value in PT3LabelsDict.items() if value == row]
        if row_lab:
            PT3Labels_idx.append(row_lab[0])
        else:
            PT3LabelsDict[label] = row
            PT3Labels_idx.append(label)
            label += 1
    # for row in PT3Labels_idx:
    #     label_vect = [0]*len(PT3LabelsDict)
    #     label_vect[row] = 1
    #     PT3Labels_vects.append(label_vect)

    return PT3Labels_idx, PT3LabelsDict

In [39]:
PT3LabelsTrain, PT3Dict = makePT3Labels(labels_starters_train)


#PT3Labels_idx = []
PT3LabelsValid = []
label=0
for row in labels_starters_valid:
    row_lab = [key for key, value in PT3Dict.items() if value == row]
    if row_lab:
        PT3LabelsValid.append(row_lab[0])
    else:
        PT3Dict[label] = row
        PT3LabelsValid.append(label)
        label += 1
# for row in PT3Labels_idx:
#     label_vect = [0]*len(PT3Dict)
#     label_vect[row] = 1
#     PT3LabelsValid.append(label_vect)

print("Number of new combination labels:", len(PT3Dict))


Number of new combination labels: 61


In [51]:
PT3LabelsTrain, PT3Dict = makePT3Labels(labels_security_train)


#PT3Labels_idx = []
PT3LabelsValid = []
label=0
for row in labels_security_valid:
    row_lab = [key for key, value in PT3Dict.items() if value == row]
    if row_lab:
        PT3LabelsValid.append(row_lab[0])
    else:
        PT3Dict[label] = row
        PT3LabelsValid.append(label)
        label += 1
# for row in PT3Labels_idx:
#     label_vect = [0]*len(PT3Dict)
#     label_vect[row] = 1
#     PT3LabelsValid.append(label_vect)

print("Number of new combination labels:", len(PT3Dict))


Number of new combination labels: 3


## Get PT4 Labels

In [12]:
#creates two dictionaries, one with positive instances for each class
#and one for negative instances for each class
def makePT4Labels(allClassLabels):
    PT4LabelsPos = {}
    PT4LabelsNeg = {}
    
    for i in range(len(allClassLabels)):
        PT4LabelsPos[i] = []
        PT4LabelsNeg[i] = []

    for row in allClassLabels:
        label_idx = 0
        for label in row:
            if label == '0':
                PT4LabelsPos[label_idx].append(0)
                PT4LabelsNeg[label_idx].append(1)
            if label == '1':
                PT4LabelsPos[label_idx].append(1)
                PT4LabelsNeg[label_idx].append(0)
            label_idx += 1
    assert len(PT4LabelsPos[0]) == len(PT4LabelsNeg[0]) == len(allClassLabels)

    return PT4LabelsPos, PT4LabelsNeg


In [13]:
PT4LabelsPos, PT4LabelsNeg = makePT4Labels(labels_starters)

In [52]:

SPECIAL_TOKENS = ['<UNK>', '<PAD>', '<SOS>', '<EOS>', '<PRO>', '<CON>']
vocab = sorted(set([str(w) for ws in list(x_train_trimmed) + [SPECIAL_TOKENS] for w in ws]))
embeddings_path = 'glove.twitter.27B.200d.txt'

from typing import Dict, Tuple
import torch
import numpy as np

def read_pretrained_embeddings(
    embeddings_path: str,
    vocab
) -> Tuple[Dict[str, int], torch.FloatTensor]:
    """Read the embeddings matrix and make a dict hashing each word.

    Args:
        embeddings_path (str): _description_
        vocab_path (str): _description_

    Returns:
        Tuple[Dict[str, int], torch.FloatTensor]: _description_
    """
    word2i = {}
    vectors = []
    
    print(f"Reading embeddings from {embeddings_path}...")
    with open(embeddings_path, "r", encoding = "utf-8") as f:
        i = 0
        for line in f:
            word, *weights = line.rstrip().split(" ")
            
            if word in vocab:
                word2i[word] = i
                i += 1
                w_weights = [float(i) for i in weights]
                vectors.append(w_weights)

        vectors = torch.FloatTensor(vectors)

    return word2i, vectors

def get_oovs(vocab, word2i: Dict[str, int]) -> List[str]:
    """Find the vocab items that do not exist in the glove embeddings (in word2i).
    Return the List of such (unique) words.

    Args:
        vocab_path: List of batches of sentences.
        word2i (Dict[str, int]): _description_

    Returns:
        List[str]: _description_
    """
    glove_and_vocab = set(word2i.keys())
    vocab_and_not_glove = set(vocab) - glove_and_vocab
    return list(vocab_and_not_glove)

def initialize_new_embedding_weights(num_embeddings: int, dim: int) -> torch.FloatTensor:
    """xavier initialization for the embeddings of words in train, but not in gLove.

    Args:
        num_embeddings (int): _description_
        dim (int): _description_

    Returns:
        torch.FloatTensor: _description_
    """
    #Initialize a num_embeddings x dim matrix with xiavier initiialization
    return torch.FloatTensor(np.random.normal(0, dim**-0.5, size=(num_embeddings, dim)))
    

def update_embeddings(
    glove_word2i: Dict[str, int],
    glove_embeddings: torch.FloatTensor,
    oovs: List[str]
) -> Tuple[Dict[str, int], torch.FloatTensor]:
    #Add the oov words to the dict, assigning a new index to each
        i = len(glove_embeddings)
        for w in oovs:
            glove_word2i[w] = i
            i +=1
    #Concatenate a new row to embeddings for each oov, initialize those new rows with `intialize_new_embedding_weights`
        new_emb = initialize_new_embedding_weights(len(oovs), len(glove_embeddings[0]))
        cat_emb = torch.cat((glove_embeddings, new_emb), 0)
        return (glove_word2i, cat_emb)



In [53]:
# glove_word2i, glove_embeddings = read_pretrained_embeddings(
#     embeddings_path,
#     vocab
# )
# oovs = get_oovs(vocab, glove_word2i)

# # Add the oovs from training data to the word2i encoding, and as new rows
# # to the embeddings matrix
# word2i, embeddings = update_embeddings(glove_word2i, glove_embeddings, oovs)

Reading embeddings from glove.twitter.27B.200d.txt...


Batch Training arguments and pad

In [54]:

import math

def make_batches(sequences: List[List[str]], labels: List[List[int]], batch_size: int) -> (List[List[List[str]]], List[List[List[int]]]):
    """Yield batch_size chunks from sequences."""
    
    num_batch = math.floor(len(sequences)/batch_size)
    batched_sents = []
    batched_labs = []
    
    df = pd.DataFrame(data = {"seq": sequences, "lab": labels})
    for i in range(num_batch):
        batch = df.sample(n=batch_size)
        #print("Batch size: ",batch.shape[0])
        this_batch_sents = []
        this_batch_labs = []
        for index, row in batch.iterrows():
            sent = row['seq']
            label = row['lab']
            #df = df[df.seq != sent]
            this_batch_sents.append(sent)
            this_batch_labs.append(label)
        df = df.drop(batch.index)
        batched_sents.append(this_batch_sents)
        batched_labs.append(this_batch_labs)
        
    return batched_sents, batched_labs


def pad(sents, labs):
    lengths = []
    for sent in sents:
        lengths.append(len(sent))
            
    max_length = max(lengths)
        
    for sent in sents:
        n = max_length - len(sent)
        for i in range(n):
            sent.append("")
        
    return sents


# Set your preferred batch size
batch_size = 8

# We make batches now and use those.
PT3_batches_train = []
# Note: Labels need to be batched in the same way to ensure
# We have train sentence and label batches lining up.
PT3_batched_sents, PT3_batched_labs = make_batches(x_train_trimmed, PT3LabelsTrain, batch_size)
for batch in PT3_batched_sents:
    pad_batch = pad(batch, PT3_batched_labs)
    PT3_batches_train.append(pad_batch)

PT3_batches_valid = []
PT3_batched_sents_valid, PT3_batched_labs_valid = make_batches(x_valid_trimmed, PT3LabelsValid, batch_size)
for batch in PT3_batched_sents_valid:
    pad_batch = pad(batch, PT3_batched_labs_valid)
    PT3_batches_valid.append(pad_batch)


In [None]:
#TODO: Make PT4 batches

In [55]:
import torch

class ValuesClassifier(torch.nn.Module):
    def __init__(self, 
    output_size: int, 
    hidden_size: int,
    embeddings_tensor: torch.FloatTensor,
    pad_idx: int,
    dropout_val: float = 0.3,
    input_dim: int = 200,
    ):
        super().__init__()
        self.output_size = output_size
        self.hidden_size = hidden_size
        # Initialize BERT, which we use instead of a single embedding layer.
        self.bert = BertModel.from_pretrained("prajjwal1/bert-small")
        self.bert_hidden_dimension = self.bert.config.hidden_size
        self.hidden_layer = torch.nn.Linear(self.bert_hidden_dimension, self.hidden_size)
        self.relu = torch.nn.ReLU()
        self.classifier = torch.nn.Linear(self.hidden_size, self.output_size)
        self.log_softmax = torch.nn.LogSoftmax(dim=2)
        self.embeddings = torch.nn.Embedding.from_pretrained(embeddings_tensor, freeze = False, padding_idx = pad_idx)
        self.dropout_val = dropout_val
        self.dropout_layer = torch.nn.Dropout(p=self.dropout_val, inplace=False)
        self.pad_idx = pad_idx
        self.input_dim = input_dim
        self.lstm = torch.nn.LSTM(
            self.input_dim,
            self.hidden_size,
            num_layers=2,
            dropout=dropout_val,
            batch_first=True,
            bidirectional=True,
        )
        


    def encode_text(
        self,
        symbols: torch.Tensor
    ) -> torch.Tensor:
        """Encode the (batch of) sequence(s) of token symbols with an LSTM.
            Then, get the last (non-padded) hidden state for each symbol and return that.

        Args:
            symbols (torch.Tensor): The batch size x sequence length tensor of input tokens

        Returns:
            torch.Tensor: The final hiddens tate of the LSTM, which represents an encoding of
                the entire sentence
        """
        # First we get the embedding for each input symbol
        embedded = self.embeddings(symbols)
        embedded = self.dropout_layer(embedded)
        # Packs embedded source symbols into a PackedSequence.
        # This is an optimization when using padded sequences with an LSTM
        lens = (symbols != self.pad_idx).sum(dim=1).to("cpu")
        packed = torch.nn.utils.rnn.pack_padded_sequence(
            embedded, lens, batch_first=True, enforce_sorted=False
        )
        # -> batch_size x seq_len x encoder_dim, (h0, c0).
        packed_outs, (H, C) = self.lstm(packed)
        encoded, _ = torch.nn.utils.rnn.pad_packed_sequence(
            packed_outs,
            batch_first=True,
            padding_value=self.pad_idx,
            total_length=None,
        )
        # Now we have the representation of eahc token encoded by the LSTM.
        encoded, (H, C) = self.lstm(embedded)
        
        # This part looks tricky. All we are doing is getting a tensor
        # That indexes the last non-PAD position in each tensor in the batch.
        last_enc_out_idxs = lens - 1
        # -> B x 1 x 1.
        last_enc_out_idxs = last_enc_out_idxs.view([encoded.size(0)] + [1, 1])
        # -> 1 x 1 x encoder_dim. This indexes the last non-padded dimension.
        last_enc_out_idxs = last_enc_out_idxs.expand(
            [-1, -1, encoded.size(-1)]
        )
        # Get the final hidden state in the LSTM
        last_hidden = torch.gather(encoded, 1, last_enc_out_idxs)
        return last_hidden

    def forward(
        self,
        symbols: Dict,
    ) -> torch.Tensor:
        """_summary_

        Args:
            symbols (Dict): The Dict of token specifications provided by the HuggingFace tokenizer

        Returns:
            torch.Tensor: _description_
        """
        encoded_sents = self.encode_text(symbols)
        output = self.hidden_layer(encoded_sents)
        output = self.relu(output)
        output = self.classifier(output)
        return self.log_softmax(output)

In [56]:
# For making predictions at test time
def predict(model: torch.nn.Module, sents: torch.Tensor) -> List:
    logits = model(sents)
    return list(torch.argmax(logits, axis=2).squeeze().numpy())

In [57]:
import numpy as np
from numpy import logical_and, sum as t_sum


def precision(predicted_labels, true_labels, which_label=1):
    """
    Precision is True Positives / All Positives Predictions
    """
    pred_which = np.array([pred == which_label for pred in predicted_labels])
    true_which = np.array([lab == which_label for lab in true_labels])
    denominator = t_sum(pred_which)
    if denominator:
        return t_sum(logical_and(pred_which, true_which))/denominator
    else:
        return 0.


def recall(predicted_labels, true_labels, which_label=1):
    """
    Recall is True Positives / All Positive Labels
    """
    pred_which = np.array([pred == which_label for pred in predicted_labels])
    true_which = np.array([lab == which_label for lab in true_labels])
    denominator = t_sum(true_which)
    if denominator:
        return t_sum(logical_and(pred_which, true_which))/denominator
    else:
        return 0.


def f1_score(
    predicted_labels: List[int],
    true_labels: List[int],
    which_label: int
):
    """
    F1 score is the harmonic mean of precision and recall
    """
    P = precision(predicted_labels, true_labels, which_label=which_label)
    R = recall(predicted_labels, true_labels, which_label=which_label)
    if P and R:
        return 2*P*R/(P+R)
    else:
        return 0.


def macro_f1(
    predicted_labels: List[int],
    true_labels: List[int],
    possible_labels: List[int]
):
    scores = [f1_score(predicted_labels, true_labels, l) for l in possible_labels]
    # Macro, so we take the uniform avg.
    return sum(scores) / len(scores)

In [58]:
from transformers import logging

logging.set_verbosity_warning()