In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount = True)


Mounted at /content/gdrive


In [None]:

import torch

import random
import numpy as np

SEED = 1234

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

Transformers package is installed

In [None]:
!pip install transformers



The **bert base uncased** model will be used for our purpose.

In [None]:

from transformers import BertTokenizer

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
len(tokenizer.vocab)

30522

In [None]:
tokens = tokenizer.tokenize('Hello WORLD how ARE yoU?')

print(tokens)

['hello', 'world', 'how', 'are', 'you', '?']


In [None]:
indexes = tokenizer.convert_tokens_to_ids(tokens)

print(indexes)

[7592, 2088, 2129, 2024, 2017, 1029]


In [None]:
init_token = tokenizer.cls_token
eos_token = tokenizer.sep_token
pad_token = tokenizer.pad_token
unk_token = tokenizer.unk_token

print(init_token, eos_token, pad_token, unk_token)

[CLS] [SEP] [PAD] [UNK]


In [None]:
init_token_idx = tokenizer.convert_tokens_to_ids(init_token)
eos_token_idx = tokenizer.convert_tokens_to_ids(eos_token)
pad_token_idx = tokenizer.convert_tokens_to_ids(pad_token)
unk_token_idx = tokenizer.convert_tokens_to_ids(unk_token)

print(init_token_idx, eos_token_idx, pad_token_idx, unk_token_idx)

101 102 0 100


In [None]:

max_input_length = tokenizer.max_model_input_sizes['bert-base-uncased']

print(max_input_length)

512


The **Berttokenizer** is used to tokenize each sentence. Two tokens are reserved for the start and end token appended to by BERT

In [None]:

def tokenize_and_cut(sentence):
    tokens = tokenizer.tokenize(sentence) 
    tokens = tokens[:max_input_length-2]
    return tokens

We dont need vocabulary in this case as we will use the embedding provided by BERT. The tokens are converted to their ids by BERT.

In [None]:
from torchtext import data

TEXT = data.Field(batch_first = True,
                  use_vocab = False,
                  tokenize = tokenize_and_cut,
                  preprocessing = tokenizer.convert_tokens_to_ids,
                  init_token = init_token_idx,
                  eos_token = eos_token_idx,
                  pad_token = pad_token_idx,
                  unk_token = unk_token_idx)

LABEL = data.LabelField(dtype = torch.long)

In [None]:
dataset_path = 'gdrive/My Drive/Projects/Pytorch/Sentiment_analysis/'

In [None]:
train_data, valid_data, test_data = data.TabularDataset.splits(
    path=dataset_path, train="sst_train.csv", 
    validation="sst_dev.csv", test="sst_test.csv",format="csv", skip_header=True, 
    fields=[('text', TEXT), ('truth', LABEL)]
)

In [None]:
print(f'Number of training examples: {len(train_data)}')
print(f'Number of valid examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

Number of training examples: 8544
Number of valid examples: 1101
Number of testing examples: 2210


In [None]:
LABEL.build_vocab(train_data)

In [None]:
print(vars(train_data.examples[6]))

{'text': [2074, 1996, 4428, 2920, 1999, 4526, 1996, 21323, 4138, 2791, 1997, 1996, 13425, 1999, 2023, 9610, 10464, 28817, 3217, 1997, 12013, 1998, 2422, 2003, 26137, 1012], 'truth': '5'}


In [None]:
tokens = tokenizer.convert_ids_to_tokens(vars(train_data.examples[6])['text'])

print(tokens)

['just', 'the', 'labour', 'involved', 'in', 'creating', 'the', 'layered', 'rich', '##ness', 'of', 'the', 'imagery', 'in', 'this', 'chi', '##aro', '##scu', '##ro', 'of', 'madness', 'and', 'light', 'is', 'astonishing', '.']


In [None]:
print(LABEL.vocab.stoi)

defaultdict(<function _default_unk_index at 0x7fac583c9a60>, {'4': 0, '2': 1, '3': 2, '5': 3, '1': 4})


The batch size of 8 was found to be the best for the results.

In [None]:
BATCH_SIZE = 8

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# keep in mind the sort_key option 
train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), sort_key=lambda x: len(x.text),
    batch_size=BATCH_SIZE,
    sort_within_batch=True,
    device=device)
LABEL.vocab.freqs

Counter({'1': 1092, '2': 2218, '3': 1624, '4': 2322, '5': 1288})

The **bert model** is loaded and the **output_hidden_states flag is set to True** as we will use the last 4 hidden layer outputs

In [None]:
from transformers import BertTokenizer, BertModel

bert = BertModel.from_pretrained('bert-base-uncased', output_hidden_states=True)

The model is created. The features of this model are as follows:


1.   **Embedding from BERT** is directly used (though made **non-trainable**)
2.   The **pooled output** provided by BERT is **not used**. Instead the **hidden states are accessed and passed through a bidirectional GRU**




In [None]:
import torch.nn as nn

class BERTGRUSentiment(nn.Module):
    def __init__(self,
                 bert,
                 hidden_dim,
                 output_dim,
                 n_layers,
                 bidirectional,
                 dropout):
        
        super().__init__()
        
        self.bert = bert
        
        embedding_dim = bert.config.to_dict()['hidden_size']
        
        self.rnn = nn.GRU(embedding_dim*4,
                          hidden_dim,
                          num_layers = n_layers,
                          bidirectional = bidirectional,
                          batch_first = True,
                          dropout = 0 if n_layers < 2 else dropout)
        
        self.out = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):
        
        #text = [batch size, sent len]
                
        with torch.no_grad():
            outputs = self.bert(text)[2]
        l_1, l_2, l_3, l_4 = outputs[-1], outputs[-2], outputs[-3], outputs[-4]
        #print(l_1.shape,l_2.shape, l_3.shape,l_4.shape)
        #embedded = [batch size, sent len, emb dim]
        embedded = torch.cat((l_1, l_2, l_3, l_4), axis=2)
        #print(embedded.shape)
        _, hidden = self.rnn(embedded)
        
        #hidden = [n layers * n directions, batch size, emb dim]
        #print(hidden.shape)
        if self.rnn.bidirectional:
            hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))
        else:
            hidden = self.dropout(hidden[-1,:,:])
                
        #hidden = [batch size, hid dim]
        
        output = self.out(hidden)
        
        #output = [batch size, out dim]
        
        return output

The best set of hyperparameters

In [None]:
HIDDEN_DIM = 256
OUTPUT_DIM = 5
N_LAYERS = 2
BIDIRECTIONAL = True
DROPOUT = 0.2

model = BERTGRUSentiment(bert,
                         HIDDEN_DIM,
                         OUTPUT_DIM,
                         N_LAYERS,
                         BIDIRECTIONAL,
                         DROPOUT)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 6,300,165 trainable parameters


In [None]:
for name, param in model.named_parameters():                
    if name.startswith('bert'):
        param.requires_grad = False

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 6,300,165 trainable parameters


In [None]:

import torch.optim as optim



In [None]:
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()
model = model.to(device)
criterion = criterion.to(device)

In [None]:
import torch.nn.functional as F
def binary_accuracy(preds1, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    preds, ind= torch.max(F.softmax(preds1),1)
    correct = (ind == y).float()
    acc = correct.sum()/float(len(correct))
    return acc

In [None]:

def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        predictions = model(batch.text).squeeze(1)
        
        loss = criterion(predictions, batch.truth)
        
        acc = binary_accuracy(predictions, batch.truth)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            predictions = model(batch.text).squeeze(1)
            
            loss = criterion(predictions, batch.truth)
            
            acc = binary_accuracy(predictions, batch.truth)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
N_EPOCHS = 5

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    
    
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
        
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'bert-model.pt')
    
    
    
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

  


	Train Loss: 1.280 | Train Acc: 42.88%
	 Val. Loss: 1.186 |  Val. Acc: 46.25%
	Train Loss: 1.121 | Train Acc: 50.74%
	 Val. Loss: 1.165 |  Val. Acc: 49.15%
	Train Loss: 1.013 | Train Acc: 56.44%
	 Val. Loss: 1.170 |  Val. Acc: 49.82%
	Train Loss: 0.887 | Train Acc: 62.30%
	 Val. Loss: 1.345 |  Val. Acc: 44.47%
	Train Loss: 0.724 | Train Acc: 70.85%
	 Val. Loss: 1.332 |  Val. Acc: 47.01%


The test accuracy is **50.86%**

In [None]:
model.load_state_dict(torch.load('bert-model.pt'))

test_loss, test_acc = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

  


Test Loss: 1.107 | Test Acc: 50.86%
