In [None]:
# https://www.analyticsvidhya.com/blog/2020/01/first-text-classification-in-pytorch/

In [1]:
import time

In [2]:
import re
import pandas as pd
from sklearn.utils import shuffle
import seaborn as sns
from matplotlib import pyplot as plt
from nltk.corpus import stopwords

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.legacy import data

In [4]:
def loader(file, is_number = False):
    data = []
    with open(file, encoding="utf8") as my_file:
        data = my_file.read().splitlines()
    if is_number:
        data = [int(i) for i in data]
    return data

In [5]:
test_data = loader("data/test_text.txt")
test_labels = loader("data/test_labels.txt", True)
val_data = loader("data/val_text.txt")
val_labels = loader("data/val_labels.txt", True)
train_data = loader("data/train_text.txt")
train_labels = loader("data/train_labels.txt", True)
mappings = {0:"anger", 1:"joy", 2:"optimism",3:	"sadness"}

In [6]:
df_test = pd.DataFrame({"text":test_data, "target":test_labels, "emotion":[mappings[i] for i in test_labels]})
df_val = pd.DataFrame({"text":val_data, "target":val_labels, "emotion":[mappings[i] for i in val_labels]})
df_train = pd.DataFrame({"text":train_data, "target":train_labels, "emotion":[mappings[i] for i in train_labels]})

In [7]:
df = pd.concat([df_train, df_val])

In [10]:
df = shuffle(df).reset_index(drop=True)

In [11]:
df

Unnamed: 0,text,target,emotion
0,"@user happy bday Ruth, hope you have an amazin...",1,joy
1,Banger sit in 2013 reason why we great doings ...,1,joy
2,Height of irritation when a person makes a hil...,0,anger
3,"#internationaldayofpeace Want peace,prepare fo...",0,anger
4,Oi @user you've absolutely fucking killed me.....,1,joy
...,...,...,...
3626,@user @user My heart goes out to that woman f...,3,sadness
3627,@user @user @user @user indeed &amp; is sadnes...,3,sadness
3628,the rappers who stayed true to the game is rich.,2,optimism
3629,Will WHU be old bill free by the time the game...,1,joy


In [12]:
df.to_csv("data/emotions.csv", index=False)

In [None]:
df_test.head()

In [None]:
df_val.head()

In [None]:
df_train.head()

In [None]:
for txt in df_train["text"]:
    if len(txt)<1:
        print("SHIT")
        print(txt)

In [None]:
df_train["emotion"].value_counts()

In [None]:
ax = sns.countplot(x="emotion", data=df_train)
ax.set_title("Value count of each label on Train dataset")
plt.show()

In [None]:
ax = sns.countplot(x="emotion", data=df_test)
ax.set_title("Value count of each label on Test dataset")
plt.show()

In [None]:
ax = sns.countplot(x="emotion", data=df_val)
ax.set_title("Value count of each label on Valid dataset")
plt.show()

In [None]:
def print_sample_text(df, count):
    for index in range(count):
        print(df["text"][index])

In [None]:
print_sample_text(df_train, 10)

In [None]:
ONLY_KEEP_ALPHA_SPACE = re.compile("[^a-zA-Z-' ]")
STOPWORDS = set(stopwords.words('english'))

def preprocess_text(text):
    # lowercase text
    text = text.lower() 
    # replace REPLACE_BY_SPACE_RE symbols by space in text. 
    # substitute the matched string in REPLACE_BY_SPACE_RE with space.
    text = ONLY_KEEP_ALPHA_SPACE.sub(' ', text)
    # multiple spaces with single space
    text = re.sub(' +', ' ', text)
    # remove stopwors from text
    text = ' '.join(word for word in text.split() if word not in STOPWORDS)
    return text

In [None]:
df_train['text'] = df_train['text'].apply(preprocess_text)
df_test['text'] = df_test['text'].apply(preprocess_text)
df_val['text'] = df_val['text'].apply(preprocess_text)

In [None]:
print_sample_text(df_train, 10)

In [None]:
df_train = df_train[df_train["text"].str.len() > 0].reset_index(drop=True)
df_val = df_val[df_val["text"].str.len() > 0].reset_index(drop=True)

In [None]:
#Reproducing same results
SEED = 2019
#Torch
torch.manual_seed(SEED)
#Cuda algorithms
torch.backends.cudnn.deterministic = True  

In [None]:
!python -m spacy download en_core_web_sm

In [None]:
import spacy
nlp = spacy.load("en_core_web_sm")

In [None]:
TEXT = data.Field(tokenize='spacy',batch_first=True,include_lengths=True)
LABEL = data.LabelField(dtype = torch.float,batch_first=True)

In [None]:
# source : https://gist.github.com/lextoumbourou/8f90313cbc3598ffbabeeaa1741a11c8
# to use DataFrame as a Data source

class DataFrameDataset(data.Dataset):

    def __init__(self, df, fields, is_test=False, **kwargs):
        examples = []
        for i, row in df.iterrows():
            label = row.target if not is_test else None
            text = row.text
            examples.append(data.Example.fromlist([text, label], fields))

        super().__init__(examples, fields, **kwargs)

    @staticmethod
    def sort_key(ex):
        return len(ex.text)

    @classmethod
    def splits(cls, fields, train_df, val_df=None, test_df=None, **kwargs):
        train_data, val_data, test_data = (None, None, None)
        data_field = fields

        if train_df is not None:
            train_data = cls(train_df.copy(), data_field, **kwargs)
        if val_df is not None:
            val_data = cls(val_df.copy(), data_field, **kwargs)
        if test_df is not None:
            test_data = cls(test_df.copy(), data_field, True, **kwargs)

        return tuple(d for d in (train_data, val_data, test_data) if d is not None)

In [None]:
fields = [('text',TEXT), ('target',LABEL)]

train_ds, val_ds = DataFrameDataset.splits(fields, train_df=df_train, val_df=df_val)

In [None]:
#initialize glove embeddings
TEXT.build_vocab(train_ds,min_freq=3,vectors = "glove.6B.100d")  
LABEL.build_vocab(train_ds)

#No. of unique tokens in text
print("Size of TEXT vocabulary:",len(TEXT.vocab))

#No. of unique tokens in label
print("Size of LABEL vocabulary:",len(LABEL.vocab))

#Commonly used words
print(TEXT.vocab.freqs.most_common(10))  

#Word dictionary
print(TEXT.vocab.stoi) 

In [None]:
#check whether cuda is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  

#set batch size
BATCH_SIZE = 64

#Load an iterator
train_iterator, valid_iterator = data.BucketIterator.splits(
    (train_ds, val_ds), 
    batch_size = BATCH_SIZE,
    sort_key = lambda x: len(x.text),
    sort_within_batch=True,
    device = device)

In [None]:
import torch.nn as nn

class classifier(nn.Module):
    
    #define all the layers used in model
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, 
                 bidirectional, dropout):
        
        #Constructor
        super().__init__()          
        
        #embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        #lstm layer
        self.lstm = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers=n_layers, 
                           bidirectional=bidirectional, 
                           dropout=dropout,
                           batch_first=True)
        
        #dense layer
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        
        #activation function
        self.act = nn.Sigmoid()
        
    def forward(self, text, text_lengths):
        
        #text = [batch size,sent_length]
        embedded = self.embedding(text)
        #embedded = [batch size, sent_len, emb dim]
      
        #packed sequence
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths,batch_first=True)
        
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        #hidden = [batch size, num layers * num directions,hid dim]
        #cell = [batch size, num layers * num directions,hid dim]
        
        #concat the final forward and backward hidden state
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)
                
        #hidden = [batch size, hid dim * num directions]
        dense_outputs=self.fc(hidden)

        #Final activation function
        outputs=self.act(dense_outputs)
        
        return outputs

In [None]:
#define hyperparameters
size_of_vocab = len(TEXT.vocab)
embedding_dim = 100
num_hidden_nodes = 32
num_output_nodes = 1
num_layers = 2
bidirection = True
dropout = 0.2

#instantiate the model
model = classifier(size_of_vocab, embedding_dim, num_hidden_nodes,num_output_nodes, num_layers, 
                   bidirectional = True, dropout = dropout)

In [None]:
#architecture
print(model)

#No. of trianable parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
    
print(f'The model has {count_parameters(model):,} trainable parameters')

#Initialize the pretrained embedding
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)

print(pretrained_embeddings.shape)

In [None]:
import torch.optim as optim

#define optimizer and loss
optimizer = optim.Adam(model.parameters())
criterion = nn.BCELoss()

#define metric
def binary_accuracy(preds, y):
    #round predictions to the closest integer
    rounded_preds = torch.round(preds)
    
    correct = (rounded_preds == y).float() 
    acc = correct.sum() / len(correct)
    return acc
    
#push to cuda if available
model = model.to(device)
criterion = criterion.to(device)

In [None]:
def train(model, iterator, optimizer, criterion):
    
    #initialize every epoch 
    epoch_loss = 0
    epoch_acc = 0
    
    #set the model in training phase
    model.train()  
    
    for batch in iterator:
        
        #resets the gradients after every batch
        optimizer.zero_grad()   
        
        #retrieve text and no. of words
        text, text_lengths = batch.text   
        
        #convert to 1D tensor
        predictions = model(text, text_lengths).squeeze()  
        
        #compute the loss
        loss = criterion(predictions, batch.target)        
        
        #compute the binary accuracy
        acc = binary_accuracy(predictions, batch.target)   
        
        #backpropage the loss and compute the gradients
        loss.backward()       
        
        #update the weights
        optimizer.step()      
        
        #loss and accuracy
        epoch_loss += loss.item()  
        epoch_acc += acc.item()    
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
def evaluate(model, iterator, criterion):
    
    #initialize every epoch
    epoch_loss = 0
    epoch_acc = 0

    #deactivating dropout layers
    model.eval()
    
    #deactivates autograd
    with torch.no_grad():
    
        for batch in iterator:
        
            #retrieve text and no. of words
            text, text_lengths = batch.text
            
            #convert to 1d tensor
            predictions = model(text, text_lengths).squeeze()
            
            #compute loss and accuracy
            loss = criterion(predictions, batch.target)
            acc = binary_accuracy(predictions, batch.target)
            
            #keep track of loss and accuracy
            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
N_EPOCHS = 50
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
     
    #train the model
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    
    #evaluate the model
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    
    #save the best model
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'saved_weights.pt')
    
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

In [None]:
#load weights
path="saved_weights.pt"
model.load_state_dict(torch.load(path));
model.eval();

#inference 
import spacy
nlp = spacy.load("en_core_web_sm")

def predict(model, sentence):
    tokenized = [tok.text for tok in nlp.tokenizer(sentence)]  #tokenize the sentence 
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]          #convert to integer sequence
    length = [len(indexed)]                                    #compute no. of words
    tensor = torch.LongTensor(indexed).to(device)              #convert to tensor
    tensor = tensor.unsqueeze(1).T                             #reshape in form of batch,no. of words
    length_tensor = torch.LongTensor(length)                   #convert to tensor
    prediction = model(tensor, length_tensor)                  #prediction 
    return prediction.item()        

In [None]:
predict(model, "haha")