# Face Mask Detection (MLOps Enhanced)

This notebook implements Face Mask Detection using **Faster R-CNN**, integrating "Production-Grade" practices:
- **MLflow**: For experiment tracking (loss, metrics, artifacts).
- **Mixed Precision (AMP)**: For faster training and lower memory usage.
- **Advanced Evaluation**: Confusion Matrix and Classification Reports adapted for Object Detection.


In [None]:
%load_ext autoreload
%autoreload 2

import os
import sys
import torch
import numpy as np
import mlflow
import mlflow.pytorch
import plotly.express as px
import plotly.graph_objects as go
import plotly.figure_factory as ff
from tqdm.auto import tqdm
from PIL import Image
import shutil
import math
from sklearn.metrics import confusion_matrix, classification_report
from torch.amp import autocast, GradScaler

# Add src to path
sys.path.append(os.path.abspath(os.path.join('..')))
from src.dataset import FaceMaskDataset
from src.model import get_model
from src.utils import convert_to_yolo_format
import kagglehub
import torchvision
import torchvision.transforms as T

# Enable System Metrics Logging (CPU/GPU/RAM)
try:
    mlflow.enable_system_metrics_logging()
except Exception as e:
    print(f'Could not enable system metrics: {e}')

# Configure MLflow Tracking URI to use a centralized SQLite DB in the root directory
# This ensures 'mlflow ui' sees the same data regardless of where it's run
root_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
db_path = os.path.join(root_dir, 'mlflow.db')
tracking_uri = f'sqlite:///{db_path}'
mlflow.set_tracking_uri(tracking_uri)
print(f"MLflow Tracking URI set to: {tracking_uri}")


## 1. Data Preparation
Downloading and setting up the Face Mask Dataset.


In [None]:
# Model Selection
model_name = "retinanet" # Options: fasterrcnn_mobilenet, fasterrcnn_resnet50, retinanet, yolov8n

# Global Experiment Name (Early Definition)
experiment_name = f"FaceMask_{model_name.replace('.pt', '')}"
print(f"Experiment Name: {experiment_name}")

# Dataset Download and Move
try:
    cache_path = kagglehub.dataset_download("andrewmvd/face-mask-detection")
    target_path = '../data'
    
    if not os.path.exists(os.path.join(target_path, 'images')):
        print(f"Moving data to {target_path}...")
        os.makedirs(target_path, exist_ok=True)
        for item in os.listdir(cache_path):
            s = os.path.join(cache_path, item)
            d = os.path.join(target_path, item)
            if os.path.isdir(s):
                if os.path.exists(d): shutil.rmtree(d)
                shutil.copytree(s, d)
            else:
                shutil.copy2(s, d)
    ROOT_DIR = target_path
except Exception as e:
    print(f"Error: {e}")
    ROOT_DIR = '../data'

print(f"Dataset Root: {ROOT_DIR}")

# Transforms
def get_transform(train):
    transforms = []
    transforms.append(T.ToTensor())
    if train:
        # transforms.append(T.RandomHorizontalFlip(0.5)) # Optional augmentation
        pass
    return T.Compose(transforms)

# Load Dataset
dataset = FaceMaskDataset(ROOT_DIR, transforms=get_transform(train=False)) # We convert locally in loop if needed, or use separate
# Ideally for training we want transforms, but for simplicity we keep it standard
dataset_train_full = FaceMaskDataset(ROOT_DIR, transforms=get_transform(train=True))
dataset_test_full = FaceMaskDataset(ROOT_DIR, transforms=get_transform(train=False))

# Split
torch.manual_seed(42)
indices = torch.randperm(len(dataset)).tolist()
test_split = int(0.1 * len(dataset))
dataset_train = torch.utils.data.Subset(dataset_train_full, indices[:-test_split])
dataset_test = torch.utils.data.Subset(dataset_test_full, indices[-test_split:])

# Dataloaders
def collate_fn(batch):
    return tuple(zip(*batch))

num_epochs = 50 # Default epochs
batch_size = 4
num_workers = 0 # Windows safe

train_dataloader = torch.utils.data.DataLoader(
    dataset_train, batch_size=batch_size, shuffle=True, num_workers=num_workers, collate_fn=collate_fn)

test_dataloader = torch.utils.data.DataLoader(
    dataset_test, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=collate_fn)

print(f"Train Size: {len(dataset_train)}, Test Size: {len(dataset_test)}")


In [None]:
# Ensure we target the correct experiment
try:
    # Ensure root path for artifacts
    root_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
    artifact_path = os.path.join(root_dir, 'mlruns')
    mlflow.create_experiment(experiment_name, artifact_location=f'file:///{artifact_path.replace(os.sep, "/")}')
except:
    pass
mlflow.set_experiment(experiment_name)

print("Analyzing Class Distribution...")
with mlflow.start_run(run_name="Data_Distribution_Log"):
    # Visualize Class Distribution (Plotly)
    label_counts = {1: 0, 2: 0, 3: 0}
    label_names = {1: "with_mask", 2: "without_mask", 3: "mask_weared_incorrectly"}
    
    # Iterate to count labels
    for _, target in tqdm(dataset, desc="Counting Classes"):
        labels = target['labels'].tolist()
        for l in labels:
            if l in label_counts:
                label_counts[l] += 1
    
    counts = [label_counts[k] for k in sorted(label_counts.keys())]
    names = [label_names[k] for k in sorted(label_counts.keys())]
    
    fig = px.bar(
        x=names, 
        y=counts, 
        title="Class Distribution", 
        labels={'x': 'Class', 'y': 'Count'}, 
        color=names,
        text=counts
    )
    fig.update_traces(textposition='outside')
    fig.write_html("class_distribution.html")
    mlflow.log_artifact("class_distribution.html")
    fig.show()


In [None]:
# Visualize Random Samples with Bounding Boxes (Plotly)
import random

def visualize_sample(dataset, idx):
    img_tensor, target = dataset[idx]
    # Convert tensor [C, H, W] -> [H, W, C]
    img_np = img_tensor.permute(1, 2, 0).numpy()
    # Simple Denormalize/Scale (assuming standard ToTensor 0-1)
    img_np = (img_np * 255).astype(np.uint8)
    
    boxes = target['boxes'].tolist()
    labels = target['labels'].tolist()
    
    label_names = {1: "with_mask", 2: "without_mask", 3: "mask_weared_incorrectly"}
    label_colors = {
        1: 'green', # with_mask
        2: 'red',   # without_mask
        3: 'orange' # incorrect
    }
    
    fig = px.imshow(img_np)
    
    for box, label in zip(boxes, labels):
        xmin, ymin, xmax, ymax = box
        name = label_names.get(label, 'Unknown')
        color = label_colors.get(label, 'white')
        
        fig.add_shape(
            type="rect",
            x0=xmin, y0=ymin, x1=xmax, y1=ymax,
            line=dict(color=color, width=3),
        )
        # Add text annotation near box
        fig.add_annotation(
            x=xmin, y=ymin,
            text=name,
            showarrow=False,
            yshift=10,
            xanchor='left',
            bgcolor=color,
            font=dict(color='white')
        )
        
    fig.update_layout(title=f"Sample {idx} (Green=Mask, Red=No Mask, Orange=Incorrect)")
    fig.show()

print("Visualizing 3 Random Samples...")
indices = random.sample(range(len(dataset)), 3)
for idx in indices:
    visualize_sample(dataset, idx)


In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
num_classes = 4 # Background + 3 classes

model = get_model(model_name, num_classes)
# Model is now a Wrapper (YOLO or TorchVision) - Internalizing setup

try:
    model.to(device)
except:
    pass
    model.to(device)
    
    # Optimizer & Scheduler (Only for PyTorch models)


## 2. Methodology: Training & Evaluation
We adapt the reference notebook methods:
- **`train_epoch`**: Uses `tqdm` for progress, `autocast` for AMP, and tracks Loss manually since Detection models return loss dicts.
- **`evaluate`**: Since this is Object Detection, we compute metrics by matching predicted boxes to ground truth (IoU >= 0.5) and then calculating classification metrics (Confusion Matrix, Precision, Recall).


In [None]:
def match_boxes(pred_boxes, true_boxes, iou_threshold=0.5):
    # Simple IoU matching
    if len(pred_boxes) == 0 or len(true_boxes) == 0:
        return []
        
    ious = torchvision.ops.box_iou(pred_boxes, true_boxes)
    matches = []
    
    # Greedy matching
    for i in range(len(pred_boxes)):
        best_iou, best_idx = ious[i].max(dim=0)
        if best_iou > iou_threshold:
            matches.append((i, best_idx.item()))
            # mask out to prevent reuse? Simple greedy doesn't need complex mask for basic stats
            ious[:, best_idx] = -1 
            
    return matches

def evaluate(dataloader, device, model, epoch, class_names):
    model.eval()
    all_preds_cls = []
    all_true_cls = []
    
    with torch.no_grad():
        for images, targets in tqdm(dataloader, desc="Evaluating"):
            images = list(image.to(device) for image in images)
            outputs = model(images)
            
            for i, output in enumerate(outputs):
                target = targets[i]
                
                true_boxes = target['boxes'].to(device)
                true_labels = target['labels'].to(device)
                
                pred_boxes = output['boxes']
                pred_labels = output['labels']
                pred_scores = output['scores']
                
                # Filter low confidence
                keep = pred_scores > 0.5
                pred_boxes = pred_boxes[keep]
                pred_labels = pred_labels[keep]
                
                matches = match_boxes(pred_boxes, true_boxes)
                
                matched_pred_indices = set()
                matched_true_indices = set()
                
                for p_idx, t_idx in matches:
                    all_preds_cls.append(pred_labels[p_idx].item())
                    all_true_cls.append(true_labels[t_idx].item())
                    matched_pred_indices.add(p_idx)
                    matched_true_indices.add(t_idx)
                
                # False Negatives (Missed ground truths)
                for t_idx in range(len(true_labels)):
                    if t_idx not in matched_true_indices:
                        all_true_cls.append(true_labels[t_idx].item())
                        all_preds_cls.append(0) # 0 is background/missed
                        
                # False Positives (Spurious detections)
                for p_idx in range(len(pred_labels)):
                    if p_idx not in matched_pred_indices:
                        all_true_cls.append(0) # Background
                        all_preds_cls.append(pred_labels[p_idx].item())

    # Metrics
    # We add 'Background' to class names for display if not present
    display_names = ['Background'] + list(class_names.values())
    
    # Filter labels to be within range
    unique_labels = sorted(list(set(all_true_cls) | set(all_preds_cls)))
    
    # Confusion Matrix
    cm = confusion_matrix(all_true_cls, all_preds_cls, labels=[0, 1, 2, 3])
    
    # Plotly Confusion Matrix
    # Reverse formatting to match heatmap expectation (y=True, x=Predicted)
    # But annotating heatmap usually takes z, x, y
    
    # z = cm
    # x = display_names (Predicted)
    # y = display_names (True)
    
    # Re-order logic if needed, but standard cm is [True, Pred]
    
    fig = ff.create_annotated_heatmap(
        z=cm,
        x=display_names,
        y=display_names,
        colorscale='Blues',
        showscale=True
    )
    fig.update_layout(
        title=f'Confusion Matrix (Epoch {epoch+1})',
        xaxis_title='Predicted',
        yaxis_title='True Label'
    )
    
    cm_html_path = f"confusion_matrix_epoch_{epoch+1}.html"
    fig.write_html(cm_html_path)
    mlflow.log_artifact(cm_html_path)
    # fig.show() # Optional inside notebook
    
    report = classification_report(all_true_cls, all_preds_cls, labels=[0, 1, 2, 3], target_names=display_names, output_dict=True)
    
    # Log metrics
    mlflow.log_metric("val_accuracy", report['accuracy'], step=epoch)
    mlflow.log_metric("val_f1_macro", report['macro avg']['f1-score'], step=epoch)
    
    return report['accuracy']


In [None]:
# Main Experiment Loop
if mlflow.active_run():
    mlflow.end_run()

with mlflow.start_run():
    # Log params
    mlflow.log_param("epochs", num_epochs)
    mlflow.log_param("batch_size", batch_size)
    mlflow.log_param("model_architecture", model_name)

    # Prepare arguments for Unified Training
    train_kwargs = {
        "epochs": num_epochs,
        "device": device,
        "project": "../mlruns",
        "name": experiment_name
    }

    if "yolo" in model_name:
        print("Detected YOLO model. Training with MLflow tracking...")
        # Sync MLflow config for Ultralytics
        os.environ["MLFLOW_TRACKING_URI"] = mlflow.get_tracking_uri()
        os.environ["MLFLOW_EXPERIMENT_NAME"] = experiment_name

        yaml_path = convert_to_yolo_format(ROOT_DIR, {})
        mlflow.log_param("dataset_yaml", yaml_path)
        
        # YOLO Specific Args
        results = model.train(data=yaml_path, workers=0, **train_kwargs)

        # Explicitly validate to get metrics and log them
        print("Running validation to extract metrics...")
        metrics = model.val() # Uses best model
        mlflow.log_metric("map50", metrics.box.map50)
        mlflow.log_metric("map50-95", metrics.box.map)
        mlflow.log_metric("precision", metrics.box.mp)
        mlflow.log_metric("recall", metrics.box.mr)

        # Cleanup Ghost Experiment (named after artifact path) if exists
        try:
            client = mlflow.tracking.MlflowClient()
            ghost_exp = client.get_experiment_by_name(str(train_kwargs['project']))
            if ghost_exp:
                client.delete_experiment(ghost_exp.experiment_id)
                print(f"Cleaned up ghost experiment: {train_kwargs['project']}")
        except Exception as e:
             pass

    else:
        # Standard PyTorch (Unified Wrapper)
        print(f"Detected TorchVision model: {model_name}. Training with Wrapper...")
        mlflow.log_param("framework", "PyTorch")
        
        # Unified .train() call
        results = model.train(train_dataloader, test_dataloader, **train_kwargs)

    # Evaluation (Common)
    # Note: TorchVision Wrapper handles training loop evaluation if implemented, or we do specific eval here
    # For now, keeping the notebook simple.

    # Enhanced MLflow Model Logging (Versioning + Input Example)
    print("Logging model to MLflow Registry...")
    try:
        if "yolo" not in model_name: # YOLO logs itself automatically usually, but RCNN doesn't
             # Create input example
             model.model.eval()
             example_input, _ = dataset[0]
             example_input = example_input.unsqueeze(0).to(device)
             
             mlflow.pytorch.log_model(
                 pytorch_model=model.model, # Log the inner model
                 artifact_path="model",
                 registered_model_name=experiment_name,
                 input_example=example_input.cpu().numpy()
             )
    except Exception as e:
        print(f"MLflow Model Logging Failed: {e}")
