# COS 760 Research Project: Analysing Sentiments for Low-resource African Languages

## Group Members: Mihir Arjun, Troy Clark, Hamza Mokiwa

### Establishing Baselines with Monolingual Long Short-Term Memory networks(LSTMs) and pre-trained Multilingual transformers

#### First we need to install the datasets

In [None]:
%pip install datasets

In [None]:
import os
from datasets import load_from_disk, load_dataset

def load_local_datasets():
    swa_path = "./datasets/afrisenti/swa"
    por_path = "./datasets/afrisenti/por"
    sot_path = "./datasets/news"

    if not all(os.path.exists(path) for path in [swa_path, por_path,sot_path]):
        print("One or more dataset directories not found. Please check the paths.")
        return None, None, None

    print("Loading Swahili (swa) dataset from disk...")
    swa_dataset = load_from_disk(swa_path)
    print("Swahili dataset loaded!")

    print("Loading Portuguese (por) dataset from disk...")
    por_dataset = load_from_disk(por_path)
    print("Portuguese dataset loaded!")

    print("Loading Sesotho (sot) dataset from disk...")
    sot_dataset = load_dataset("csv",  data_files="datasets/sotho-news/sotho_news_dataset.csv")
    print("Sesotho dataset loaded!")

    return swa_dataset, por_dataset, sot_dataset

if __name__ == "__main__":
    swa, por, sot = load_local_datasets()

    if swa is not None:
        print(f"Swahili dataset size: {len(swa['train'])} examples")
    if por is not None:
        print(f"Portuguese dataset size: {len(por['train'])} examples")
    if sot is not None:
        print(f"Sesotho dataset size: {len(sot['train'])} examples")

## Now that the datasets have been loaded, we can start creating our LSTM baseline models below:

### First we will build an LSTM model for Swahili

In [None]:
import os
import torch
import numpy as np
import pandas as pd
from datasets import load_from_disk
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
from collections import Counter
from tqdm import tqdm


torch.manual_seed(42)
np.random.seed(42)


EMBEDDING_DIM = 100
HIDDEN_DIM = 128
NUM_LAYERS = 2
DROPOUT = 0.2
LEARNING_RATE = 0.001
BATCH_SIZE = 32
NUM_EPOCHS = 10

class SwahiliSentimentDataset(Dataset):
    def __init__(self, tweets, labels, vocab, label_map):
        self.tweets = tweets
        self.labels = labels
        self.vocab = vocab
        self.label_map = label_map

    def __len__(self):
        return len(self.tweets)

    def __getitem__(self, idx):
        tweet = self.tweets[idx]
        label = self.labels[idx]


        tokenized = [self.vocab.get(word, self.vocab['<UNK>']) for word in tweet.split()]
        return torch.tensor(tokenized, dtype=torch.long), torch.tensor(self.label_map[label], dtype=torch.long)

def collate_fn(batch):
    tweets, labels = zip(*batch)

    tweets_padded = pad_sequence(tweets, batch_first=True, padding_value=0)
    return tweets_padded, torch.stack(labels)

class LSTMSentiment(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout, pad_idx):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers,
                           bidirectional=True, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):

        embedded = self.dropout(self.embedding(text))


        output, (hidden, cell) = self.lstm(embedded)



        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)


        hidden = self.dropout(hidden)
        return self.fc(hidden)

def train_model(model, train_loader, val_loader, optimizer, criterion, device, epochs):
    best_val_loss = float('inf')

    for epoch in range(epochs):

        model.train()
        running_loss = 0.0

        for tweets, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} - Training"):
            tweets, labels = tweets.to(device), labels.to(device)

            optimizer.zero_grad()


            outputs = model(tweets)


            loss = criterion(outputs, labels)


            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        train_loss = running_loss / len(train_loader)


        model.eval()
        val_loss = 0.0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for tweets, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} - Validation"):
                tweets, labels = tweets.to(device), labels.to(device)

                outputs = model(tweets)
                loss = criterion(outputs, labels)

                val_loss += loss.item()

                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        val_loss /= len(val_loader)
        val_accuracy = accuracy_score(all_labels, all_preds)
        val_f1 = f1_score(all_labels, all_preds, average='weighted')

        print(f"Epoch {epoch+1}/{epochs}:")
        print(f"  Train Loss: {train_loss:.4f}")
        print(f"  Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val F1: {val_f1:.4f}")


        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), "best_swahili_lstm_model.pt")
            print("  Saved new best model!")

        print("-" * 60)

    return model

def evaluate_model(model, test_loader, criterion, device, label_list):
    model.eval()
    test_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for tweets, labels in tqdm(test_loader, desc="Testing"):
            tweets, labels = tweets.to(device), labels.to(device)

            outputs = model(tweets)
            loss = criterion(outputs, labels)

            test_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    test_loss /= len(test_loader)
    test_accuracy = accuracy_score(all_labels, all_preds)
    test_f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Test F1 Score: {test_f1:.4f}")


    class_names = [label_list[i] for i in range(len(label_list))]
    report = classification_report(all_labels, all_preds, target_names=class_names)
    print("\nClassification Report:")
    print(report)

    return test_loss, test_accuracy, test_f1

def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")


    try:
        swa_dataset = load_from_disk("./datasets/afrisenti/swa")
        print("Swahili dataset loaded successfully!")

        print(f"Available columns in train split: {swa_dataset['train'].column_names}")
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return

    print(f"Dataset structure: {swa_dataset}")
    print(f"Train set size: {len(swa_dataset['train'])}")
    print(f"Test set size: {len(swa_dataset['test'])}")
    print(f"Validation set size: {len(swa_dataset['validation'])}")

    train_tweets = swa_dataset['train']['tweet']
    train_labels = swa_dataset['train']['label']
    val_tweets = swa_dataset['validation']['tweet']
    val_labels = swa_dataset['validation']['label']
    test_tweets = swa_dataset['test']['tweet']
    test_labels = swa_dataset['test']['label']

    unique_labels = set(train_labels + val_labels + test_labels)
    label_to_idx = {label: idx for idx, label in enumerate(unique_labels)}
    idx_to_label = {idx: label for label, idx in label_to_idx.items()}
    print(f"Label mapping: {label_to_idx}")

    word_counts = Counter()
    for tweet in train_tweets:
        word_counts.update(tweet.split())

    min_freq = 2
    vocabulary = {'<PAD>': 0, '<UNK>': 1}
    vocab_idx = 2

    for word, count in word_counts.items():
        if count >= min_freq:
            vocabulary[word] = vocab_idx
            vocab_idx += 1

    print(f"Vocabulary size: {len(vocabulary)}")

    train_dataset = SwahiliSentimentDataset(train_tweets, train_labels, vocabulary, label_to_idx)
    val_dataset = SwahiliSentimentDataset(val_tweets, val_labels, vocabulary, label_to_idx)
    test_dataset = SwahiliSentimentDataset(test_tweets, test_labels, vocabulary, label_to_idx)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)

    model = LSTMSentiment(
        vocab_size=len(vocabulary),
        embedding_dim=EMBEDDING_DIM,
        hidden_dim=HIDDEN_DIM,
        output_dim=len(unique_labels),
        n_layers=NUM_LAYERS,
        dropout=DROPOUT,
        pad_idx=vocabulary['<PAD>']
    ).to(device)

    print(f"Model architecture:\n{model}")

    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.CrossEntropyLoss()

    print("Starting training...")
    model = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        optimizer=optimizer,
        criterion=criterion,
        device=device,
        epochs=NUM_EPOCHS
    )

    model.load_state_dict(torch.load("best_swahili_lstm_model.pt"))

    print("\nEvaluating on test set...")
    test_loss, test_accuracy, test_f1 = evaluate_model(
        model=model,
        test_loader=test_loader,
        criterion=criterion,
        device=device,
        label_list=list(idx_to_label.values())
    )

    results = {
        "test_loss": test_loss,
        "test_accuracy": test_accuracy,
        "test_f1": test_f1,
        "embedding_dim": EMBEDDING_DIM,
        "hidden_dim": HIDDEN_DIM,
        "num_layers": NUM_LAYERS,
        "dropout": DROPOUT,
        "learning_rate": LEARNING_RATE,
        "batch_size": BATCH_SIZE,
        "epochs": NUM_EPOCHS,
        "vocab_size": len(vocabulary)
    }


    pd.DataFrame([results]).to_csv("swahili_lstm_results.csv", index=False)
    print(f"Results saved to swahili_lstm_results.csv")

if __name__ == "__main__":
    main()

### Next, we build an LSTM model for Mozambican Portuguese:

In [None]:
import os
import torch
import numpy as np
import pandas as pd
from datasets import load_from_disk
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
from collections import Counter
from tqdm import tqdm

torch.manual_seed(42)
np.random.seed(42)

EMBEDDING_DIM = 100
HIDDEN_DIM = 128
NUM_LAYERS = 2
DROPOUT = 0.2
LEARNING_RATE = 0.001
BATCH_SIZE = 32
NUM_EPOCHS = 10

class PorSentimentDataset(Dataset):
    def __init__(self, tweets, labels, vocab, label_map):
        self.tweets = tweets
        self.labels = labels
        self.vocab = vocab
        self.label_map = label_map

    def __len__(self):
        return len(self.tweets)

    def __getitem__(self, idx):
        tweet = self.tweets[idx]
        label = self.labels[idx]

        tokenized = [self.vocab.get(word, self.vocab['<UNK>']) for word in tweet.split()]
        return torch.tensor(tokenized, dtype=torch.long), torch.tensor(self.label_map[label], dtype=torch.long)

def collate_fn(batch):
    tweets, labels = zip(*batch)
    tweets_padded = pad_sequence(tweets, batch_first=True, padding_value=0)
    return tweets_padded, torch.stack(labels)

class LSTMSentiment(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout, pad_idx):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers,
                           bidirectional=True, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        embedded = self.dropout(self.embedding(text))

        output, (hidden, cell) = self.lstm(embedded)

        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)

        hidden = self.dropout(hidden)
        return self.fc(hidden)

def train_model(model, train_loader, val_loader, optimizer, criterion, device, epochs):
    best_val_loss = float('inf')

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        for tweets, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} - Training"):
            tweets, labels = tweets.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(tweets)

            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        train_loss = running_loss / len(train_loader)

        model.eval()
        val_loss = 0.0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for tweets, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} - Validation"):
                tweets, labels = tweets.to(device), labels.to(device)

                outputs = model(tweets)
                loss = criterion(outputs, labels)

                val_loss += loss.item()

                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        val_loss /= len(val_loader)
        val_accuracy = accuracy_score(all_labels, all_preds)
        val_f1 = f1_score(all_labels, all_preds, average='weighted')

        print(f"Epoch {epoch+1}/{epochs}:")
        print(f"  Train Loss: {train_loss:.4f}")
        print(f"  Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val F1: {val_f1:.4f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), "best_portuguese_lstm_model.pt")
            print("  Saved new best model!")

        print("-" * 60)

    return model

def evaluate_model(model, test_loader, criterion, device, label_list):
    model.eval()
    test_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for tweets, labels in tqdm(test_loader, desc="Testing"):
            tweets, labels = tweets.to(device), labels.to(device)

            outputs = model(tweets)
            loss = criterion(outputs, labels)

            test_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    test_loss /= len(test_loader)
    test_accuracy = accuracy_score(all_labels, all_preds)
    test_f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Test F1 Score: {test_f1:.4f}")

    class_names = [label_list[i] for i in range(len(label_list))]
    report = classification_report(all_labels, all_preds, target_names=class_names)
    print("\nClassification Report:")
    print(report)

    return test_loss, test_accuracy, test_f1

def main():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    try:
        por_dataset = load_from_disk("./datasets/afrisenti/por")
        print("Mozambican Portuguese dataset loaded successfully!")

        print(f"Available columns in train split: {por_dataset['train'].column_names}")
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return

    print(f"Dataset structure: {por_dataset}")
    print(f"Train set size: {len(por_dataset['train'])}")
    print(f"Test set size: {len(por_dataset['test'])}")
    print(f"Validation set size: {len(por_dataset['validation'])}")

    train_tweets = por_dataset['train']['tweet']
    train_labels = por_dataset['train']['label']
    val_tweets = por_dataset['validation']['tweet']
    val_labels = por_dataset['validation']['label']
    test_tweets = por_dataset['test']['tweet']
    test_labels = por_dataset['test']['label']

    unique_labels = set(train_labels + val_labels + test_labels)
    label_to_idx = {label: idx for idx, label in enumerate(unique_labels)}
    idx_to_label = {idx: label for label, idx in label_to_idx.items()}
    print(f"Label mapping: {label_to_idx}")

    word_counts = Counter()
    for tweet in train_tweets:
        word_counts.update(tweet.split())

    min_freq = 2
    vocabulary = {'<PAD>': 0, '<UNK>': 1}
    vocab_idx = 2

    for word, count in word_counts.items():
        if count >= min_freq:
            vocabulary[word] = vocab_idx
            vocab_idx += 1

    print(f"Vocabulary size: {len(vocabulary)}")

    train_dataset = PorSentimentDataset(train_tweets, train_labels, vocabulary, label_to_idx)
    val_dataset = PorSentimentDataset(val_tweets, val_labels, vocabulary, label_to_idx)
    test_dataset = PorSentimentDataset(test_tweets, test_labels, vocabulary, label_to_idx)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)

    model = LSTMSentiment(
        vocab_size=len(vocabulary),
        embedding_dim=EMBEDDING_DIM,
        hidden_dim=HIDDEN_DIM,
        output_dim=len(unique_labels),
        n_layers=NUM_LAYERS,
        dropout=DROPOUT,
        pad_idx=vocabulary['<PAD>']
    ).to(device)

    print(f"Model architecture:\n{model}")

    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.CrossEntropyLoss()

    print("Starting training...")
    model = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        optimizer=optimizer,
        criterion=criterion,
        device=device,
        epochs=NUM_EPOCHS
    )

    model.load_state_dict(torch.load("best_portuguese_lstm_model.pt"))

    print("\nEvaluating on test set...")
    test_loss, test_accuracy, test_f1 = evaluate_model(
        model=model,
        test_loader=test_loader,
        criterion=criterion,
        device=device,
        label_list=list(idx_to_label.values())
    )

    results = {
        "test_loss": test_loss,
        "test_accuracy": test_accuracy,
        "test_f1": test_f1,
        "embedding_dim": EMBEDDING_DIM,
        "hidden_dim": HIDDEN_DIM,
        "num_layers": NUM_LAYERS,
        "dropout": DROPOUT,
        "learning_rate": LEARNING_RATE,
        "batch_size": BATCH_SIZE,
        "epochs": NUM_EPOCHS,
        "vocab_size": len(vocabulary)
    }


    pd.DataFrame([results]).to_csv("portuguese_lstm_results.csv", index=False)
    print(f"Results saved to portuguese_lstm_results.csv")

if __name__ == "__main__":
    main()

### Lastly, we build an LSTM model for Sesotho

In [None]:
import os
import torch
import numpy as np
import pandas as pd
from datasets import load_from_disk
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
from collections import Counter
from tqdm import tqdm


torch.manual_seed(42)
np.random.seed(42)


EMBEDDING_DIM = 100
HIDDEN_DIM = 128
NUM_LAYERS = 2
DROPOUT = 0.2
LEARNING_RATE = 0.001
BATCH_SIZE = 32
NUM_EPOCHS = 10

class SotSentimentDataset(Dataset):
    def __init__(self, headlines, labels, vocab, label_map):
        self.headlines = headlines
        self.labels = labels
        self.vocab = vocab
        self.label_map = label_map

    def __len__(self):
        return len(self.headlines)

    def __getitem__(self, idx):
        tweet = self.headlines[idx]
        label = self.labels[idx]


        tokenized = [self.vocab.get(word, self.vocab['<UNK>']) for word in tweet.split()]
        return torch.tensor(tokenized, dtype=torch.long), torch.tensor(self.label_map[label], dtype=torch.long)

def collate_fn(batch):
    headlines, labels = zip(*batch)

    headlines_padded = pad_sequence(headlines, batch_first=True, padding_value=0)
    return headlines_padded, torch.stack(labels)

class LSTMSentiment(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout, pad_idx):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers,
                           bidirectional=True, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):

        embedded = self.dropout(self.embedding(text))


        output, (hidden, cell) = self.lstm(embedded)



        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)


        hidden = self.dropout(hidden)
        return self.fc(hidden)

def train_model(model, train_loader, val_loader, optimizer, criterion, device, epochs):
    best_val_loss = float('inf')

    for epoch in range(epochs):

        model.train()
        running_loss = 0.0

        for headlines, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} - Training"):
            headlines, labels = headlines.to(device), labels.to(device)

            optimizer.zero_grad()


            outputs = model(headlines)


            loss = criterion(outputs, labels)


            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        train_loss = running_loss / len(train_loader)


        model.eval()
        val_loss = 0.0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for headlines, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} - Validation"):
                headlines, labels = headlines.to(device), labels.to(device)

                outputs = model(headlines)
                loss = criterion(outputs, labels)

                val_loss += loss.item()

                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        val_loss /= len(val_loader)
        val_accuracy = accuracy_score(all_labels, all_preds)
        val_f1 = f1_score(all_labels, all_preds, average='weighted')

        print(f"Epoch {epoch+1}/{epochs}:")
        print(f"  Train Loss: {train_loss:.4f}")
        print(f"  Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val F1: {val_f1:.4f}")


        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), "best_portuguese_lstm_model.pt")
            print("  Saved new best model!")

        print("-" * 60)

    return model

def evaluate_model(model, test_loader, criterion, device, label_list):
    model.eval()
    test_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for headlines, labels in tqdm(test_loader, desc="Testing"):
            headlines, labels = headlines.to(device), labels.to(device)

            outputs = model(headlines)
            loss = criterion(outputs, labels)

            test_loss += loss.item()

            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    test_loss /= len(test_loader)
    test_accuracy = accuracy_score(all_labels, all_preds)
    test_f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f"Test Loss: {test_loss:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Test F1 Score: {test_f1:.4f}")


    class_names = [label_list[i] for i in range(len(label_list))]
    report = classification_report(all_labels, all_preds, target_names=class_names)
    print("\nClassification Report:")
    print(report)

    return test_loss, test_accuracy, test_f1

def main():

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")


    try:
        sot_dataset = load_from_disk("./datasets/sesotho_news_dataset")
        print("Sesotho dataset loaded successfully!")


        print(f"Available columns in train split: {sot_dataset['train'].column_names}")
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return


    print(f"Dataset structure: {sot_dataset}")
    print(f"Train set size: {len(sot_dataset['train'])}")
    print(f"Test set size: {len(sot_dataset['test'])}")
    print(f"Validation set size: {len(sot_dataset['validation'])}")


    train_headlines = sot_dataset['train']['headline']
    train_labels = sot_dataset['train']['label']
    val_headlines = sot_dataset['validation']['headline']
    val_labels = sot_dataset['validation']['label']
    test_headlines = sot_dataset['test']['headline']
    test_labels = sot_dataset['test']['label']



    unique_labels = set(train_labels + val_labels + test_labels)
    label_to_idx = {label: idx for idx, label in enumerate(unique_labels)}
    idx_to_label = {idx: label for label, idx in label_to_idx.items()}
    print(f"Label mapping: {label_to_idx}")


    word_counts = Counter()
    for headline in train_headlines:
        word_counts.update(headline.split())


    min_freq = 2
    vocabulary = {'<PAD>': 0, '<UNK>': 1}
    vocab_idx = 2

    for word, count in word_counts.items():
        if count >= min_freq:
            vocabulary[word] = vocab_idx
            vocab_idx += 1

    print(f"Vocabulary size: {len(vocabulary)}")


    train_dataset = SotSentimentDataset(train_headlines, train_labels, vocabulary, label_to_idx)
    val_dataset = SotSentimentDataset(val_headlines, val_labels, vocabulary, label_to_idx)
    test_dataset = SotSentimentDataset(test_headlines, test_labels, vocabulary, label_to_idx)


    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)


    model = LSTMSentiment(
        vocab_size=len(vocabulary),
        embedding_dim=EMBEDDING_DIM,
        hidden_dim=HIDDEN_DIM,
        output_dim=len(unique_labels),
        n_layers=NUM_LAYERS,
        dropout=DROPOUT,
        pad_idx=vocabulary['<PAD>']
    ).to(device)


    print(f"Model architecture:\n{model}")


    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.CrossEntropyLoss()


    print("Starting training...")
    model = train_model(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        optimizer=optimizer,
        criterion=criterion,
        device=device,
        epochs=NUM_EPOCHS
    )


    torch.save(model.state_dict(), "best_sesotho_lstm_model.pt");


    model.load_state_dict(torch.load("best_sesotho_lstm_model.pt"))


    print("\nEvaluating on test set...")
    test_loss, test_accuracy, test_f1 = evaluate_model(
        model=model,
        test_loader=test_loader,
        criterion=criterion,
        device=device,
        label_list=list(idx_to_label.values())
    )


    results = {
        "test_loss": test_loss,
        "test_accuracy": test_accuracy,
        "test_f1": test_f1,
        "embedding_dim": EMBEDDING_DIM,
        "hidden_dim": HIDDEN_DIM,
        "num_layers": NUM_LAYERS,
        "dropout": DROPOUT,
        "learning_rate": LEARNING_RATE,
        "batch_size": BATCH_SIZE,
        "epochs": NUM_EPOCHS,
        "vocab_size": len(vocabulary)
    }


    pd.DataFrame([results]).to_csv("sesotho_lstm_results.csv", index=False)
    print(f"Results saved to sesotho_lstm_results.csv")

if __name__ == "__main__":
    main()

# We now create Baselines with pre-trained Multilingual transformers

## AfroXLMR

In [None]:
!pip install -U transformers datasets peft evaluate plotly --quiet

from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from datasets import load_dataset
from peft import LoraConfig, get_peft_model
import numpy as np
from sklearn.manifold import TSNE
import pandas as pd

import torch

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

# por_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "pt-MZ")
# swa_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "swa")
# sot_dataset=load_dataset("hamza-student-123/nlp-assignment-news-data",'sot')
#
# for ds in [por_dataset,swa_dataset,sot_dataset]: # Change to all three later
#     for lbl in ["train","validation","test"]:
#         if ds[lbl].column_names[0]== "tweet":
#             ds[lbl] = ds[lbl].rename_column("tweet","text")
#         else:
#             ds[lbl] = ds[lbl].rename_column("headline","text")
#
# por_df = por_dataset["train"].to_pandas()
# swa_df = swa_dataset["train"].to_pandas()
# sot_df = sot_dataset["train"].to_pandas()


#### Baseline with [AfroXLMR](https://huggingface.co/Davlan/afro-xlmr-large)

In [None]:
!pip install -U transformers datasets peft evaluate plotly sentencepiece --quiet


import torch
from torch.utils.data import DataLoader, Dataset
from transformers import XLMRobertaForSequenceClassification, XLMRobertaTokenizer
from datasets import load_dataset
# import torch_optimizer as optim
from transformers import get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
# import numpy as np
import pandas as pd
# import matplotlib.pyplot as plt
# import seaborn as sns
# from tqdm import tqdm
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SentimentDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }

def load_data(language):
    try:
        if language=='por':
            chosen_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "por",trust_remote_code=True)
        elif language=='swa':
            chosen_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "swa",trust_remote_code=True)
        elif language=='sot':
            chosen_dataset=load_dataset("hamza-student-123/nlp-assignment-news-data",'sot')
        else:
            raise Exception

        for ds in [chosen_dataset]: # Change to all three later
            for lbl in ["train","validation","test"]:
                if ds[lbl].column_names[0]== "tweet":
                    ds[lbl] = ds[lbl].rename_column("tweet","text")
                else:
                    ds[lbl] = ds[lbl].rename_column("headline","text")

        train_df  = chosen_dataset["train"].to_pandas()
        val_df  = chosen_dataset["validation"].to_pandas()
        test_df  = chosen_dataset["test"].to_pandas()
        logger.info(f"Data loaded successfully: {len(train_df)} training, {len(val_df)} validation, {len(test_df)} test examples")
        return train_df, val_df, test_df
    except Exception as e:
        logger.error(f"Error loading data: {str(e)}")
        raise e

# Function to create DataLoaders
def create_data_loaders(train_df, val_df, test_df, tokenizer, batch_size=16, text_column='text', label_column='label'):
    train_dataset = SentimentDataset(
        texts=train_df[text_column].tolist(),
        labels=train_df[label_column].tolist(),
        tokenizer=tokenizer
    )

    val_dataset = SentimentDataset(
        texts=val_df[text_column].tolist(),
        labels=val_df[label_column].tolist(),
        tokenizer=tokenizer
    )

    test_dataset = SentimentDataset(
        texts=test_df[text_column].tolist(),
        labels=test_df[label_column].tolist(),
        tokenizer=tokenizer
    )

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    return train_loader, val_loader, test_loader

def train_epoch(model, dataloader, optimizer, scheduler, device):
    model.train()
    epoch_loss = 0

    progress_bar = tqdm(dataloader, desc="Training")
    for batch in progress_bar:
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )

        loss = outputs.loss
        epoch_loss += loss.item()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

        progress_bar.set_postfix({"loss": f"{loss.item():.4f}"})

    return epoch_loss / len(dataloader)

def evaluate(model, dataloader, device):
    """
    Returns:
        Tuple of (loss, accuracy, precision, recall, f1)
    """
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )

            loss = outputs.loss
            total_loss += loss.item()

            logits = outputs.logits
            preds = torch.argmax(logits, dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted'
    )

    return total_loss / len(dataloader), accuracy, precision, recall, f1, all_preds, all_labels

def plot_confusion_matrix(true_labels, predictions, class_names):
    cm = confusion_matrix(true_labels, predictions)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.savefig('confusion_matrix.png')
    plt.close()

def evaluate_afro_xlmr_for_lang(language):
    config = {
        'model_name': 'Davlan/afro-xlmr-base',
        'num_labels': 3,
        'batch_size': 16,
        'learning_rate': 2e-5,
        'epochs': 3,
        'warmup_steps': 0,
        'max_grad_norm': 1.0,
        'text_column': 'text',
        'label_column': 'label',
        'class_names': ['negative', 'neutral', 'positive']
    }

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info(f"Using device: {device}")

    logger.info(f"Loading model: {config['model_name']}...")
    tokenizer = XLMRobertaTokenizer.from_pretrained(config['model_name'])
    model = XLMRobertaForSequenceClassification.from_pretrained(
        config['model_name'],
        num_labels=config['num_labels']
    )
    model.to(device)

    # Load data
    logger.info("Loading data...")
    train_df, val_df, test_df = load_data(f'{language}')

    # Create data loaders
    logger.info("Creating data loaders...")
    train_loader, val_loader, test_loader = create_data_loaders(
        train_df, val_df, test_df,
        tokenizer,
        batch_size=config['batch_size'],
        text_column=config['text_column'],
        label_column=config['label_column']
    )


    optimizer = torch.optim.AdamW(model.parameters(), lr=config['learning_rate'])
    total_steps = len(train_loader) * config['epochs']
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=config['warmup_steps'],
        num_training_steps=total_steps
    )

    logger.info("Starting training...")
    best_val_f1 = 0
    best_model_state = None

    for epoch in range(config['epochs']):
        logger.info(f"Epoch {epoch + 1}/{config['epochs']}")

        # Train
        start_time = time.time()
        train_loss = train_epoch(model, train_loader,optimizer,scheduler, device) # train_epoch(model, train_loader, scheduler, device)
        train_time = time.time() - start_time

        # Validate
        val_loss, val_accuracy, val_precision, val_recall, val_f1, _, _ = evaluate(model, val_loader, device)

        logger.info(f"Epoch {epoch + 1} results:")
        logger.info(f"Train Loss: {train_loss:.4f}, Time: {train_time:.2f}s")
        logger.info(f"Val Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1: {val_f1:.4f}")

        # Save best model
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            best_model_state = model.state_dict().copy()
            logger.info(f"New best model with F1: {best_val_f1:.4f}")

    # Load best model for testing
    if best_model_state:
        logger.info("Loading best model for testing...")
        model.load_state_dict(best_model_state)

    # Test evaluation
    logger.info("Evaluating on test set...")
    test_loss, test_accuracy, test_precision, test_recall, test_f1, test_preds, test_labels = evaluate(model, test_loader, device)

    logger.info(f"Test Results:")
    logger.info(f"Loss: {test_loss:.4f}")
    logger.info(f"Accuracy: {test_accuracy:.4f}")
    logger.info(f"Precision: {test_precision:.4f}")
    logger.info(f"Recall: {test_recall:.4f}")
    logger.info(f"F1 Score: {test_f1:.4f}")

    results_df_afro = {
    "test_loss": test_loss,
    "test_accuracy": test_accuracy,
    "test_f1": test_f1,
    "test_precision": test_precision,
    "test_recall": test_recall,
    "epochs": config['epochs'],
    "learning_rate": config['learning_rate'],
    "batch_size": config['batch_size'],
}

    path = f"afroxlmr_results_{language}.csv"
    pd.DataFrame([results_df_afro]).to_csv(path, index=False)
    print(f"Results saved to {path}")

    # Save model
    logger.info("Saving model...")
    model_save_path = f'./xmlr_sentiment_model_{language}'
    model.save_pretrained(model_save_path)
    tokenizer.save_pretrained(model_save_path)
    logger.info(f"Model saved to {model_save_path}")

# evaluate_afro_xlmr()
langs = ['por','swa']
# langs = ['sot']
for l in langs:
    evaluate_afro_xlmr_for_lang(l)

##### Evaluation for Sesotho

In [None]:
!pip install -U transformers datasets peft evaluate plotly sentencepiece --quiet


import torch
from torch.utils.data import DataLoader, Dataset
from transformers import XLMRobertaForSequenceClassification, XLMRobertaTokenizer
from datasets import load_dataset
# import torch_optimizer as optim
from transformers import get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import time
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SentimentDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.label_mapping = {"negative": 0, "neutral": 1, "positive": 2}  # Label conversion

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.label_mapping[self.labels[idx]]

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }

def load_data(language):
    try:
        if language=='por':
            chosen_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "por",trust_remote_code=True)
        elif language=='swa':
            chosen_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "swa",trust_remote_code=True)
        elif language=='sot':
            chosen_dataset=load_dataset("hamza-student-123/nlp-assignment-news-data",'sot')
        else:
            raise Exception

        for ds in [chosen_dataset]: # Change to all three later
            for lbl in ["train","validation","test"]:
                if ds[lbl].column_names[0]== "tweet":
                    ds[lbl] = ds[lbl].rename_column("tweet","text")
                else:
                    ds[lbl] = ds[lbl].rename_column("headline","text")

        train_df  = chosen_dataset["train"].to_pandas()
        val_df  = chosen_dataset["validation"].to_pandas()
        test_df  = chosen_dataset["test"].to_pandas()
        logger.info(f"Data loaded successfully: {len(train_df)} training, {len(val_df)} validation, {len(test_df)} test examples")
        return train_df, val_df, test_df
    except Exception as e:
        logger.error(f"Error loading data: {str(e)}")
        raise e

# Function to create DataLoaders
def create_data_loaders(train_df, val_df, test_df, tokenizer, batch_size=16, text_column='text', label_column='label'):
    train_dataset = SentimentDataset(
        texts=train_df[text_column].tolist(),
        labels=train_df[label_column].tolist(),
        tokenizer=tokenizer
    )

    val_dataset = SentimentDataset(
        texts=val_df[text_column].tolist(),
        labels=val_df[label_column].tolist(),
        tokenizer=tokenizer
    )

    test_dataset = SentimentDataset(
        texts=test_df[text_column].tolist(),
        labels=test_df[label_column].tolist(),
        tokenizer=tokenizer
    )

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    return train_loader, val_loader, test_loader

def train_epoch(model, dataloader, optimizer, scheduler, device):
    model.train()
    epoch_loss = 0

    progress_bar = tqdm(dataloader, desc="Training")
    for batch in progress_bar:
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )

        loss = outputs.loss
        epoch_loss += loss.item()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

        progress_bar.set_postfix({"loss": f"{loss.item():.4f}"})

    return epoch_loss / len(dataloader)

def evaluate(model, dataloader, device):
    """
    Returns:
        Tuple of (loss, accuracy, precision, recall, f1)
    """
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )

            loss = outputs.loss
            total_loss += loss.item()

            logits = outputs.logits
            preds = torch.argmax(logits, dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted'
    )

    return total_loss / len(dataloader), accuracy, precision, recall, f1, all_preds, all_labels

def plot_confusion_matrix(true_labels, predictions, class_names):
    cm = confusion_matrix(true_labels, predictions)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.savefig('confusion_matrix.png')
    plt.close()

def evaluate_afro_xlmr_for_lang(language):
    config = {
        'model_name': 'Davlan/afro-xlmr-base',
        'num_labels': 3,
        'batch_size': 16,
        'learning_rate': 2e-5,
        'epochs': 3,
        'warmup_steps': 0,
        'max_grad_norm': 1.0,
        'text_column': 'text',
        'label_column': 'label',
        'class_names': ['negative', 'neutral', 'positive']
    }

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info(f"Using device: {device}")

    logger.info(f"Loading model: {config['model_name']}...")
    tokenizer = XLMRobertaTokenizer.from_pretrained(config['model_name'])
    model = XLMRobertaForSequenceClassification.from_pretrained(
        config['model_name'],
        num_labels=config['num_labels']
    )
    model.to(device)

    # Load data
    logger.info("Loading data...")
    train_df, val_df, test_df = load_data(f'{language}')

    # Create data loaders
    logger.info("Creating data loaders...")
    train_loader, val_loader, test_loader = create_data_loaders(
        train_df, val_df, test_df,
        tokenizer,
        batch_size=config['batch_size'],
        text_column=config['text_column'],
        label_column=config['label_column']
    )


    optimizer = torch.optim.AdamW(model.parameters(), lr=config['learning_rate'])
    total_steps = len(train_loader) * config['epochs']
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=config['warmup_steps'],
        num_training_steps=total_steps
    )

    logger.info("Starting training...")
    best_val_f1 = 0
    best_model_state = None

    for epoch in range(config['epochs']):
        logger.info(f"Epoch {epoch + 1}/{config['epochs']}")

        # Train
        start_time = time.time()
        train_loss = train_epoch(model, train_loader,optimizer,scheduler, device) # train_epoch(model, train_loader, scheduler, device)
        train_time = time.time() - start_time

        # Validate
        val_loss, val_accuracy, val_precision, val_recall, val_f1, _, _ = evaluate(model, val_loader, device)

        logger.info(f"Epoch {epoch + 1} results:")
        logger.info(f"Train Loss: {train_loss:.4f}, Time: {train_time:.2f}s")
        logger.info(f"Val Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1: {val_f1:.4f}")

        # Save best model
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            best_model_state = model.state_dict().copy()
            logger.info(f"New best model with F1: {best_val_f1:.4f}")

    # Load best model for testing
    if best_model_state:
        logger.info("Loading best model for testing...")
        model.load_state_dict(best_model_state)

    # Test evaluation
    logger.info("Evaluating on test set...")
    test_loss, test_accuracy, test_precision, test_recall, test_f1, test_preds, test_labels = evaluate(model, test_loader, device)

    logger.info(f"Test Results:")
    logger.info(f"Loss: {test_loss:.4f}")
    logger.info(f"Accuracy: {test_accuracy:.4f}")
    logger.info(f"Precision: {test_precision:.4f}")
    logger.info(f"Recall: {test_recall:.4f}")
    logger.info(f"F1 Score: {test_f1:.4f}")

    results_df_afro = {
    "test_loss": test_loss,
    "test_accuracy": test_accuracy,
    "test_f1": test_f1,
    "test_precision": test_precision,
    "test_recall": test_recall,
    "epochs": config['epochs'],
    "learning_rate": config['learning_rate'],
    "batch_size": config['batch_size'],
}

    path = f"afroxlmr_results_{language}.csv"
    pd.DataFrame([results_df_afro]).to_csv(path, index=False)
    print(f"Results saved to {path}")

    # Save model
    logger.info("Saving model...")
    model_save_path = f'./xmlr_sentiment_model_{language}'
    model.save_pretrained(model_save_path)
    tokenizer.save_pretrained(model_save_path)
    logger.info(f"Model saved to {model_save_path}")

langs = ['sot']
for l in langs:
    evaluate_afro_xlmr_for_lang(l)

## mBERT

### Baseline with [mBERT](https://huggingface.co/google-bert/bert-base-multilingual-cased)


In [None]:
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset, Dataset, DatasetDict
import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import os
import time
import json
import random
from lime.lime_text import LimeTextExplainer
import torch.nn.functional as F

run_timestamp = time.strftime("%Y%m%d_%H%M%S")

os.makedirs("./benchmark_results", exist_ok=True)
os.makedirs("./lime_explanations", exist_ok=True)

output_csv = os.path.abspath(f"./benchmark_results/mbert_benchmark_results_{run_timestamp}.csv")
explanations_dir = os.path.abspath(f"./lime_explanations/explanations_{run_timestamp}")
os.makedirs(explanations_dir, exist_ok=True)

results_df = pd.DataFrame(columns=[
    "Dataset", "Model", "Loss", "Accuracy", "F1", "Precision", "Recall"
])

try:
    swa_dataset = load_dataset("masakhane/afrisenti", "swa")
except Exception as e:
    swa_dataset = None

try:
    por_dataset = load_dataset("masakhane/afrisenti", "por")
except Exception as e:
    por_dataset = None

try:
    sot_dataset = load_dataset("csv", data_files={"train": "./datasets/sotho-news/sotho_news_dataset.csv"})
except Exception as e:
    sot_dataset = None

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

class ModelPredictor:
    """Wrapper class for LIME explanations"""
    def __init__(self, model, tokenizer, device, num_labels):
        self.model = model
        self.tokenizer = tokenizer
        self.device = device
        self.num_labels = num_labels
        self.model.eval()
    
    def predict_proba(self, texts):
        """Predict probabilities for LIME"""
        if isinstance(texts, str):
            texts = [texts]
        
        inputs = self.tokenizer(
            texts,
            padding=True,
            truncation=True,
            max_length=128,
            return_tensors="pt"
        ).to(self.device)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            probas = F.softmax(outputs.logits, dim=-1)
        
        return probas.cpu().numpy()

def generate_lime_explanations(model_predictor, test_texts, test_labels, dataset_name, 
                              label_names=None, num_samples=10):
    """Generate LIME explanations for sample predictions"""
    
    explainer = LimeTextExplainer(class_names=label_names or [f"Class_{i}" for i in range(3)])
    
    sample_indices = random.sample(range(len(test_texts)), min(num_samples, len(test_texts)))
    explanations_data = []
    
    print(f"\nGenerating LIME explanations for {dataset_name}...")
    
    for i, idx in enumerate(sample_indices):
        try:
            text = test_texts[idx]
            true_label = test_labels[idx]
            
            pred_proba = model_predictor.predict_proba([text])[0]
            pred_label = np.argmax(pred_proba)
            
            exp = explainer.explain_instance(
                text, 
                model_predictor.predict_proba, 
                num_features=10,
                num_samples=1000
            )
            
            explanation_data = {
                'sample_id': idx,
                'text': text,
                'true_label': int(true_label),
                'predicted_label': int(pred_label),
                'prediction_probability': float(pred_proba[pred_label]),
                'all_probabilities': pred_proba.tolist(),
                'lime_explanation': []
            }
            
            for feature, importance in exp.as_list():
                explanation_data['lime_explanation'].append({
                    'feature': feature,
                    'importance': float(importance)
                })
            
            explanations_data.append(explanation_data)
            
            html_file = os.path.join(explanations_dir, f"{dataset_name}_sample_{idx}_explanation.html")
            exp.save_to_file(html_file)
            
            print(f"  Generated explanation {i+1}/{len(sample_indices)} for sample {idx}")
            
        except Exception as e:
            print(f"  Error generating explanation for sample {idx}: {str(e)}")
            continue
    
    # Save explanations as JSON
    json_file = os.path.join(explanations_dir, f"{dataset_name}_lime_explanations.json")
    with open(json_file, 'w', encoding='utf-8') as f:
        json.dump(explanations_data, f, indent=2, ensure_ascii=False)
    
    return explanations_data

def get_benchmark_metrics(dataset_name, dataset, num_labels):

    if dataset is None:
        return None, None

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    tokenizer = AutoTokenizer.from_pretrained("bert-base-multilingual-cased")
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-multilingual-cased",
        num_labels=num_labels
    ).to(device)

    text_column = None
    label_column = None

    text_candidates = ["text", "content", "tweet", "sentence", "document"]
    for split in dataset:
        columns = dataset[split].column_names
        for candidate in text_candidates:
            if candidate in columns:
                text_column = candidate
                break
        if text_column:
            break

    label_candidates = ["label", "sentiment", "class", "category"]
    for split in dataset:
        columns = dataset[split].column_names
        for candidate in label_candidates:
            if candidate in columns:
                label_column = candidate
                break
        if label_column:
            break

    if text_column is None:
        for split in dataset:
            for col in dataset[split].column_names:
                if isinstance(dataset[split][col][0], str):
                    text_column = col
                    break
            if text_column:
                break

    if text_column is None:
        return None, None

    if label_column is None:
        return None, None

    required_splits = ["train", "validation", "test"]
    missing_splits = [split for split in required_splits if split not in dataset]

    if missing_splits:
        if "train" in dataset:
            train_valid_test = {}

            for split in dataset:
                if split in required_splits:
                    train_valid_test[split] = dataset[split]

            if "train" in dataset and ("validation" not in train_valid_test or "test" not in train_valid_test):
                if "validation" not in train_valid_test:
                    if "test" not in train_valid_test:
                        split_datasets = dataset["train"].train_test_split(test_size=0.2, seed=42)
                        test_valid_split = split_datasets["test"].train_test_split(test_size=0.5, seed=42)
                        train_valid_test["train"] = split_datasets["train"]
                        train_valid_test["validation"] = test_valid_split["train"]
                        train_valid_test["test"] = test_valid_split["test"]
                    else:
                        split_datasets = dataset["train"].train_test_split(test_size=0.1, seed=42)
                        train_valid_test["train"] = split_datasets["train"]
                        train_valid_test["validation"] = split_datasets["test"]
                else:
                    split_datasets = dataset["train"].train_test_split(test_size=0.1, seed=42)
                    train_valid_test["test"] = split_datasets["test"]

            dataset = DatasetDict(train_valid_test)
        else:
            return None, None

    label_mapping = None
    label_names = None
    for split in dataset:
        if isinstance(dataset[split][label_column][0], str):
            all_labels = set()
            for example in dataset[split][label_column]:
                all_labels.add(example)

            sorted_labels = sorted(all_labels)
            label_mapping = {label: i for i, label in enumerate(sorted_labels)}
            label_names = sorted_labels
            break

    processed_dataset = DatasetDict()
    for split_name, split_dataset in dataset.items():
        texts = split_dataset[text_column]

        if label_mapping:
            labels = [label_mapping[label] for label in split_dataset[label_column]]
        else:
            labels = []
            for label in split_dataset[label_column]:
                if isinstance(label, (int, np.integer)):
                    labels.append(int(label))
                elif isinstance(label, str):
                    try:
                        labels.append(int(label))
                    except ValueError:
                        labels.append(0)
                else:
                    labels.append(0)

        processed_dataset[split_name] = Dataset.from_dict({
            "text": texts,
            "label": labels
        })

    def tokenize_function(examples):
        return tokenizer(
            examples["text"],
            padding="max_length",
            truncation=True,
            max_length=128,
            return_tensors="pt"
        )

    tokenized_dataset = DatasetDict()
    for split_name, split_dataset in processed_dataset.items():
        tokenized_split = split_dataset.map(
            tokenize_function,
            batched=True,
            remove_columns=["text"]
        )
        tokenized_dataset[split_name] = tokenized_split

        for required_col in ["input_ids", "attention_mask", "label"]:
            if required_col not in tokenized_split.column_names:
                return None, None

    eval_args = TrainingArguments(
        output_dir=f"./benchmark_results/{dataset_name}_{run_timestamp}",
        per_device_eval_batch_size=16,
        logging_dir=f"./benchmark_results/{dataset_name}_{run_timestamp}/logs",
        report_to="none",
        remove_unused_columns=True
    )

    trainer = Trainer(
        model=model,
        args=eval_args,
        compute_metrics=compute_metrics,
    )

    try:
        benchmark_results = trainer.evaluate(eval_dataset=tokenized_dataset["test"])
        
        model_predictor = ModelPredictor(model, tokenizer, device, num_labels)
        test_texts = processed_dataset["test"]["text"]
        test_labels = processed_dataset["test"]["label"]
        
        lime_explanations = generate_lime_explanations(
            model_predictor, test_texts, test_labels, dataset_name, 
            label_names=label_names, num_samples=10
        )

        return benchmark_results, lime_explanations
    except Exception as e:
        print(f"Error in benchmarking {dataset_name}: {str(e)}")
        return None, None

available_datasets = []
if swa_dataset:
    available_datasets.append(("Swahili", swa_dataset, 3))
if por_dataset:
    available_datasets.append(("Portuguese", por_dataset, 3))
if sot_dataset:
    available_datasets.append(("Sesotho", sot_dataset, 3))

all_results = []
all_explanations = {}

print("Starting benchmarking with LIME explanations...")

for dataset_name, dataset, num_labels in available_datasets:
    try:
        print(f"\nProcessing {dataset_name} dataset...")
        results, explanations = get_benchmark_metrics(dataset_name, dataset, num_labels)

        if results:
            results_df = results_df._append({
                "Dataset": dataset_name,
                "Model": "mBERT",
                "Loss": results.get("eval_loss"),
                "Accuracy": results.get("eval_accuracy"),
                "F1": results.get("eval_f1"),
                "Precision": results.get("eval_precision"),
                "Recall": results.get("eval_recall")
            }, ignore_index=True)

            all_results.append({"Dataset": dataset_name, "Results": results})
            if explanations:
                all_explanations[dataset_name] = explanations
        else:
            print(f"Failed to process {dataset_name} dataset")
    except Exception as e:
        print(f"Error processing {dataset_name}: {str(e)}")

try:
    results_df.to_csv(output_csv, index=False)
    print(f"\nResults saved to: {output_csv}")
except Exception as e:
    emergency_path = f"./emergency_results_{run_timestamp}.csv"
    results_df.to_csv(emergency_path, index=False)
    print(f"\nEmergency results saved to: {emergency_path}")

if all_explanations:
    summary_file = os.path.join(explanations_dir, "explanations_summary.json")
    explanation_summary = {}
    
    for dataset_name, explanations in all_explanations.items():
        summary_stats = {
            'total_explanations': len(explanations),
            'average_prediction_confidence': np.mean([exp['prediction_probability'] for exp in explanations]),
            'correct_predictions': sum(1 for exp in explanations if exp['true_label'] == exp['predicted_label']),
            'top_important_features': {}
        }
        
        feature_importance = {}
        for exp in explanations:
            for feature_data in exp['lime_explanation']:
                feature = feature_data['feature']
                importance = abs(feature_data['importance'])
                if feature in feature_importance:
                    feature_importance[feature].append(importance)
                else:
                    feature_importance[feature] = [importance]
        
        avg_feature_importance = {
            feature: np.mean(importances) 
            for feature, importances in feature_importance.items()
        }
        
        sorted_features = sorted(avg_feature_importance.items(), key=lambda x: x[1], reverse=True)
        summary_stats['top_important_features'] = dict(sorted_features[:10])
        
        explanation_summary[dataset_name] = summary_stats
    
    with open(summary_file, 'w', encoding='utf-8') as f:
        json.dump(explanation_summary, f, indent=2, ensure_ascii=False)
    


print("\nResults:")
print(results_df)

### Fine-Tuning BERT


In [None]:
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset, Dataset, DatasetDict
import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import os
import logging
import time
import sys
import json
import shutil

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

try:
    swa_dataset = load_dataset("masakhane/afrisenti", "swa")
    logger.info("Successfully loaded Swahili dataset from HuggingFace Hub")
except Exception as e:
    logger.error(f"Failed to load Swahili dataset: {e}")
    swa_dataset = None

try:
    por_dataset = load_dataset("masakhane/afrisenti", "por")
    logger.info("Successfully loaded Portuguese dataset from HuggingFace Hub")
except Exception as e:
    logger.error(f"Failed to load Portuguese dataset: {e}")
    por_dataset = None

try:
    sot_dataset = load_dataset("csv", data_files={"train": "./datasets/sotho-news/sotho_news_dataset.csv"})
    logger.info("Successfully loaded Sesotho dataset from CSV")
except Exception as e:
    logger.error(f"Failed to load Sesotho dataset: {e}")
    sot_dataset = None

def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

def finetune_and_evaluate(dataset_name, dataset, num_labels):
    logger.info(f"Starting fine-tuning for {dataset_name}")

    if dataset is None:
        logger.error(f"Dataset {dataset_name} is None, cannot proceed with fine-tuning")
        return None

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info(f"Using device: {device}")

    model_save_dir = f"./models/{dataset_name.lower()}"
    os.makedirs(model_save_dir, exist_ok=True)

    tokenizer = AutoTokenizer.from_pretrained("bert-base-multilingual-cased")
    model = AutoModelForSequenceClassification.from_pretrained(
        "bert-base-multilingual-cased",
        num_labels=num_labels
    ).to(device)

    logger.info(f"Dataset {dataset_name} structure:")
    for split in dataset:
        logger.info(f"  - Split: {split}, Examples: {len(dataset[split])}")
        logger.info(f"  - Features: {dataset[split].features}")
        logger.info(f"  - Columns: {dataset[split].column_names}")

    text_column = None
    label_column = None

    text_candidates = ["text", "content", "tweet", "sentence", "document", "headline"]
    for split in dataset:
        columns = dataset[split].column_names
        for candidate in text_candidates:
            if candidate in columns:
                text_column = candidate
                break
        if text_column:
            break

    label_candidates = ["label", "sentiment", "class", "category"]
    for split in dataset:
        columns = dataset[split].column_names
        for candidate in label_candidates:
            if candidate in columns:
                label_column = candidate
                break
        if label_column:
            break

    logger.info(f"For {dataset_name}, using text_column={text_column}, label_column={label_column}")

    if text_column is None:
        for split in dataset:
            for col in dataset[split].column_names:
                if isinstance(dataset[split][col][0], str):
                    text_column = col
                    logger.info(f"Using '{col}' as text column based on string data")
                    break
            if text_column:
                break

    if text_column is None:
        logger.error(f"Could not identify a text column for {dataset_name}")
        return None

    if label_column is None:
        logger.error(f"Could not identify a label column for {dataset_name}")
        return None

    required_splits = ["train", "validation", "test"]
    missing_splits = [split for split in required_splits if split not in dataset]

    if missing_splits:
        logger.info(f"Creating missing splits: {missing_splits} for {dataset_name}")
        if "train" in dataset:
            train_valid_test = {}

            for split in dataset:
                if split in required_splits:
                    train_valid_test[split] = dataset[split]

            if "train" in dataset and ("validation" not in train_valid_test or "test" not in train_valid_test):
                if "validation" not in train_valid_test:
                    if "test" not in train_valid_test:
                        split_datasets = dataset["train"].train_test_split(test_size=0.2, seed=42)
                        test_valid_split = split_datasets["test"].train_test_split(test_size=0.5, seed=42)
                        train_valid_test["train"] = split_datasets["train"]
                        train_valid_test["validation"] = test_valid_split["train"]
                        train_valid_test["test"] = test_valid_split["test"]
                    else:
                        split_datasets = dataset["train"].train_test_split(test_size=0.1, seed=42)
                        train_valid_test["train"] = split_datasets["train"]
                        train_valid_test["validation"] = split_datasets["test"]
                else:
                    split_datasets = dataset["train"].train_test_split(test_size=0.1, seed=42)
                    train_valid_test["test"] = split_datasets["test"]

            dataset = DatasetDict(train_valid_test)
        else:
            logger.error(f"Dataset {dataset_name} has no train split and cannot create splits")
            return None

    label_mapping = None
    for split in dataset:
        if isinstance(dataset[split][label_column][0], str):
            all_labels = set()
            for example in dataset[split][label_column]:
                all_labels.add(example)

            label_mapping = {label: i for i, label in enumerate(sorted(all_labels))}
            logger.info(f"Created label mapping for {dataset_name}: {label_mapping}")
            break

    processed_dataset = DatasetDict()
    for split_name, split_dataset in dataset.items():
        texts = split_dataset[text_column]

        if label_mapping:
            labels = [label_mapping[label] for label in split_dataset[label_column]]
        else:
            labels = []
            for label in split_dataset[label_column]:
                if isinstance(label, (int, np.integer)):
                    labels.append(int(label))
                elif isinstance(label, str):
                    try:
                        labels.append(int(label))
                    except ValueError:
                        logger.warning(f"Unexpected string label found in {split_name}: {label}")
                        labels.append(0)
                else:
                    logger.warning(f"Unexpected label type in {split_name}: {type(label)}")
                    labels.append(0)

        processed_dataset[split_name] = Dataset.from_dict({
            "text": texts,
            "label": labels
        })

        logger.info(f"Processed {split_name} split: {len(processed_dataset[split_name])} examples")

    def tokenize_function(examples):
        return tokenizer(
            examples["text"],
            padding="max_length",
            truncation=True,
            max_length=128,
        )

    tokenized_dataset = DatasetDict()
    for split_name, split_dataset in processed_dataset.items():
        tokenized_split = split_dataset.map(
            tokenize_function,
            batched=True,
            remove_columns=["text"]
        )
        tokenized_dataset[split_name] = tokenized_split

        logger.info(f"Tokenized {split_name} split: {len(tokenized_split)} examples")
        logger.info(f"Columns after tokenization: {tokenized_split.column_names}")

        for required_col in ["input_ids", "attention_mask", "label"]:
            if required_col not in tokenized_split.column_names:
                logger.error(f"Required column {required_col} missing after tokenization")
                return None

    model_output_dir = f"./tmp_model_dir_{dataset_name}"
    if os.path.exists(model_output_dir):
        shutil.rmtree(model_output_dir)
    os.makedirs(model_output_dir, exist_ok=True)

    training_args = TrainingArguments(
        output_dir=model_output_dir,
        learning_rate=2e-5,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        num_train_epochs=3,
        weight_decay=0.01,
        logging_dir="./tmp_logs",
        logging_steps=10,
        save_strategy="no",
        save_steps=1000000,
        eval_steps=100,
        do_eval=True,
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset["train"],
        eval_dataset=tokenized_dataset["validation"],
        compute_metrics=compute_metrics,
    )

    logger.info(f"Starting fine-tuning {dataset_name} with {len(tokenized_dataset['train'])} examples")
    start_time = time.time()

    try:
        try:
            train_output = trainer.train()

            training_loss = None
            try:
                if hasattr(train_output, "metrics") and "loss" in train_output.metrics:
                    training_loss = train_output.metrics["loss"]
                elif isinstance(train_output, dict) and "loss" in train_output:
                    training_loss = train_output["loss"]

                if training_loss is None and hasattr(trainer, "state"):
                    if hasattr(trainer.state, "log_history") and trainer.state.log_history:
                        for log in reversed(trainer.state.log_history):
                            if "loss" in log:
                                training_loss = log["loss"]
                                break
            except Exception as e:
                logger.warning(f"Could not extract training loss: {e}")
        except RuntimeError as e:
            if "PytorchStreamWriter failed writing file" in str(e):
                logger.warning("Caught PyTorch serialization error during training. Will proceed with model saving.")

            else:

                raise

        try:
            model_path = os.path.join(model_save_dir, "model.pt")
            torch.save(model.state_dict(), model_path)
            logger.info(f"Model saved to {model_path}")
        except RuntimeError as e:
            if "PytorchStreamWriter failed writing file" in str(e):
                logger.warning(f"PyTorch serialization error when saving model state dict. Trying alternative method.")

                try:

                    with open(model_path, 'wb') as f:
                        torch.save(model.state_dict(), f, _use_new_zipfile_serialization=False)
                    logger.info(f"Model saved to {model_path} using legacy serialization")
                except Exception as e2:
                    logger.error(f"Failed to save model with alternative method: {e2}")
            else:

                raise

        tokenizer.save_pretrained(model_save_dir)
        logger.info(f"Tokenizer saved to {model_save_dir}")

        if label_mapping:
            label_mapping_path = os.path.join(model_save_dir, "label_mapping.json")
            with open(label_mapping_path, "w") as f:

                json_mapping = {str(k): int(v) for k, v in label_mapping.items()}
                json.dump(json_mapping, f, indent=2)
            logger.info(f"Label mapping saved to {label_mapping_path}")

        model_config = {
            "base_model": "bert-base-multilingual-cased",
            "num_labels": num_labels,
            "text_column": text_column,
            "label_column": label_column,
            "max_length": 128,

            "dataset_name": dataset_name,
        }

        config_path = os.path.join(model_save_dir, "config.json")
        with open(config_path, "w") as f:
            json.dump(model_config, f, indent=2)
        logger.info(f"Model config saved to {config_path}")

        training_time = time.time() - start_time
        logger.info(f"Training completed in {training_time:.2f} seconds")

        logger.info(f"Evaluating on validation set with {len(tokenized_dataset['validation'])} examples")
        validation_results = trainer.evaluate(eval_dataset=tokenized_dataset["validation"])

        logger.info(f"Evaluating on test set with {len(tokenized_dataset['test'])} examples")
        test_results = trainer.evaluate(eval_dataset=tokenized_dataset["test"])


        processed_validation = {}
        for k, v in validation_results.items():
            if isinstance(v, (int, float, str, bool)) or v is None:
                processed_validation[k] = v
            else:
                processed_validation[k] = float(v)

        processed_test = {}
        for k, v in test_results.items():
            if isinstance(v, (int, float, str, bool)) or v is None:
                processed_test[k] = v
            else:
                processed_test[k] = float(v)

        metrics = {
            "training_time": float(training_time),
            "training_loss": float(training_loss) if training_loss is not None else None,
            "validation_results": processed_validation,
            "test_results": processed_test,
        }

        metrics_path = os.path.join(model_save_dir, "metrics.json")
        with open(metrics_path, "w") as f:
            json.dump(metrics, f, indent=2)
        logger.info(f"Model metrics saved to {metrics_path}")

        logger.info(f"============== RESULTS FOR {dataset_name} ==============")
        logger.info(f"Training loss: {training_loss}")
        logger.info(f"Validation results: {validation_results}")
        logger.info(f"Test results: {test_results}")

        return {
            "dataset": dataset_name,
            "training_time": training_time,
            "training_loss": training_loss,
            "validation_results": validation_results,
            "test_results": test_results,
            "model_path": model_path,
        }
    except Exception as e:
        logger.error(f"Fine-tuning failed for {dataset_name}: {e}", exc_info=True)
        return None

available_datasets = []
if swa_dataset:
    available_datasets.append(("Swahili", swa_dataset, 3))
if por_dataset:
    available_datasets.append(("Portuguese", por_dataset, 3))
if sot_dataset:
    available_datasets.append(("Sesotho", sot_dataset, 3))

logger.info(f"Available datasets: {[name for name, _, _ in available_datasets]}")

all_results = []
for dataset_name, dataset, num_labels in available_datasets:
    logger.info(f"\n==== FINE-TUNING ON {dataset_name.upper()} DATASET ====")
    try:
        results = finetune_and_evaluate(dataset_name, dataset, num_labels)

        if results:
            all_results.append(results)
        else:
            logger.warning(f"No results returned for {dataset_name}")
    except Exception as e:
        logger.error(f"Unexpected error during fine-tuning {dataset_name}: {e}", exc_info=True)

logger.info("\n==== FINE-TUNING COMPLETE ====")

print(f"{'Dataset':<12} | {'Acc (val)':<10} | {'F1 (val)':<10} | {'Acc (test)':<10} | {'F1 (test)':<10} | {'Model Dir':<20}")

for result in all_results:
    dataset = result["dataset"]
    val_acc = result["validation_results"].get("eval_accuracy", float('nan'))
    val_f1 = result["validation_results"].get("eval_f1", float('nan'))
    test_acc = result["test_results"].get("eval_accuracy", float('nan'))
    test_f1 = result["test_results"].get("eval_f1", float('nan'))
    model_dir = f"./models/{dataset.lower()}"

    print(f"{dataset:<12} | {val_acc:<10.4f} | {val_f1:<10.4f} | {test_acc:<10.4f} | {test_f1:<10.4f} | {model_dir}")

## XLM-RoBERTa

### Baseline with [XLM-RoBERTa](https://huggingface.co/docs/transformers/en/model_doc/xlm-roberta)


In [None]:
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset, Dataset, DatasetDict
import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import os
import time

run_timestamp = time.strftime("%Y%m%d_%H%M%S")

os.makedirs("./benchmark_results", exist_ok=True)

output_csv = os.path.abspath(f"./benchmark_results/xlmroberta_benchmark_results_{run_timestamp}.csv")

results_df = pd.DataFrame(columns=[
    "Dataset", "Model", "Loss", "Accuracy", "F1", "Precision", "Recall"
])

try:
    swa_dataset = load_dataset("masakhane/afrisenti", "swa")
except Exception as e:
    swa_dataset = None

try:
    por_dataset = load_dataset("masakhane/afrisenti", "por")
except Exception as e:
    por_dataset = None

try:
    sot_dataset = load_dataset("csv", data_files={"train": "./datasets/sotho-news/sotho_news_dataset.csv"})
except Exception as e:
    sot_dataset = None
    
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

def get_benchmark_metrics(dataset_name, dataset, num_labels):
    
    if dataset is None:
        return None
        
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-base")
    model = AutoModelForSequenceClassification.from_pretrained(
        "xlm-roberta-base", 
        num_labels=num_labels
    ).to(device)
    
    text_column = None
    label_column = None
    
    text_candidates = ["text", "content", "tweet", "sentence", "document"]
    for split in dataset:
        columns = dataset[split].column_names
        for candidate in text_candidates:
            if candidate in columns:
                text_column = candidate
                break
        if text_column:
            break
    
    label_candidates = ["label", "sentiment", "class", "category"]
    for split in dataset:
        columns = dataset[split].column_names
        for candidate in label_candidates:
            if candidate in columns:
                label_column = candidate
                break
        if label_column:
            break
    
    if text_column is None:
        for split in dataset:
            for col in dataset[split].column_names:
                if isinstance(dataset[split][col][0], str):
                    text_column = col
                    break
            if text_column:
                break
    
    if text_column is None:
        return None
    
    if label_column is None:
        return None
    
    required_splits = ["train", "validation", "test"]
    missing_splits = [split for split in required_splits if split not in dataset]
    
    if missing_splits:
        if "train" in dataset:
            train_valid_test = {}
            
            for split in dataset:
                if split in required_splits:
                    train_valid_test[split] = dataset[split]
            
            if "train" in dataset and ("validation" not in train_valid_test or "test" not in train_valid_test):
                if "validation" not in train_valid_test:
                    if "test" not in train_valid_test:
                        split_datasets = dataset["train"].train_test_split(test_size=0.2, seed=42)
                        test_valid_split = split_datasets["test"].train_test_split(test_size=0.5, seed=42)
                        train_valid_test["train"] = split_datasets["train"]
                        train_valid_test["validation"] = test_valid_split["train"]
                        train_valid_test["test"] = test_valid_split["test"]
                    else:
                        split_datasets = dataset["train"].train_test_split(test_size=0.1, seed=42)
                        train_valid_test["train"] = split_datasets["train"]
                        train_valid_test["validation"] = split_datasets["test"]
                else:
                    split_datasets = dataset["train"].train_test_split(test_size=0.1, seed=42)
                    train_valid_test["test"] = split_datasets["test"]
            
            dataset = DatasetDict(train_valid_test)
        else:
            return None
    
    label_mapping = None
    for split in dataset:
        if isinstance(dataset[split][label_column][0], str):
            all_labels = set()
            for example in dataset[split][label_column]:
                all_labels.add(example)
            
            label_mapping = {label: i for i, label in enumerate(sorted(all_labels))}
            break
    
    processed_dataset = DatasetDict()
    for split_name, split_dataset in dataset.items():
        texts = split_dataset[text_column]
        
        if label_mapping:
            labels = [label_mapping[label] for label in split_dataset[label_column]]
        else:
            labels = []
            for label in split_dataset[label_column]:
                if isinstance(label, (int, np.integer)):
                    labels.append(int(label))
                elif isinstance(label, str):
                    try:
                        labels.append(int(label))
                    except ValueError:
                        labels.append(0)
                else:
                    labels.append(0)
        
        processed_dataset[split_name] = Dataset.from_dict({
            "text": texts,
            "label": labels
        })
        
    def tokenize_function(examples):
        return tokenizer(
            examples["text"], 
            padding="max_length", 
            truncation=True, 
            max_length=128,
            return_tensors="pt"
        )
    
    tokenized_dataset = DatasetDict()
    for split_name, split_dataset in processed_dataset.items():
        tokenized_split = split_dataset.map(
            tokenize_function, 
            batched=True, 
            remove_columns=["text"]
        )
        tokenized_dataset[split_name] = tokenized_split
        
        for required_col in ["input_ids", "attention_mask", "label"]:
            if required_col not in tokenized_split.column_names:
                return None
    
    eval_args = TrainingArguments(
        output_dir=f"./benchmark_results/{dataset_name}_{run_timestamp}",
        per_device_eval_batch_size=16,
        logging_dir=f"./benchmark_results/{dataset_name}_{run_timestamp}/logs",
        report_to="none",
        remove_unused_columns=True
    )
    
    trainer = Trainer(
        model=model,
        args=eval_args,
        compute_metrics=compute_metrics,
    )
    
    try:
        benchmark_results = trainer.evaluate(eval_dataset=tokenized_dataset["test"])
        
        return benchmark_results
    except Exception as e:
        return None

available_datasets = []
if swa_dataset:
    available_datasets.append(("Swahili", swa_dataset, 3))
if por_dataset:
    available_datasets.append(("Portuguese", por_dataset, 3))
if sot_dataset:
    available_datasets.append(("Sesotho", sot_dataset, 3))

all_results = []
for dataset_name, dataset, num_labels in available_datasets:
    try:
        results = get_benchmark_metrics(dataset_name, dataset, num_labels)
        
        if results:
            results_df = results_df._append({
                "Dataset": dataset_name,
                "Model": "XLM-RoBERTa",
                "Loss": results.get("eval_loss"),
                "Accuracy": results.get("eval_accuracy"),
                "F1": results.get("eval_f1"),
                "Precision": results.get("eval_precision"),
                "Recall": results.get("eval_recall")
            }, ignore_index=True)
            
            all_results.append({"Dataset": dataset_name, "Results": results})
        else:
            pass
    except Exception as e:
        pass

try:
    results_df.to_csv(output_csv, index=False)
except Exception as e:
    emergency_path = f"./emergency_results_{run_timestamp}.csv"
    results_df.to_csv(emergency_path, index=False)

print("\n==== BENCHMARKING COMPLETE ====")
print("\nResults Summary:")
print(results_df)

### Fine-Tuning XLMR-Roberta

In [None]:
#run this on google colab, my gpu doesnt have enough vram - Troy
import pandas as pd
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset, Dataset, DatasetDict
import torch
import numpy as np
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import os
import logging
import time
import sys
import json
import shutil

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

try:
    swa_dataset = load_dataset("masakhane/afrisenti", "swa")
    logger.info("Successfully loaded Swahili dataset from HuggingFace Hub")
except Exception as e:
    logger.error(f"Failed to load Swahili dataset: {e}")
    swa_dataset = None

try:
    por_dataset = load_dataset("masakhane/afrisenti", "por") 
    logger.info("Successfully loaded Portuguese dataset from HuggingFace Hub")
except Exception as e:
    logger.error(f"Failed to load Portuguese dataset: {e}")
    por_dataset = None

try:
    sot_dataset = load_dataset("csv", data_files={"train": "./datasets/sotho-news/sotho_news_dataset.csv"})
    logger.info("Successfully loaded Sesotho dataset from CSV")
except Exception as e:
    logger.error(f"Failed to load Sesotho dataset: {e}")
    sot_dataset = None
    
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    acc = accuracy_score(labels, preds)
    return {
        'accuracy': acc,
        'f1': f1,
        'precision': precision,
        'recall': recall
    }

def finetune_and_evaluate(dataset_name, dataset, num_labels):
    logger.info(f"Starting fine-tuning for {dataset_name}")
    
    if dataset is None:
        logger.error(f"Dataset {dataset_name} is None, cannot proceed with fine-tuning")
        return None
        
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    logger.info(f"Using device: {device}")
    
    model_save_dir = f"./models/{dataset_name.lower()}"
    os.makedirs(model_save_dir, exist_ok=True)
    
    tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-base")
    model = AutoModelForSequenceClassification.from_pretrained(
        "xlm-roberta-base", 
        num_labels=num_labels
    ).to(device)
    
    logger.info(f"Dataset {dataset_name} structure:")
    for split in dataset:
        logger.info(f"  - Split: {split}, Examples: {len(dataset[split])}")
        logger.info(f"  - Features: {dataset[split].features}")
        logger.info(f"  - Columns: {dataset[split].column_names}")
    
    text_column = None
    label_column = None
    
    text_candidates = ["text", "content", "tweet", "sentence", "document", "headline"]
    for split in dataset:
        columns = dataset[split].column_names
        for candidate in text_candidates:
            if candidate in columns:
                text_column = candidate
                break
        if text_column:
            break
    
    label_candidates = ["label", "sentiment", "class", "category"]
    for split in dataset:
        columns = dataset[split].column_names
        for candidate in label_candidates:
            if candidate in columns:
                label_column = candidate
                break
        if label_column:
            break
    
    logger.info(f"For {dataset_name}, using text_column={text_column}, label_column={label_column}")
    
    if text_column is None:
        for split in dataset:
            for col in dataset[split].column_names:
                if isinstance(dataset[split][col][0], str):
                    text_column = col
                    logger.info(f"Using '{col}' as text column based on string data")
                    break
            if text_column:
                break
    
    if text_column is None:
        logger.error(f"Could not identify a text column for {dataset_name}")
        return None
    
    if label_column is None:
        logger.error(f"Could not identify a label column for {dataset_name}")
        return None
    
    required_splits = ["train", "validation", "test"]
    missing_splits = [split for split in required_splits if split not in dataset]
    
    if missing_splits:
        logger.info(f"Creating missing splits: {missing_splits} for {dataset_name}")
        if "train" in dataset:
            train_valid_test = {}
            
            for split in dataset:
                if split in required_splits:
                    train_valid_test[split] = dataset[split]
            
            if "train" in dataset and ("validation" not in train_valid_test or "test" not in train_valid_test):
                if "validation" not in train_valid_test:
                    if "test" not in train_valid_test:
                        split_datasets = dataset["train"].train_test_split(test_size=0.2, seed=42)
                        test_valid_split = split_datasets["test"].train_test_split(test_size=0.5, seed=42)
                        train_valid_test["train"] = split_datasets["train"]
                        train_valid_test["validation"] = test_valid_split["train"]
                        train_valid_test["test"] = test_valid_split["test"]
                    else:
                        split_datasets = dataset["train"].train_test_split(test_size=0.1, seed=42)
                        train_valid_test["train"] = split_datasets["train"]
                        train_valid_test["validation"] = split_datasets["test"]
                else:
                    split_datasets = dataset["train"].train_test_split(test_size=0.1, seed=42)
                    train_valid_test["test"] = split_datasets["test"]
            
            dataset = DatasetDict(train_valid_test)
        else:
            logger.error(f"Dataset {dataset_name} has no train split and cannot create splits")
            return None
    
    label_mapping = None
    for split in dataset:
        if isinstance(dataset[split][label_column][0], str):
            all_labels = set()
            for example in dataset[split][label_column]:
                all_labels.add(example)
            
            label_mapping = {label: i for i, label in enumerate(sorted(all_labels))}
            logger.info(f"Created label mapping for {dataset_name}: {label_mapping}")
            break
    
    processed_dataset = DatasetDict()
    for split_name, split_dataset in dataset.items():
        texts = split_dataset[text_column]
        
        if label_mapping:
            labels = [label_mapping[label] for label in split_dataset[label_column]]
        else:
            labels = []
            for label in split_dataset[label_column]:
                if isinstance(label, (int, np.integer)):
                    labels.append(int(label))
                elif isinstance(label, str):
                    try:
                        labels.append(int(label))
                    except ValueError:
                        logger.warning(f"Unexpected string label found in {split_name}: {label}")
                        labels.append(0)
                else:
                    logger.warning(f"Unexpected label type in {split_name}: {type(label)}")
                    labels.append(0)
        
        processed_dataset[split_name] = Dataset.from_dict({
            "text": texts,
            "label": labels
        })
        
        logger.info(f"Processed {split_name} split: {len(processed_dataset[split_name])} examples")
    
    def tokenize_function(examples):
        return tokenizer(
            examples["text"], 
            padding="max_length", 
            truncation=True, 
            max_length=128,
        )
    
    tokenized_dataset = DatasetDict()
    for split_name, split_dataset in processed_dataset.items():
        tokenized_split = split_dataset.map(
            tokenize_function, 
            batched=True, 
            remove_columns=["text"]
        )
        tokenized_dataset[split_name] = tokenized_split
        
        logger.info(f"Tokenized {split_name} split: {len(tokenized_split)} examples")
        logger.info(f"Columns after tokenization: {tokenized_split.column_names}")
        
        for required_col in ["input_ids", "attention_mask", "label"]:
            if required_col not in tokenized_split.column_names:
                logger.error(f"Required column {required_col} missing after tokenization")
                return None
    
    model_output_dir = f"./tmp_model_dir_{dataset_name}"
    if os.path.exists(model_output_dir):
        shutil.rmtree(model_output_dir)
    os.makedirs(model_output_dir, exist_ok=True)
    
    training_args = TrainingArguments(
        output_dir=model_output_dir,
        learning_rate=2e-5,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        num_train_epochs=3,
        weight_decay=0.01,
        logging_dir="./tmp_logs",
        logging_steps=10,
        save_strategy="no",
        save_steps=1000000,
        eval_steps=100,
        do_eval=True,
    )
    
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset["train"],
        eval_dataset=tokenized_dataset["validation"],
        compute_metrics=compute_metrics,
    )
    
    logger.info(f"Starting fine-tuning {dataset_name} with {len(tokenized_dataset['train'])} examples")
    start_time = time.time()
    
    try:
        try:
            train_output = trainer.train()
            
            training_loss = None
            try:
                if hasattr(train_output, "metrics") and "loss" in train_output.metrics:
                    training_loss = train_output.metrics["loss"]
                elif isinstance(train_output, dict) and "loss" in train_output:
                    training_loss = train_output["loss"]
                
                if training_loss is None and hasattr(trainer, "state"):
                    if hasattr(trainer.state, "log_history") and trainer.state.log_history:
                        for log in reversed(trainer.state.log_history):
                            if "loss" in log:
                                training_loss = log["loss"]
                                break
            except Exception as e:
                logger.warning(f"Could not extract training loss: {e}")
        except RuntimeError as e:
            if "PytorchStreamWriter failed writing file" in str(e):
                logger.warning("Caught PyTorch serialization error during training. Will proceed with model saving.")
                
            else:
                
                raise
        
        try:
            model_path = os.path.join(model_save_dir, "model.pt")
            torch.save(model.state_dict(), model_path)
            logger.info(f"Model saved to {model_path}")
        except RuntimeError as e:
            if "PytorchStreamWriter failed writing file" in str(e):
                logger.warning(f"PyTorch serialization error when saving model state dict. Trying alternative method.")
                
                try:
                    
                    with open(model_path, 'wb') as f:
                        torch.save(model.state_dict(), f, _use_new_zipfile_serialization=False)
                    logger.info(f"Model saved to {model_path} using legacy serialization")
                except Exception as e2:
                    logger.error(f"Failed to save model with alternative method: {e2}")
            else:
                
                raise
        
        tokenizer.save_pretrained(model_save_dir)
        logger.info(f"Tokenizer saved to {model_save_dir}")
        
        if label_mapping:
            label_mapping_path = os.path.join(model_save_dir, "label_mapping.json")
            with open(label_mapping_path, "w") as f:
                
                json_mapping = {str(k): int(v) for k, v in label_mapping.items()}
                json.dump(json_mapping, f, indent=2)
            logger.info(f"Label mapping saved to {label_mapping_path}")
            
        model_config = {
            "base_model": "xlm-roberta-base",
            "num_labels": num_labels,
            "text_column": text_column,
            "label_column": label_column,
            "max_length": 128,
            
            "dataset_name": dataset_name,
        }
        
        config_path = os.path.join(model_save_dir, "config.json")
        with open(config_path, "w") as f:
            json.dump(model_config, f, indent=2)
        logger.info(f"Model config saved to {config_path}")
        
        training_time = time.time() - start_time
        logger.info(f"Training completed in {training_time:.2f} seconds")
        
        logger.info(f"Evaluating on validation set with {len(tokenized_dataset['validation'])} examples")
        validation_results = trainer.evaluate(eval_dataset=tokenized_dataset["validation"])
        
        logger.info(f"Evaluating on test set with {len(tokenized_dataset['test'])} examples")
        test_results = trainer.evaluate(eval_dataset=tokenized_dataset["test"])
        
        
        processed_validation = {}
        for k, v in validation_results.items():
            if isinstance(v, (int, float, str, bool)) or v is None:
                processed_validation[k] = v
            else:
                processed_validation[k] = float(v)
                
        processed_test = {}
        for k, v in test_results.items():
            if isinstance(v, (int, float, str, bool)) or v is None:
                processed_test[k] = v
            else:
                processed_test[k] = float(v)
        
        metrics = {
            "training_time": float(training_time),
            "training_loss": float(training_loss) if training_loss is not None else None,
            "validation_results": processed_validation,
            "test_results": processed_test,
        }
        
        metrics_path = os.path.join(model_save_dir, "metrics.json")
        with open(metrics_path, "w") as f:
            json.dump(metrics, f, indent=2)
        logger.info(f"Model metrics saved to {metrics_path}")
        
        logger.info(f"============== RESULTS FOR {dataset_name} ==============")
        logger.info(f"Training loss: {training_loss}")
        logger.info(f"Validation results: {validation_results}")
        logger.info(f"Test results: {test_results}")
        
        return {
            "dataset": dataset_name,
            "training_time": training_time,
            "training_loss": training_loss,
            "validation_results": validation_results,
            "test_results": test_results,
            "model_path": model_path,
        }
    except Exception as e:
        logger.error(f"Fine-tuning failed for {dataset_name}: {e}", exc_info=True)
        return None

available_datasets = []
if swa_dataset:
    available_datasets.append(("Swahili", swa_dataset, 3))
if por_dataset:
    available_datasets.append(("Portuguese", por_dataset, 3))
if sot_dataset:
    available_datasets.append(("Sesotho", sot_dataset, 3))

logger.info(f"Available datasets: {[name for name, _, _ in available_datasets]}")

all_results = []
for dataset_name, dataset, num_labels in available_datasets:
    logger.info(f"\n==== FINE-TUNING ON {dataset_name.upper()} DATASET ====")a
    try:
        results = finetune_and_evaluate(dataset_name, dataset, num_labels)
        
        if results:
            all_results.append(results)
        else:
            logger.warning(f"No results returned for {dataset_name}")
    except Exception as e:
        logger.error(f"Unexpected error during fine-tuning {dataset_name}: {e}", exc_info=True)

logger.info("\n==== FINE-TUNING COMPLETE ====")

print(f"{'Dataset':<12} | {'Acc (val)':<10} | {'F1 (val)':<10} | {'Acc (test)':<10} | {'F1 (test)':<10} | {'Model Dir':<20}")

for result in all_results:
    dataset = result["dataset"]
    val_acc = result["validation_results"].get("eval_accuracy", float('nan'))
    val_f1 = result["validation_results"].get("eval_f1", float('nan'))
    test_acc = result["test_results"].get("eval_accuracy", float('nan'))
    test_f1 = result["test_results"].get("eval_f1", float('nan'))
    model_dir = f"./models/{dataset.lower()}"
    
    print(f"{dataset:<12} | {val_acc:<10.4f} | {val_f1:<10.4f} | {test_acc:<10.4f} | {test_f1:<10.4f} | {model_dir}")

# Data Augmentation

Firstly, we want to check the actual distribution of the classes for Mozambican Portuguese

In [None]:
import os
from datasets import load_from_disk, load_dataset
from collections import Counter

def analyze_sentiment_distribution(dataset, split='train'):
    """Analyze sentiment distribution in a dataset with string labels."""
    if dataset is None:
        return
    
    # Count the occurrences of each sentiment label
    label_counts = Counter(dataset[split]['label'])
    
    # Print the distribution
    print(f"\nSentiment Distribution:")
    total = len(dataset[split])
    
    # Sort labels in a meaningful order: negative, neutral, positive
    ordered_labels = ['negative', 'neutral', 'positive']
    
    for label in ordered_labels:
        if label in label_counts:
            count = label_counts[label]
            percentage = (count / total) * 100
            # Capitalize first letter for display
            display_label = label.capitalize()
            print(f"{display_label}: {count} ({percentage:.2f}%)")
    
    # Check for any other labels not in our expected list
    for label, count in label_counts.items():
        if label not in ordered_labels:
            percentage = (count / total) * 100
            print(f"Other ({label}): {count} ({percentage:.2f}%)")

if __name__ == "__main__":
    swa, por, sot = load_local_datasets()

    if por is not None:
        print(f"Portuguese dataset size: {len(por['train'])} examples")
        print(f"Dataset features: {por['train'].features}")
        print("\nSample entry:")
        print(por['train'][0])
        analyze_sentiment_distribution(por)

### There is a clear imbalance in the classes and so data augmentation techniques will be used such as dropout and back-translation in order to create synthetic data for the minority classes

In [None]:
import os
import random
from datasets import load_from_disk, load_dataset, Dataset
from collections import Counter

def load_local_datasets():
    swa_path = "./datasets/afrisenti/swa"
    por_path = "./datasets/afrisenti/por"
    sot_path = "./datasets/news"

    if not all(os.path.exists(path) for path in [swa_path, por_path, sot_path]):
        print("One or more dataset directories not found. Please check the paths.")
        return None, None, None

    print("Loading Swahili (swa) dataset from disk...")
    swa_dataset = load_from_disk(swa_path)
    print("Swahili dataset loaded!")

    print("Loading Portuguese (por) dataset from disk...")
    por_dataset = load_from_disk(por_path)
    print("Portuguese dataset loaded!")

    print("Loading Sesotho (sot) dataset from disk...")
    sot_dataset = load_dataset("csv", data_files="datasets/sotho-news/sotho_news_dataset.csv")
    print("Sesotho dataset loaded!")

    return swa_dataset, por_dataset, sot_dataset

def analyze_sentiment_distribution(dataset, split='train'):
    """Analyze sentiment distribution in a dataset with string labels."""
    if dataset is None:
        return
    
    # Check if this is a Dataset object with splits or just a simple Dataset
    if split in dataset:
        # This is a dataset with splits
        data = dataset[split]
    else:
        # This is a simple Dataset without splits
        data = dataset
    
    # Count the occurrences of each sentiment label
    label_counts = Counter(data['label'])
    
    # Print the distribution
    print(f"\nSentiment Distribution:")
    total = len(data)
    
    # Sort labels in a meaningful order: negative, neutral, positive
    ordered_labels = ['negative', 'neutral', 'positive']
    
    for label in ordered_labels:
        if label in label_counts:
            count = label_counts[label]
            percentage = (count / total) * 100
            # Capitalize first letter for display
            display_label = label.capitalize()
            print(f"{display_label}: {count} ({percentage:.2f}%)")
    
    # Check for any other labels not in our expected list
    for label, count in label_counts.items():
        if label not in ordered_labels:
            percentage = (count / total) * 100
            print(f"Other ({label}): {count} ({percentage:.2f}%)")

def simple_augment_text(text):
    """Simple text augmentation without relying on translation models"""
    augmentation_techniques = [
        lambda t: word_deletion(t, p=0.1),
        lambda t: word_swap(t, p=0.1),
        lambda t: add_punctuation(t)
    ]
    
    # Randomly select an augmentation technique
    technique = random.choice(augmentation_techniques)
    return technique(text)

def word_deletion(text, p=0.1):
    """Randomly delete words with probability p"""
    words = text.split()
    if len(words) <= 3:  # Don't delete from very short texts
        return text
        
    new_words = []
    for word in words:
        if random.random() > p:  # Keep the word with probability (1-p)
            new_words.append(word)
    
    # Ensure we don't delete all words
    if not new_words:
        return random.choice(words)
        
    return ' '.join(new_words)

def word_swap(text, p=0.1):
    """Randomly swap adjacent words with probability p"""
    words = text.split()
    if len(words) <= 1:
        return text
        
    for i in range(len(words) - 1):
        if random.random() < p:
            words[i], words[i+1] = words[i+1], words[i]
    
    return ' '.join(words)

def add_punctuation(text):
    """Add or modify punctuation without changing meaning"""
    # Add emphasis for positive/negative texts
    if text[-1] not in '!?.':
        if random.random() < 0.5:
            text += '!'
        else:
            text += '.'
    elif text[-1] == '.' and random.random() < 0.3:
        text = text[:-1] + '!'
    
    return text

def balance_and_augment_dataset(dataset, target_neutral_ratio=0.7, split='train'):
    """
    Balance and augment dataset:
    1. Undersample neutral class
    2. Augment negative and positive classes using simple techniques
    """
    # Count the occurrences of each label
    label_counts = {}
    for label in ['negative', 'neutral', 'positive']:
        label_counts[label] = sum(1 for l in dataset[split]['label'] if l == label)
    
    # Separate data by label
    data_by_label = {
        'negative': [],
        'neutral': [],
        'positive': []
    }
    
    for i in range(len(dataset[split])):
        label = dataset[split][i]['label']
        data_by_label[label].append({
            'tweet': dataset[split][i]['tweet'],
            'label': label
        })
    
    # Undersample neutral class
    neutral_target_size = int(label_counts['neutral'] * target_neutral_ratio)
    sampled_neutral = random.sample(data_by_label['neutral'], neutral_target_size)
    
    # Prepare balanced data with originals
    balanced_data = data_by_label['negative'] + sampled_neutral + data_by_label['positive']
    
    # Augment minority classes
    augmented_data = balanced_data.copy()
    
    # Target count for each class after balancing
    target_count = max(len(data_by_label['negative']), len(data_by_label['positive']), neutral_target_size)
    
    # Augment negative class
    negative_to_add = target_count - len(data_by_label['negative'])
    if negative_to_add > 0:
        # Select samples to augment (can select the same sample multiple times)
        for _ in range(negative_to_add):
            sample = random.choice(data_by_label['negative'])
            augmented_tweet = simple_augment_text(sample['tweet'])
            augmented_data.append({'tweet': augmented_tweet, 'label': 'negative'})
    
    # Augment positive class
    positive_to_add = target_count - len(data_by_label['positive'])
    if positive_to_add > 0:
        for _ in range(positive_to_add):
            sample = random.choice(data_by_label['positive'])
            augmented_tweet = simple_augment_text(sample['tweet'])
            augmented_data.append({'tweet': augmented_tweet, 'label': 'positive'})
    
    # Shuffle the augmented data
    random.shuffle(augmented_data)
    
    # Return the augmented data directly (not as a Dataset object)
    return augmented_data

if __name__ == "__main__":
    swa, por, sot = load_local_datasets()
    
    if por is not None:
        print("Original Portuguese dataset:")
        analyze_sentiment_distribution(por)
        
        # Balance and augment dataset
        augmented_data = balance_and_augment_dataset(por)
        
        # Create a new dataset from the augmented data
        augmented_dataset = Dataset.from_dict({
            'tweet': [item['tweet'] for item in augmented_data],
            'label': [item['label'] for item in augmented_data]
        })
        
        print("\nAfter balancing and augmentation:")
        analyze_sentiment_distribution(augmented_dataset)  # Now analyzing the dataset directly
        
        # Create a dataset with splits for saving
        full_dataset_with_splits = {"train": augmented_dataset}
        full_dataset = Dataset.from_dict({
            'train': augmented_dataset
        })
        
        # Save the balanced and augmented dataset
        full_dataset.save_to_disk("./datasets/afrisenti/por_balanced_augmented")
        print("\nBalanced and augmented dataset saved!")
        

These text augmentation techniques (word deletion, word swap, punctuation changes) are more appropriate for Mozambican Portuguese since they don't rely on external translation models that might not understand the dialect. They preserve the unique characteristics of Mozambican Portuguese while still creating useful variations of the original text.

Now let's see if there is any improvement when training the best performing BERT model on the Mozambican Portuguese data which was Afro-XLMR:

In [None]:
# %pip install -U transformers datasets peft evaluate plotly sentencepiece --quiet


import torch
from torch.utils.data import DataLoader, Dataset
from transformers import XLMRobertaForSequenceClassification, XLMRobertaTokenizer
from datasets import load_dataset
# import torch_optimizer as optim
from transformers import get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
# import numpy as np
import pandas as pd
# import matplotlib.pyplot as plt
# import seaborn as sns
# from tqdm import tqdm
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SentimentDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }

def load_data(language):
    try:
        aug_por_path = "./datasets/afrisenti/por_balanced_augmented"

        if language=='por':
            chosen_dataset=load_from_disk(aug_por_path)
        else:
            raise Exception

        for ds in [chosen_dataset]: # Change to all three later
            for lbl in ["train","validation","test"]:
                if ds[lbl].column_names[0]== "tweet":
                    ds[lbl] = ds[lbl].rename_column("tweet","text")
                else:
                    ds[lbl] = ds[lbl].rename_column("headline","text")

        train_df  = chosen_dataset["train"].to_pandas()
        val_df  = chosen_dataset["validation"].to_pandas()
        test_df  = chosen_dataset["test"].to_pandas()
        logger.info(f"Data loaded successfully: {len(train_df)} training, {len(val_df)} validation, {len(test_df)} test examples")
        return train_df, val_df, test_df
    except Exception as e:
        logger.error(f"Error loading data: {str(e)}")
        raise e

# Function to create DataLoaders
def create_data_loaders(train_df, val_df, test_df, tokenizer, batch_size=16, text_column='text', label_column='label'):
    train_dataset = SentimentDataset(
        texts=train_df[text_column].tolist(),
        labels=train_df[label_column].tolist(),
        tokenizer=tokenizer
    )

    val_dataset = SentimentDataset(
        texts=val_df[text_column].tolist(),
        labels=val_df[label_column].tolist(),
        tokenizer=tokenizer
    )

    test_dataset = SentimentDataset(
        texts=test_df[text_column].tolist(),
        labels=test_df[label_column].tolist(),
        tokenizer=tokenizer
    )

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    return train_loader, val_loader, test_loader

def train_epoch(model, dataloader, optimizer, scheduler, device):
    model.train()
    epoch_loss = 0

    progress_bar = tqdm(dataloader, desc="Training")
    for batch in progress_bar:
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )

        loss = outputs.loss
        epoch_loss += loss.item()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

        progress_bar.set_postfix({"loss": f"{loss.item():.4f}"})

    return epoch_loss / len(dataloader)

def evaluate(model, dataloader, device):
    """
    Returns:
        Tuple of (loss, accuracy, precision, recall, f1)
    """
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )

            loss = outputs.loss
            total_loss += loss.item()

            logits = outputs.logits
            preds = torch.argmax(logits, dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted'
    )

    return total_loss / len(dataloader), accuracy, precision, recall, f1, all_preds, all_labels

def plot_confusion_matrix(true_labels, predictions, class_names):
    cm = confusion_matrix(true_labels, predictions)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.tight_layout()
    plt.savefig('confusion_matrix.png')
    plt.close()

def evaluate_afro_xlmr_for_lang(language):
    config = {
        'model_name': 'Davlan/afro-xlmr-base',
        'num_labels': 3,
        'batch_size': 16,
        'learning_rate': 2e-5,
        'epochs': 3,
        'warmup_steps': 0,
        'max_grad_norm': 1.0,
        'text_column': 'text',
        'label_column': 'label',
        'class_names': ['negative', 'neutral', 'positive']
    }

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info(f"Using device: {device}")

    logger.info(f"Loading model: {config['model_name']}...")
    tokenizer = XLMRobertaTokenizer.from_pretrained(config['model_name'])
    model = XLMRobertaForSequenceClassification.from_pretrained(
        config['model_name'],
        num_labels=config['num_labels']
    )
    model.to(device)

    # Load data
    logger.info("Loading data...")
    train_df, val_df, test_df = load_data(f'{language}')

    # Create data loaders
    logger.info("Creating data loaders...")
    train_loader, val_loader, test_loader = create_data_loaders(
        train_df, val_df, test_df,
        tokenizer,
        batch_size=config['batch_size'],
        text_column=config['text_column'],
        label_column=config['label_column']
    )


    optimizer = torch.optim.AdamW(model.parameters(), lr=config['learning_rate'])
    total_steps = len(train_loader) * config['epochs']
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=config['warmup_steps'],
        num_training_steps=total_steps
    )

    logger.info("Starting training...")
    best_val_f1 = 0
    best_model_state = None

    for epoch in range(config['epochs']):
        logger.info(f"Epoch {epoch + 1}/{config['epochs']}")

        # Train
        start_time = time.time()
        train_loss = train_epoch(model, train_loader,optimizer,scheduler, device) # train_epoch(model, train_loader, scheduler, device)
        train_time = time.time() - start_time

        # Validate
        val_loss, val_accuracy, val_precision, val_recall, val_f1, _, _ = evaluate(model, val_loader, device)

        logger.info(f"Epoch {epoch + 1} results:")
        logger.info(f"Train Loss: {train_loss:.4f}, Time: {train_time:.2f}s")
        logger.info(f"Val Loss: {val_loss:.4f}, Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1: {val_f1:.4f}")

        # Save best model
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            best_model_state = model.state_dict().copy()
            logger.info(f"New best model with F1: {best_val_f1:.4f}")

    # Load best model for testing
    if best_model_state:
        logger.info("Loading best model for testing...")
        model.load_state_dict(best_model_state)

    # Test evaluation
    logger.info("Evaluating on test set...")
    test_loss, test_accuracy, test_precision, test_recall, test_f1, test_preds, test_labels = evaluate(model, test_loader, device)

    logger.info(f"Test Results:")
    logger.info(f"Loss: {test_loss:.4f}")
    logger.info(f"Accuracy: {test_accuracy:.4f}")
    logger.info(f"Precision: {test_precision:.4f}")
    logger.info(f"Recall: {test_recall:.4f}")
    logger.info(f"F1 Score: {test_f1:.4f}")

    results_df_afro = {
    "test_loss": test_loss,
    "test_accuracy": test_accuracy,
    "test_f1": test_f1,
    "test_precision": test_precision,
    "test_recall": test_recall,
    "epochs": config['epochs'],
    "learning_rate": config['learning_rate'],
    "batch_size": config['batch_size'],
}

    path = f"afroxlmr_results_{language}_augmented.csv"
    pd.DataFrame([results_df_afro]).to_csv(path, index=False)
    print(f"Results saved to {path}")

    # Save model
    logger.info("Saving model...")
    model_save_path = f'./xmlr_sentiment_model_{language}_augmented'
    model.save_pretrained(model_save_path)
    tokenizer.save_pretrained(model_save_path)
    logger.info(f"Model saved to {model_save_path}")

# evaluate_afro_xlmr()
langs = ['por']

for l in langs:
    evaluate_afro_xlmr_for_lang(l)

## Next, we evaluate the use of Adapters to perform cross-lingual transfer
This section will explore the effect of Adapters on model performance when doing cross-lingual transfer. This evaluation will use the Afroxlmr model, because it had the highest accuracy when analysing the Swahili data.

We firstly evaluate the performance of the model trained on Swahili data, on the Sesotho dataset

In [6]:
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import XLMRobertaForSequenceClassification, XLMRobertaTokenizer
from datasets import load_dataset
from transformers import get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
import time
import logging
from adapters import AutoAdapterModel,AdapterConfig
from torch.nn import CrossEntropyLoss


# Temp
class SentimentDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128, lang=None):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.lang = lang
        if self.lang=='sot':
            self.label_mapping = {"negative": 0, "neutral": 1, "positive": 2}  # Label conversion

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        if self.lang=='sot':
            label = self.label_mapping[self.labels[idx]]
        else:
            label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

def load_data(language):
    try:
        if language=='por':
            chosen_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "por",trust_remote_code=True)
        elif language=='swa':
            chosen_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "swa",trust_remote_code=True)
        elif language=='sot':
            chosen_dataset=load_dataset("hamza-student-123/nlp-assignment-news-data",'sot')
        else:
            raise Exception

        for ds in [chosen_dataset]: # Change to all three later
            for lbl in ["train","validation","test"]:
                if ds[lbl].column_names[0]== "tweet":
                    ds[lbl] = ds[lbl].rename_column("tweet","text")
                else:
                    ds[lbl] = ds[lbl].rename_column("headline","text")

        train_df  = chosen_dataset["train"].to_pandas()
        val_df  = chosen_dataset["validation"].to_pandas()
        test_df  = chosen_dataset["test"].to_pandas()
        logger.info(f"Data loaded successfully: {len(train_df)} training, {len(val_df)} validation, {len(test_df)} test examples")
        return train_df, val_df, test_df
    except Exception as e:
        logger.error(f"Error loading data: {str(e)}")
        raise e


def create_data_loaders(train_df, val_df, test_df, tokenizer, batch_size=16, text_column='text', label_column='label', lang=None):
    train_dataset = SentimentDataset(
        texts=train_df[text_column].tolist(),
        labels=train_df[label_column].tolist(),
        tokenizer=tokenizer,
        lang=lang
    )

    val_dataset = SentimentDataset(
        texts=val_df[text_column].tolist(),
        labels=val_df[label_column].tolist(),
        tokenizer=tokenizer,
        lang=lang
    )

    test_dataset = SentimentDataset(
        texts=test_df[text_column].tolist(),
        labels=test_df[label_column].tolist(),
        tokenizer=tokenizer,
        lang=lang
    )

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    return train_loader, val_loader, test_loader

def evaluate(model, dataloader, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )

            loss = outputs.loss
            total_loss += loss.item()

            logits = outputs.logits
            preds = torch.argmax(logits, dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted'
    )

    return total_loss / len(dataloader), accuracy, precision, recall, f1, all_preds, all_labels


def evaluate_afro_xlmr_before_adapter(source_language,target_language):
    config = {
        'extern_model_name': f'Davlan/afro-xlmr-base',
        'model_name': f'./xmlr_sentiment_model_{source_language}',
        'adapter_model_name': f'xlmr_sentiment_model_swa',
        'num_labels': 3,
        'batch_size': 16,
        'learning_rate': 2e-5,
        'epochs': 3,
        'warmup_steps': 0,
        'max_grad_norm': 1.0,
        'text_column': 'text',
        'label_column': 'label',
        'class_names': ['negative', 'neutral', 'positive']
    }

    tokenizer = XLMRobertaTokenizer.from_pretrained(config['extern_model_name'])

    model = AutoAdapterModel.from_pretrained(config['model_name'], num_labels=config['num_labels'])
    logger.info("Successfully loaded as AutoAdapterModel")

    logger.info("Loading target data...")
    train_df, val_df, test_df = load_data(f'{target_language}')

    logger.info("Creating data loaders...")
    target_train_loader, val_loader, test_loader = create_data_loaders(
        train_df, val_df, test_df,
        tokenizer,
        batch_size=config['batch_size'],
        text_column=config['text_column'],
        label_column=config['label_column'],
        lang=target_language
    )
    # Test evaluation: Showing results after training and adapters
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info("Evaluating on test set...")
    test_loss, test_accuracy, test_precision, test_recall, test_f1, test_preds, test_labels = evaluate(model, test_loader, device)

    logger.info(f"Test Results:")
    logger.info(f"Loss: {test_loss:.4f}")
    logger.info(f"Accuracy: {test_accuracy:.4f}")
    logger.info(f"Precision: {test_precision:.4f}")
    logger.info(f"Recall: {test_recall:.4f}")
    logger.info(f"F1 Score: {test_f1:.4f}")


evaluate_afro_xlmr_before_adapter('swa','sot')

2025-06-14 18:51:46,468 - INFO - Adding head 'default' with config {'head_type': 'classification', 'num_labels': 3, 'layers': 2, 'activation_function': 'tanh', 'label2id': {'LABEL_0': 0, 'LABEL_1': 1, 'LABEL_2': 2}, 'use_pooler': False, 'bias': True, 'dropout_prob': None}.
Some weights of XLMRobertaAdapterModel were not initialized from the model checkpoint at ./xmlr_sentiment_model_swa and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
2025-06-14 18:51:46,535 - INFO - Successfully loaded as AutoAdapterModel
2025-06-14 18:51:46,536 - INFO - Loading target data...
2025-06-14 18:51:54,712 - INFO - Data loaded successfully: 1740 training, 349 validation, 349 test examples
2025-06-14 18:51:54,713 - INFO - Creating data loaders...
2025-06-14 18:51:54,714 - INFO - Evaluating on test set...
Evaluating: 100%|██████████| 22/22 [00:22<00:00,  1.02s/i

### We then Add Adapters to the model

In [None]:
# !pip install -U transformers datasets peft evaluate plotly sentencepiece adapters adapter-transformers --quiet

import torch
from torch.utils.data import DataLoader, Dataset
from transformers import XLMRobertaForSequenceClassification, XLMRobertaTokenizer
from datasets import load_dataset
from transformers import get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
import time
import logging
from adapters import AutoAdapterModel,AdapterConfig
from torch.nn import CrossEntropyLoss

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SentimentDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128, lang=None):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.lang = lang
        if self.lang=='sot':
            self.label_mapping = {"negative": 0, "neutral": 1, "positive": 2}  # Label conversion

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        if self.lang=='sot':
            label = self.label_mapping[self.labels[idx]]
        else:
            label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

def load_data(language):
    try:
        if language=='por':
            chosen_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "por",trust_remote_code=True)
        elif language=='swa':
            chosen_dataset=load_dataset("HausaNLP/AfriSenti-Twitter", "swa",trust_remote_code=True)
        elif language=='sot':
            chosen_dataset=load_dataset("hamza-student-123/nlp-assignment-news-data",'sot')
        else:
            raise Exception

        for ds in [chosen_dataset]: # Change to all three later
            for lbl in ["train","validation","test"]:
                if ds[lbl].column_names[0]== "tweet":
                    ds[lbl] = ds[lbl].rename_column("tweet","text")
                else:
                    ds[lbl] = ds[lbl].rename_column("headline","text")

        train_df  = chosen_dataset["train"].to_pandas()
        val_df  = chosen_dataset["validation"].to_pandas()
        test_df  = chosen_dataset["test"].to_pandas()
        logger.info(f"Data loaded successfully: {len(train_df)} training, {len(val_df)} validation, {len(test_df)} test examples")
        return train_df, val_df, test_df
    except Exception as e:
        logger.error(f"Error loading data: {str(e)}")
        raise e


def create_data_loaders(train_df, val_df, test_df, tokenizer, batch_size=16, text_column='text', label_column='label', lang=None):
    train_dataset = SentimentDataset(
        texts=train_df[text_column].tolist(),
        labels=train_df[label_column].tolist(),
        tokenizer=tokenizer,
        lang=lang
    )

    val_dataset = SentimentDataset(
        texts=val_df[text_column].tolist(),
        labels=val_df[label_column].tolist(),
        tokenizer=tokenizer,
        lang=lang
    )

    test_dataset = SentimentDataset(
        texts=test_df[text_column].tolist(),
        labels=test_df[label_column].tolist(),
        tokenizer=tokenizer,
        lang=lang
    )

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    return train_loader, val_loader, test_loader

def train_epoch(model, dataloader, optimizer, scheduler, device):
    model.train()
    epoch_loss = 0

    progress_bar = tqdm(dataloader, desc="Training")
    for batch in progress_bar:
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )

        loss = outputs.loss
        epoch_loss += loss.item()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()

        progress_bar.set_postfix({"loss": f"{loss.item():.4f}"})

    return epoch_loss / len(dataloader)

def evaluate(model, dataloader, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )

            loss = outputs.loss
            total_loss += loss.item()

            logits = outputs.logits
            preds = torch.argmax(logits, dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted'
    )

    return total_loss / len(dataloader), accuracy, precision, recall, f1, all_preds, all_labels

def setup_crosslingual_adapters(config, source_language, target_language):

    logger.info(f"Loading model: {config['model_name']}...")
    tokenizer = XLMRobertaTokenizer.from_pretrained(config['extern_model_name'])
    model = AutoAdapterModel.from_pretrained(config['model_name'], num_labels=config['num_labels'])
    logger.info("Successfully loaded as AutoAdapterModel")

    lang_adapter_config = AdapterConfig.load("pfeiffer", reduction_factor=2)

    source_adapter_name = f"lang_{source_language}"
    model.add_adapter(source_adapter_name, config=lang_adapter_config)
    logger.info(f"Added source language adapter: {source_adapter_name}")

    target_adapter_name = f"lang_{target_language}"
    model.add_adapter(target_adapter_name, config=lang_adapter_config)
    logger.info(f"Added target language adapter: {target_adapter_name}")

    task_adapter_name = "sentiment_task"
    task_adapter_config = AdapterConfig.load("pfeiffer", reduction_factor=16)
    model.add_adapter(task_adapter_name, config=task_adapter_config)

    model.add_classification_head(
        task_adapter_name,
        num_labels=config['num_labels'],
        id2label={i: label for i, label in enumerate(config['class_names'])}
    )

    return model, tokenizer, source_adapter_name, target_adapter_name, task_adapter_name

def setup_madx_adapter(config, source_language, target_language):
    tokenizer = XLMRobertaTokenizer.from_pretrained(config['extern_model_name'])
    model = AutoAdapterModel.from_pretrained(config['model_name'], num_labels=config['num_labels'])

    # I chose to use a MAD-X style configuration (using standard AdapterConfig)
    # reason being, it typically uses smaller reduction factors for language adapters
    lang_adapter_config = AdapterConfig.load("pfeiffer", reduction_factor=2)
    task_adapter_config = AdapterConfig.load("pfeiffer", reduction_factor=16)

    model.add_adapter(f"lang_{source_language}", config=lang_adapter_config)
    model.add_adapter(f"lang_{target_language}", config=lang_adapter_config)

    model.add_adapter("sentiment", config=task_adapter_config)
    model.add_classification_head("sentiment", num_labels=config['num_labels'])

    return model, tokenizer

def train_cross_lingual_transfer(model, source_train_loader, target_train_loader,
                               val_loader, config, source_adapter, target_adapter, task_adapter):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    criterion = CrossEntropyLoss()

    logger.info("Phase 1: Training source language + task adapters...")

    model.set_active_adapters([source_adapter, task_adapter])
    model.train_adapter([source_adapter, task_adapter])

    trainable_params = []
    for name, param in model.named_parameters():
        if param.requires_grad:
            trainable_params.append(param)

    optimizer_source = torch.optim.AdamW(trainable_params, lr=config['learning_rate'])

    for epoch in range(config['epochs']):
        model.train()
        total_loss = 0

        progress_bar = tqdm(source_train_loader, desc=f"Source Epoch {epoch+1}/{config['epochs']}")

        for batch_idx, batch in enumerate(progress_bar):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            optimizer_source.zero_grad()

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(outputs.logits, labels)

            loss.backward()
            torch.nn.utils.clip_grad_norm_(trainable_params, config['max_grad_norm'])
            optimizer_source.step()

            total_loss += loss.item()
            progress_bar.set_postfix({'loss': loss.item()})

        avg_loss = total_loss / len(source_train_loader)
        logger.info(f"Source Epoch {epoch+1}: Average Loss = {avg_loss:.4f}")

        val_accuracy = validate_model(model, val_loader, device)
        logger.info(f"Source Epoch {epoch+1}: Validation Accuracy = {val_accuracy:.4f}")


    logger.info("Phase 2: Training target language adapter...")


    model.set_active_adapters([target_adapter, task_adapter])
    model.train_adapter([target_adapter])

    target_params = []
    for name, param in model.named_parameters():
        if param.requires_grad and target_adapter in name:
            target_params.append(param)


    optimizer_target = torch.optim.AdamW(target_params, lr=config['learning_rate'] * 0.1)


    for epoch in range(config['epochs']):
        model.train()
        total_loss = 0

        progress_bar = tqdm(target_train_loader, desc=f"Target Epoch {epoch+1}/{config['epochs']}")

        for batch_idx, batch in enumerate(progress_bar):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            optimizer_target.zero_grad()

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(outputs.logits, labels)

            loss.backward()
            torch.nn.utils.clip_grad_norm_(target_params, config['max_grad_norm'])
            optimizer_target.step()

            total_loss += loss.item()
            progress_bar.set_postfix({'loss': loss.item()})

        avg_loss = total_loss / len(target_train_loader)
        logger.info(f"Target Epoch {epoch+1}: Average Loss = {avg_loss:.4f}")

        # Validation on target language
        val_accuracy = validate_model(model, val_loader, device)
        logger.info(f"Target Epoch {epoch+1}: Validation Accuracy = {val_accuracy:.4f}")

    return model

def validate_model(model, val_loader, device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            _, predicted = torch.max(outputs.logits.data, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return correct / total

def load_pretrained_lang_adapters(model, source_language, target_language):
    try:
        model.load_adapter(f"lang/{source_language}", source="hf", load_as=f"lang_{source_language}")
        logger.info(f"Loaded pretrained {source_language} adapter")
    except:
        logger.warning(f"No pretrained adapter found for {source_language}, using random initialization")

    try:
        model.load_adapter(f"lang/{target_language}", source="hf", load_as=f"lang_{target_language}")
        logger.info(f"Loaded pretrained {target_language} adapter")
    except:
        logger.warning(f"No pretrained adapter found for {target_language}, using random initialization")

    return model

def train_afro_xlmr_adapter(source_language, target_language='sot'):
    config = {
        'extern_model_name': f'Davlan/afro-xlmr-base',
        'model_name': f'./xmlr_sentiment_model_{source_language}',
        'num_labels': 3,
        'batch_size': 16,
        'learning_rate': 2e-5,
        'epochs': 3,
        'warmup_steps': 0,
        'max_grad_norm': 1.0,
        'text_column': 'text',
        'label_column': 'label',
        'class_names': ['negative', 'neutral', 'positive']
    }

    model, tokenizer, src_adapter, tgt_adapter, task_adapter = setup_crosslingual_adapters(
    config, source_language, target_language)

    logger.info("Loading source data...")
    train_df, val_df, test_df = load_data(f'{source_language}')

    logger.info("Creating data loaders...")
    source_train_loader, source_val_loader, source_test_loader = create_data_loaders(
        train_df, val_df, test_df,
        tokenizer,
        batch_size=config['batch_size'],
        text_column=config['text_column'],
        label_column=config['label_column'],
        lang=source_language
    )

    logger.info("Loading target data...")
    train_df, val_df, test_df = load_data(f'{target_language}')

    logger.info("Creating data loaders...")
    target_train_loader, val_loader, test_loader = create_data_loaders(
        train_df, val_df, test_df,
        tokenizer,
        batch_size=config['batch_size'],
        text_column=config['text_column'],
        label_column=config['label_column'],
        lang=target_language
    )


    print("Starting sequential cross-lingual training...")
    model = train_cross_lingual_transfer(
        model, source_train_loader, target_train_loader, val_loader,
        config, src_adapter, tgt_adapter, task_adapter
    )

    try:
        logger.info("Attempting to save model and tokenizer")
        model.save_pretrained(f"./cross_lingual_model_{source_language}_{target_language}")
        tokenizer.save_pretrained(f"./cross_lingual_model_{source_language}_{target_language}")
    except:
        logger.info("Failed to save model and tokenizer")

    # Test evaluation: Showing results after training and adapters
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info("Evaluating on test set...")
    test_loss, test_accuracy, test_precision, test_recall, test_f1, test_preds, test_labels = evaluate(model, test_loader, device)

    logger.info(f"Test Results:")
    logger.info(f"Loss: {test_loss:.4f}")
    logger.info(f"Accuracy: {test_accuracy:.4f}")
    logger.info(f"Precision: {test_precision:.4f}")
    logger.info(f"Recall: {test_recall:.4f}")
    logger.info(f"F1 Score: {test_f1:.4f}")

    results_df_afro = {
    "test_loss": test_loss,
    "test_accuracy": test_accuracy,
    "test_f1": test_f1,
    "test_precision": test_precision,
    "test_recall": test_recall,
    "epochs": config['epochs'],
    "learning_rate": config['learning_rate'],
    "batch_size": config['batch_size'],
}

    path = f"afroxlmr_results_{target_language}_adapters.csv"
    pd.DataFrame([results_df_afro]).to_csv(path, index=False)
    print(f"Results saved to {path}")

    # Saving the adapters
    model.save_adapter(f"./adapters/{src_adapter}", src_adapter)
    model.save_adapter(f"./adapters/{tgt_adapter}", tgt_adapter)
    model.save_adapter(f"./adapters/{task_adapter}", task_adapter)


train_afro_xlmr_adapter('swa','sot')

In [None]:
def evaluate(model, dataloader, device):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                labels=labels
            )

            loss = outputs.loss
            total_loss += loss.item()

            logits = outputs.logits
            preds = torch.argmax(logits, dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average='weighted'
    )

    return total_loss / len(dataloader), accuracy, precision, recall, f1, all_preds, all_labels


def evaluate_afro_xlmr_adapter(source_language,target_language):
    config = {
        'extern_model_name': f'Davlan/afro-xlmr-base',
        'model_name': f'./xmlr_sentiment_model_{source_language}',
        'adapter_model_name': f'./cross_lingual_model_{source_language}_{target_language}',
        'num_labels': 3,
        'batch_size': 16,
        'learning_rate': 2e-5,
        'epochs': 3,
        'warmup_steps': 0,
        'max_grad_norm': 1.0,
        'text_column': 'text',
        'label_column': 'label',
        'class_names': ['negative', 'neutral', 'positive']
    }

    tokenizer = XLMRobertaTokenizer.from_pretrained(config['extern_model_name'])

    try:
        model = AutoAdapterModel.from_pretrained(config['adapter_model_name'], num_labels=config['num_labels'])
        logger.info("Successfully loaded as AutoAdapterModel")
    except:
        exit(9)

    logger.info("Loading target data...")
    train_df, val_df, test_df = load_data(f'{target_language}')

    logger.info("Creating data loaders...")
    target_train_loader, val_loader, test_loader = create_data_loaders(
        train_df, val_df, test_df,
        tokenizer,
        batch_size=config['batch_size'],
        text_column=config['text_column'],
        label_column=config['label_column'],
        lang=target_language
    )
    # Test evaluation: Showing results after training and adapters
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    logger.info("Evaluating on test set...")
    test_loss, test_accuracy, test_precision, test_recall, test_f1, test_preds, test_labels = evaluate(model, test_loader, device)

    logger.info(f"Test Results:")
    logger.info(f"Loss: {test_loss:.4f}")
    logger.info(f"Accuracy: {test_accuracy:.4f}")
    logger.info(f"Precision: {test_precision:.4f}")
    logger.info(f"Recall: {test_recall:.4f}")
    logger.info(f"F1 Score: {test_f1:.4f}")

    results_df_afro = {
    "test_loss": test_loss,
    "test_accuracy": test_accuracy,
    "test_f1": test_f1,
    "test_precision": test_precision,
    "test_recall": test_recall,
    "epochs": config['epochs'],
    "learning_rate": config['learning_rate'],
    "batch_size": config['batch_size'],
}

    path = f"afroxlmr_results_{target_language}_adapters.csv"
    pd.DataFrame([results_df_afro]).to_csv(path, index=False)
    print(f"Results saved to {path}")

    # Saving the individual adapters
    # model.save_adapter(f"./adapters/{src_adapter}", src_adapter)
    # model.save_adapter(f"./adapters/{tgt_adapter}", tgt_adapter)
    # model.save_adapter(f"./adapters/{task_adapter}", task_adapter)

evaluate_afro_xlmr_adapter('swa','sot')