In [None]:
import os
import time
import numpy as np
from tqdm import tqdm
from string import punctuation
from collections import Counter
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(123)

<torch._C.Generator at 0x110db2b50>

In [None]:
import random
from torchtext import (data, datasets)
# for recent version of torchtext
# from torchtext.legacy import (data, datasets) 

In [None]:
TEXT_FIELD = data.Field(tokenize = data.get_tokenizer("basic_english"), include_lengths = True)
LABEL_FIELD = data.LabelField(dtype = torch.float)

train_dataset, test_dataset = datasets.IMDB.splits(TEXT_FIELD, LABEL_FIELD)
train_dataset, valid_dataset = train_dataset.split(random_state = random.seed(123))

In [None]:
MAX_VOCABULARY_SIZE = 25000

TEXT_FIELD.build_vocab(train_dataset, 
                 max_size = MAX_VOCABULARY_SIZE)

LABEL_FIELD.build_vocab(train_dataset)

In [None]:
B_SIZE = 64

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_data_iterator, valid_data_iterator, test_data_iterator = data.BucketIterator.splits(
    (train_dataset, valid_dataset, test_dataset), 
    batch_size = B_SIZE,
    sort_within_batch = True,
    device = device)

In [None]:
## If you are training using GPUs, we need to use the following function for the pack_padded_sequence method to work 
## (reference : https://discuss.pytorch.org/t/error-with-lengths-in-pack-padded-sequence/35517/3)
if torch.cuda.is_available():
    torch.set_default_tensor_type(torch.cuda.FloatTensor)
from torch.nn.utils.rnn import pack_padded_sequence, PackedSequence

def cuda_pack_padded_sequence(input, lengths, batch_first=False, enforce_sorted=True):
    lengths = torch.as_tensor(lengths, dtype=torch.int64)
    lengths = lengths.cpu()
    if enforce_sorted:
      sorted_indices = None
    else:
      lengths, sorted_indices = torch.sort(lengths, descending=True)
      sorted_indices = sorted_indices.to(input.device)
      batch_dim = 0 if batch_first else 1
      input = input.index_select(batch_dim, sorted_indices)

    data, batch_sizes = \
    torch._C._VariableFunctions._pack_padded_sequence(input, lengths, batch_first)
    return PackedSequence(data, batch_sizes, sorted_indices)

In [None]:
class LSTM(nn.Module):
    def __init__(self, vocabulary_size, embedding_dimension, hidden_dimension, output_dimension, dropout, pad_index):
        super().__init__()
        self.embedding_layer = nn.Embedding(vocabulary_size, embedding_dimension, padding_idx = pad_index)
        self.lstm_layer = nn.LSTM(embedding_dimension, 
                           hidden_dimension, 
                           num_layers=1, 
                           bidirectional=True, 
                           dropout=dropout)
        self.fc_layer = nn.Linear(hidden_dimension * 2, output_dimension)
        self.dropout_layer = nn.Dropout(dropout)
        
    def forward(self, sequence, sequence_lengths=None):
        if sequence_lengths is None:
            sequence_lengths = torch.LongTensor([len(sequence)])
        
        # sequence := (sequence_length, batch_size)
        embedded_output = self.dropout_layer(self.embedding_layer(sequence))
        
        
        # embedded_output := (sequence_length, batch_size, embedding_dimension)
        if torch.cuda.is_available():
            packed_embedded_output = cuda_pack_padded_sequence(embedded_output, sequence_lengths)
        else:
            packed_embedded_output = nn.utils.rnn.pack_padded_sequence(embedded_output, sequence_lengths)
        
        packed_output, (hidden_state, cell_state) = self.lstm_layer(packed_embedded_output)
        # hidden_state := (num_layers * num_directions, batch_size, hidden_dimension)
        # cell_state := (num_layers * num_directions, batch_size, hidden_dimension)
        
        op, op_lengths = nn.utils.rnn.pad_packed_sequence(packed_output)
        # op := (sequence_length, batch_size, hidden_dimension * num_directions)
        
        hidden_output = torch.cat((hidden_state[-2,:,:], hidden_state[-1,:,:]), dim = 1)        
        # hidden_output := (batch_size, hidden_dimension * num_directions)
        
        return self.fc_layer(hidden_output)

    
INPUT_DIMENSION = len(TEXT_FIELD.vocab)
EMBEDDING_DIMENSION = 100
HIDDEN_DIMENSION = 32
OUTPUT_DIMENSION = 1
DROPOUT = 0.5
PAD_INDEX = TEXT_FIELD.vocab.stoi[TEXT_FIELD.pad_token]

lstm_model = LSTM(INPUT_DIMENSION, 
            EMBEDDING_DIMENSION, 
            HIDDEN_DIMENSION, 
            OUTPUT_DIMENSION, 
            DROPOUT, 
            PAD_INDEX)

  "num_layers={}".format(dropout, num_layers))


In [None]:
UNK_INDEX = TEXT_FIELD.vocab.stoi[TEXT_FIELD.unk_token]

lstm_model.embedding_layer.weight.data[UNK_INDEX] = torch.zeros(EMBEDDING_DIMENSION)
lstm_model.embedding_layer.weight.data[PAD_INDEX] = torch.zeros(EMBEDDING_DIMENSION)

In [None]:
optim = torch.optim.Adam(lstm_model.parameters())
loss_func = nn.BCEWithLogitsLoss()

lstm_model = lstm_model.to(device)
loss_func = loss_func.to(device)

In [None]:
def accuracy_metric(predictions, ground_truth):
    """
    Returns 0-1 accuracy for the given set of predictions and ground truth
    """
    # round predictions to either 0 or 1
    rounded_predictions = torch.round(torch.sigmoid(predictions))
    success = (rounded_predictions == ground_truth).float() #convert into float for division 
    accuracy = success.sum() / len(success)
    return accuracy

In [None]:
def train(model, data_iterator, optim, loss_func):
    loss = 0
    accuracy = 0
    model.train()
    
    for curr_batch in data_iterator:
        optim.zero_grad()
        sequence, sequence_lengths = curr_batch.text
        preds = lstm_model(sequence, sequence_lengths).squeeze(1)
        
        loss_curr = loss_func(preds, curr_batch.label)
        accuracy_curr = accuracy_metric(preds, curr_batch.label)
        
        loss_curr.backward()
        optim.step()
        
        loss += loss_curr.item()
        accuracy += accuracy_curr.item()
        
    return loss/len(data_iterator), accuracy/len(data_iterator)

In [None]:
def validate(model, data_iterator, loss_func):
    loss = 0
    accuracy = 0
    model.eval()
    
    with torch.no_grad():
        for curr_batch in data_iterator:
            sequence, sequence_lengths = curr_batch.text
            preds = model(sequence, sequence_lengths).squeeze(1)
            
            loss_curr = loss_func(preds, curr_batch.label)
            accuracy_curr = accuracy_metric(preds, curr_batch.label)

            loss += loss_curr.item()
            accuracy += accuracy_curr.item()
        
    return loss/len(data_iterator), accuracy/len(data_iterator)

In [None]:
num_epochs = 10
best_validation_loss = float('inf')

for ep in range(num_epochs):

    time_start = time.time()
    
    training_loss, train_accuracy = train(lstm_model, train_data_iterator, optim, loss_func)
    validation_loss, validation_accuracy = validate(lstm_model, valid_data_iterator, loss_func)
    
    time_end = time.time()
    time_delta = time_end - time_start 
    
    if validation_loss < best_validation_loss:
        best_validation_loss = validation_loss
        torch.save(lstm_model.state_dict(), 'lstm_model.pt')
    
    print(f'epoch number: {ep+1} | time elapsed: {time_delta}s')
    print(f'training loss: {training_loss:.3f} | training accuracy: {train_accuracy*100:.2f}%')
    print(f'validation loss: {validation_loss:.3f} |  validation accuracy: {validation_accuracy*100:.2f}%')
    print()

In [None]:
lstm_model.load_state_dict(torch.load('../../mastering_pytorch_packt/04_deep_recurrent_net_architectures/lstm_model.pt'))

test_loss, test_accuracy = validate(lstm_model, test_data_iterator, loss_func)

print(f'test loss: {test_loss:.3f} | test accuracy: {test_accuracy*100:.2f}%')

test loss: 0.585 | test accuracy: 76.19%


In [None]:
def sentiment_inference(model, sentence):
    model.eval()
    
    # text transformations
    tokenized = data.get_tokenizer("basic_english")(sentence)
    tokenized = [TEXT_FIELD.vocab.stoi[t] for t in tokenized]
    
    # model inference
    model_input = torch.LongTensor(tokenized).to(device)
    model_input = model_input.unsqueeze(1)
    
    pred = torch.sigmoid(model(model_input))
    
    return pred.item()

In [None]:
print(sentiment_inference(lstm_model, "This film is horrible"))
print(sentiment_inference(lstm_model, "Director tried too hard but this film is bad"))
print(sentiment_inference(lstm_model, "Decent movie, although could be shorter"))
print(sentiment_inference(lstm_model, "This film will be houseful for weeks"))
print(sentiment_inference(lstm_model, "I loved the movie, every part of it"))

0.06318538635969162
0.015872443094849586
0.37745001912117004
0.8425034284591675
0.9304025769233704


In [None]:
from captum.attr import LayerIntegratedGradients, TokenReferenceBase, visualization

In [None]:
import spacy
nlp = spacy.load('en')

In [None]:
lig = LayerIntegratedGradients(lstm_model, lstm_model.embedding_layer)

In [None]:
def forward_with_sigmoid(input, l):
    return torch.sigmoid(lstm_model(input, l))

In [None]:
token_reference = TokenReferenceBase(reference_token_idx=PAD_INDEX)

In [None]:
# accumalate couple samples in this array for visualization purposes
vis_data_records_ig = []

def interpret_sentence(model, sentence, min_len = 7, label = 0):
    # text transformations
    tokenized = data.get_tokenizer("basic_english")(sentence)
    tokenized = [TEXT_FIELD.vocab.stoi[t] for t in tokenized]
    
    # model inference
    model_input = torch.LongTensor(tokenized).to(device)
    model_input = model_input.unsqueeze(1)
    length_input = torch.LongTensor([len(tokenized)])
    pred = torch.sigmoid(model(model_input, length_input))

    # generate reference indices for each sample
    reference_indices = token_reference.generate_reference(len(tokenized), device=device).unsqueeze(0)
    
    
    print(model_input.shape)
    print(reference_indices)
    # compute attributions and approximation delta using layer integrated gradients
    attributions_ig, delta = lig.attribute(model_input,
                                           reference_indices.reshape(model_input.shape[1], model_input.shape[0]), 
                                           n_steps=500, return_convergence_delta=True)

    print('pred: ', Label.vocab.itos[pred_ind], '(', '%.2f'%pred, ')', ', delta: ', abs(delta))

    add_attributions_to_visualizer(attributions_ig, text, pred, pred_ind, label, delta, vis_data_records_ig)
    
def add_attributions_to_visualizer(attributions, text, pred, pred_ind, label, delta, vis_data_records):
    attributions = attributions.sum(dim=2).squeeze(0)
    attributions = attributions / torch.norm(attributions)
    attributions = attributions.cpu().detach().numpy()

    # storing couple samples in an array for visualization purposes
    vis_data_records.append(visualization.VisualizationDataRecord(
                            attributions,
                            pred,
                            Label.vocab.itos[pred_ind],
                            Label.vocab.itos[label],
                            Label.vocab.itos[1],
                            attributions.sum(),       
                            text,
                            delta))

In [None]:
interpret_sentence(lstm_model, 'It was a fantastic performance !', label=1)
interpret_sentence(lstm_model, 'Best film ever', label=1)
interpret_sentence(lstm_model, 'Such a great show!', label=1)
interpret_sentence(lstm_model, 'It was a horrible movie', label=0)
interpret_sentence(lstm_model, 'I\'ve never watched something as bad', label=0)
interpret_sentence(lstm_model, 'It is a disgusting movie!', label=0)

torch.Size([6, 1])
tensor([[1, 1, 1, 1, 1, 1]])


RuntimeError: Expected `len(lengths)` to be equal to batch_size, but got 1 (batch_size=6)