In [None]:
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from sklearn.metrics import classification_report, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, Dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, logging, pipeline

In [None]:
MODEL_NAME = "bert-base-multilingual-uncased"
RANDOM_STATE = 9
BATCH_SIZE = 64
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
NUM_EPOCHS = 1
LEARNING_RATE = 5e-5

torch.backends.cudnn.deterministic = True
torch.manual_seed(RANDOM_STATE)
logging.set_verbosity_error()

In [None]:
#load the data
train_data = pd.read_csv("/content/sample_data/train.csv")
test_data = pd.read_csv("/content/sample_data/test.csv")
valid_data = pd.read_csv("/content/sample_data/valid.csv")

In [None]:
# Concatenate the datasets into a single dataframe
lang_data = pd.concat([train_data, test_data, valid_data], ignore_index=True)

In [None]:
print("Combined Dataset Summary:")
print(lang_data.sample(n=30, random_state=12345))

In [None]:
# Plotting the structure of the target variable
lang_counts = lang_data["labels"].value_counts(normalize=True)
lang_counts.plot(kind="bar")
plt.title("Shares of objects in each language class", fontsize=15)
plt.ylabel("Proportion of objects")
plt.tight_layout()
plt.show()

In [None]:
# Retrieving the text data
texts_data = lang_data["text"].values.astype("U")
# Retrieving the labels data
labels_data = lang_data["labels"].values

In [None]:
# Instantiating the LabelEncoder object
label_encoder = LabelEncoder()

# Encoding the labels
labels_data_encoded = label_encoder.fit_transform(labels_data)
class_names = label_encoder.classes_

# Displaying the encoding results
for idx, class_name in enumerate(class_names):
    print(f"{idx:<2} => {class_name}")

In [None]:
# Splitting the data into training (80%) and the combination of validation and testing (20%)
train_texts, val_test_texts, train_labels, val_test_labels = train_test_split(
    texts_data, labels_data_encoded, train_size=0.8, random_state=RANDOM_STATE,stratify=labels_data_encoded,
)

# Further splitting the validation and testing sets
validation_texts, testing_texts, validation_labels, testing_labels = train_test_split(
    val_test_texts, val_test_labels, test_size=0.5, random_state=RANDOM_STATE, stratify=val_test_labels
)

In [None]:
# Verifying the correctness of dimensions
assert (
    train_texts.shape[0] + validation_texts.shape[0] + testing_texts.shape[0] == texts_data.shape[0]
)

# Displaying the number of objects in each set
train_texts.shape[0], validation_texts.shape[0], testing_texts.shape[0]

In [None]:
# Instantiating a BERT tokenizer
bert_tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

In [None]:
# Tokenizing the training examples
training_encodings = bert_tokenizer(
    list(train_texts),
    add_special_tokens=True,
    max_length=128,
    truncation=True,
    padding="max_length",
)

# Tokenizing the validation examples
validation_encodings = bert_tokenizer(
    list(validation_texts),
    add_special_tokens=True,
    max_length=128,
    truncation=True,
    padding="max_length",
)

# Tokenizing the testing examples
testing_encodings = bert_tokenizer(
    list(testing_texts),
    add_special_tokens=True,
    max_length=128,
    truncation=True,
    padding="max_length",
)

In [None]:
def display_encodings_info(
    tokenizer,
    encodings,
    texts,
    labels,
    text_idx
):
    """Shows the original, encoded and decoded texts."""
    # Displaying the original text
    text = texts[text_idx]
    print(f"Input text:\n{text}\n")

    # Displaying the language of the text
    lang_label = labels[text_idx]
    lang = class_names[lang_label]
    print(f"Language: {lang}\n")

    # Displaying the encoded text
    text_encoded = encodings["input_ids"][text_idx]
    print(f"Tokenized input text (encoded):\n{text_encoded}\n")

    # Displaying the decoded text
    text_decoded = tokenizer.convert_ids_to_tokens(text_encoded)
    print(f"Tokenized input text (decoded):\n{text_decoded}")

In [None]:
display_encodings_info(
    tokenizer=bert_tokenizer,
    encodings=training_encodings,
    texts=train_texts,
    labels=train_labels,
    text_idx=990,
)

In [None]:
display_encodings_info(
    tokenizer=bert_tokenizer,
    encodings=validation_encodings,
    texts=validation_texts,
    labels=validation_labels,
    text_idx=1033,
)

In [None]:
display_encodings_info(
    tokenizer=bert_tokenizer,
    encodings=testing_encodings,
    texts=testing_texts,
    labels=testing_labels,
    text_idx=1010,
)

In [None]:
#Creating Dataset and Dataloader
class LanguageDataset(Dataset):
    """Class for creating a custom dataset."""

    def __init__(self, encodings, labels):
        """Constructor for LanguageDataset class."""
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        """Computes the number of the dataset objects."""
        dataset_length = len(self.labels)

        return dataset_length

    def __getitem__(self, idx):
        """Returns the corresponding samples for index given."""
        item = {key: torch.tensor(value[idx])
                for key, value in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx])

        return item

In [None]:
# Initializing the training dataset
training_dataset = LanguageDataset(
    encodings=training_encodings,
    labels=train_labels,
)

# Initializing the validation dataset
validation_dataset = LanguageDataset(
    encodings=validation_encodings,
    labels=validation_labels,
)

# Initializing the testing dataset
testing_dataset = LanguageDataset(
    encodings=testing_encodings,
    labels=testing_labels,
)

In [None]:
# Creating a training Dataloader
training_dataloader = DataLoader(
    training_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
)

# Creating a validation Dataloader
validation_dataloader = DataLoader(
    validation_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
)

# Creating a testing Dataloader
testing_dataloader = DataLoader(
    testing_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
)

In [None]:
print(f"Training data examples: {len(training_dataloader.dataset):,}")
print(f"Number of batches: {len(training_dataloader)}")
print(f"Batch size: {BATCH_SIZE}")

In [None]:
print(f"Validation data examples: {len(validation_dataloader.dataset)}")
print(f"Number of batches: {len(validation_dataloader)}")
print(f"Batch size: {BATCH_SIZE}")

In [None]:
print(f"Testing data examples: {len(testing_dataloader.dataset):,}")
print(f"Number of batches: {len(testing_dataloader)}")
print(f"Batch size: {BATCH_SIZE}")

In [None]:
# Creating a mapping from predictions to label names
id2label_mappings = dict()
for i, name in enumerate(class_names):
    id2label_mappings[i] = name
id2label_mappings

In [None]:
 # Computing the number of classes
num_labels = len(class_names)

# Instantiating the BERT model
bert_model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=num_labels,
    id2label=id2label_mappings,
)

# Moving the model to DEVICE (GPU/CUDA)
bert_model.to(DEVICE)

# Defining the optimization algorithm
optimizer = torch.optim.Adam(bert_model.parameters(), lr=LEARNING_RATE)

In [None]:
bert_model.eval()

In [None]:
def acc_score(model, dataloader, device=DEVICE):
    """Computes the accuracy score for a DataLoader."""
    # Preallocating counter variables
    correct_predictions, num_examples = 0, 0

    # Turning off computing gradients
    with torch.no_grad():

        # Iteratively computing accuracy score (batch by batch)
        for batch_idx, batch in enumerate(dataloader):

            # Selecting the batch data (encodings, attention mask, labels)
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            # Using BERT to compute logits
            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs["logits"]

            # Computing the predictions for labels
            predicted_labels = torch.argmax(logits, dim=1)

            # Computing the number of examples/correct predictions number
            num_examples += labels.size(0)
            correct_predictions += (predicted_labels == labels).sum()

    # Computing the final accuracy score
    accuracy_score = correct_predictions.float() / num_examples

    return accuracy_score

In [None]:
def train_bert_model(
    model,
    optimizer,
    training_dataloader,
    validation_dataloader,
    accuracy_score_func=acc_score,
    epochs=2,
    batch_log_freq=100,
    device=DEVICE
):
    """Launches the fine-tuning of BERT."""
    # Starting the timer
    start_time = time.time()

    # Going through all epochs
    for epoch in range(epochs):

        # Setting the model in the training mode
        model.train()

        # Going through all batches
        for batch_idx, batch in enumerate(training_dataloader):

            # Selecting the batch
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            # BERT forward pass
            outputs = model(
                input_ids, attention_mask=attention_mask, labels=labels
            )
            loss, logits = outputs["loss"], outputs["logits"]

            # BERT backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            # Logging the progress
            if not batch_idx % batch_log_freq:
                print (f"Epoch {epoch+1:03d}/{epochs:03d} | "
                       f"Batch {batch_idx:03d}/{len(training_dataloader):03d} | "
                       f"Loss = {loss:.4f}")

        # Setting the model in the evaluation mode
        model.eval()

        # Disabling computing gradients
        with torch.set_grad_enabled(False):
            # Computing training accuracy
            training_accuracy_score = accuracy_score_func(
                model=model,
                dataloader=training_dataloader,
            )
            # Computing validation accuracy
            validation_accuracy_score = accuracy_score_func(
                model=model,
                dataloader=validation_dataloader,
            )
            # Logging the accuracy scores
            print(f"\nTraining accuracy = "
                  f"{training_accuracy_score:.4f}"
                  f"\nValid accuracy = "
                  f"{validation_accuracy_score:.4f}\n")

        # Printing the time passed at the end of the epoch
        time_elapsed_epoch = (time.time() - start_time) / 60
        print(f'Time elapsed: {time_elapsed_epoch:.2f} min\n')

    # Printing the total time spent on BERT fine-tuning
    time_elapsed_total = (time.time() - start_time) / 60
    print(f'\nTotal training Time: {time_elapsed_total:.2f} min')

    return model

In [None]:
# Training the BERT model
bert_model = train_bert_model(
    model=bert_model,
    optimizer=optimizer,
    training_dataloader=training_dataloader,
    validation_dataloader=validation_dataloader,
    epochs=NUM_EPOCHS,
    batch_log_freq=10,
)

In [None]:
def evaluate_test(model, dataloader, device=DEVICE):
    """Predicts the labels for the DataLoader."""
    # Setting up counter variables
    correct_preds, num_examples = 0, 0
    # Preallocating the list for test predictions
    test_predictions = []

    # Disabling computing gradients
    with torch.no_grad():

        # Iterating through all batches
        for batch_idx, batch in enumerate(dataloader):

            # Selecting the batch
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            # Computing logits
            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs["logits"]

            # Computing the predictions for labels
            predicted_labels_batch = torch.argmax(logits, dim=1)

            # Adding the batch predictions to the list
            test_predictions.append(predicted_labels_batch)

            # Iteratively computing accuracy determinants
            num_examples += labels.size(0)
            correct_preds += (predicted_labels_batch == labels).sum().cpu()

    # Computing final accuracy score
    test_accuracy_score = correct_preds.float() / num_examples

    # Transforming a list of tensors into one tensor
    test_predictions_tensor = torch.cat(test_predictions).cpu()

    return test_accuracy_score, test_predictions_tensor

In [None]:
# Computing test accuracy and test predictions
accuracy_test, predictions_test = evaluate_test(
    model=bert_model, dataloader=testing_dataloader
)

print(f"Test accuracy: {accuracy_test:.4f}")