# Imports and uploads

In [None]:
# for Google Colab
# install specific versions for compatibility
!pip install accelerate==0.23.0 -U
!pip install transformers==4.33.0

In [None]:
# install for Japanese BERT
!pip install "fugashi[unidic-lite]"

In [None]:
import os
import numpy as np
import pandas as pd
import torch
from tokenizers import Tokenizer
from tokenizers.models import WordPiece
from tokenizers.trainers import WordPieceTrainer
from tokenizers.pre_tokenizers import Whitespace
from transformers import BertTokenizer, Trainer, TrainingArguments, BertForSequenceClassification, AutoConfig, TrainerCallback
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, classification_report
from torch.utils.data import Dataset
import accelerate

np.random.seed(42)
torch.manual_seed(42)
torch.backends.cudnn.enabled = False
torch.backends.cudnn.deterministic = True

In [None]:
# import for Japanese BERT
import fugashi

In [11]:
# create directories to save data
directories = ["tokenizers/", "results/", "logs/"]

for dir_name in directories:
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)

# upload dataset from github
df = pd.read_csv("https://raw.githubusercontent.com/ARomach/classification-of-akkadian-texts/main/ORACC-catalogues-030524.csv",
                  encoding="utf-8", index_col="_ - index")

In [4]:
def combine_tokenizers(tokenizer_name):

    original_tokenizer = BertTokenizer.from_pretrained(tokenizer_name)

    for word_level in ("lemm", "norm", "seg_uni", "unseg_uni"):
        # Load your trained tokenizer
        new_tokenizer = Tokenizer.from_file(f"tokenizers/{word_level}_wordpiece.tokenizer")
        new_vocab = [token for token in new_tokenizer.get_vocab().keys() if token not in
                     original_tokenizer.get_vocab().keys()]

        # Add new tokens to the original tokenizer and save it
        num_added_toks = original_tokenizer.add_tokens(new_vocab)
        print("Added", num_added_toks, "new tokens for", word_level, "with", tokenizer_name)

        # Save the updated tokenizer
        original_tokenizer.save_pretrained(f"tokenizers/updated_{tokenizer_name}_{word_level}")

# Fine-tune BERT Transformer

In [5]:
# Prepare dataset
class AkkadianDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item["labels"] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

class MetricsCallback(TrainerCallback):
    def __init__(self):
        self.train_losses = []
        self.eval_losses = []

    def on_step_end(self, args, state, control, **kwargs):
        # Log training loss
        if state.log_history and "loss" in state.log_history[-1]:
            self.train_losses.append(state.log_history[-1]["loss"])

    def on_evaluate(self, args, state, control, metrics=None, **kwargs):
        # Log evaluation loss after each evaluation
        if metrics and "eval_loss" in metrics:
            self.eval_losses.append(metrics["eval_loss"])

In [6]:
def fine_tune_model(model_name, tokenizer_name):

    # Loop through prediction categories
    for category in ("supergenre_160424", "superperiod_160424", "superprovenience_160424"):
        # filter out small examples in specific categories:
        if category == "superperiod_160424":
            filtered_df = df[(df["superperiod_160424"]!="Unknown")&(df["superperiod_160424"]!="First Millennium")].copy()
        elif category == "superprovenience_160424":
            filtered_df = df[(df["superprovenience_160424"]!="East")&(df["superprovenience_160424"]!="Unknown")].copy()
        else:
            filtered_df = df.copy()

        filtered_df["labels"] = filtered_df[category].astype("category").cat.codes

        sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=42)
        for train_index, test_index in sss1.split(filtered_df, filtered_df["labels"]):
            train_val_df, test_df = filtered_df.iloc[train_index], filtered_df.iloc[test_index]

        sss2 = StratifiedShuffleSplit(n_splits=1, test_size=0.22, random_state=42)
        for train_index, test_index in sss2.split(train_val_df, train_val_df["labels"]):
            train_df, val_df = train_val_df.iloc[train_index], train_val_df.iloc[test_index]

        # Loop through word levels
        for word_level in ("lemm", "norm", "seg_uni", "unseg_uni"):
            # Encode the texts and prepare labels
            tokenizer = BertTokenizer.from_pretrained(f"tokenizers/updated_{tokenizer_name}_{word_level}")

            print("tokenizing...")
            train_encodings = tokenizer(train_df[word_level].tolist(), truncation=True, padding=True, max_length=128)
            val_encodings = tokenizer(val_df[word_level].tolist(), truncation=True, padding=True, max_length=128)
            test_encodings = tokenizer(test_df[word_level].tolist(), truncation=True, padding=True, max_length=128)

            # Get labels
            train_labels = train_df["labels"].tolist()
            val_labels = val_df["labels"].tolist()
            test_labels = test_df["labels"].tolist()

            # Create Datasets
            print("creating datasets...")
            train_dataset = AkkadianDataset(train_encodings, train_labels)
            val_dataset = AkkadianDataset(val_encodings, val_labels)
            test_dataset = AkkadianDataset(test_encodings, test_labels)

            # Load model
            model = BertForSequenceClassification.from_pretrained(model_name,
                                                                  num_labels=filtered_df["labels"].nunique())
            model.resize_token_embeddings(len(tokenizer), pad_to_multiple_of=8)

            # Setting results and logs subdirectories
            model_name_to_save = model_name.replace("/", "-")
            if not os.path.exists(f"results/{model_name_to_save}_{category}_{word_level}/"):
                os.mkdir(f"results/{model_name_to_save}_{category}_{word_level}/")
            if not os.path.exists(f"logs/{model_name_to_save}_{category}_{word_level}/"):
                os.mkdir(f"logs/{model_name_to_save}_{category}_{word_level}/")

            # Set up training arguments
            training_args = TrainingArguments(
                save_strategy="no",            # change to yes or remove to save models
                output_dir=f"results/{model_name_to_save}_{category}_{word_level}/",
                num_train_epochs=10,
                per_device_train_batch_size=8,
                per_device_eval_batch_size=8,
                warmup_steps=500,
                weight_decay=0.01,
                logging_dir=f"logs/{model_name_to_save}_{category}_{word_level}/",
                logging_steps=10,
                evaluation_strategy="epoch",
                load_best_model_at_end=False  # change to True to save the final model
            )

            # Initialize callback
            metrics_callback = MetricsCallback()

            # Initialize the Trainer
            trainer = Trainer(
                model=model,
                args=training_args,
                train_dataset=train_dataset,
                eval_dataset=val_dataset,
                callbacks=[metrics_callback]
            )

            # Train the model
            print("training...")
            trainer.train()

            # Make predictions
            predictions, labels, _ = trainer.predict(test_dataset)

            # Decode predictions
            predictions = np.argmax(predictions, axis=1)

            accuracy = accuracy_score(labels, predictions)
            target_names = filtered_df[category].astype("category").cat.categories.tolist()
            report = classification_report(labels, predictions, target_names=target_names)

            # Save to file and print
            with open(f"Report_{model_name_to_save}_{category}_{word_level}.txt", "w") as file:
                file.write(f"Accuracy: {str(accuracy)}\n")
                file.write(f"Classification Report:\n")
                file.write(report+"\n")

                file.write("Training Loss:\n")
                file.write(f"{metrics_callback.train_losses}\n")
                file.write("Validation Loss:\n")
                file.write(f"{metrics_callback.eval_losses}\n")

                file.write("Train, Validation, Test split:\n")
                file.write(f"Train: {train_df.shape[0]}\n")
                file.write(f"Validation: {val_df.shape[0]}\n")
                file.write(f"Test: {test_df.shape[0]}\n")

            print(f"{model_name}_{category}_{word_level} Accuracy:", accuracy)
            print("Classification Report:\n", report)

# Main

## Prepare tokenizers

The code block below prepares tokens that will be saves in the tokenizers folder. You only need to run this once.

In [7]:
# Initialize a tokenizer with WordPiece
tokenizer = Tokenizer(WordPiece(unk_token="UNK"))
tokenizer.pre_tokenizer = Whitespace()

# Create a trainer
trainer = WordPieceTrainer(
    vocab_size=20000,
    min_frequency=2,
    special_tokens=["UNK", "PN", "RN", "DN", "GN", "MN", "SN", "NUM", "X"]
)

for word_level in ("lemm", "norm", "seg_uni", "unseg_uni"):
    # Train the tokenizer
    texts = df[word_level].tolist()
    tokenizer.train_from_iterator(texts, trainer)

    # Save the tokenizer
    tokenizer.save(f"tokenizers/{word_level}_wordpiece.tokenizer")

## Combine tokenizers

The code block below uses the combine_tokenizers function to combine between the Akkadian tokenizers created above on each of the word level, and the original tokenizer of the model that is being used for pre-training. The resulting tokenizer is saved in the tokenizers folder.

Models that were used (from Hugging Face):

- `bert-base-multilingual-cased`
- `CAMeL-Lab/bert-base-arabic-camelbert-mix`
- `tohoku-nlp/bert-base-japanese-char-v2`

In [None]:
combine_tokenizers("bert-base-multilingual-cased")

## Fine-tune model

The code block below fine-tunes an existing BERT model, on all classification categories and all word level at once. Runtime with GPU is around 3.5-4 hours.

Function takes two parameters, `model_name` and `tokenizer_name`. Give the name of the original model from HuggingFace. If the tokenizers have not been combined yet, it will throw and error (see code block above).

In [None]:
fine_tune_model(model_name="bert-base-multilingual-cased",
                tokenizer_name="bert-base-multilingual-cased")