# Fine-Tuning a Transformer Model on Azure Databricks

## Overview
This notebook guides you through the process of preparing data and executing the fine-tuning of a transformer model using Azure Databricks.

## Objectives
- Setup the environments
- Load datasets 
- Configure and train a transformer model.
- Evaluate model performance and save results.


## Author
- Name: Alessandro Armillotta
- Date: 09/10/2025

# Steps
1. Data loading and preprocessing.
2. Model configuration and fine-tuning.
3. Model evaluation and saving.
4. Load Model

In [0]:
# Create Databricks widgets to make the notebook configurable.
# 'experiment_name' will be used to define the MLflow experiment where training runs are logged.
# 'base_model' defines which pretrained model from Hugging Face will be used as the starting point.
dbutils.widgets.text("experiment_name", "fine_tuning_transformer_model")
dbutils.widgets.text("base_model", "google-bert/bert-base-uncased")
dbutils.widgets.text("run_name", "google-bert/bert-base-uncased")

In [0]:
# Read the values provided through the Databricks widgets.
# These variables can now be used throughout the notebook for dynamic configuration.
experiment_name = dbutils.widgets.get("experiment_name")
base_model      = dbutils.widgets.get("base_model")
run_name        = dbutils.widgets.get("run_name")

## Step 0: Setup Environments

In [0]:
import json

# Import Hugging Face and PyTorch core components
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorWithPadding, pipeline, EarlyStoppingCallback
from pyspark.sql import functions as F
import datasets
import os
import torch
import numpy as np
import evaluate


# Import MLflow for experiment tracking and model logging
import mlflow
mlflow.set_registry_uri("databricks-uc") # Use Unity Catalog as MLflow model registry
mlflow.set_tracking_uri("databricks") # Enable experiment tracking in Databricks

# Logging
import warnings
warnings.filterwarnings("ignore")
import transformers
transformers.logging.set_verbosity_error()
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

# GPU/CPU Setup
# Automatically detect whether a GPU is available and configure CUDA environment variables.
# If no GPU is detected, training will fall back to CPU.
USE_CUDA = torch.cuda.is_available()
DEVICE = "cuda" if USE_CUDA else "cpu"

print(f"üî• Device in uso: {DEVICE}")
if USE_CUDA:
    print(f"GPU rilevata: {torch.cuda.get_device_name(0)}")
    
else:
    print("‚ö†Ô∏è Nessuna GPU rilevata ‚Äî training su CPU")

# Disable parallel tokenizers warnings to prevent multiprocessing deadlocks
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Configure visible CUDA devices if GPU is available
os.environ["CUDA_VISIBLE_DEVICES"] = "0" if USE_CUDA else ""

print("PyTorch version:", torch.__version__)
print("Transformers version:", transformers.__version__)

# Only check BF16 support if CUDA is available
USE_BF16 = torch.cuda.is_bf16_supported() if USE_CUDA else False
USE_FP16 = USE_CUDA and not USE_BF16

In [0]:
experiment_name = f'/{experiment_name}'

train_cache_dir   = "/Volumes/main/fine_tuning_transformer_model/tmp/train"
val_cache_dir     = "/Volumes/main/fine_tuning_transformer_model/tmp/val"


model_output_dir    = "/Volumes/main/fine_tuning_transformer_model/tmp/output_model"
model_artifact_path = "classification"
training_output_dir = "/Volumes/main/fine_tuning_transformer_model/tmp/trainer"
pipeline_output_dir = "/Volumes/main/fine_tuning_transformer_model/tmp/pipeline"

In [0]:
try:
    experiment = mlflow.get_experiment_by_name(experiment_name)
    
    if experiment is None:
        experiment_id = mlflow.create_experiment(
            name=experiment_name,
            tags={'exp_name': experiment_name}
        )
        mlflow.set_experiment(experiment_id=experiment_id)
        print(f"Experiment {experiment_name} created.")
    else:
        mlflow.set_experiment(experiment_id=experiment.experiment_id)
        print(f"Experiment {experiment_name} already exists.")
except Exception as e:
    print(f"An error occurred: {e}")

## Step 1: Read Dataset

In [0]:
# load delta tables into dataframe
train_df = spark.read.table("main.fine_tuning_transformer_model.train_data")
print(train_df.count())

test_df   = spark.read.table("main.fine_tuning_transformer_model.test_data")
print(test_df.count())

In [0]:
labels = spark.read.table("main.fine_tuning_transformer_model.labels")
labels = labels.collect()

id2label = {index: row.label for (index, row) in enumerate(labels)}
label2id = {row.label: index for (index, row) in enumerate(labels)}

In [0]:
# create a label with id if you don't have it as id. This is needed especially for text labels
#train_dataset = train_df.select("text","label_id").withColumn("label", F.col("label_id")).drop("label_id")
#test_dataset   = val_df.select("text","label_id").withColumn("label", F.col("label_id")).drop("label_id")

# if you use a serverless compute
#train_df = train_df.toPandas()
#test_df = test_df.toPandas()
#train_dataset = datasets.Dataset.from_pandas(train_df)
#test_dataset   = datasets.Dataset.from_pandas(test_df)

# if you use a non serverless compute
train_dataset = datasets.Dataset.from_spark(train_df, cache_dir=train_cache_dir)
test_dataset  = datasets.Dataset.from_spark(test_df, cache_dir=val_cache_dir)

## Step 2: Training Configuration

In [0]:
# Load Tokenizer baseed on the Model Name. Transformers models expect tokenized input
tokenizer = AutoTokenizer.from_pretrained(base_model)

def tokenize_function(examples):
    return tokenizer(examples["text"],truncation=True, padding='max_length', max_length=512, return_tensors="pt")


train_tokenized  = train_dataset.map(tokenize_function, batched=True)
test_tokenized   = test_dataset.map(tokenize_function, batched=True)

In [0]:
model = AutoModelForSequenceClassification.from_pretrained(
        base_model,
        num_labels=len(label2id),
        label2id=label2id,
        id2label=id2label
        )

Training arguments refer to a set of hyperparameters that control how a model is trained. These are passed to the TrainingArguments class and used by the Trainer API to manage the training loop. [huggingface.co]

Think of training arguments as the recipe for model training: each parameter influences how the model learns, how fast it converges, and how well it generalizes.

In [0]:
training_args = TrainingArguments(
    output_dir=training_output_dir,
    per_device_train_batch_size=64 if USE_CUDA else 32,
    per_device_eval_batch_size=64 if USE_CUDA else 32,
    do_eval=True,
    do_train=True,
    num_train_epochs=2,
    learning_rate=2e-5,
    optim="adamw_torch",
    fp16=USE_CUDA,
    bf16=False,
    max_grad_norm=1.0,

    dataloader_num_workers=8 if USE_CUDA else 0,
    dataloader_pin_memory=USE_CUDA,
    eval_strategy="epoch",
    save_strategy="epoch",
    logging_strategy="steps",
    #eval_steps=10, # only if eval_strategy is step
    #save_steps=10, # only if save_strategy is step
    load_best_model_at_end=True,
    metric_for_best_model="eval_accuracy",
    greater_is_better=True,
    logging_steps=2,
    report_to="mlflow",

    no_cuda=not USE_CUDA
)

In [0]:
data_collator = DataCollatorWithPadding(tokenizer)

In [0]:
from sklearn.metrics import accuracy_score
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return {"accuracy": accuracy_score(labels, predictions)}

In [0]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tokenized,
    eval_dataset=test_tokenized,
    compute_metrics=compute_metrics,
    data_collator=data_collator,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

In [0]:
try:
  with mlflow.start_run(experiment_id=experiment.experiment_id, run_name=f"trainer_{run_name}") as run:
    trainer.train()
    trainer.save_model(model_output_dir)

    pipe = pipeline("text-classification", model=AutoModelForSequenceClassification.from_pretrained(model_output_dir), batch_size=1, tokenizer=tokenizer)

    pipe.save_pretrained(pipeline_output_dir)

    model_info = mlflow.transformers.log_model(
                                              transformers_model=pipe,
                                              artifact_path=model_artifact_path,
                                              input_example="Hi there!")
    
    mlflow.end_run()
  
  
except Exception as e:
  print(f"An error occurred: {e}")
  mlflow.end_run()



## Step 4: Load Logged Model

In [0]:
logged_model = f"runs:/{run.info.run_id}/{model_artifact_path}"
model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model, result_type='string')

In [0]:
test = test_df.select(test_df.text, test_df.label, model(test_df.text).alias("prediction"))
display(test)

In [0]:
test.write.mode("overwrite").saveAsTable("main.fine_tuning_transformer_model.prediction")

### Register Model

In [0]:
mv = mlflow.register_model(logged_model, "main.fine_tuning_transformer_model.classification_model")
print(f"Name: {mv.name}")
print(f"Version: {mv.version}")