In [0]:
%pip install --quiet datasets==2.20.0 transformers==4.49.0 tf-keras==2.17.0 accelerate==1.4.0 mlflow==2.20.2 torchvision==0.20.1 deepspeed==0.14.4
dbutils.library.restartPython()

In [0]:
import pandas as pd
import io
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql import SparkSession
from PIL import Image
IMAGE_RESIZE = 224

landing_catalog = "smart_claims_drv"
landing_schema = "00_landing"
bronze_schema = "01_bronze"
silver_schema = "02_silver"
gold_schema = "03_schema"

base_path = f"/Volumes/{landing_catalog}/{landing_schema}/claims"
metadata_path = f"{base_path}/autoloader_metadata"

In [0]:
training_df = spark.table("smart_claims_drv.`02_silver`.training_images")


In [0]:
# This code resizes all images in the "training_df" dataset to a fixed square resolution (IMAGE_RESIZE x IMAGE_RESIZE).
# 1. Each image is cropped to the center to ensure a square shape.
# 2. The image is resized to the target resolution using nearest neighbor interpolation.
# 3. The resized images are converted back to bytes and stored in a Pandas DataFrame.
# 4. The Pandas DataFrame is converted to a Spark DataFrame and saved as a Delta table for downstream ML pipelines.
# 
# Purpose: Standardizing image sizes ensures consistent input for machine learning models, enabling efficient training and inference.

spark = SparkSession.builder.getOrCreate()

# --- Função de resize via pandas_udf ---
@pandas_udf("binary")
def resize_image_udf(content_series):
    def resize_image(content):
        image = Image.open(io.BytesIO(content))
        width, height = image.size
        new_size = min(width, height)
        # Crop central
        image = image.crop(((width - new_size)/2, (height - new_size)/2, 
                            (width + new_size)/2, (height + new_size)/2))
        # Resize
        image = image.resize((IMAGE_RESIZE, IMAGE_RESIZE), Image.NEAREST)
        # Salvar como bytes JPEG
        output = io.BytesIO()
        image.save(output, format='JPEG')
        return output.getvalue()
    return content_series.apply(resize_image)

# Metadata para preview
image_meta = {"spark.contentAnnotation": '{"mimeType": "image/jpeg"}'}

# --- Ler o DataFrame existente ---
training_df = spark.table(f"{landing_catalog}.02_silver.training_images")

# --- Aplicar resize e salvar em Delta (batch) ---
(training_df
    .withColumn("content", resize_image_udf(col("content")).alias("content", metadata=image_meta))
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(f"{landing_catalog}.02_silver.training_images_resized")
)


In [0]:
display(spark.table("smart_claims_drv.02_silver.training_images_resized").limit(10))

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS smart_claims_drv.exports;

CREATE VOLUME IF NOT EXISTS smart_claims_drv.exports.training_images_resized;


In [0]:
# This code prepares the resized images dataset for model training:
# 1. Reads the Spark Delta table "training_images_resized" and converts it to a Pandas DataFrame.
# 2. Converts the Pandas DataFrame into a Hugging Face Dataset, renaming the image column for compatibility.
# 3. Splits the dataset into training (80%) and validation (20%) sets with a fixed seed for reproducibility.
# 4. Sets an MLflow experiment to track model training metrics and artifacts.

from datasets import Dataset
import mlflow
import pyarrow as pa

#Setup the training experiment
mlflow.set_experiment("/Users/gj.goncalvescaldas@gmail.com/image-claims-classifier")

# Spark escreve em Parquet (ou Delta → parquet under the hood)
(
  spark.table("smart_claims_drv.02_silver.training_images_resized")
       .write
       .mode("overwrite")
       .parquet("/Volumes/smart_claims_drv/exports/training_images_resized")
)

# Hugging Face carrega direto
from datasets import load_dataset

dataset = load_dataset(
    "parquet", 
    data_files="/Volumes/smart_claims_drv/exports/training_images_resized/*.parquet"
).rename_column("content", "image")

splits = dataset["train"].train_test_split(test_size=0.2, seed=42)
train_ds, val_ds = splits["train"], splits["test"]


In [0]:
# This code prepares image transformations for fine-tuning a pre-trained model (ResNet-50):
# 1. Loads the model's feature extractor to get preprocessing parameters (mean, std, input size).
# 2. Defines a transformation pipeline: bytes → PIL → Tensor → normalization.
# 3. Creates a preprocess function that applies the transformations to a batch of images.
# 4. Assigns the preprocessing function to the training and validation datasets, so images are transformed on-the-fly during training.
# Purpose: Standardize and normalize images consistently with the pre-trained model, enabling effective fine-tuning.

import torch
from transformers import AutoFeatureExtractor, AutoImageProcessor

# pre-trained model from which to fine-tune
# Check the hugging face repo for more details & models: https://huggingface.co/microsoft/resnet-50
model_checkpoint = "microsoft/resnet-50"

from PIL import Image
import io
from torchvision.transforms import CenterCrop, Compose, Normalize, RandomResizedCrop, Resize, ToTensor, Lambda

#Extract the model feature (contains info on pre-process step required to transform our data, such as resizing & normalization)
#Using the model parameters makes it easy to switch to another model without any change, even if the input size is different.
model_def = AutoFeatureExtractor.from_pretrained(model_checkpoint)

#Transformations on our training dataset. we'll add some crop here
transforms = Compose([Lambda(lambda b: Image.open(io.BytesIO(b)).convert("RGB")), #byte to pil
                        ToTensor(), #convert the PIL img to a tensor
                        Normalize(mean=model_def.image_mean, std=model_def.image_std)
                        ])

# Add some random resiz & transformation to our training dataset
def preprocess(batch):
    """Apply train_transforms across a batch."""
    batch["image"] = [transforms(image) for image in batch["image"]]
    return batch
   
#Set our training / validation transformations
train_ds.set_transform(preprocess)
val_ds.set_transform(preprocess)

In [0]:
# This code prepares the model for fine-tuning on a custom image classification dataset:
# 1. Creates a mapping between class labels and integer IDs (label2id and id2label), which Hugging Face uses for inference.
# 2. Loads a pre-trained image classification model (ResNet-50) from the checkpoint.
# 3. Configures the model with the correct number of classes for the dataset and the label mappings.
# 4. The `ignore_mismatched_sizes` option allows fine-tuning even if the pre-trained model has a different number of classes.
# Purpose: Initialize the model for fine-tuning while ensuring label consistency and compatibility with the pre-trained weights.

from transformers import AutoModelForImageClassification, TrainingArguments, Trainer

#Mapping between class label and value (huggingface use it during inference to output the proper label)
label2id, id2label = dict(), dict()
for i, label in enumerate(set(dataset['label'])):
    label2id[label] = i
    id2label[i] = label
    
#Load the base model from its checkpoint
model = AutoModelForImageClassification.from_pretrained(
    model_checkpoint, 
    label2id=label2id,
    id2label=id2label,
    num_labels=len(label2id),
    ignore_mismatched_sizes = True # provide this in case you're planning to fine-tune an already fine-tuned checkpoint
)

In [0]:
# This code configures the training settings for fine-tuning the pre-trained model:
# 1. Extracts the model name from the checkpoint path to define the output directory.
# 2. Sets up Hugging Face TrainingArguments:
#    - Save the fine-tuned model to a specific folder.
#    - Run training on CPU only (no CUDA) for simplicity.
#    - Evaluate and save checkpoints at the end of each epoch.
#    - Train for 20 epochs and load the best model at the end.
# Purpose: Standardize the training configuration for consistent and reproducible fine-tuning.

model_name = model_checkpoint.split("/")[-1]

from transformers import TrainingArguments
args = TrainingArguments(
    f"/tmp/huggingface/pcb/{model_name}-finetuned",
    no_cuda=True, #Run on CPU for resnet to make it easier
    remove_unused_columns=False,
    evaluation_strategy = "epoch",
    save_strategy = "epoch",
    num_train_epochs=20,
    load_best_model_at_end=True
)

In [0]:
import mlflow
# This wrapper adds steps before and after the inference to simplify the model usage
# Before calling the model: apply the same transform as the training, resizing the image
# After callint the model: only keeps the main class with the probability as output
class ModelWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, pipeline):
        self.pipeline = pipeline
        # instantiate model in evaluation mode
        self.pipeline.model.eval()

    def predict(self, context, images):
        from PIL import Image
        with torch.set_grad_enabled(False):
            #Convert the byte to PIL images
            images = images['content'].apply(lambda b: Image.open(io.BytesIO(b))).to_list()
            #the pipeline returns the probability for all the class
            predictions = self.pipeline.predict(images)
            #Filter & returns only the class with the highest score [{'score': 0.999038815498352, 'label': 'normal'}, ...]
            return pd.DataFrame([max(r, key=lambda x: x['score']) for r in predictions])
     

In [0]:
from transformers import pipeline, DefaultDataCollator, EarlyStoppingCallback
from mlflow.models import infer_signature

with mlflow.start_run(run_name="hugging_face_new") as run:
    mlflow.log_input(mlflow.data.from_huggingface(train_ds, "training"))

    # use real class count instead of 3
    def collate_fn(examples):
        import torch
        pixel_values = torch.stack([e["image"] for e in examples])
        labels = torch.tensor([label2id[e["label"]] for e in examples], dtype=torch.long)
        labels = torch.nn.functional.one_hot(labels, num_classes=len(label2id)).float()
        return {"pixel_values": pixel_values, "labels": labels}

    trainer = Trainer(model, args, train_dataset=train_ds, eval_dataset=val_ds, tokenizer=model_def, data_collator=collate_fn)
    train_results = trainer.train()

    # Build final HF pipeline
    classifier = pipeline("image-classification", model=trainer.state.best_model_checkpoint, tokenizer=model_def)

    # ---- moved from your Cell B, so it's inside the SAME run ----
    import pandas as pd
    wrapped_model = ModelWrapper(classifier)
    test_df = spark.table("smart_claims_drv.02_silver.training_images_resized").select('content').toPandas()
    predictions = wrapped_model.predict(None, test_df)
    signature = infer_signature(test_df, predictions)

    reqs = mlflow.transformers.get_default_pip_requirements(model)

    # LOG the model and CAPTURE the URI
    logged = mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=wrapped_model,
        pip_requirements=reqs,
        signature=signature,
    )

# keep these prints to sanity-check
from mlflow import artifacts
print("logged.model_uri:", logged.model_uri)   # e.g., runs://model
print("logged.run_id  :", logged.run_id)
print("model files    :", artifacts.list_artifacts(logged.model_uri))

In [0]:

from mlflow.tracking import MlflowClient
import mlflow

mlflow.set_registry_uri("databricks-uc")
model_name = "smart_claims_drv.03_gold.claims_damage_level"

registered = mlflow.register_model(
    model_uri=logged.model_uri,
    name=model_name,
)

MlflowClient().set_registered_model_alias(
    name=model_name,
    alias="prod",
    version=registered.version,
)

print(f"Registered {model_name} v{registered.version} and set alias 'prod'.")
     

In [0]:
results = predictions.selectExpr("path", "label", "damage_prediction.label as predictions", "damage_prediction.score as score").toPandas()


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# create confusion matrix
confusion_matrix = pd.crosstab(results['label'], results['predictions'])

# plot confusion matrix
fig = plt.figure()
sns.heatmap(confusion_matrix, annot=True, cmap="Blues", fmt='d')

In [0]:
raw_images = (spark.read.table("smart_claims_drv.02_silver.claim_images")
                   .withColumn("damage_prediction", predict_damage_udf(*columns)))

metadata = spark.table("smart_claims_drv.01_bronze.claim_images_meta")

raw_images.join(metadata, on="image_name").write.mode('overwrite').saveAsTable("smart_claims_drv.03_gold.claim_images_predicted")
     