**BERT MODEL TRAINED USING A RNN NETWORK**

Peter Lau - 
Nov 2019


In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
pip install torch



Setting Up the Environment

In [0]:
pip install transformers



In [0]:
import torch
import numpy as np
import random
import torch.nn as nn
import torch.optim as optimizer
from transformers import BertTokenizer, BertModel
from torchtext import datasets

In [0]:
SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [0]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
theModel = BertModel.from_pretrained('bert-base-uncased').to('cuda')

init_token_idx = tokenizer.cls_token_id
eos_token_idx = tokenizer.sep_token_id
pad_token_idx = tokenizer.pad_token_id
unk_token_idx = tokenizer.unk_token_id
max_input_length = 512

In [0]:
def tokenize(sentence):
    tokens = tokenizer.tokenize(sentence) 
    tokens = tokens[:max_input_length-2]
    return tokens

In [0]:
pip install torchtext



In [0]:
from torchtext import data

TEXT = data.Field(batch_first = True, #requirement of bert
                  use_vocab = False, #bert deals with the vocab
                  tokenize = tokenize, #tokenize the word
                  preprocessing = tokenizer.convert_tokens_to_ids,
                  init_token = init_token_idx,
                  eos_token = eos_token_idx,
                  pad_token = pad_token_idx,
                  unk_token = unk_token_idx)

LABEL = data.LabelField(dtype = torch.float)


In [0]:
from torchtext import datasets

train, test = datasets.IMDB.splits(TEXT, LABEL) #test train split
train, valid = train.split(random_state = random.seed(SEED))
LABEL.build_vocab(train,)

Building the NN


In [0]:
batchSize = 32 #batchsize 

train_iterator, valid_iterator, test_iterator = data.BucketIterator.splits(
    (train, valid, test), 
    batch_size = batchSize, 
    device = 'cuda') #???

In [0]:
class NN(nn.Module):
    def __init__(self,bert,hidden_dim,output_dim,n_layers,bidirectional,dropout):

        super().__init__()
        
        self.bert = bert
        
        embedding_dim = bert.config.to_dict()['hidden_size']
        
        self.rnn = nn.GRU(embedding_dim,
                          hidden_dim,
                          num_layers = n_layers,
                          bidirectional = bidirectional,
                          batch_first = True,
                          dropout = 0 if n_layers < 2 else dropout)
        
        self.out = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):
                
        with torch.no_grad():
            embedded = theModel(text)[0]
        
        _, hidden = self.rnn(embedded)
    
        if self.rnn.bidirectional:
            hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))
        else:
            hidden = self.dropout(hidden[-1,:,:])
                
        #hidden = [batch size, hid dim]
        
        output = self.out(hidden)
    
        return output

In [0]:
#Getting an instance of the model and disabling paramaters to make training faster

trained = NN(theModel,256,1,2,True,0.25)

for name, param in trained.named_parameters():                
    if name.startswith('bert'):
        param.requires_grad = False

Running the NN

In [0]:
criterion = nn.BCEWithLogitsLoss().to('cuda') #setup loss criterion
trained = trained.to('cuda')
optimizer = optimizer.Adam(trained.parameters()) #Adam optimizer -> Used for everything:)
numberOfRuns = 6
bestLoss = 0.0

In [0]:
#Function to test error & findout test Accuracy
def TestAccuracy(predicted, label):
    prediction = torch.round(torch.sigmoid(predicted)) #round predicted value to integer
    correctNum = (predicted == label).float() #convert into float for division 
    predictionAccuracy = correctNum.sum() / len(correctNum) #divide correct by total
    return predictionAccuracy

def evaluateModel(model, iterator, criterion):
    epoch_loss = 0
    epoch_acc = 0
    
    trained.eval()
    
    with torch.no_grad():
    
        for batch in iterator:

            predictions = trained(batch.text).squeeze(1)
            
            loss = criterion(predictions, batch.label)
            
            acc = TestAccuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [0]:
#Function to Train #todo
def trainModel(model, iterator, optimizer, criterion):
    epoch_loss = 0
    epoch_acc = 0
    
    trained.train()
    
    for batch in iterator:
        
        optimizer.zero_grad()
        
        predictions = trained(batch.text).squeeze(1)
        
        loss = criterion(predictions, batch.label)
        
        acc = TestAccuracy(predictions, batch.label)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [0]:
import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [0]:
#Running the previous 2 functions #todo
#~30mins for 1 epoch
for epoch in range(numberOfRuns):
    start_time = time.time()
    
    train_loss, train_acc = trainModel(trained, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluateModel(trained, valid_iterator, criterion)
        
    end_time = time.time()
        
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
        
    if valid_loss < bestLoss:
        bestLoss = valid_loss
        torch.save(model.state_dict(), 'bestModel.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

bestTrained.load_state_dict(torch.load('bestModel.pt'))
test_loss, test_acc = evaluate(bestTrained, test_iterator, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

Predicting the Analysis

In [0]:
def analysis(sentence):
    trained.eval()
    sentence2 = tokenizer.tokenize(sentence)
    sentence3 = [init_token_idx] + tokenizer.convert_tokens_to_ids(sentence2) + [eos_token_idx]
    sentence4 = torch.LongTensor(sentence3).to('cuda')
    sentence5 = sentence4.unsqueeze(0)
    prediction = torch.sigmoid(trained(sentence5))
    result = round(prediction.item(), 4)
    return result

In [0]:
path = F"/content/gdrive/My Drive/KWHS2019/Colab_Models/BERTModel.pt" 
model.load_state_dict(torch.load(path))

NameError: ignored

In [0]:
analysis("This film is terrible")

In [0]:
analysis("This film is Great")

In [0]:
model_save_name = 'BERTModel.pt'
path = F"/content/gdrive/My Drive/KWHS2019/Colab_Models/{model_save_name}" 
torch.save(trained.state_dict(), path)