In [21]:
!pip install nltk
!pip install transformers

In [23]:
import pandas as pd
import numpy as np
import torch
import nltk
import string
from torch import nn
from torch.optim import Adam
from tqdm import tqdm
from transformers import BertModel, BertTokenizer
from nltk.corpus import stopwords
from nltk import pos_tag, word_tokenize
from nltk.stem.porter import PorterStemmer

In [26]:
# read data from drive
# original data   : 1pqa8tsY5kP6AmWRiXazkk1rHIewe40UH
# small data : 1K6xnL8mOENuqgJmSmrr3RiLbuI7U1aa0

df = pd.read_csv('https://drive.google.com/uc?export=view&id=1pqa8tsY5kP6AmWRiXazkk1rHIewe40UH')
print(f'size : {len(df)}\n', df.head())

In [25]:
# truncate the data
new_size = 1000
new_size = int(new_size/2) 

pos = df[df['sentiment'] == 'positive'].iloc[0:new_size, :]
neg = df[df['sentiment'] == 'negative'].iloc[0:new_size, :]
df = pos.append(neg).sample(frac= 1)
print(f'size : {len(df)}\n', df.head())

In [27]:
# data preprocessing
nltk.download('stopwords')
nltk.download('punkt')
porter =  PorterStemmer()
stop_words = stopwords.words('english')
def preprocess(text):
    text = text.lower()
    text = "".join([char for char in text if char not in string.punctuation])
    text = word_tokenize(text)
    text = " ".join([porter.stem(word) for word in text if word not in stop_words])
    return text;
df['review'] = df['review'].apply(preprocess)
df.head()

In [28]:
# split the data
pos = df[df['sentiment'] == 'positive']
neg = df[df['sentiment'] == 'negative']

print(len(pos), len(neg))

pos_train, pos_val, pos_test = np.split(pos,
                                     [int(.7*len(pos)), int(.8*len(pos))])

neg_train, neg_val, neg_test = np.split(neg, 
                                        [int(.7*len(neg)), int(.8*len(neg))])

print(len(pos_train),len(pos_val), len(pos_test))
print(len(neg_train),len(neg_val), len(neg_test))
print("")

df_train = pos_train.append(neg_train).sample(frac= 1)
df_test = pos_test.append(neg_test).sample(frac= 1)
df_val = pos_val.append(neg_val).sample(frac = 1)
print(len(df_train), df_train.head(), end="\n\n")
print(len(df_val), df_val.head(), end="\n\n")
print(len(df_test), df_test.head())

In [29]:
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
labels = {'negative':0,'positive':1}

class Dataset(torch.utils.data.Dataset):

    def __init__(self, df):

        self.labels = [labels[label] for label in df['sentiment']]
        self.texts = [tokenizer(text, 
                               padding='max_length', max_length = 512, truncation=True,
                                return_tensors="pt") for text in df['review']]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        # Fetch a batch of inputs
        return self.texts[idx]

    def __getitem__(self, idx):

        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)

        return batch_texts, batch_y

In [30]:
class BertClassifier(nn.Module):

    def __init__(self, dropout=0.5):

        super(BertClassifier, self).__init__()

        self.bert = BertModel.from_pretrained('bert-base-cased')
        self.dropout = nn.Dropout(dropout)
        self.linear1 = nn.Linear(768, 512)
        self.linear2 = nn.Linear(512, 256)
        self.linear3 = nn.Linear(256, 128)
        self.linear4 = nn.Linear(128, 64)
        self.linear5 = nn.Linear(64, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, input_id, mask):

        _, pooled_output = self.bert(input_ids= input_id, attention_mask=mask,return_dict=False)
        dropout_output = self.dropout(pooled_output)
        linear_output = self.linear1(dropout_output)
        linear_output = self.linear2(linear_output)
        linear_output = self.linear3(linear_output)
        linear_output = self.linear4(linear_output)
        linear_output = self.linear5(linear_output)
        final_layer = self.sigmoid(linear_output)
        return final_layer

In [None]:
def train(model, train_data, val_data, learning_rate, epochs):

    train, val = Dataset(train_data), Dataset(val_data)

    train_dataloader = torch.utils.data.DataLoader(train, batch_size=8, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=8)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.BCELoss()
    optimizer = Adam(model.parameters(), lr= learning_rate)

    if use_cuda:

            model = model.cuda()
            criterion = criterion.cuda()

    for epoch_num in range(epochs):

            total_acc_train = 0
            total_loss_train = 0

            for train_input, train_label in tqdm(train_dataloader):
                acc = 0
                train_label = train_label.to(device).to(torch.float32)
                mask = train_input['attention_mask'].to(device)
                input_id = train_input['input_ids'].squeeze(1).to(device)

                output = model(input_id, mask)
                
                batch_loss = criterion(output, train_label.unsqueeze(1))
                total_loss_train += batch_loss.item()
                
                output = (output > 0.5).float()
                for i in range(len(output)) :
                    if (output[i] == train_label[i]):
                        acc += 1
#                 acc = (output == train_label).sum().item()
#                 print(output , "output - train", train_label , "Acc = " , acc)
                total_acc_train += acc

                model.zero_grad()
                batch_loss.backward()
                optimizer.step()
            
            total_acc_val = 0
            total_loss_val = 0

            with torch.no_grad():

                for val_input, val_label in val_dataloader:
                    acc = 0
                    val_label = val_label.to(device).to(torch.float32)
                    mask = val_input['attention_mask'].to(device)
                    input_id = val_input['input_ids'].squeeze(1).to(device)

                    output = model(input_id, mask)
                    batch_loss = criterion(output, val_label.unsqueeze(1))
                    total_loss_val += batch_loss.item()
                    
                    output = (output > 0.5).float()
                    for i in range(len(output)) :
                        if (output[i] == val_label[i]):
                            acc += 1
#                     acc = (output == val_label).sum().item()
                    
                    total_acc_val += acc
            
            print("train: " , total_acc_train )
            print("val: " , total_acc_val )
            print(
                f'Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(train_data): .3f} \
                | Train Accuracy: {total_acc_train / len(train_data): .3f} \
                | Val Loss: {total_loss_val / len(val_data): .3f} \
                | Val Accuracy: {total_acc_val / len(val_data): .3f}')
                  
EPOCHS = 5
model = BertClassifier()
LR = 1e-6
              
train(model, df_train, df_val, LR, EPOCHS)

In [None]:
def evaluate(model, test_data):

    test = Dataset(test_data)

    test_dataloader = torch.utils.data.DataLoader(test, batch_size=8)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    if use_cuda:

        model = model.cuda()

    tp, tn, fp, fn = 0,0,0,0
    with torch.no_grad():
        for test_input, test_label in test_dataloader:
            test_label = test_label.to(device)
            mask = test_input['attention_mask'].to(device)
            input_id = test_input['input_ids'].squeeze(1).to(device)
            output = model(input_id, mask)
            output = torch.round(output) 
            for i in range(len(output)) :
                if(output[i] == test_label[i]) :
                    if (output[i] == 1) : 
                        tp += 1
                    else:
                        tn += 1
                else:
                    if (output[i] == 1) : 
                        fp += 1
                    else:
                        fn += 1
    print("Confusion Matrix :-")
    print(f"{tp}   {fp}")
    print(f"{fn}   {tn}")
    precision, recall = (tp)/(tp+fp), (tp)/(tp+fn)
    print(f"Accuracy percentage is: {(tp+tn)/(tp+tn+fp+fn):.3f}")
    print(f"Specificity percentage is: {(tn)/(tn+fp):.3f} %")
    print(f"Precision is: {precision:.3f}")
    print(f"Recall is: {recall :.3f}")
    print(f"F1 score is: {2 * (precision * recall) / (precision + recall):.3f}")
    
evaluate(model, df_test)