# EE8228 - Neural Info. Proc. Retrieval - Final Project

Model: BERT-Based Aspect-Based Sentiment Classification

Name: Tiago Rodrigues

Student ID: 500963826

# Installing Libraries

In [None]:
# Importing libraries
#pip install -q transformers
!pip install --upgrade pip
!pip install transformers
import numpy as np
import pandas as pd
from sklearn import metrics
import transformers
import torch
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from transformers import BertTokenizer, BertModel, BertConfig

# Setting up the device for GPU usage if available
from torch import cuda
device = 'cuda' if cuda.is_available() else 'cpu'

## Stemming algorithm (Not Used)
#from nltk.stem import PorterStemmer
#from nltk.tokenize import sent_tokenize, word_tokenize
#import nltk
#nltk.download('punkt')
#ps = PorterStemmer()

# Importing and Pre-Processing the Twitter Dataset

In [2]:
# Function used to clean text from non-alphabetical characters excluding spaces
def cleanText(text):
  # Remove specific substrings '(' and ')'
  text = text.replace('-LRB-', '').replace('-RRB-', '')
  # Remove non-alphabetical characters
  cleaned_text = ''.join(letter for letter in text if letter.isalpha() or letter.isspace())

  ## Stem the words with the stemming algorithm (Not Used)
  #words = word_tokenize(cleaned_text)
  #stemmed_words = [ps.stem(w) for w in words]
  #stemmed_sentence = ' '.join(stemmed_words)
  #return(stemmed_sentence)

  return(cleaned_text)


# Function used to convert the original datset into a concatenated dataframe
def fixDataFormat(df):
  tweet = []
  target = []
  classification = []

  # Convert the three lines into their respective list
  for i in range(df.shape[0]):
    if (i%3 == 0):                            # First line is the tweet
      tweet.append(df.iloc[i].tolist()[0])
    elif (i%3 == 1):                          # Second line is the target
      target.append(df.iloc[i].tolist()[0])
    elif (i%3 == 2):                          # Third line is the classification
      classification.append(df.iloc[i].tolist()[0])

  # Convert the classification into a one-hot-encoding using get_dummies
  temp_df = pd.DataFrame({'classification': classification})
  one_hot_encoding = pd.get_dummies(temp_df['classification'])
  one_hot_list = one_hot_encoding.values.tolist()

  # Iterate over the indices of 'tweet' and replace "$T$" with the corresponding
  # 'target' word, and concatenate 'tweet' and 'target' using [SEP] token
  final_tweet = []
  for i in range(len(tweet)):
    replaced_tweet = tweet[i].replace("$T$", target[i])
    replaced_tweet = f"{cleanText(replaced_tweet)} [SEP] {cleanText(target[i])}"
    final_tweet.append(replaced_tweet)

  # Create the final dataframe
  new_df = pd.DataFrame({
      'tweet': final_tweet,
      'sentiment': one_hot_list
  })

  return new_df

In [3]:
# Read the training/testing dataset and convert a final DataFrame
df_train = pd.read_csv('train.raw', sep='\t', header=None, names=['data'])
df_test = pd.read_csv('test.raw', sep='\t', header=None, names=['data'])
train_dataset = fixDataFormat(df_train)
test_dataset = fixDataFormat(df_test)

In [None]:
# Print the first 5 indexes of the training data
train_dataset.head()

In [None]:
# Print the first 5 indexes of the testing data
test_dataset.head()

# Preparing the Dataset and Dataloader

In [19]:
# Defining some key variables that will be used later on in the training
# for the pre-trained BERT model by Hugging Face
MAX_LEN = 200
TRAIN_BATCH_SIZE = 8
VALID_BATCH_SIZE = 4
EPOCHS = 4
LEARNING_RATE = 1e-12
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [7]:
# Create a Dataset class to be passed to the model
class CustomDataset(Dataset):

    def __init__(self, dataframe, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.data = dataframe
        self.tweet = dataframe.tweet
        self.targets = self.data.sentiment
        self.max_len = max_len

    def __len__(self):
        return len(self.tweet)

    def __getitem__(self, index):
        tweet = str(self.tweet[index])
        tweet = " ".join(tweet.split())

        inputs = self.tokenizer.encode_plus(
            tweet,
            None,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',  # Use this instead of pad_to_max_length=True
            truncation=True,  # Add this line to explicitly activate truncation
            return_token_type_ids=True
        )
        ids = inputs['input_ids']
        mask = inputs['attention_mask']
        token_type_ids = inputs["token_type_ids"]

        return {
            'ids': torch.tensor(ids, dtype=torch.long),
            'mask': torch.tensor(mask, dtype=torch.long),
            'token_type_ids': torch.tensor(token_type_ids, dtype=torch.long),
            'targets': torch.tensor(self.targets[index], dtype=torch.float)
        }

In [None]:
# Creating the dataset and dataloader for the neural network
print("TRAIN Dataset: {}".format(train_dataset.shape))
print("TEST Dataset: {}".format(test_dataset.shape))

training_set = CustomDataset(train_dataset, tokenizer, MAX_LEN)
testing_set = CustomDataset(test_dataset, tokenizer, MAX_LEN)

In [9]:
train_params = {'batch_size': TRAIN_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

test_params = {'batch_size': VALID_BATCH_SIZE,
                'shuffle': True,
                'num_workers': 0
                }

training_loader = DataLoader(training_set, **train_params)
testing_loader = DataLoader(testing_set, **test_params)

# Creating the Neural Network for Fine-Tuning

In [10]:
# Creating the customized BERT-based model by HuggingFace, by adding a drop out
# and a dense layer on top of BERT to get the final output for the model
class BERTClass(torch.nn.Module):
    def __init__(self):
        super(BERTClass, self).__init__()
        self.l1 = transformers.BertModel.from_pretrained('bert-base-uncased')
        self.l2 = torch.nn.Dropout(0.3)
        self.l3 = torch.nn.Linear(768, 3)   # 3 classes: negative, neutral, positive

    def forward(self, ids, mask, token_type_ids):
        _, output_1= self.l1(ids, attention_mask = mask, token_type_ids = token_type_ids, return_dict=False)
        output_2 = self.l2(output_1)
        output = self.l3(output_2)
        return output

In [None]:
model = BERTClass()   # Pretrained BERT-based model by HuggingFace
model.to(device)

In [12]:
# Assigning the loss and optimizer functions
def loss_fn(outputs, targets):
    return torch.nn.BCEWithLogitsLoss()(outputs, targets)

optimizer = torch.optim.Adam(params =  model.parameters(), lr=LEARNING_RATE)

# Fine Tuning the Model

In [13]:
# Function used to train and optimize the BERT model
def train(epoch):
    model.train()
    for i,data in enumerate(training_loader, 0):
        ids = data['ids'].to(device, dtype = torch.long)
        mask = data['mask'].to(device, dtype = torch.long)
        token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
        targets = data['targets'].to(device, dtype = torch.float)

        outputs = model(ids, mask, token_type_ids)

        optimizer.zero_grad()
        loss = loss_fn(outputs, targets)
        if i%156==0:
            print(f'Epoch: {epoch}, Loss:  {loss.item()}')

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

In [14]:
# Function used to evaluate the current model
def validation():
    model.eval()
    fin_targets=[]
    fin_outputs=[]
    with torch.no_grad():
        for _, data in enumerate(testing_loader, 0):
            ids = data['ids'].to(device, dtype = torch.long)
            mask = data['mask'].to(device, dtype = torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype = torch.long)
            targets = data['targets'].to(device, dtype = torch.float)
            outputs = model(ids, mask, token_type_ids)
            fin_targets.extend(targets.cpu().detach().numpy().tolist())
            fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())
    return fin_outputs, fin_targets

In [None]:
# For each epoch:
#for epoch in range(EPOCHS):
for epoch in range(10, 20):
    # Train the model
    train(epoch)

    # Predict the output and set the class with the highest value
    outputs, targets = validation()
    outputs_array = np.array(outputs)
    # Find the index of the maximum value for each item
    max_indices = np.argmax(outputs_array, axis=1)
    # Create a boolean array where only the maximum value for each item is True
    result = np.zeros_like(outputs_array, dtype=bool)
    result[np.arange(len(outputs_array)), max_indices] = True

    # Evaluate the model and print the results
    accuracy = metrics.accuracy_score(targets, result)
    f1_score_macro = metrics.f1_score(targets, result, average='macro')
    print(f"Accuracy Score = {accuracy}")
    print(f"F1 Score (Macro) = {f1_score_macro}")
    
    # Save the model after each epoch
    torch.save(model, "model" + str(epoch) + ".pth")

# Save the model

In [27]:
# Save the model
model_save_path = "model.pth"
torch.save(model, model_save_path)

# Load the model

In [14]:
# Load model and tokenizer
#loaded_model = BERTClass()
loaded_model_path = "model.pth"
loaded_model = torch.load(loaded_model_path)

loaded_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

In [None]:
# Function used to evaluate the current model
def loaded_validation():
    loaded_model.eval()
    fin_targets = []
    fin_outputs = []
    with torch.no_grad():
        for _, data in enumerate(testing_loader, 0):
            ids = data['ids'].to(device, dtype=torch.long)
            mask = data['mask'].to(device, dtype=torch.long)
            token_type_ids = data['token_type_ids'].to(device, dtype=torch.long)
            targets = data['targets'].to(device, dtype=torch.float)

            # Move loaded_model to GPU if available. Must be the save device as previous
            loaded_model_to_device = loaded_model.to(device)

            outputs = loaded_model_to_device(ids, mask, token_type_ids)
            fin_targets.extend(targets.cpu().detach().numpy().tolist())
            fin_outputs.extend(torch.sigmoid(outputs).cpu().detach().numpy().tolist())

    return fin_outputs, fin_targets


# Predict the output and set the class with the highest value
outputs, targets = loaded_validation()
outputs_array = np.array(outputs)
# Find the index of the maximum value for each item
max_indices = np.argmax(outputs_array, axis=1)
# Create a boolean array where only the maximum value for each item is True
result = np.zeros_like(outputs_array, dtype=bool)
result[np.arange(len(outputs_array)), max_indices] = True

# Evaluate the model and print the results
accuracy = metrics.accuracy_score(targets, result)
f1_score_macro = metrics.f1_score(targets, result, average='macro')
print(f"Accuracy Score = {accuracy}")
print(f"F1 Score (Macro) = {f1_score_macro}")