# Import libraries


In [None]:
import os
import torch
import json
import numpy as np
import pandas as pd

from langdetect import detect

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score

from datasets import Dataset

from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from transformers import EarlyStoppingCallback

In [None]:
device="cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

# Data loading and dataframe construction

In [None]:
folder_path='/content/drive/MyDrive/Colab Notebooks/code_classification_dataset'
data_list=[]

for filename in os.listdir(folder_path):
    filepath=os.path.join(folder_path, filename)
    with open(filepath, 'r') as f:
        data=json.load(f)
        data_list.append(data)

df=pd.DataFrame(data_list)
df.head()

# Data preprocessing

In [None]:
# Delete all tags we aren't trying to predict
tags_to_keep=['math', 'graphs', 'strings', 'number theory', 'trees', 'geometry', 'games', 'probabilities']
df['tags']=df['tags'].apply(lambda taglist: [tag for tag in taglist if tag in tags_to_keep])

# Multilabel binarization
mlb=MultiLabelBinarizer(classes=tags_to_keep)
y=pd.DataFrame(mlb.fit_transform(df['tags']), columns=mlb.classes_)
y=y.astype('float32')

In [None]:
# Deleting rows with non english descriptions
def is_english(text):
    try:
        return detect(text)=='en'
    except:
        return False

english_mask=df['prob_desc_description'].apply(is_english)
df=df[english_mask]
y=y[english_mask]

In [None]:
# Identification of the examples with no tags
no_tag_mask=(y.sum(axis=1)==0)

# Undersampling examples with none of the eight tags
no_tag_df=df[no_tag_mask].sample(frac=0.4, random_state=42)
no_tag_y=y[no_tag_mask].loc[no_tag_df.index]

with_tag_df=df[~no_tag_mask]
with_tag_y=y[~no_tag_mask]

# Concatenation of the dataframes with and without tags
df_balanced=pd.concat([with_tag_df, no_tag_df])
y_balanced=pd.concat([with_tag_y, no_tag_y])

# Shuffling of the rows so we don't have all no tag rows at the end
shuffled_idx=df_balanced.sample(frac=1, random_state=42).index
df_balanced=df_balanced.loc[shuffled_idx].reset_index(drop=True)
y_balanced=y_balanced.loc[shuffled_idx].reset_index(drop=True)

In [None]:
# Only keeping useful features useless columns
X=df_balanced['prob_desc_description']

# Train/test/val split
X_train_val, X_test, y_train_val, y_test=train_test_split(X,y_balanced, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val=train_test_split(X_train_val,y_train_val, test_size=0.13, random_state=42)

# Pretrained model and tokenizer import

In [None]:
model_path="bert-base-uncased"

tokenizer=AutoTokenizer.from_pretrained(model_path)

id2label={index:tag for index,tag in enumerate(tags_to_keep)}
label2id={tag: index for index, tag in enumerate(tags_to_keep)}
model=AutoModelForSequenceClassification.from_pretrained(
    model_path,
    num_labels=len(tags_to_keep),
    problem_type="multi_label_classification",
    id2label=id2label,
    label2id=label2id
)

# Freezing base model parameters
for name, param in model.base_model.named_parameters():
    param.requires_grad = False

# Unfreezing the last four encoder layers
for layer_idx in [8,9,10, 11]:
    for param in model.base_model.encoder.layer[layer_idx].parameters():
        param.requires_grad = True

# Tokenization

In [None]:
def tokenization_function(examples):
    return tokenizer(examples["text"], truncation=True, padding="max_length")

train_dataset=Dataset.from_dict({"text": X_train.tolist(), "labels": y_train.values.tolist()})
val_dataset=Dataset.from_dict({"text":X_val.tolist(), "labels":y_val.values.tolist()})
test_dataset = Dataset.from_dict({"text": X_test.tolist(), "labels": y_test.values.tolist()})

train_dataset=train_dataset.map(tokenization_function, batched=True)
val_dataset=val_dataset.map(tokenization_function, batched=True)
test_dataset=test_dataset.map(tokenization_function, batched=True)

# Definition of the metrics

In [None]:
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    probabilities = 1/(1+np.exp(-predictions))
    thresholds= np.round(np.arange(0.2,1,0.1),1) # We compute for different thresholds to find the best
    metrics ={f"f1_micro_{int(t*100)}": f1_score(labels,(probabilities>=t).astype(int), average='micro', zero_division=0)for t in thresholds}
    return metrics

# Modification of the loss function

In [None]:
# Computing  class weights
tag_count={}
for tag in y.columns:
    tag_count[tag]=y[tag].sum()
class_weights=[(y.shape[0]-count)/count for tag, count in tag_count.items()]

# Changing the loss function to handle class imbalance
class WeightedTrainer(Trainer):
    def __init__(self, class_weights, focal_alpha=0.25, focal_gamma=2.0, **kwargs):
        super().__init__(**kwargs)
        self.class_weights=torch.tensor(class_weights, dtype=torch.float32)

    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        labels=inputs["labels"]
        outputs=model(**inputs)
        logits=outputs["logits"]
        loss=torch.nn.functional.binary_cross_entropy_with_logits(logits, labels, pos_weight=self.class_weights.to(logits.device), reduction='mean')
        return (loss, outputs) if return_outputs else loss


# Model training

In [None]:
os.environ["WANDB_DISABLED"]="true"

# Hyperparameters
lr=2e-5
batch_size=16
num_epochs=10


training_args=TrainingArguments(
    output_dir="bert-codeforces-tags",
    learning_rate=lr,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=num_epochs,
    logging_strategy="epoch",
    eval_strategy="epoch",
    save_strategy="epoch",
    load_best_model_at_end=True,
    metric_for_best_model="f1_micro_50",
    greater_is_better=True,
    weight_decay=0.01 # Regularization
)


trainer=WeightedTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    class_weights=class_weights,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)] # Early stopping
)


trainer.train()