## Install requsite libraries and import all necessary packages

In [None]:
!pip install -q transformers sentencepiece

In [None]:
import torch
import torch.nn as nn
import numpy as np
import xgboost as xgb
import os
import re

from transformers import DistilBertModel, DistilBertTokenizer, XLMRobertaModel, XLMRobertaTokenizer

## Get GPU for fast training

In [None]:
# Set up for GPU training

def try_gpu():
  return torch.device("cuda" if torch.cuda.is_available() else "cpu")

device = try_gpu()

## Data Preparation Routines
1. Fetch Data
2. Map Data to appropriate label formats
3. Tokenize and encode to indices
4. Create train and validation splits
5. Create train and validation dataloaders

In [None]:
import re
import string

translator = str.maketrans('', '', string.punctuation)


def clean_text(text: str):

    # lowercase string
    text = text.lower()

    # remove twitter handles (usernames)
    text = re.sub('@[^\s]+', '', text)

    # remove stop words
    # text = " ".join([word for word in str(
    #     text).split() if word not in stop_words])

    # remove urls
    text = re.sub('((www.[^s]+)|(https?://[^s]+))', ' ', text)

    # remove punctuations
    text = text.translate(translator)

    # remove repeating characters
    text = re.sub(r'(.)1+', r'1', text)

    # remove numbers
    text = re.sub('[0-9]+', '', text)

    emoj = re.compile("["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
        u"\U00002500-\U00002BEF"  # chinese char
        u"\U00002702-\U000027B0"
        u"\U00002702-\U000027B0"
        u"\U000024C2-\U0001F251"
        u"\U0001f926-\U0001f937"
        u"\U00010000-\U0010ffff"
        u"\u2640-\u2642"
        u"\u2600-\u2B55"
        u"\u200d"
        u"\u23cf"
        u"\u23e9"
        u"\u231a"
        u"\ufe0f"  # dingbats
        u"\u3030"
                      "]+", re.UNICODE)
    text = re.sub(emoj, '', text)

    return text.strip()


In [None]:
from google.colab import drive
drive.mount("/content/drive")

KeyboardInterrupt: 

In [None]:
import pandas as pd


dataset_path = "/content/drive/MyDrive/SAfriSenti_Dataset/final_setswana_safrisenti_tweets_clean_v3.csv"
#dataset_path = "final_all_twi_tweets.csv"
# {negative: 0, positive: 1, neutral: 2}



# Read the CSV file into a pandas DataFrame
df = pd.read_csv(dataset_path, sep=";", encoding='cp437', engine='python')
# Define a mapping for the labels
#df.rename(columns={
 #   'Clean_tweets': 'tweets',
#    'Labels': 'labels'
#}, inplace=True)

df

In [None]:
df["sentence"] = df["sentence"].apply(lambda x: clean_text(x))

In [None]:
import pandas as pd
from torch.utils.data import DataLoader


df.rename(columns={
    'sentence': 'tweets',
    'Final_Label': 'labels'
}, inplace=True)

df

# Define a mapping for the labels
label_map = {"neutral": 0, "positive": 1, "negative": 2}

# Map the labels to integers using the defined mapping
df["labels"] = df["labels"].map(label_map)

# Drop rows with NaN labels
df = df.dropna(subset=["labels"])

# Convert the labels column to integers
df["labels"] = df["labels"].astype(int)

df["tweets"] = df["tweets"].apply(lambda x: clean_text(x))
df

In [None]:
# Create a function to tokenize a set of texts
def preprocess_input(tokenizer, data, max_len):
    # Create empty lists to store outputs
    input_ids = []
    attention_masks = []

    # For every sentence...
    for sent in data:
        # `encode_plus` will:
        #    (1) Tokenize the sentence
        #    (2) Add the `[CLS]` and `[SEP]` token to the start and end
        #    (3) Truncate/Pad sentence to max length
        #    (4) Map tokens to their IDs
        #    (5) Create attention mask
        #    (6) Return a dictionary of outputs
        encoded_sent = tokenizer.encode_plus(
            sent,
            add_special_tokens=True,        # Add `[CLS]` and `[SEP]`
            max_length=max_len,                  # Max length to truncate/pad
            padding="max_length",        # Pad sentence to max length
            #return_tensors='pt',           # Return PyTorch tensor
            return_attention_mask=True,      # Return attention mask
            truncation=True
            )

        # Add the outputs to the lists
        input_ids.append(encoded_sent.get('input_ids'))
        attention_masks.append(encoded_sent.get('attention_mask'))

    # Convert lists to tensors
    input_ids = torch.tensor(input_ids)
    attention_masks = torch.tensor(attention_masks)

    return input_ids, attention_masks

In [None]:
afroxml_model_path = "Davlan/afro-xlmr-base"
afrolm_model_path = "bonadossou/afrolm_active_learning"

#abena_classifier = DistilBertModel.from_pretrained(distilabena)
#abena_tokenizer = DistilBertTokenizer.from_pretrained(abena_model_path)
afroxml_tokenizer = AutoTokenizer.from_pretrained(afroxml_model_path, do_lower_case=True)
afroxlmr_classifier = XLMRobertaModel.from_pretrained(afroxml_model_path)
afrolm_tokenizer = XLMRobertaTokenizer.from_pretrained(afrolm_model_path)


In [None]:
# Specify max len
MAX_LEN = 256

from sklearn.model_selection import train_test_split

X = df.tweets.values
y = df.labels.values

print(df.shape)
df.head()
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.20, random_state=2020, shuffle=True)

# Run function `preprocess_input` on the train set and the validation set
# Obtain splits for both DistilAbena and AfroXLM-R

#abena_train_xs, abena_train_masks = preprocess_input(abena_tokenizer, X_train, MAX_LEN)
#abena_val_xs, abena_val_masks = preprocess_input(abena_tokenizer, X_val, MAX_LEN)

afrolm_train_xs, afrolm_train_masks = preprocess_input(afrolm_tokenizer, X_train, MAX_LEN)
afrolm_val_xs, afrolm_val_masks = preprocess_input(afrolm_tokenizer, X_val, MAX_LEN)

In [None]:
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler

# Convert other data types to torch.Tensor
train_labels = torch.tensor(y_train)
val_labels = torch.tensor(y_val)

# For fine-tuning BERT, the authors recommend a batch size of 16 or 32.
batch_size = 32

# Create the DataLoader for our training set
#abena_train_data = TensorDataset(abena_train_xs, abena_train_masks, train_labels)
#abena_train_sampler = RandomSampler(abena_train_data)
#abena_train_dataloader = DataLoader(abena_train_data, sampler=abena_train_sampler, batch_size=batch_size)

afrolm_train_data = TensorDataset(afrolm_train_xs, afrolm_train_masks, train_labels)
afrolm_train_sampler = RandomSampler(afrolm_train_data)
afrolm_train_dataloader = DataLoader(afrolm_train_data, sampler=afrolm_train_sampler, batch_size=batch_size)

# Create the DataLoader for our validation set
#abena_val_data = TensorDataset(abena_val_xs, abena_val_masks, val_labels)
#abena_val_sampler = SequentialSampler(abena_val_data)
#abena_val_dataloader = DataLoader(abena_val_data, sampler=abena_val_sampler, batch_size=batch_size)

afrolm_val_data = TensorDataset(afrolm_val_xs, afrolm_val_masks, val_labels)
afrolm_val_sampler = SequentialSampler(afrolm_val_data)
afrolm_val_dataloader = DataLoader(afrolm_val_data, sampler=afrolm_val_sampler, batch_size=batch_size)

## Define DistilAbena and AfroXLMR classes for sentiment analysis.

In [None]:
class DistilAbenaSenti(nn.Module):
  """ABENA (BERT) Model for sentiment analysis."""

  def __init__(self, model_name=afroxml_model_path, dropout=0.2, freeze_abena=False):
    super(DistilAbenaSenti, self).__init__()
    input_dim, hidden_size, output_dim = 768, 64, 3

    # Get pretrained model
    self.abena = DistilBertModel.from_pretrained(model_name)

    # Instantiate a one layer feed-forward net for classification
    self.sentiment_classifier = nn.Sequential(
        nn.Linear(input_dim, hidden_size),
        nn.ReLU(),
        nn.Linear(hidden_size, output_dim),
        #nn.Dropout(dropout)
    )

    # Freeze ABENA model
    if freeze_abena == True:
      for param in self.abena.parameters():
        param.requires_grad = False

  def forward(self, input_ids, attention_mask=None):
    outputs = self.abena(input_ids, attention_mask)

    # Extract the last hidden state of the token `[CLS]` for classification task
    last_hidden_state_cls = outputs[0][:, 0, :]

    # Feed into classifier to compute logits
    logits = self.sentiment_classifier(last_hidden_state_cls)

    return logits

  @classmethod
  def from_pretrained(cls, checkpoint_path):
    model = cls()
    model.load_state_dict(torch.load(checkpoint_path))
    return model

In [None]:
afrolm_model = XLMRobertaModel.from_pretrained(afrolm_model_path)

class AfroLMSenti(nn.Module):
  """AfroLM Model for sentiment analysis."""

  def __init__(self, dropout=0.2, freeze=True):
    super(AfroLMSenti, self).__init__()
    input_dim, hidden_size, output_dim = 768, 64, 3

    # Get pretrained model
    self.afrolm = afrolm_model

    # Instantiate a one layer feed-forward net for classification
    self.sentiment_classifier = nn.Sequential(
        nn.Linear(input_dim, hidden_size),
        nn.ReLU(),
        nn.Linear(hidden_size, output_dim),
        #nn.Dropout(dropout)
    )

    # Freeze model
    if freeze == True:
      for param in self.afrolm.parameters():
        param.requires_grad = False

  def forward(self, input_ids, attention_mask=None):
    outputs = self.afrolm(input_ids, attention_mask)

    # Extract the last hidden state of the token `[CLS]` for classification task
    last_hidden_state_cls = outputs[0][:, 0, :]

    # Feed into classifier to compute logits
    logits = self.sentiment_classifier(last_hidden_state_cls)

    return logits

  @classmethod
  def from_pretrained(cls, checkpoint_path):
    model = cls()
    model.load_state_dict(torch.load(checkpoint_path))
    return model

## Load finetuned models

In [None]:
# Define path to finetuned checkpoints
afroxlmr_senti_path = "/content/drive/My Drive/final/afroxlmr2-09.pth""
afrolm_senti_path = "/content/drive/My Drive/final/afroxlmr2-04.pth"

In [None]:
distilabena_senti = DistilAbenaSenti.from_pretrained(afroxlmr_senti_path).to(device)
afrolm_senti = AfroLMSenti.from_pretrained(afrolm_senti_path).to(device)

## Extract features from finetuned models and convert into numpy arrays for XGBoost.

In [None]:
def get_model_predictions(model, dataloader):
  model.eval()
  all_logits = []

  with torch.no_grad():
    for batch_input_ids, batch_attn_masks, _ in dataloader:
      # Move to device
      batch_input_ids, batch_attn_masks = batch_input_ids.to(device), batch_attn_masks.to(device)
      logits = model(batch_input_ids, batch_attn_masks)

      # Convert logits to numpy and append to all_logits_list
      all_logits.append(logits.cpu().numpy())

    all_logits = np.vstack(all_logits)
  return all_logits

## Get models predictions and ensemble with XGBoost.

In [None]:
# Get model predictions an
distilabena_features = get_model_predictions(distilabena_senti, abena_train_dataloader)
afrolm_features = get_model_predictions(afrolm_senti, afrolm_train_dataloader)

#ensembled_features = np.hstack((distilabena_features, afrolm_features))
ensembled_features = (0.4*distilabena_features) + (0.6*afrolm_features)
target_labels = y_train

In [None]:
# Create and fit XGB Classifier
xgb_classifier = xgb.XGBClassifier(
    learning_rate=1e-2,
    max_depth=32,
    colsample_bytree=0.1,
    colsample_bylevel=0.2,
    colsample_bynode=0.2,
    n_estimators=20,
)

xgb_classifier.fit(
    ensembled_features, y_train)

In [None]:
# Ensemble test set and evaluate XGBoost classifier
abenasenti_test_fatures = get_model_predictions(distilabena_senti, abena_val_dataloader)
afrolm_test_features = get_model_predictions(afrolm_senti, afrolm_val_dataloader)
ensembled_test_features = (0.4 * abenasenti_test_fatures) + (0.6 * afrolm_test_features)

preds = {}
preds['validation'] = xgb_classifier.predict(ensembled_test_features)

## Evaluate Classifier

In [None]:
# Evaluate the classifier
from sklearn.metrics import accuracy_score, classification_report


accuracy = accuracy_score(y_val, preds['validation'])
print("Accuracy:", accuracy)

# Print classification report for more detailed evaluation
print(classification_report(y_val, preds['validation']))