# Task 1

In [None]:
from datasets import Dataset
from datasets.formatting.formatting import LazyBatch
import nest_asyncio
import numpy as np
from numpy.typing import NDArray
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from transformers.data.data_collator import DataCollatorWithPadding
from transformers.models.auto.modeling_auto import AutoModelForSequenceClassification
from transformers.models.auto.tokenization_auto import AutoTokenizer
from transformers.modeling_utils import PreTrainedModel
from transformers.tokenization_utils_base import BatchEncoding, PreTrainedTokenizerBase
from transformers.trainer import Trainer
from transformers.trainer_callback import EarlyStoppingCallback
from transformers.trainer_utils import get_last_checkpoint
from transformers.training_args import TrainingArguments
import wandb

from modules import dataset, models_creation, paths
from modules.utils import model as model_utils

In [None]:
# Allows for asyncio to be run in notebooks
nest_asyncio.apply()

In [None]:
model_name: str = 'sentence-transformers/paraphrase-TinyBERT-L6-v2'

## Data

### Basic Operations

In [None]:
# Load the dataset
full_train_set: pd.DataFrame = dataset.extract_dataset('train')
full_val_set: pd.DataFrame = dataset.extract_dataset('validation')
full_test_set: pd.DataFrame = dataset.extract_dataset('test')

In [None]:
# Split x and y
train_x: pd.DataFrame = full_train_set.filter(regex = '_extract$')
train_y: pd.Series = full_train_set['label']
val_x: pd.DataFrame = full_val_set.filter(regex = '_extract$')
val_y: pd.Series = full_val_set['label']
test_x: pd.DataFrame = full_test_set.filter(regex = '_extract$')

In [None]:
# Rename the columns
train_x = train_x.rename(columns = lambda x: x.replace('_extract', ''))
val_x = val_x.rename(columns = lambda x: x.replace('_extract', ''))
test_x = test_x.rename(columns = lambda x: x.replace('_extract', ''))

### Preprocessing

In [None]:
# Encode labels
label_encoder: LabelEncoder = LabelEncoder()
train_y_encoded: pd.Series = pd.Series(label_encoder.fit_transform(train_y),    # type: ignore
                                       name = train_y.name,
                                       index = train_y.index
                                       )
val_y_encoded: pd.Series = pd.Series(label_encoder.transform(val_y),    # type: ignore
                                     name = val_y.name,
                                     index = val_y.index
                                     )

In [None]:
# Merge the dataframes
train_df: pd.DataFrame = pd.concat([train_x, train_y_encoded], axis = 1)
val_df: pd.DataFrame = pd.concat([val_x, val_y_encoded], axis = 1)
test_df: pd.DataFrame = test_x

In [None]:
# Define the tokenizer
tokenizer: PreTrainedTokenizerBase = AutoTokenizer.from_pretrained(model_name)

# Add the additional special tokens
tokenizer.add_special_tokens({'additional_special_tokens': [f'<{col}>' for col in train_x.columns]})    # type: ignore

In [None]:
# Define the data collator
data_collator: DataCollatorWithPadding = DataCollatorWithPadding(tokenizer = tokenizer)

In [None]:
# Tokenize the data
def tokenize(examples: LazyBatch) -> BatchEncoding:
    """
    Tokenizes the input data by concatenating the values of each row and adding the column names tokens.
    """

    rows: list[str] = []
    
    # Iterate over the rows of the input data
    for values in zip(*(examples[col] for col in train_x.columns)):
        processed_extracts: list[str] = []
        # Iterate over the values of each row
        for col, value in zip(train_x.columns, values):
            extract: str = f'<{col}> {value}' if value else f'<{col}>'
            processed_extracts.append(extract)
        rows.append(' '.join(processed_extracts))

    # Tokenize the concatenated strings
    return tokenizer(rows, truncation = True)

train_data: Dataset = Dataset.from_pandas(train_df).map(tokenize, batched = True)
val_data: Dataset = Dataset.from_pandas(val_df).map(tokenize, batched = True)
test_data: Dataset = Dataset.from_pandas(test_df).map(tokenize, batched = True)

## Model

### Training

In [None]:
# Load the pretrained model
pretrained_model: PreTrainedModel = AutoModelForSequenceClassification.from_pretrained(model_name,
                                                                            num_labels = len(label_encoder.classes_),
                                                                            ignore_mismatched_sizes = True
                                                                            )

In [None]:
# Resize the tokenizer embeddings
pretrained_model.resize_token_embeddings(len(tokenizer), pad_to_multiple_of = 8)    # ideal for fp16 precision

In [None]:
# Initialize wandb
wandb.init(project = 'Cultural classification on text', dir = paths.DATA_DIR)

In [None]:
# Train the model

trainargs: TrainingArguments = TrainingArguments(num_train_epochs = 1000,    # Unlimited epochs
                                                 auto_find_batch_size = True,
                                                 fp16 = True,
                                                 metric_for_best_model = 'f1',
                                                 load_best_model_at_end = True,
                                                 eval_strategy = 'steps',
                                                 eval_steps = 100,
                                                 save_strategy = 'best',
                                                 save_total_limit = 1,
                                                 output_dir = str(paths.TRANSFORMER_MODEL_DIR),
                                                 report_to = 'wandb'
                                                 )

trainer: Trainer = Trainer(callbacks = [EarlyStoppingCallback(early_stopping_patience = 10)],
                           model = pretrained_model,
                           args = trainargs,
                           train_dataset = train_data,
                           eval_dataset = val_data,
                           processing_class = tokenizer,
                           data_collator = data_collator,
                           compute_metrics = models_creation.transformer_metrics
                           )

trainer.train()

In [None]:
# close wandb
wandb.finish()

### Results

In [None]:
# Load the model
model: PreTrainedModel = AutoModelForSequenceClassification.from_pretrained(get_last_checkpoint(paths.TRANSFORMER_MODEL_DIR))

In [None]:
# Initialize the trainer
results_trainer: Trainer = Trainer(model = model,
                                   args = TrainingArguments(auto_find_batch_size = True,
                                                            output_dir = str(paths.TRANSFORMER_MODEL_DIR),
                                                            report_to = 'none'
                                                            ),
                                   eval_dataset = val_data,
                                   processing_class = tokenizer,
                                   data_collator = data_collator,
                                   compute_metrics = models_creation.transformer_metrics
                                   )

#### Validation

In [None]:
# Evaluate the model on the validation set
val_results: dict[str, float] = results_trainer.evaluate()
print(f"Loss: {val_results['eval_loss']:.3f}")
print(f"Accuracy: {val_results['eval_accuracy']:.3f}")
print(f"F1 score: {val_results['eval_f1']:.3f}")
print(f"Precision: {val_results['eval_precision']:.3f}")
print(f"Recall: {val_results['eval_recall']:.3f}")

In [None]:
# Confusion matrix
val_logits: NDArray[np.float32] = np.array(results_trainer.predict(val_data).predictions)    # type: ignore
val_predictions_encoded: NDArray[np.intp] = np.argmax(val_logits, axis = 1)
model_utils.plot_confusion_matrix(val_y_encoded, val_predictions_encoded, label_encoder)

#### Test

In [None]:
# Get the predictions on the test set
test_logits: NDArray[np.float32] = np.array(results_trainer.predict(test_data).predictions)    # type: ignore
test_predictions_encoded: NDArray[np.intp] = np.argmax(test_logits, axis = 1)
test_predictions: NDArray[str] = label_encoder.inverse_transform(test_predictions_encoded)  # type: ignore

# Save the predictions
test_predictions_df: pd.DataFrame = pd.DataFrame({'item': full_test_set['item'],
                                                  'name': full_test_set['name'],
                                                  'label': test_predictions
                                                  }, index = full_test_set.index
                                                  )
test_predictions_df.to_csv(paths.TRANSFORMER_PREDICITONS, index_label = 'id')
print(f"Saved the predicitons on test set to {paths.TRANSFORMER_PREDICITONS}")