In [None]:
import pandas as pd
from transformers import AutoTokenizer,AutoModel, BertTokenizer, BertModel
from datasets import load_dataset, Dataset
import nltk
from nltk.corpus import stopwords
import torch

In [None]:
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

In [None]:
from sklearn.metrics import precision_score, recall_score, accuracy_score, f1_score
import numpy as np

In [None]:
imdb = load_dataset('imdb',split='train')
imdb = imdb.shard(8, index=1)
imdb.set_format("torch",columns=['text','label'])

In [None]:
def remove_stop_words(example):
    wrds = example['text'].split(' ')
    flts = [w for w in wrds if w.lower() not in stop_words]
    str = ""
    
    for f in flts:
        str+= f+" "
    
    new_one = {'text':str[:-1],'label':example['label']}
    return new_one

In [None]:
print(type(imdb))

In [None]:
imdb.map(remove_stop_words)

In [None]:
imdbb = imdb.train_test_split(test_size=0.2,stratify_by_column='label')
imdb_train = imdbb['train']
imdb_test = imdbb['test']

In [None]:
TRAIN_BATCH_SIZE = 12
TEST_BATCH_SIZE = 4

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
n_test = imdb_test.shape[0] / TEST_BATCH_SIZE

tokenized_test_batches = []
tokenized_test_batches_y = []

for i in range(int(n_test)):
    btch = imdb_test['text'][TEST_BATCH_SIZE*i : min(TEST_BATCH_SIZE*(i+1), imdb_test.shape[0])]
    tp = tokenizer.batch_encode_plus(btch,max_length=512, padding='max_length', truncation=True,return_tensors='pt')
    tokenized_test_batches.append(tp)
    tokenized_test_batches_y.append(imdb_test['label'][TEST_BATCH_SIZE*i : min(TEST_BATCH_SIZE*(i+1), imdb_test.shape[0])])
    
    if i % (n_test//10) == 0:
        print(f' finished {int((i / (n_test)) * 100)}%')

print(len(tokenized_test_batches))
print(len(tokenized_test_batches_y))

In [None]:
class FBert(torch.nn.Module):
    
    def __init__(self) -> None:
        super(FBert,self).__init__()
        self.l1 = BertModel.from_pretrained('bert-base-uncased')
        self.l2 = torch.nn.Dropout(0.2)
        self.l3 = torch.nn.Linear(768,64)
        self.l4 = torch.nn.Linear(64,1)
        self.l5 = torch.nn.Sigmoid()
        
    def forward(self, input):
        
        ids = input['input_ids']
        tto = input['token_type_ids']
        attn = input['attention_mask']
        
        _, t = self.l1(ids, attention_mask = attn, token_type_ids = tto, return_dict=False)
        # print(f'a1[0] -> {t.size()}')
        a2 = self.l2(t)
        # print(f'a2 -> {a2.size()}')
        a3 = self.l3(a2)
        # print(f' a3 -> {a3.size()}')
        a4 = self.l4(a3)
        # print(f' a4 -> {a4.size()}')
        a5 = self.l5(a4)
        # print(f' a5 -> {a5.size()}')
        a6 = a5.squeeze()
        # print(f' a6 -> {a6.size()}')
        return a6
        

In [None]:
loss_fn = torch.nn.BCELoss()

In [None]:
test_loss = {}

In [None]:
def evaluate(model, epoch=0):
    
    loss_graph = []
    losses = []
    
    with torch.no_grad():
        
        model.eval()
        
        count = 0
        
        total_loss = 0
        
        for b, b_y in zip(tokenized_test_batches, tokenized_test_batches_y):
            b.to(device)
            test_pred = model(b)
            actual_test = torch.as_tensor(b_y,device=device,dtype=torch.float)
            ts_ls = loss_fn(test_pred, actual_test)
            losses.append(ts_ls)
            total_loss += ts_ls
            count +=1
            print(f'Batch {count}, Test loss: {total_loss/count}')
            loss_graph.append(total_loss/count)
            
        total_loss /= len(tokenized_test_batches)
        test_loss[epoch] = total_loss
        print(f'Epoch {epoch}, Test loss: {total_loss}')
        return total_loss, loss_graph, losses

In [None]:
def evaluate_preds(model, epoch=0):
    
    predicted = []
    actual = []
    
    with torch.no_grad():
        
        model.eval()
        
        for b, b_y in zip(tokenized_test_batches, tokenized_test_batches_y):
            b.to(device)
            test_pred = model(b)
            predicted.extend(test_pred.cpu().tolist())
            actual.extend(b_y.flatten().tolist())


        return predicted,actual

In [None]:
torch.cuda.empty_cache()

In [None]:
nm = torch.load("Models/fbert.pth")
nm = nm.to(device)

In [None]:
torch.cuda.empty_cache()

In [None]:
p,a = evaluate_preds(nm)

In [None]:
print(len(p),len(a))

In [None]:
print(type(p))
pclass = [1 if x > 0.5 else 0 for x in p]

In [None]:
set(pclass)

In [None]:
def calculate_metrics(p,a):
    ps = precision_score(a,p)
    rs = recall_score(a,p)
    f1 = f1_score(a,p)
    ac = accuracy_score(a,p)
    
    return ps,rs,f1,ac

In [None]:
print(calculate_metrics(pclass,a))

In [None]:
evaluate(model = nm)