In [None]:
# Guide: https://towardsdatascience.com/fine-tuning-bert-for-text-classification-54e7df642894

In [3]:
# finetuning bert language model for classification
!pip3 install transformers

[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[33mDEPRECATION: Configuring installation scheme with distutils config files is deprecated and will no longer work in the near future. If you are using a Homebrew or Linuxbrew Python, please see discussion at https://github.com/Homebrew/homebrew-core/issues/76621[0m[33m
[0m

In [5]:
# import libraries
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel, BertForSequenceClassification
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt

In [None]:
# load in pretrained bert model
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=3)

In [None]:
# load in dataset
df = pd.read_csv("/Users/jeremyhudsonchan/Dropbox/Files/Github_Repos/Twitter-Sentiment-Analysis/data/sampled/training.1600000.processed.noemoticon.csv", encoding = "latin-1", low_memory=False)

In [None]:
# separate into train and test sets
train, test = train_test_split(df, test_size=0.2, random_state=42)

In [None]:
# create a custom dataset class
class CustomDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.text = dataframe.text
        self.targets = self.data.target
        self.max_len = max_len

    def __len__(self):
        return len(self.text)

    def __getitem__(self, index):
        text = str(self.text[index])
        text = " ".join(text.split())

        inputs = self.tokenizer.encode_plus(
            text,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            pad_to_max_length=True,
            return_token_type_ids=True
        )

        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs['token_type_ids']

        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(self.targets[index], dtype=torch.long)
        }

In [None]:
# create train and test datasets
training_set = CustomDataset(train, tokenizer, 64)
testing_set = CustomDataset(test, tokenizer, 64)

In [None]:
# create train and test dataloaders
train_params = {'batch_size': 32,
                'shuffle': True,
                'num_workers': 0
                }
test_params = {'batch_size': 32,
                'shuffle': True,
                'num_workers': 0
                }

In [None]:
training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)

In [None]:
# define device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
# define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(params =  model.parameters(), lr=1e-5)

In [None]:
# define training function
def train(epoch):
    model.train()
    train_loss = 0
    for _,data in enumerate(training_loader, 0):
        ids = data['ids'].to(device, dtype = torch.long)
        mask = data['mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.long)

        outputs = model(ids, mask, token_type_ids, labels=targets)
        loss = outputs[0]

        train_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f'Epoch: {epoch}, Loss:  {train_loss/len(training_loader)}')

In [None]:
# define testing function
def test(epoch):
    model.eval()
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0
    predictions , true_labels = [], []
    for _, data in enumerate(testing_loader, 0):
        ids = data['ids'].to(device, dtype = torch.long)
        mask = data['mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.long)

        with torch.no_grad():
            outputs = model(ids, mask, token_type_ids, labels=targets)
            loss = outputs[0]
            logits = outputs[1]

        logits = logits.detach().cpu().numpy()
        label_ids = targets.to('cpu').numpy()

        predictions.append(logits)
        true_labels.append(label_ids)

        tmp_eval_accuracy = flat_accuracy(logits, label_ids)

        eval_loss += loss.mean
        eval_accuracy += tmp_eval_accuracy
        
        nb_eval_steps += 1
    print("Validation loss: {}".format(eval_loss/nb_eval_steps))
    print("Validation Accuracy: {}".format(eval_accuracy/nb_eval_steps))
    return eval_loss/nb_eval_steps, eval_accuracy/nb_eval_steps

In [None]:
# define accuracy function
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

In [None]:
# define function to plot loss and accuracy
def plot_loss_accuracy(train_loss, test_loss, train_accuracy, test_accuracy):
    plt.figure(figsize=(15,5))
    plt.subplot(1,2,1)
    plt.plot(train_loss, label='Train')
    plt.plot(test_loss, label='Test')
    plt.title('Loss')
    plt.legend()
    plt.subplot(1,2,2)
    plt.plot(train_accuracy, label='Train')
    plt.plot(test_accuracy, label='Test')
    plt.title('Accuracy')
    plt.legend()
    plt.show()

In [None]:
# train and test model
train_loss, test_loss, train_accuracy, test_accuracy = [], [], [], []
for epoch in range(5):
    print(f'Epoch {epoch + 1}/{5}')
    print('-' * 10)
    train(epoch)
    loss, accuracy = test(epoch)
    train_loss.append(loss)
    test_loss.append(loss)
    train_accuracy.append(accuracy)
    test_accuracy.append(accuracy)

In [None]:
# plot loss and accuracy
plot_loss_accuracy(train_loss, test_loss, train_accuracy, test_accuracy)

In [None]:
# save model
torch.save(model.state_dict(), 'model.pt')