**Bidirectional LSTM - Sentiment Analysis**

In [1]:
import os
import random
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

from string import punctuation
from collections import Counter

import torch
from torch.utils.data import TensorDataset, DataLoader

from sklearn.model_selection import train_test_split
from keras.preprocessing.sequence import pad_sequences

Using TensorFlow backend.


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

**Import data**

In [3]:
data = pd.read_csv('.\processed_data\processed_data.csv',index_col='Unnamed: 0')
labels = pd.read_csv('.\processed_data\processed_labels.csv',index_col='Unnamed: 0')

data = data.rename(columns={"0": 'reviews'})
labels = labels.rename(columns={"0": 'sentiment'})

**Preprocess data, define collate function and data loader**

In [4]:
def preprocessed_data(data,labels):
    """
    Prepare data for the mode
    Standard preprocessing: lower case, remove punctuation
    Encoding: transform text to numeric representation
    Remove outliers
    Pad reviews to have the same length
    """
    # lower case and get rid of punctuation
    data['reviews'] = data['reviews'].apply(lambda x: x.lower())
    data['reviews'] = data['reviews'].apply(lambda x: ''.join([i for i in x if i not in punctuation]))
    
    # create a list of words
    list_words = [x.split() for x in data['reviews']]
    words = [word for l in list_words for word in l]
    
    # build word dictionary that maps words to integers
    # the most frequent words will have the smallest index
    counts = Counter(words)
    vocab = sorted(counts,key=counts.get,reverse=True)
    vocab_to_int = {word: ii for ii, word in enumerate(vocab, 1)}
    
    # use the built dictionary to encode each review in the data to numeric representation
    encoded_reviews = [[vocab_to_int[word] for word in l] for l in list_words]
    
    # encode labels to numeric representation
    encoded_labels = np.array([1 if x =='positive' else 0 for x in labels['sentiment'].values])
    
    # remove outlier (reviews that have zero length)
    review_indx_non_zero = [idx for idx,review in enumerate(encoded_reviews) if len(review) != 0]
    encoded_reviews = [encoded_reviews[idx] for idx in review_indx_non_zero]
    encoded_labels = [encoded_labels[idx] for idx in review_indx_non_zero]
    
    list_of_samples = [(torch.LongTensor(encoded_reviews[i]),encoded_labels[i]) for i in range(len(encoded_labels))]

    
    return len(vocab_to_int)+1,list_of_samples

In [5]:
from torch.nn.utils.rnn import pad_sequence
padding_value = 0
def collate(list_of_samples):
    """Merges a list of samples to form a mini-batch.

    Args:
      list_of_samples is a list of tuples (src_seq, tgt_seq):
          src_seq is of shape (src_seq_length,)
          tgt_seq is of shape (tgt_seq_length,)

    Returns:
      src_seqs of shape (max_src_seq_length, batch_size): Tensor of padded source sequences.
          The sequences should be sorted by length in a decreasing order, that is src_seqs[:,0] should be
          the longest sequence, and src_seqs[:,-1] should be the shortest.
      src_seq_lengths: List of lengths of source sequences.
      tgt_seqs of shape (1, batch_size): Tensor of target sequences.
    """
    # YOUR CODE HERE
    sorted_list = sorted(list_of_samples, key =  lambda x: len(x[0]),reverse=True)
    
    
    #src_seq = pad_sequence([sample[0] for sample in sorted_list],padding_value=padding_value)
    src_seq = pad_sequences([sample[0] for sample in sorted_list], 
                            maxlen=250, dtype="long", truncating="post", padding="post")



    src_seq_lengths = [len(sample[0]) if len(sample[0]) < 250 else 250 for sample in sorted_list]
    
    tgt_seqs = torch.from_numpy(np.array([x[1] for x in sorted_list]))

    
    return src_seq,src_seq_lengths,tgt_seqs

In [6]:
def prepare_data_loader(list_of_samples,train_size,test_size):
    
    train_idx=round(len(list_of_samples)*train_size)
    test_idx= round((len(list_of_samples) - train_idx)*test_size)

    
    train,remaining = list_of_samples[:train_idx],list_of_samples[train_idx:]
    

    test,val = remaining[:test_idx],remaining[test_idx:]
    
    train_loader = DataLoader(train, shuffle=True, batch_size=30,collate_fn=collate, pin_memory=True)
    test_loader = DataLoader(test, shuffle=True, batch_size=30,collate_fn=collate, pin_memory=True)
    val_loader = DataLoader(val, shuffle=True, batch_size=30,collate_fn=collate, pin_memory=True)   
    return train_loader, test_loader, val_loader    

**Define Bidirectional LSTM model**

In [8]:
import torch.nn as nn

class SentimentBidirectLSTM(nn.Module):
    """
    The Bidirectional Multilayers LSTM model that will be used to perform Sentiment analysis.
    
    vocab_size: vocabulary size (train? or train +test + val?)
    output_size: size of outputs. In this case, the label is either '1' or '0' so output_size=1
    embedding_dim: Number of columns in the embedding lookup table; size of our embeddings
    hidden_dim: Number of units in the hidden layers of our LSTM cells. Usually larger is better performance wise. 
                Common values are 128, 256, 512, etc.
                [forward_layer_0, backward_layer_0, forward_layer_1, backward_layer 1, ..., forward_layer_n, backward_layer n]
    n_layers: Number of LSTM layers in the network.
    """

    def __init__(self, vocab_size, output_size, embedding_dim, hidden_dim, n_layers,bidirectional=True, lstm_drop=0.5,dropout=0.3):
        """
        """
        super(SentimentBidirectLSTM, self).__init__()

        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim,
                            hidden_dim, 
                            num_layers=n_layers, 
                            bidirectional=bidirectional, 
                            dropout=lstm_drop,
                            batch_first=False)
    
        self.dropout = nn.Dropout(dropout)
        self.ln = nn.Linear(hidden_dim*2, output_size)
        
        self.sig = nn.Sigmoid()
        

    def forward(self, src_seq, seq_lengths):
        """
        """
        embeds = self.embedding(src_seq) # (max_seq_length, batch_size,embedding_dim)

        packed = pack_padded_sequence(embeds,seq_lengths) 
        out,(hidden, cell)  = self.lstm(packed) # hidden: (n_layers * num directions,batch_size, hidden_dim)

        # concat the last hidden layers of 2 directions
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)
        hidden = self.dropout(hidden)
        hidden = self.ln(hidden)
        hidden = self.sig(hidden)
        return hidden
    
#     def init_hidden(self, batch_size):
#         hidden=torch.zeros(self.n_layers*2, batch_size, self.hidden_dim)
#         return hidden

In [9]:
vocab_size, list_of_samples = preprocessed_data(data,labels)
train_loader, test_loader, val_loader  = prepare_data_loader(list_of_samples,train_size=0.8,test_size=0.5)
output_size = 1
embedding_dim = 400
hidden_dim = 256
n_layers = 2

net = SentimentBidirectLSTM(vocab_size, output_size, embedding_dim, hidden_dim, n_layers,bidirectional=True, lstm_drop=0.5,dropout=0.3)
net = net.to(device)
lr=0.001
criterion = nn.BCELoss().to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=lr)


**Define train, evaluation, and test functions**

In [10]:
def train_model(model,optimizer,criterion,train_loader):
    model.train()
    epoch_loss = 0
    epoch_acc = 0
    for i, data in enumerate(train_loader, 0):
        src_seq,src_seq_lengths,tgt_seqs= data
        src_seq = torch.LongTensor(src_seq).to(device)
        src_seq_lengths = torch.LongTensor(src_seq_lengths).to(device)
        tgt_seqs=tgt_seqs.unsqueeze(dim=1)
        tgt_seqs = torch.LongTensor(tgt_seqs).to(device)

        optimizer.zero_grad()

        preds = model(src_seq.T, src_seq_lengths).to(device)
        loss = criterion(preds,tgt_seqs.float()).to(device)
        acc = torch.sum(torch.round(preds) == tgt_seqs)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()
    return epoch_loss/len(train_loader), epoch_acc/(len(list_of_samples)*0.8)


In [11]:
def evaluation_model(model,optimizer,criterion,val_loader):
    model.eval()
    with torch.no_grad():
        epoch_loss = 0
        epoch_acc = 0
        for i, data in enumerate(val_loader, 0):
            src_seq,src_seq_lengths,tgt_seqs= data
            src_seq = torch.LongTensor(src_seq).to(device)
            src_seq_lengths = torch.LongTensor(src_seq_lengths).to(device)
            tgt_seqs=tgt_seqs.unsqueeze(dim=1)
            tgt_seqs = torch.LongTensor(tgt_seqs).to(device)

            preds = model.forward(src_seq.T, src_seq_lengths).to(device)
            loss = criterion(preds,tgt_seqs.float()).to(device)
            acc = torch.sum(torch.round(preds) == tgt_seqs)
            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss/len(val_loader), epoch_acc/(len(list_of_samples)*0.2*0.5)



In [18]:
def test_model(model,optimizer,criterion,test_loader):
    the_model.eval()
    with torch.no_grad():
        epoch_loss = 0
        epoch_acc = 0
        for i, data in enumerate(test_loader, 0):
            src_seq,src_seq_lengths,tgt_seqs= data
            src_seq = torch.LongTensor(src_seq).to(device)
            src_seq_lengths = torch.LongTensor(src_seq_lengths).to(device)
            tgt_seqs=tgt_seqs.unsqueeze(dim=1)
            tgt_seqs = torch.LongTensor(tgt_seqs).to(device)

            preds = model.forward(src_seq.T, src_seq_lengths).to(device)
            loss = criterion(preds,tgt_seqs.float()).to(device)
            acc = torch.sum(torch.round(preds) == tgt_seqs)
            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return  epoch_loss/len(test_loader), epoch_acc/(len(list_of_samples)*0.2*0.5)

**Train model**

In [12]:
n_epochs = 5

for epoch in range(n_epochs):
    
    train_loss, train_acc = train_model(net,optimizer,criterion,train_loader)
    val_loss, val_acc = evaluation_model(net,optimizer,criterion,val_loader)
    

    
    print(f'Epoch: {epoch+1} Train Loss: {train_loss:.3f} Train Acc: {train_acc*100:.2f}% Val Loss: {val_loss:.3f} Val Acc: {val_acc*100:.2f}%')

Epoch: 1 Train Loss: 0.486 Train Acc: 75.81% Val Loss: 0.324 Val Acc: 86.18%
Epoch: 2 Train Loss: 0.262 Train Acc: 89.50% Val Loss: 0.271 Val Acc: 88.74%
Epoch: 3 Train Loss: 0.147 Train Acc: 94.68% Val Loss: 0.292 Val Acc: 88.82%
Epoch: 4 Train Loss: 0.078 Train Acc: 97.32% Val Loss: 0.331 Val Acc: 88.34%
Epoch: 5 Train Loss: 0.033 Train Acc: 98.94% Val Loss: 0.500 Val Acc: 87.14%


**Save trained parameters**

In [14]:
PATH = "trained_LSTM.pt"
torch.save(net.state_dict(),PATH)

**Upload trained parameters and to test the model performance on test data**

In [17]:
the_model =  SentimentBidirectLSTM(vocab_size, output_size, embedding_dim, hidden_dim, n_layers,bidirectional=True, lstm_drop=0.5,dropout=0.3)
the_model.load_state_dict(torch.load("trained_LSTM.pt"))

<All keys matched successfully>

In [20]:
test_loss, test_acc = test_model(net,optimizer,criterion,test_loader)
print(f'Test Loss: {test_loss:.3f} Test Acc: {test_acc*100:.2f}%')

Test Loss: 0.521 Test Acc: 87.02%
