In [1]:
!pip install -r requirements.txt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import os
import sys
import nltk
from typing import List, Tuple, Union
import pandas as pd
import numpy as np
import ast
import matplotlib.pyplot as plt
import json
import random
import torch
import pickle
from sklearn.model_selection import train_test_split
from torch.nn import Module, ReLU, Linear, Sigmoid, BCELoss
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from torch.optim import Adam
from torch.nn.utils import clip_grad_norm_
from tqdm import tqdm
from transformers import BertTokenizer, BertModel, TFBertModel
import spacy

In [3]:
train_data = pd.read_csv('tsd_train.csv')
test_data = pd.read_csv('tsd_test.csv')

In [4]:
train_data["spans"] = [ast.literal_eval(x) for x in train_data["spans"]]
test_data["spans"] = [ast.literal_eval(x) for x in test_data["spans"]]

In [5]:
def new_spans(data_text, data_span):
    spans_new = []
    for idx in range(len(data_text)):
      sentence = []
      start = None
      for idx_arr in range(len(data_span[idx])):
        if start is None or data_span[idx][idx_arr] != data_span[idx][idx_arr-1] + 1:
          if start is not None:
            sentence.append(data_text[idx][start:data_span[idx][idx_arr-1]+1])
          start = data_span[idx][idx_arr]
      if start is not None:
        sentence.append(data_text[idx][start:data_span[idx][-1]+1])
      spans_new.append(sentence)
    return spans_new

In [6]:
train_spans_new = new_spans(train_data["text"], train_data["spans"])
test_spans_new = new_spans(test_data["text"], test_data["spans"])

In [7]:
nlp = spacy.blank("en")

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased', do_lower_case=True)

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

In [8]:
def text_2_tokens(text):
  text = nlp(text)
  text_tokens = []
  for token in text:
      text_subtokens = tokenizer.tokenize(token.text)
      text_tokens += text_subtokens
  return text_tokens

In [9]:
train_text_tokens, test_text_tokens = [], []
for idx in range(len(train_data)):
  train_text_token = text_2_tokens(train_data["text"][idx])
  train_text_tokens.append(train_text_token)

for idx in range(len(test_data)):
  test_text_token = text_2_tokens(test_data["text"][idx])
  test_text_tokens.append(test_text_token)

In [10]:
def bin_tokens(tokens, text, spans):
  token_idx = []
  start, end = 0, 0
  for token in tokens:
    sent = text.lower()
    token = token.strip('#')
    start = sent.find(token, start)
    end = start + len(token)
    token_idx.append((start,end))
    start = end

  toxic_words = []
  for word, (start,end) in zip(tokens, token_idx):
    k = [i for i in range(start,end)]
    if any(i in spans for i in k):
      toxic_words.append(1)
    else:
      toxic_words.append(0)

  return toxic_words

In [11]:
toxic_binary_test = []
for idx in range(len(test_data)):
  toxic_words = bin_tokens(test_text_tokens[idx], test_data["text"][idx], test_data["spans"][idx])
  toxic_binary_test.append(toxic_words)

In [14]:
maxlen = max([len(x) for x in train_text_tokens])
train_tokens1 = [['[CLS]'] + t[:maxlen - 2] + ['[SEP]'] for t in train_text_tokens]
test_tokens = [['[CLS]'] + t[:maxlen - 2] + ['[SEP]'] for t in test_text_tokens]

print(len(train_tokens1), len(test_tokens))

7939 2000


In [15]:
def pad_tokens(tokens, max_len=maxlen):
    length = len(tokens)
    pad_length = max_len - length
    if pad_length < 0:
        pad_length = 0
    
    tokens += [0] * pad_length                                 # pad tokens with zeros if necessary
    
    if length > max_len:                                       # truncate tokens if necessary
        tokens = tokens[:max_len]
    
    mask = [1] * length + [0] * pad_length                     # create attention mask to indicate which tokens are padding (0) and which are not (1)

    tokens = np.array(tokens, dtype="int")
    mask = np.array(mask, dtype="int")
    
    return tokens, mask

In [16]:
def token_ids_and_masks(text_tokens):
    token_ids, masks = [], []

    for token in text_tokens:
        tokens, mask = pad_tokens(tokenizer.convert_tokens_to_ids(token), maxlen)
        token_ids.append(tokens)
        masks.append(mask)

    return np.array(token_ids), np.array(masks)

tr_token_ids, tr_masks = token_ids_and_masks(train_tokens1)
te_token_ids, te_masks = token_ids_and_masks(test_tokens)

In [18]:
class BertClassifier(Module):
  
    def __init__(self, hidden_units):
        super().__init__()

        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.hidden_layer = Linear(BertModel.from_pretrained('bert-base-uncased').config.hidden_size, hidden_units)
        self.activation = ReLU()
        self.out = Linear(hidden_units, 1)
        self.final_activation = Sigmoid()

    def forward(self, input, mask, labels=None):
        outputs = self.bert(input_ids=input, attention_mask=mask)          # Inputs are passed to bert model
        hid_layer_output = self.hidden_layer(outputs.last_hidden_state)    # Outputs of the last hidden state are the inputs to the next hidden state
        activation_output = self.activation(hid_layer_output)              # Those outputs are send through activation layer (ReLU)
        output_layer = self.out(activation_output)                         # Output from the Linear layer
        final_output = self.final_activation(output_layer)                 # This is paased through final activation function (sigmoid)

        bce_loss = BCELoss()
        loss = 0
        if labels is not None:
            loss = bce_loss(final_output, labels.float())
            
        return loss, final_output

hidden_units = 32
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertClassifier(hidden_units).to(device)
model.load_state_dict(torch.load('NLP_project_checkpoint.pt'))

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.bias', 'cls.predictions.transfo

<All keys matched successfully>

In [19]:
BATCH_SIZE = 8

test_dataset = TensorDataset(torch.tensor(te_token_ids), torch.tensor(te_masks))
test_sampler = SequentialSampler(test_dataset)
test_dataloader = DataLoader(test_dataset, sampler=test_sampler, batch_size=BATCH_SIZE)

In [20]:
optimizer = Adam(model.parameters(), lr=3e-6)

In [21]:
model.eval()                                                         # switch the model to evaluation mode

test_tokens_ids, test_masks, test_preds = [], [], []

with torch.no_grad():                                                # temporarily disable gradient calculations within the for loop
    for step, batch_data in enumerate(tqdm(test_dataloader)):
        token_ids, masks = tuple(t.to(device) for t in batch_data)
        _, output = model(token_ids, masks)
        test_tokens_ids += token_ids.tolist()
        test_masks += masks.tolist()
        test_preds += output[:, :, 0].tolist()

100%|██████████| 250/250 [00:46<00:00,  5.33it/s]


In [22]:
# toxic_binary_test, toxic_phrases_test = [], []
# for idx in range(len(test_data)):
#   toxic_words, toxic_sentence = bin_tokens(test_data["text"][idx], test_data["spans"][idx])
#   toxic_binary_test.append(toxic_words)
#   toxic_phrases_test.append(toxic_sentence)

TypeError: ignored

In [23]:
import copy
def threshold_fun(preds, threshold):
    # testy_preds = []
    # for i in test_preds:
    #     testy_preds.append(np.mean(i))         # converting the predicted values into binary values given a threshold
    pred = copy.deepcopy(preds)
    for i in range(len(pred)):
        for j in range(len(pred[0])):
          if pred[i][j] >= threshold:
              pred[i][j] = 1
          else:
              pred[i][j] = 0
    return pred

In [62]:
test_pred = threshold_fun(test_preds, 0.6)

In [63]:
def truncated(preds, masks): 
  new_preds = []
  for pr, ma in zip(preds, masks):
    pr = [pr[i] for i in range(len(pr)) if ma[i]==1]   #truncate
    new_preds.append(pr)
  return new_preds

In [64]:
test_new_preds = truncated(test_pred, te_masks)

In [65]:
def id_2_tokens(token_ids):
    id2token=[]
    for i in range(len(token_ids)):
        id2token.append(tokenizer.convert_ids_to_tokens(token_ids[i]))
    return id2token

In [66]:
test_tokens_new = id_2_tokens(te_token_ids)

In [67]:
def toxic_tokens(tokens, predicts, texts):
    final_toxic_tokens = []
    for (token, pred, text) in zip(tokens, predicts, texts):
        toxic_tokens_spans = []
        start = 0
        for i in range(len(token)):
            tk = token[i].strip('#')
            start = text.find(tk,start)
            end = start + len(tk)
            if pred[i] == 1 and start!=-1:
                toxic_tokens_spans.extend(range(start,end))
            start=end
        final_toxic_tokens.append(toxic_tokens_spans)
    return final_toxic_tokens

In [68]:
test_toxic_tokens = toxic_tokens(test_tokens, test_new_preds, test_data["text"].tolist())

In [69]:
def f1_score(ground_truth, prediction):
    
    #Calculates F1 score for a set of spans
    true_positives = 0
    if len(ground_truth) == 0 and len(prediction) == 0:
        return 1.0
    elif len(ground_truth) == 0 and len(prediction) != 0:
        return 0.0
    elif len(ground_truth) != 0 and len(prediction) == 0:
        return 0.0
    else:
        true_positives_set = set(ground_truth).intersection(set(prediction))
        true_positives_len = len(true_positives_set)

        pred_cardinal = len(prediction)
        grou_cardinal = len(ground_truth)
        precision = true_positives_len / pred_cardinal
        recall = true_positives_len / grou_cardinal
        if precision + recall == 0:
            return 0.0
        else:
            f1_score = 2 * precision * recall / (precision + recall)
            return f1_score

In [70]:
f1_score_test = []
for i in range(len(test_toxic_tokens)):
    f1_score_test.append(f1_score(test_data["spans"].tolist()[i], test_toxic_tokens[i]))
print(np.mean(f1_score_test))

0.5916448782642862
