In [21]:
import torch
import nltk
import numpy as np
import pandas as pd
from torch import nn
from tqdm import tqdm
from sklearn.metrics import f1_score
from torch.optim import Adam
import matplotlib.pyplot as plt
from transformers import BertTokenizer, BertModel, AutoTokenizer
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay


plt.style.use('seaborn')
nltk.download('stopwords')
nltk.download('punkt')
BASE_PATH = "D:\\University\\7 Fall 2023\\CMSC516\\Project 1\\Datasets\\"

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\majd2\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\majd2\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [4]:
dataset = pd.read_csv(BASE_PATH + "preprocessed_dataset.csv", encoding='utf_8')
dataset.dropna(inplace=True)
dataset.reset_index(drop=True, inplace=True)

In [None]:
# Getting the length of the longest tweet for the max length for the padding
max_len = 0
for tweet in dataset['tweet']:
    max_len = max(len(tweet), max_len)
print("The length of the longest tweet: {}".format(max_len))

# initiating the BertTokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

# Labels
labels_dict = {'negative':0, 'positive':1}

In [11]:
class Dataset(torch.utils.data.Dataset):

    def __init__(self, df):
        self.labels = [labels_dict[label] for label in df['label']]
        self.texts  = [tokenizer(text, padding='max_length', max_length = max_len, 
                      truncation=True, return_tensors="pt") for text in df['tweet']]

    def classes(self):
        return self.labels

    def __len__(self):
        return len(self.labels)

    def get_batch_labels(self, idx):
        # Fetch a batch of labels
        return np.array(self.labels[idx])

    def get_batch_texts(self, idx):
        # Fetch a batch of inputs
        return self.texts[idx]

    def __getitem__(self, idx):
        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)

        return batch_texts, batch_y

In [12]:
np.random.seed(112)
df_train, df_val, df_test = np.split(dataset.sample(frac=1, random_state=42), [int(.8*len(dataset)), int(.9*len(dataset))])
print("Training data size: {0:0}\nVal data size: {1:11}\nTest data size: {2:10}".format(len(df_train), len(df_val), len(df_test)))

Training data size: 1362588
Val data size:      170324
Test data size:     170324


In [16]:
class BertClassifier(nn.Module):
    def __init__(self, dropout=0.5, frozen = False):

        super(BertClassifier, self).__init__()
        self.bert    = BertModel.from_pretrained('bert-base-cased')
        self.dropout = nn.Dropout(dropout)
        self.linear  = nn.Linear(768, 1)
        self.sigmoid = nn.Sigmoid()
        if frozen:
            for param in self.bert.parameters():
                param.requires_grad = False

    def forward(self, input_id, mask):
        _, pooled_output = self.bert(input_ids= input_id, attention_mask=mask,return_dict=False)
        dropout_output   = self.dropout(pooled_output)
        linear_output    = self.linear(dropout_output)
        final_layer      = self.sigmoid(linear_output)
        return final_layer

In [None]:
def train(model, train_data, val_data, learning_rate, epochs):

    train, val = Dataset(train_data), Dataset(val_data)

    train_dataloader = torch.utils.data.DataLoader(train, batch_size=1, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=1)

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")

    criterion = nn.BCEWithLogitsLoss()
    optimizer = Adam(model.parameters(), lr= learning_rate)
    print("HERE1")

    if use_cuda:
          model = model.cuda()
          criterion = criterion.cuda()

    for epoch_num in range(epochs):
          total_acc_train, total_loss_train = 0, 0
          for train_input, train_label in tqdm(train_dataloader):

              train_label = train_label.to(device)
              mask = train_input['attention_mask'].to(device)
              input_id = train_input['input_ids'].squeeze(1).to(device)

              output = model(input_id, mask)
              batch_loss = criterion(output.reshape([1]), train_label.float())
              
              # binary_cross_entropy(input, target, weight, size_average, reduce, reduction)
              total_loss_train += batch_loss.item()
              
              acc = (output.argmax(dim=1) == train_label).sum().item()
              total_acc_train += acc

              model.zero_grad()
              batch_loss.backward()
              optimizer.step()
          
          total_acc_val, total_loss_val = 0, 0

          with torch.no_grad():
              for val_input, val_label in val_dataloader:

                  val_label = val_label.to(device)
                  mask = val_input['attention_mask'].to(device)
                  input_id = val_input['input_ids'].squeeze(1).to(device)

                  output = model(input_id, mask)

                  batch_loss = criterion(output.reshape([1]), val_label.float())
                  total_loss_val += batch_loss.item()
                  
                  acc = (output.argmax(dim=1) == val_label).sum().item()
                  total_acc_val += acc
          
          print(
              f'Epochs: {epoch_num + 1} | Train Loss: {total_loss_train / len(train_data): .3f} \
              | Train Accuracy: {total_acc_train / len(train_data): .3f} \
              | Val Loss: {total_loss_val / len(val_data): .3f} \
              | Val Accuracy: {total_acc_val / len(val_data): .3f}')
    return model

In [None]:
def test(model, test_df):
    test_dataloader = torch.utils.data.DataLoader(Dataset(test_df), batch_size=1)
    criterion = nn.CrossEntropyLoss()
    total_acc_test, total_loss_test = 0, 0

    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    print(f"Using {device} device")

    if use_cuda:
        model = model.cuda()
        criterion = criterion.cuda()
    outputs, labels = [], []
    with torch.no_grad():
        for test_input, test_label in tqdm(test_dataloader):

            test_label = test_label.to(device)
            mask = test_input['attention_mask'].to(device)
            input_id = test_input['input_ids'].squeeze(1).to(device)

            output = model(input_id, mask)

            # print("output type:{}  |   test_label type:{}".format(type(output.argmax(dim=1).item()), type(test_label.item())))
            # print("output:{}  |   test_label:{}".format(output.argmax(dim=1).item(), test_label.item()))

            outputs.append(output.argmax(dim=1).item())
            labels.append(test_label.item())

            batch_loss = criterion(output.reshape([1]), test_label.float())
            total_loss_test += batch_loss.item()

            acc = (output.argmax(dim=1) == test_label).sum().item()
            total_acc_test += acc

    print(f'Val Loss: {total_loss_test / len(test_df): .3f} \
          | Val Accuracy: {total_acc_test / len(test_df): .3f}')
    return outputs, labels

In [None]:
portion = 0.0001
EPOCHS = 4
model = BertClassifier()
LR = 1e-6
              
trained_model = train(model, df_train[:int(len(df_train)*portion)], df_val[:int(len(df_val)*portion)], LR, EPOCHS)

In [None]:
# save the trained model and naming is by it is variables
torch.save(trained_model.state_dict(), BASE_PATH + "{}_{}_model.pth".format(str(portion),str(EPOCHS)))

In [None]:
# Loading the saved model
# You can use the trained_model that has been returned by the train() function
# This piece of code is not necessary

# initiating object BertClassifier object
model = BertClassifier()
# the model that is going to be loaded
model_path = BASE_PATH + "{}_{}_model.pth".format(str(portion),str(EPOCHS))
# load the saved model
model.load_state_dict(torch.load(model_path))

In [None]:
# test_slice defines the how much of total test dataset we wanna use
test_slice = int(len(df_test)*portion)
outputs, labels = test(model, df_test[:test_slice])

In [None]:
outputs_np, labels_np = np.array(outputs), np.array(labels)
f1score = f1_score(labels_np, outputs_np)
print("The F1 score for test_slice test tweets {}".fromat(f1score))

# generating a confusion matrix
cm = confusion_matrix(labels_np, outputs_np)
# plotting the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()