In [None]:
import random

SEED = 32
random.seed(SEED)

import numpy as np 
import pandas as pd
import spacy

from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

from torch import nn
import torch
from torchtext import data
from torch.nn  import functional as F
import torch.optim as  optim 
if torch.cuda.is_available():  
  dev = "gpu"
else:  
  dev = "cpu"  
device = torch.device(dev)
print(dev)

In [None]:
import nltk
nltk.download("punkt")

import re
from spacy.tokenizer import Tokenizer
from spacy.lang.en import English
nlp = English()

tokenizer = Tokenizer(nlp.vocab)

from nltk import word_tokenize,sent_tokenize
from nltk.stem  import PorterStemmer


from nltk.corpus import stopwords
from nltk.stem import SnowballStemmer

nltk.download('stopwords')
stops = stopwords.words("english")


def removepunc(my_str): # function to remove punctuation
    punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
    no_punct = ""
    for char in my_str:
        if char not in punctuations:
            no_punct = no_punct + char
    return no_punct

def hasNumbers(inputString):
    return bool(re.search(r'\d', inputString))
snowstem = SnowballStemmer("english")
portstem = PorterStemmer()


In [None]:
traindata = pd.read_csv("AuxData/toxic_train.csv")
testdata  = pd.read_csv("AuxData/toxic_test.csv")

traindata.drop("Unnamed: 0",axis=1,inplace=True)
testdata.drop("Unnamed: 0",axis=1,inplace=True)

In [None]:
test = pd.DataFrame({'tweet': test.tweet.values.tolist(), 'toxic': test['class'].values.tolist()})

In [None]:
def myTokenizer(x):
 return  [snowstem.stem(word.text)for word in 
          tokenizer(removepunc(re.sub(r"\s+\s+"," ",re.sub(r"[^A-Za-z0-9()!?\'\`\"\r+\n+]"," ",x.lower()))).strip()) 
          if (word.text not in stops and not hasNumbers(word.text)) ]

In [None]:
TEXT = data.Field(tokenize=myTokenizer,batch_first=True,fix_length=140)
LABEL = data.LabelField(dtype=torch.float ,batch_first=True)

class DataFrameDataset(data.Dataset):
    def __init__(self, df, text_field, label_field, is_test=False, **kwargs):
        fields = [('comment_text', text_field), ('toxic', label_field)]
        examples = []
        for i, row in df.iterrows():
            label = row.toxic
            text = row.comment_text
            examples.append(data.Example.fromlist([text, label], fields))

        super().__init__(examples, fields, **kwargs)

torchdataset = DataFrameDataset(traindata, TEXT,LABEL)
torchtest    = DataFrameDataset(testdata, TEXT,LABEL)

In [None]:
train_data, valid_data = torchdataset.split(split_ratio=0.8, random_state = random.seed(SEED))

In [None]:
TEXT.build_vocab(train_data,min_freq=3)  
LABEL.build_vocab(train_data)

In [None]:
#No. of unique tokens in text
print("Size of TEXT vocabulary:",len(TEXT.vocab))

#No. of unique tokens in label
print("Size of LABEL vocabulary:",len(LABEL.vocab))

#Commonly used words
print(TEXT.vocab.freqs.most_common(10))  

In [None]:
#set batch size
BATCH_SIZE = 128

train_iterator,valid_iterator,test_iterator = data.BucketIterator.splits(
    (train_data,valid_data,torchtest), 
    batch_size = BATCH_SIZE,
    device = device,
    sort =False,
shuffle=False)

In [None]:
class TextTransformer(nn.Module):
  def __init__(self):
    super(TextTransformer,self).__init__()
    self.wordEmbeddings = nn.Embedding(len(TEXT.vocab),140)
    self.positionEmbeddings = nn.Embedding(140,20)
    self.transformerLayer = nn.TransformerEncoderLayer(160,8) 
    self.linear1 = nn.Linear(160,  64)
    self.linear2 = nn.Linear(64,  1)
    self.linear3 = nn.Linear(140,  16)
    self.linear4 = nn.Linear(16,  1)
  def forward(self,x):
    positions = (torch.arange(0,140).reshape(1,140) + torch.zeros(x.shape[0],140)).to(device) 
    # broadcasting the tensor of positions 
    sentence = torch.cat((self.wordEmbeddings(x.long()),self.positionEmbeddings(positions.long())),axis=2)
    attended = self.transformerLayer(sentence)
    linear1 = F.relu(self.linear1(attended))
    linear2 = F.relu(self.linear2(linear1))
    linear2 = linear2.view(-1,140) # reshaping the layer as the transformer outputs a 2d tensor (or 3d considering the batch size)
    linear3 = F.relu(self.linear3(linear2))
    out = torch.sigmoid(self.linear4(linear3))
    return out

myTransformer = TextTransformer()
myTransformer.to(device)

In [None]:
lstm_loss     = {
    'train': [],
    'validation': []
}

lstm_accuracy     = {
    'train': [],
    'validation': []
}

In [None]:
def calculateMetrics(ypred,ytrue,ty):
    acc  = accuracy_score(ytrue,ypred)
    f1  = f1_score(ytrue,ypred)
    f1_average  = f1_score(ytrue,ypred,average="macro")
    
    if ty == 'train':
        lstm_accuracy['train'].append(round(acc,3))
        
    if ty == 'val':
        lstm_accuracy['validation'].append(round(acc,3))

    return " f1 score: "+str(round(f1,3))+" f1 average: "+str(round(f1_average,3))+" accuracy: "+str(round(acc,3))

In [None]:
"""
using adagrad because it assign bigger updates to less frequently updated weights 
(like words that are not used many times)

"""
from progressbar import progressbar
optimizer = optim.Adagrad(myTransformer.parameters(),lr = 0.001)

for i in progressbar(range(20)):
    trainpreds = torch.tensor([])
    traintrues = torch.tensor([])
    for batch in train_iterator:
        X = batch.comment_text
        y = batch.toxic
        myTransformer.zero_grad()
        pred = myTransformer(X).squeeze()
        trainpreds = torch.cat((trainpreds,pred.cpu().detach()))
        traintrues = torch.cat((traintrues,y.cpu().detach()))
        err = F.binary_cross_entropy(pred,y)
        err.backward()
        optimizer.step()
    err = F.binary_cross_entropy(trainpreds,traintrues)
    lstm_loss['train'].append(err.item())
    print("train BCE loss: ",err.item(),calculateMetrics(torch.round(trainpreds).numpy(),traintrues.numpy(), 'train'))
 

    valpreds = torch.tensor([])
    valtrues = torch.tensor([])
    for batch in valid_iterator:
        X = batch.comment_text
        y = batch.toxic
        valtrues = torch.cat((valtrues,y.cpu().detach()))
        pred = myTransformer(X).squeeze().cpu().detach()
        valpreds = torch.cat((valpreds,pred))
    err = F.binary_cross_entropy(valpreds,valtrues)
    lstm_loss['validation'].append(err.item())
    print("validation BCE loss: ",err.item(),calculateMetrics(torch.round(valpreds).numpy(),valtrues.numpy(), 'val'))

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')

# Plot loss
plt.plot(lstm_loss['train'], label='Treino')
plt.plot(lstm_loss['validation'], label='Validação')
plt.xlabel('\nÉpocas')
plt.ylabel('Perda\n')
plt.title('Perda por época (LSTM)\n')
plt.legend()
plt.show()

In [None]:
# Plot accuracy
plt.plot(lstm_accuracy['train'], label='Treino')
plt.plot(lstm_accuracy['validation'], label='Validação')
plt.xlabel('\nÉpocas')
plt.ylabel('Acurácia\n')
plt.title('Acurácia por época (LSTM)\n')
plt.legend()
plt.show()

In [None]:
for i in progressbar(range(1)):
    testpreds = torch.tensor([])
    testtrues = torch.tensor([])
    for batch in test_iterator:
        X = batch.comment_text
        y = batch.toxic
        testtrues = torch.cat((testtrues,y.cpu().detach()))
        pred = myTransformer(X).squeeze().cpu().detach()
        # print(valtrues.shape)
        testpreds = torch.cat((testpreds,pred))
    err = F.binary_cross_entropy(testpreds,testtrues)
    print("test BCE loss: ",err.item(),calculateMetrics(torch.round(testpreds).numpy(),testtrues.numpy()))

In [None]:
testdata['LSTM_HS'] = torch.round(testpreds).numpy()

LSTM = []

for i in testdata.LSTM_HS.values.tolist():
    LSTM.append(int(i))
    
testdata['LSTM_HS'] = LSTM
testdata.LSTM_HS.value_counts()