**Sentiment Analysis**

---

Most codes, explanations, and figures are from (Ben Trevett/ Pytorch Sentiment Analysis)

https://github.com/bentrevett/pytorch-sentiment-analysis/blob/master/1%20-%20Simple%20Sentiment%20Analysis.ipynb


Mount your google drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!ls

Import libraries

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

import os
from torchtext.legacy import data, datasets
import random
import json

Define sentiment analysis model

In [None]:
class RNN(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim,\
                dropout, rnn_type, bidirectional, n_layers):
        super().__init__()
        self.rnn_type = rnn_type
        self.bidir = bidirectional
        if self.bidir == True:
            bi_coeff = 2
        else:
            bi_coeff = 1

        self.embedding = nn.Embedding(input_dim, embedding_dim)
        if rnn_type == 'rnn':
            self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers=n_layers, \
                              bidirectional=bidirectional)
        elif rnn_type == 'lstm':
            self.rnn = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers,\
                               bidirectional=bidirectional)
        
        self.fc = nn.Linear(bi_coeff*hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

        self.criterion = nn.CrossEntropyLoss()
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, text, label):
        # text (Timeseq, Batch)
        Tx, Bn = text.size()
        embedded = self.dropout(self.embedding(text))
        # embedded (Timeseq, Batch, emb_dim)

        if self.rnn_type == 'rnn':
            output, hidden = self.rnn(embedded)
        elif self.rnn_type == 'lstm':
            output, (hidden, cell) = self.rnn(embedded)
        
        if self.bidir == True:
            hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        else:
            hidden = self.dropout(hidden[-1,:,:])
        # hidden (Batch, hidden_dim*direction)
        
        logit = self.fc(hidden)
        # logit (Batch, output_dim)

        if label is not None:
            loss = self.criterion(logit, label)
        else:
            loss = 0
        probs = self.softmax(logit)
        # probs (Batch, output_dim)

        return loss, probs

In [None]:
def train(model, device, train_loader, optimizer, epoch):
    model.train()
    
    correct = 0
    den = 0
    train_loss = 0
    for batch_idx, (batch) in enumerate(train_loader):
        den += 1
        text = batch.text.to(device)
        label = batch.label.to(device)
              
        optimizer.zero_grad()
        loss, probs = model(text, label)
        loss.backward()
        optimizer.step()

        pred = torch.argmax(probs, dim=1)
        correct += (pred == label).float().sum().item()
        train_loss += loss.item()
  
        Tx, Bn = text.size()
        
    acc = correct / len(train_loader.dataset)
    train_loss /= den
    print('Epoch {} Train: Loss: {:.6f} \tAcc.: {:.6f}'.format(
                epoch, train_loss, acc))   
    
            
def test(model, device, test_loader, epoch):
    model.eval()
    test_loss = 0
    correct = 0
    den = 0
    with torch.no_grad():
        for batch in test_loader:
            den += 1
            text = batch.text.to(device)
            label = batch.label.to(device)
            
            loss, probs = model(text, label)
            pred = torch.argmax(probs, dim=1)

            test_loss += loss.item()
            
            correct += (pred == label).float().sum()

    test_loss /= den
    acc = correct / len(test_loader.dataset)

    print('Epoch {} Test : Loss: {:.6f} \tAcc.: {:.6f}\n'.format(
        epoch, test_loss, acc))
    return acc

Download the IMDB dataset and make data iterators, vocabulary sets

In [None]:
# Tokenizing whole dataset once and save it!
TEXT = data.Field(tokenize='spacy')
LABEL = data.LabelField()

print("Downloading and tokenizing")
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)

print("Make dataset")
train_examples = [vars(t) for t in train_data]
test_examples = [vars(t) for t in test_data]

print("Storing")
if not os.path.exists('./drive/My Drive/public/data/imdb'):
    os.mkdir('./drive/My Drive/public/data/imdb')

with open('./drive/My Drive/public/data/imdb/sentiment_train.json', 'w+') as f:
    for example in train_examples:
        json.dump(example, f)
        f.write('\n')
        
with open('./drive/My Drive/public/data/imdb/sentiment_test.json', 'w+') as f:
    for example in test_examples:
        json.dump(example, f)
        f.write('\n')

print("Done")

In [None]:
TEXT = data.Field()
LABEL = data.LabelField()

fields = {'text': ('text', TEXT), 'label': ('label', LABEL)}

train_data, test_data = data.TabularDataset.splits(
    path = './drive/My Drive/public/data/imdb',
    train = 'sentiment_train.json',
    test = 'sentiment_test.json',
    format = 'json',
    fields = fields
)

print("Splitting for train and validation")
train_data, valid_data = train_data.split(random_state = random.seed(1234))

MAX_VOCAB_SIZE = 25_000

print("Generating vocabulary sets")
TEXT.build_vocab(train_data, 
                 max_size = MAX_VOCAB_SIZE, 
                 vectors = "glove.6B.100d", 
                 unk_init = torch.Tensor.normal_)
LABEL.build_vocab(train_data)

print("Done")

Check the dataset and vocabulary

In [None]:
print("Number of training data : ", len(train_data))
print("Number of validation data : ", len(valid_data))
print("Number of test data : ", len(test_data))

print("Number of unique tokens of data vocab. : ", len(TEXT.vocab))
print("Number of unique tokens of label vocab. : ", len(LABEL.vocab))

print("One of the training example")
print(vars(train_data.examples[0]))

print("Vocabulary set")
print(TEXT.vocab.itos[:10])
print(LABEL.vocab.stoi)

Set your hyperparameters

In [None]:
BATCH_SIZE = 64

INPUT_DIM = len(TEXT.vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 2
DROPOUT = 0.5

LR = 0.001
OPTIMIZER = 'Adam'
MAX_EPOCH = 5

RNN_TYPE = 'rnn'
PRETRAIN_EMBEDDING = 0
BIDIRECTIONAL = False
N_LAYERS = 1

Training

In [None]:
### Training
# Check the device
cuda = torch.cuda.is_available()
device = torch.device("cuda" if cuda else "cpu")

# Datasets
train_loader = data.BucketIterator(train_data, batch_size = BATCH_SIZE)
valid_loader = data.BucketIterator(valid_data, batch_size = BATCH_SIZE)
test_loader = data.BucketIterator(test_data, batch_size = BATCH_SIZE)

# build my model
model = RNN(INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, DROPOUT, RNN_TYPE, BIDIRECTIONAL, N_LAYERS)
if PRETRAIN_EMBEDDING == 1:
    pretrained_embeddings = TEXT.vocab.vectors
    model.embedding.weight.data.copy_(pretrained_embeddings)
model.to(device)

# build the optimizer
if OPTIMIZER == 'RMSprop':
    opt = optim.RMSprop(model.parameters(), lr=LR)
elif OPTIMIZER == 'Adam':
    opt = optim.Adam(model.parameters(), lr=LR)
elif OPTIMIZER == 'Adadelta':
    opt = optim.Adadelta(model.parameters(), lr=LR)
else:
    opt = optim.SGD(model.parameters(), lr=LR)

# Training..
print("Training Start!")
best_acc = 0
for epoch in range(MAX_EPOCH):
    train(model, device, train_loader, opt, epoch)
    valid_acc = test(model, device, valid_loader, epoch)

    if best_acc < valid_acc:
        print("We found the best model!\n")
        best_acc = valid_acc
        save_dir = 'drive/My Drive/public/results/sentiment_analysis_model_best.pth'
        if os.path.exists(save_dir):
            os.remove(save_dir)
        torch.save(model, save_dir)
    
print("Training is done!!")

Test model on your own sentence

In [None]:
import spacy
nlp = spacy.load('en')

def predict_sentiment(model, sentence):
    model.eval()
    tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]
    tensor = torch.LongTensor(indexed).to(device)
    tensor = tensor.unsqueeze(1)
    _, probs = model(tensor, None)

    return probs

In [None]:
probs = predict_sentiment(model, "This film is Wonderful")
print(probs) #[neg, pos]

  **The End!!**