In [0]:
!pip install torchtext==0.5.0

In [0]:
!pip install torchtext

In [0]:
pip show torchtext

In [0]:
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
%pylab inline
from pandas.io.json import json_normalize
import json
import torch
import re
import random
import warnings
import functools
import operator
import seaborn as sns

from collections import defaultdict, Counter
from html import unescape
from matplotlib import pyplot as plt
from torch import nn, optim
from torchtext.data import Field, Dataset, Example, BucketIterator, Iterator, TabularDataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import matthews_corrcoef, confusion_matrix, precision_score, recall_score, f1_score, accuracy_score
from sklearn.utils import shuffle

### Prepare Data

In [0]:
# Use this code block if load file is only 2 columns: text and labels

# Locate data
# file_path = ""
# train_path = ""
# dev_path = ""
# test_path = ""

# # Define fields
# TEXT = Field(sequential=True, tokenize="spacy", lower=True, include_lengths=True) 
# LABEL = Field(sequential=False, use_vocab=False)

# # Create dataset object
# train_set, dev_set, test_set = TabularDataset.splits(path=file_path, train=train_path,validation=dev_path, test=test_path, format='csv',fields=[('text', TEXT), ('labels', LABEL))

# # Get training words and pretrained vectors (training words without vectors are initialized randomly)
# TEXT.build_vocab(train_set, vectors='fasttext.simple.300d', unk_init=torch.Tensor.normal_)

In [0]:
# Remove columns not needed
train = pd.read_csv("") 
dev = pd.read_csv("") 
test = pd.read_csv("") 

train = train[["text", "labels"]]
dev = dev[["text", "labels"]]
test = test[["text", "labels"]]

# Define fields
TEXT = Field(sequential=True, tokenize="spacy", include_lengths=True) 
LABEL = Field(sequential=False, use_vocab=False)
fields = [('text', TEXT), ('labels', LABEL)]

# Create datasets
train_set = Dataset([Example.fromlist(i, fields) for i in train.values.tolist()], fields=fields)
dev_set = Dataset([Example.fromlist(i, fields) for i in dev.values.tolist()], fields=fields)
test_set = Dataset([Example.fromlist(i, fields) for i in test.values.tolist()], fields=fields)

# Get training words and pretrained vectors (training words without vectors are initialized randomly)
TEXT.build_vocab(train_set, vectors='fasttext.simple.300d', unk_init=torch.Tensor.normal_)

## Model Setup

In [0]:
class LSTMClassifier(nn.Module):
    def __init__(self, train, dev, test, pad_id, input_dim: int, embedding_dim: int=300, hidden_dim: int=256, output_dim: int=4, n_layers: int=2, dropout: float=0.2, bidirectional: bool=True, batch_size: int=64):
        """
        train: dataset object of training data

        dev: dataset object of dev data

        test: dataset object of test data
        
        pad_id: pads output with embedding vector
        
        input_dim (int): length of text
        
        embedding_dim (int): number of embedding dimensions defaulted to 300
        
        hidden_dim (int): number of hidden nodes defaulted to 256
        
        output_dim (int): number of labels in classification task defaulted to 2
        
        n_layers (int): number of recurrent layers defaulted to 2
        
        dropout (float): percent on nodes turned off during training defaulted to 0.2

        bidirectional (boolean): whether bidirectional layers will be added, defaulted to True

        batch_size (int): size of mini batches
        """
        
        super(LSTMClassifier, self).__init__()
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.train_iter, self.dev_iter, self.test_iter = BucketIterator.splits((train, dev, test), 
                                                                                batch_size=batch_size, 
                                                                                device=self.device, 
                                                                                sort_key=lambda x: len(x.text), 
                                                                                sort_within_batch=True
                                                                                )

        self.output_dim = output_dim
        self.embedding = nn.Embedding(input_dim, embedding_dim, padding_idx=pad_id)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, bidirectional=bidirectional, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.sig = nn.Sigmoid()


    def forward(self, text, lengths):
        """
        This function sets up the model's foward pass

        Initiates forward pass of data
        
        text: vector representation of text string
        
        lengths: second element in bucket iterator
        """

        embedded = self.embedding(text)
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, lengths) #,batch_first=True)
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)
        outputs=self.fc(hidden)
        
        return outputs
      
    
    def trainer(self, model, learning_rate: float, early_stop_vals, epochs: int=50):
        """
        This function trains the model

        model: Instantiation of model

        learning_rate (float): determines steps size while minimizing loss function
        
        early_stop_vals: Dictionary containing patience level and minimum improvement delta
        
        epochs (int): Number of training epochs defaulted to 50
        """

        self.early_stop_vals = early_stop_vals
        optimizer = optim.AdamW(model.parameters(), lr=learning_rate)

        model = model.to(self.device)
        
        self.val_scores, self.val_losses, = [], []
        
        for epoch in range(epochs):
            print("Processing epoch {}".format(epoch+1))
            if self.early_stopping() == False:

                model.train()

                train_acc = 0

                for step, batch in enumerate(self.train_iter): # Loop over mini-batches

                    optimizer.zero_grad()

                    text, lengths = batch.text

                    if self.output_dim ==  1:
                        predictions = model(text, lengths).squeeze()
                        criterion = nn.BCELogits().to(self.device)
                        loss = criterion(predictions, batch.labels.to(self.device, dtype=torch.float)) 
                        loss.backward() # Backpropagate loss
                        optimizer.step() # Update weights
                        train_acc += self.binary_accuracy(predictions, batch.labels).item()
                    else:
                        predictions = model(text, lengths)
                        criterion = nn.CrossEntropyLoss().to(self.device)
                        loss = criterion(predictions, batch.labels) 
                        loss.backward() # Backpropagate loss
                        optimizer.step() # Update weights
                        train_acc += self.batch_accuracy(predictions, batch.labels).item() 

                train_acc /= len(self.train_iter)
                print('Accuracy on train data:\t{:.4f}'.format(train_acc))
                model.eval() # Compute accuracy on validation data

                val_acc = 0

                with torch.no_grad():

                    batch_val_losses = []
                    for batch in self.dev_iter:

                        text, lengths = batch.text

                        if self.output_dim ==  1:
                            preds = model(text, lengths).squeeze()
                            criterion = nn.BCELogits().to(self.device)
                            loss = criterion(preds, batch.labels.to(self.device, dtype=torch.float)) 
                            batch_val_losses.append(loss.tolist())
                            val_acc += self.binary_accuracy(preds, batch.label).item()
                        else:
                            preds = model(text, lengths)
                            criterion = nn.CrossEntropyLoss().to(self.device)
                            loss = criterion(preds, batch.labels) 
                            batch_val_losses.append(loss.tolist())
                            val_acc += self.batch_accuracy(preds, batch.labels).item()
                        

                batch_loss = sum(batch_val_losses)/len(batch_val_losses)
                print("Batch dev loss: {}".format(batch_loss))
                self.val_losses.append(batch_loss)
                val_acc /= len(self.dev_iter)
                self.val_scores.append(val_acc)

                print('Accuracy on dev data:\t{:.4f}'.format(val_acc))
                
                if epoch == (epochs-1):
                    print("Training complete!")
                    return self.val_scores
                else:
                    continue
                
            else:
                print("Early stop.")
                print("Training complete!")
                return self.val_scores
    

    def early_stopping(self):
        """
        This function adds early stopping capability

        Determines whether or not the model will keep running based on the patience and delta given relative to the val loss
        """
        
        if len(self.val_losses) > self.early_stop_vals["patience"]:
            if self.val_losses[-1] <= np.mean(np.array(self.val_losses[-1-self.early_stop_vals["patience"]:-1])) - self.early_stop_vals["delta"]:
                return False
            else:
                return True
    
        else:
            return False

    def test(self, model):
        """
        This function performs a forward pass on the test data and computes the performance metrics

        model: instantiation of model
        """
        
        model = model.to(self.device)
        model.eval()

        labels = list()
        preds = list()

        with torch.no_grad():

            for batch in self.test_iter:

                text, lengths = batch.text
                output = model(text, lengths)

                preds.extend([p.item() for p in output.argmax(dim=1, keepdim=True)])
                labels.extend([l.item() for l in batch.labels])
        
        return self.metrics(labels, preds), labels, preds


    def batch_accuracy(self, predictions, labels):
        """
        Calculates mean batch accuracy for multiclass data
        
        predictions: model output after forward pass
        
        labels: list of one-hot encoded labels
        """
        
        max_predictions = predictions.argmax(dim=1, keepdim=True)
        correct = max_predictions.squeeze(1).eq(labels)
        
        return correct.sum() / torch.FloatTensor([labels.shape[0]])


    def binary_accuracy(self, predictions, labels):
        """
        Calculates mean batch accuracy for binary data

        predictions: model output after forward pass
          
        labels: list of one-hot encoded labels
        """
        rounded_preds = torch.round(self.sig(predictions))
        
        correct = (rounded_preds == labels).float() 
        acc = correct.sum() / len(correct)
        
        return acc
      
    
    def metrics(self, labels, preds):
        """
        Returns the Matthew's correlation coefficient, accuracy rate, true positive rate, true negative rate, false positive rate, false negative rate, precission, recall, and f1 score

        labels: list of correct labels

        pred: list of model predictions
        """

        mcc = matthews_corrcoef(labels, preds)
        acc = accuracy_score(labels, preds)
        cm = confusion_matrix(labels, preds)
        precision = precision_score(labels, preds, average= "weighted")
        recall = recall_score(labels, preds, average= "weighted")
        f1 = f1_score(labels, preds, average= "weighted")

        self.results = {
            "mcc": mcc,
            "acc": acc,
            "confusion_matrix": cm,
            "precision": precision,
            "recall": recall,
            "f1": f1
        }

        return self.results


    def save(self, model, output_directory, name, labels, preds):
        """
        This function saves the model to the given directory

        model: model to save

        output_directory: Folder to save file in

        name: name of files

        labels: list of ground truth values

        preds: list of predictions made by the model
        """

        if not os.path.exists(output_directory):
            os.makedirs(output_directory)

        file_name = os.path.join(output_directory, name)

        torch.save(model.state_dict(), file_name+"_model")

        training_dict = {"Val Accuracy": self.val_scores, "Val Loss": self.val_losses}
        np.save(file_name+"_train_results", training_dict)
        np.save(file_name+"_test_results", self.results)

        test_predictions = pd.DataFrame([labels, preds])
        test_predictions = test_predictions.T
        test_predictions = test_predictions.rename(columns={0: 'Labels', 1: 'Predictions'})
        test_predictions.to_csv(file_name+"_predictions")
      
        return print("Saving complete.")

## Run Model

In [0]:
#define hyperparameters
INPUT_DIM = len(TEXT.vocab)
PAD_ID = TEXT.vocab.stoi[TEXT.pad_token]
UNK_ID = TEXT.vocab.stoi[TEXT.unk_token]
N_EPOCHS = 50
EARLY_STOPPING = {"patience": 10, "delta": 0.01}
LEARNING_RATE = 0.001
OUTPUT_DIR = ""
SAVE_NAME = ""

In [0]:
# Initialize LSTM model
model = LSTMClassifier(train_set, dev_set, test_set, PAD_ID, INPUT_DIM)  

# Load pretrained vector
model.embedding.weight.data.copy_(TEXT.vocab.vectors) 

# Manually initialize UNK and PAD tokens as zero vectors (and NOT randomly as would be done otherwise)
model.embedding.weight.data[UNK_ID] = torch.zeros(300)
model.embedding.weight.data[PAD_ID] = torch.zeros(300)

#model test
model.trainer(model, LEARNING_RATE, EARLY_STOPPING, N_EPOCHS) 

#preds, labels = original_data_model.test(original_data_model)
results, labels, preds = model.test(model)
print(results)
model.save(model, OUTPUT_DIR, SAVE_NAME, labels, preds)

In [0]:
df_cm = pd.DataFrame(results.ravel()[0]["confusion_matrix"])

fig = plt.figure(figsize=(16, 12))
 
plt.rc('axes', labelsize=14)  
plt.rc('xtick', labelsize=12)   
plt.rc('ytick', labelsize=12)       

plt.subplot(2, 2, 1)
g1 = sns.heatmap(df_cm, annot=True, fmt='g', cmap="Greys")
g1.set_xlabel('Predicted Label')
g1.set_ylabel('True Label', rotation=0) 
g1.xaxis.set_ticklabels(["","", "", ""], rotation=0) 
g1.yaxis.set_ticklabels(["","", "", ""], rotation=0) 

plt.show()
#fig.savefig('dogwhistle_lstm_cm.png',bbox_inches='tight')