# Smart Mirror Agent ‚Äì Minimal, Stylish GUI with Camera Preview and Fashion Tips
This notebook sets up a live webcam preview and uses the Qwen/Qwen3-VL-2B-Instruct vision-language model to provide tailored fashion tips based on the captured user image and a topic they provide.

# 1) Install and Import Dependencies
Below commands install required packages. If already installed, they will be skipped. Then we import needed modules.

In [15]:
# Install completed; import libraries
import os
import io
import time
import base64
import threading
from typing import Optional

import cv2
import numpy as np
from PIL import Image
import ipywidgets as widgets
from IPython.display import display, clear_output

import torch
from transformers import AutoProcessor, AutoModelForCausalLM

# Configure M1 Max for maximum performance
os.environ["PYTORCH_MPS_HIGH_WATERMARK_RATIO"] = "0.0"  # Use all available GPU memory
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"  # Fallback for unsupported ops

# Helper: ensure that processor/model exist even after kernel restarts
processor = globals().get("processor", None)
model = globals().get("model", None)

# Device selection - prioritize MPS for M1 Max
if torch.backends.mps.is_available():
    device = "mps"
elif torch.cuda.is_available():
    device = "cuda"
else:
    device = "cpu"
device = globals().get("device", device)

# Use Qwen2.5-VL-3B-Instruct for facial feature recognition (VLM)
selected_model_name = "Qwen/Qwen2.5-VL-3B-Instruct"

# Path to fine-tuned LoRA weights (set after training)
LORA_WEIGHTS_PATH = globals().get("CHECKPOINT_PATH", None)


def ensure_model_ready():
    global processor, model, device, selected_model_name, LORA_WEIGHTS_PATH
    if processor is None or model is None:
        # Use float32 on MPS for stability, bfloat16 on CUDA
        if device == "mps":
            dtype = torch.float32  # MPS works best with float32
        elif device == "cuda":
            dtype = torch.bfloat16
        else:
            dtype = torch.float32
        
        print(f"Loading vision-language model: {selected_model_name} on {device}‚Ä¶")
        processor = AutoProcessor.from_pretrained(selected_model_name, trust_remote_code=True)
        
        base_model = AutoModelForCausalLM.from_pretrained(
            selected_model_name,
            trust_remote_code=True,
            torch_dtype=dtype,
        )
        
        # Check if fine-tuned LoRA weights exist
        if LORA_WEIGHTS_PATH and os.path.exists(LORA_WEIGHTS_PATH):
            print(f"Loading fine-tuned LoRA weights from {LORA_WEIGHTS_PATH}...")
            try:
                from peft import PeftModel
                model = PeftModel.from_pretrained(base_model, LORA_WEIGHTS_PATH)
                print("‚úì Fine-tuned model loaded!")
            except Exception as e:
                print(f"Could not load LoRA weights: {e}. Using base model.")
                model = base_model
        else:
            model = base_model
            print("Using base model (no fine-tuned weights found).")
        
        model.to(device)
        model.eval()
        
        # Optimize for M1 Max
        if device == "mps":
            torch.mps.empty_cache()  # Clear any cached memory
            print("‚úì Model loaded on M1 Max GPU (MPS)")
        
        print("Model ready.")

# Optional: explicit preload to avoid load on first capture
def preload_model():
    ensure_model_ready()
    return "Model ready."

In [16]:
# 2) Runtime Device Setup - Optimized for M1 Max
# Prioritize MPS (Metal Performance Shaders) for Apple Silicon

if torch.backends.mps.is_available():
    device = "mps"
    print("‚úì Using M1 Max GPU (MPS) - Apple Silicon optimized")
    print(f"  MPS available: {torch.backends.mps.is_available()}")
    print(f"  MPS built: {torch.backends.mps.is_built()}")
elif torch.cuda.is_available():
    device = "cuda"
    print(f"Using CUDA GPU: {torch.cuda.get_device_name(0)}")
else:
    device = "cpu"
    print("‚ö† Using CPU (GPU not available)")

# Set high-performance mode for M1 Max
if device == "mps":
    # Enable async execution for better GPU utilization
    import os
    os.environ["PYTORCH_MPS_HIGH_WATERMARK_RATIO"] = "0.0"  # Allow full GPU memory usage
    
print(f"\nDevice set to: {device}")
print("Model will be loaded on first analysis via ensure_model_ready()")

‚úì Using M1 Max GPU (MPS) - Apple Silicon optimized
  MPS available: True
  MPS built: True

Device set to: mps
Model will be loaded on first analysis via ensure_model_ready()


# 2A) Fine-tune Qwen2.5-VL-3B on CelebA Facial Attributes
This section downloads CelebA-HQ, creates training pairs (image + attribute labels), and fine-tunes the VLM using **LoRA** (Low-Rank Adaptation) for efficient training. Training uses your **M1 Max GPU (MPS)** and stops after **15 minutes** with checkpoint saving.

In [17]:
# 2A-1) Install fine-tuning dependencies and download CelebA-HQ
# ‚ö†Ô∏è IMPORTANT: After running this cell, you MUST restart the kernel!
# Go to: Kernel -> Restart Kernel, then run cells from Cell 1 again.

import subprocess
import sys

print("Installing compatible package versions...")
print("=" * 60)

# Uninstall first to clear any conflicting versions
subprocess.run([sys.executable, "-m", "pip", "uninstall", "-y", "transformers", "peft", "accelerate"], 
               capture_output=True)

# Install SPECIFIC compatible versions ‚Äì tested combo that avoids optional backend import errors
result = subprocess.run([
    sys.executable, "-m", "pip", "install", 
    "transformers==4.47.0",  # Supports qwen2_5_vl and avoids is_flute_available import
    "peft==0.14.0",          # Compatible with transformers 4.47.0
    "accelerate==0.34.2",    # Stable with both
    "kagglehub", 
    "torchvision",
    "qwen-vl-utils"
], capture_output=True, text=True)

print(result.stdout[-2000:] if len(result.stdout) > 2000 else result.stdout)
if result.returncode != 0:
    print("STDERR:", result.stderr[-1000:])

print("=" * 60)
print("\n‚ö†Ô∏è  KERNEL RESTART REQUIRED!")
print("Please restart the kernel now: Kernel ‚Üí Restart Kernel")
print("Then run cells from Cell 1 again.")
print("=" * 60)

# Try to download CelebA dataset (this part doesn't need restart)
try:
    import kagglehub
    print("\nDownloading CelebA-HQ dataset...")
    celeba_path = kagglehub.dataset_download("ipythonx/celebamaskhq")
    print(f"Dataset downloaded to: {celeba_path}")
except Exception as e:
    print(f"Dataset download will happen after restart: {e}")

Installing compatible package versions...
m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1/3[0m [transformers]
[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1/3[0m [transformers]
[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1/3[0m [transformers]
[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1/3[0m [transformers]
[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1/3[0m [transformers]
[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0

In [18]:
# 2A-1b) Verify packages are correctly installed (run after kernel restart)
# Use importlib.metadata to avoid importing heavy packages (which can trigger optional backends)
from importlib.metadata import version, PackageNotFoundError


def check_pkg_version(dist_name, expected=None):
    try:
        ver = version(dist_name)
        status = "‚úì"
        if expected and ver != expected:
            status = f"‚ö†Ô∏è (expected {expected}, got {ver})"
        print(f"  {status} {dist_name}: {ver}")
        return expected is None or ver == expected
    except PackageNotFoundError as e:
        print(f"  ‚úó {dist_name}: MISSING - {e}")
        return False

print("Checking package versions (without importing modules)...")
print("=" * 50)
ok = True
ok &= check_pkg_version("transformers", "4.47.0")
ok &= check_pkg_version("peft", "0.14.0")
ok &= check_pkg_version("accelerate", "0.34.2")
ok &= check_pkg_version("torch")
ok &= check_pkg_version("torchvision")
print("=" * 50)

if ok:
    print("\n‚úì All packages OK! Continue to next cell.")
    
    # Also set celeba_path if dataset was downloaded
    import kagglehub
    celeba_path = kagglehub.dataset_download("ipythonx/celebamaskhq")
    print(f"Dataset path: {celeba_path}")
else:
    print("\n‚ö†Ô∏è Version mismatch detected!")
    print("Run the previous cell (Cell 6), restart kernel, then run from Cell 1.")

Checking package versions (without importing modules)...
  ‚úì transformers: 4.47.0
  ‚úì peft: 0.14.0
  ‚úì accelerate: 0.34.2
  ‚úì torch: 2.9.1
  ‚úì torchvision: 0.24.1

‚úì All packages OK! Continue to next cell.
Dataset path: /Users/ahmad/.cache/kagglehub/datasets/ipythonx/celebamaskhq/versions/1


In [19]:
# 2A-2) Parse CelebA attribute labels and create training dataset
import os
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image as PILImage

# Paths inside downloaded dataset - CelebAMask-HQ subfolder structure
CELEBA_ROOT = os.path.join(celeba_path, "CelebAMask-HQ")
IMG_DIR = os.path.join(CELEBA_ROOT, "CelebA-HQ-img")
ATTR_FILE = os.path.join(CELEBA_ROOT, "CelebAMask-HQ-attribute-anno.txt")

# Verify paths exist
print(f"Checking paths...")
print(f"  Images dir: {IMG_DIR} - exists: {os.path.exists(IMG_DIR)}")
print(f"  Attributes file: {ATTR_FILE} - exists: {os.path.exists(ATTR_FILE)}")

# Read attribute labels
with open(ATTR_FILE, "r") as f:
    lines = f.read().strip().split("\n")
num_images = int(lines[0])
attr_names = lines[1].split()
data_rows = [line.split() for line in lines[2:]]
attr_df = pd.DataFrame(data_rows, columns=["filename"] + attr_names)

# Convert to int and map -1 to 0
for col in attr_names:
    attr_df[col] = attr_df[col].astype(int)
attr_df[attr_names] = attr_df[attr_names].replace(-1, 0)

print(f"\nLoaded {len(attr_df)} images with {len(attr_names)} attributes")
print("Sample attributes:", attr_names[:10])

# Human-readable attribute mapping
ATTR_READABLE = {
    "5_o_Clock_Shadow": "5 o'clock shadow",
    "Arched_Eyebrows": "arched eyebrows",
    "Attractive": "attractive appearance",
    "Bags_Under_Eyes": "bags under eyes",
    "Bald": "bald head",
    "Bangs": "bangs hairstyle",
    "Big_Lips": "full lips",
    "Big_Nose": "prominent nose",
    "Black_Hair": "black hair",
    "Blond_Hair": "blonde hair",
    "Blurry": "blurry image",
    "Brown_Hair": "brown hair",
    "Bushy_Eyebrows": "bushy eyebrows",
    "Chubby": "round face",
    "Double_Chin": "double chin",
    "Eyeglasses": "wearing eyeglasses",
    "Goatee": "goatee beard",
    "Gray_Hair": "gray hair",
    "Heavy_Makeup": "heavy makeup",
    "High_Cheekbones": "high cheekbones",
    "Male": "masculine features",
    "Mouth_Slightly_Open": "mouth slightly open",
    "Mustache": "mustache",
    "Narrow_Eyes": "narrow eyes",
    "No_Beard": "clean shaven",
    "Oval_Face": "oval face shape",
    "Pale_Skin": "pale skin tone",
    "Pointy_Nose": "pointy nose",
    "Receding_Hairline": "receding hairline",
    "Rosy_Cheeks": "rosy cheeks",
    "Sideburns": "sideburns",
    "Smiling": "smiling expression",
    "Straight_Hair": "straight hair",
    "Wavy_Hair": "wavy hair",
    "Wearing_Earrings": "wearing earrings",
    "Wearing_Hat": "wearing a hat",
    "Wearing_Lipstick": "wearing lipstick",
    "Wearing_Necklace": "wearing a necklace",
    "Wearing_Necktie": "wearing a necktie",
    "Young": "youthful appearance",
}

def attrs_to_description(row):
    """Convert attribute row to natural language description."""
    present = [ATTR_READABLE.get(col, col.replace("_", " ").lower()) 
               for col in attr_names if row[col] == 1 and col not in ["Blurry"]]
    if not present:
        return "a face with no distinctive features detected"
    return "a face with " + ", ".join(present[:12])  # limit for token length

# Create training examples
attr_df["description"] = attr_df.apply(attrs_to_description, axis=1)
print("\nSample descriptions:")
for i in range(3):
    print(f"  {attr_df.iloc[i]['filename']}: {attr_df.iloc[i]['description'][:80]}...")

Checking paths...
  Images dir: /Users/ahmad/.cache/kagglehub/datasets/ipythonx/celebamaskhq/versions/1/CelebAMask-HQ/CelebA-HQ-img - exists: True
  Attributes file: /Users/ahmad/.cache/kagglehub/datasets/ipythonx/celebamaskhq/versions/1/CelebAMask-HQ/CelebAMask-HQ-attribute-anno.txt - exists: True

Loaded 30000 images with 40 attributes
Sample attributes: ['5_o_Clock_Shadow', 'Arched_Eyebrows', 'Attractive', 'Bags_Under_Eyes', 'Bald', 'Bangs', 'Big_Lips', 'Big_Nose', 'Black_Hair', 'Blond_Hair']

Sample descriptions:
  0.jpg: a face with arched eyebrows, attractive appearance, bags under eyes, full lips, ...
  1.jpg: a face with arched eyebrows, attractive appearance, blonde hair, heavy makeup, m...
  2.jpg: a face with attractive appearance, bags under eyes, full lips, brown hair, high ...


In [20]:
# 2A-3) Create VLM fine-tuning dataset with image-text pairs
import random

class CelebAVLMDataset(Dataset):
    """Dataset for VLM fine-tuning: image + question -> attribute description answer."""
    
    def __init__(self, df, img_dir, processor, max_samples=2000):
        self.df = df.head(max_samples).reset_index(drop=True)
        self.img_dir = img_dir
        self.processor = processor
        
        # Different question variations for diversity
        self.questions = [
            "Describe the facial features visible in this image.",
            "What facial attributes can you observe in this person?",
            "Analyze the face and list the visible features.",
            "What do you notice about this person's facial features?",
            "Describe the hair, face shape, and other visible attributes.",
        ]
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.img_dir, row["filename"])
        
        # Load and resize image
        image = PILImage.open(img_path).convert("RGB")
        image = image.resize((384, 384))  # Resize for efficiency
        
        # Random question for variety
        question = random.choice(self.questions)
        answer = row["description"]
        
        return {
            "image": image,
            "question": question,
            "answer": answer,
        }

# We'll create the dataset after loading the model
print("CelebAVLMDataset class defined. Dataset will be created after model loading.")

CelebAVLMDataset class defined. Dataset will be created after model loading.


In [21]:
# 2A-4) Load model and configure LoRA for efficient fine-tuning on M1 Max
# Compatibility shim for transformers optional backend import changes
import transformers
import transformers.utils as _tf_utils
# Older/alternate names used across versions (typo in some releases): is_soundfile_availble
if not hasattr(_tf_utils, "is_soundfile_availble") and hasattr(_tf_utils, "is_soundfile_available"):
    setattr(_tf_utils, "is_soundfile_availble", _tf_utils.is_soundfile_available)

from peft import LoraConfig, get_peft_model, TaskType
from transformers import AutoProcessor, AutoModelForCausalLM
import torch
import os

# Configure M1 Max for maximum GPU utilization
os.environ["PYTORCH_MPS_HIGH_WATERMARK_RATIO"] = "0.0"  # Use all GPU memory
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"  # Fallback for unsupported ops

# Use MPS (M1 Max GPU) for training - prioritize Apple Silicon
if torch.backends.mps.is_available():
    train_device = torch.device("mps")
    print("‚úì Using M1 Max GPU (MPS) for fine-tuning")
    print(f"  MPS available: {torch.backends.mps.is_available()}")
    print(f"  MPS built: {torch.backends.mps.is_built()}")
elif torch.cuda.is_available():
    train_device = torch.device("cuda")
    print(f"‚úì Using CUDA GPU: {torch.cuda.get_device_name(0)}")
else:
    train_device = torch.device("cpu")
    print("‚ö† Using CPU for fine-tuning (will be slower)")

# Load processor - using Qwen2.5-VL-3B-Instruct (publicly available)
MODEL_NAME = "Qwen/Qwen2.5-VL-3B-Instruct"
print(f"\nLoading processor for {MODEL_NAME}...")
ft_processor = AutoProcessor.from_pretrained(
    MODEL_NAME,
    trust_remote_code=True
)

# Load model - use float32 for MPS stability and performance
print(f"Loading {MODEL_NAME} model...")
dtype = torch.float32  # MPS works best with float32

ft_model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    trust_remote_code=True,
    torch_dtype=dtype,
    device_map=None,  # Manual placement for MPS
)

# Configure LoRA - target the attention layers for efficient fine-tuning
print("Configuring LoRA adapters...")
lora_config = LoraConfig(
    r=16,  # LoRA rank
    lora_alpha=32,  # Scaling factor
    lora_dropout=0.1,
    bias="none",
    task_type=TaskType.CAUSAL_LM,
    target_modules=["q_proj", "k_proj", "v_proj", "o_proj"],  # Attention layers
)

# Apply LoRA to model
ft_model = get_peft_model(ft_model, lora_config)
ft_model.to(train_device)

# Clear MPS cache for optimal memory usage
if train_device.type == "mps":
    torch.mps.empty_cache()
    torch.mps.synchronize()
    print("‚úì MPS cache cleared for optimal memory")

# Print trainable parameters
trainable_params = sum(p.numel() for p in ft_model.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in ft_model.parameters())
print(f"\nTrainable parameters: {trainable_params:,} / {total_params:,} ({100 * trainable_params / total_params:.2f}%)")

# Create dataset
print("\nCreating training dataset...")
train_dataset = CelebAVLMDataset(attr_df, IMG_DIR, ft_processor, max_samples=2000)
print(f"Training samples: {len(train_dataset)}")
print(f"\n‚úì Ready to fine-tune on {train_device}!")

‚úì Using M1 Max GPU (MPS) for fine-tuning
  MPS available: True
  MPS built: True

Loading processor for Qwen/Qwen2.5-VL-3B-Instruct...
Loading Qwen/Qwen2.5-VL-3B-Instruct model...


ValueError: The checkpoint you are trying to load has model type `qwen2_5_vl` but Transformers does not recognize this architecture. This could be because of an issue with the checkpoint, or because your version of Transformers is out of date.

In [None]:
# 2A-5) Fine-tune with 15-minute timeout, checkpointing, and M1 Max optimization
import time as time_module
from torch.optim import AdamW
import gc

# Training settings
CHECKPOINT_DIR = os.path.join(celeba_path, "qwen2_5_vl_finetuned")
os.makedirs(CHECKPOINT_DIR, exist_ok=True)
CHECKPOINT_PATH = os.path.join(CHECKPOINT_DIR, "lora_weights")
MAX_TRAIN_SECONDS = 15 * 60  # 15 minutes

# Optimizer with settings optimized for M1 Max
optimizer = AdamW(ft_model.parameters(), lr=2e-5, weight_decay=0.01)

# Training loop
ft_model.train()
start_time = time_module.time()
global_step = 0
best_loss = float("inf")
running_loss = 0.0
log_interval = 10

print(f"\n{'='*60}")
print(f"Starting fine-tuning on M1 Max (MPS)")
print(f"Max training time: {MAX_TRAIN_SECONDS // 60} minutes")
print(f"{'='*60}\n")

# Shuffle indices for random sampling
indices = list(range(len(train_dataset)))
random.shuffle(indices)

for idx in indices:
    # Check time limit
    elapsed = time_module.time() - start_time
    if elapsed >= MAX_TRAIN_SECONDS:
        print(f"\n‚è± Time limit reached ({elapsed / 60:.1f} min). Stopping training.")
        break
    
    # Get sample
    sample = train_dataset[idx]
    image = sample["image"]
    question = sample["question"]
    answer = sample["answer"]
    
    # Build conversation format for Qwen
    messages = [
        {
            "role": "user",
            "content": [
                {"type": "image", "image": image},
                {"type": "text", "text": question},
            ]
        },
        {
            "role": "assistant",
            "content": answer
        }
    ]
    
    try:
        # Process inputs
        text_prompt = ft_processor.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=False
        )
        
        inputs = ft_processor(
            text=[text_prompt],
            images=[image],
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=512,
        )
        
        # Move to device
        inputs = {k: v.to(train_device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()}
        
        # Forward pass
        optimizer.zero_grad()
        
        outputs = ft_model(**inputs, labels=inputs["input_ids"])
        loss = outputs.loss
        
        # Backward pass
        loss.backward()
        torch.nn.utils.clip_grad_norm_(ft_model.parameters(), 1.0)
        optimizer.step()
        
        running_loss += loss.item()
        global_step += 1
        
        # Log progress
        if global_step % log_interval == 0:
            avg_loss = running_loss / log_interval
            elapsed_min = (time_module.time() - start_time) / 60
            remaining_min = (MAX_TRAIN_SECONDS / 60) - elapsed_min
            print(f"Step {global_step}: loss={avg_loss:.4f}, elapsed={elapsed_min:.1f}min, remaining={remaining_min:.1f}min")
            
            # Save checkpoint if improved
            if avg_loss < best_loss:
                best_loss = avg_loss
                ft_model.save_pretrained(CHECKPOINT_PATH)
                print(f"  ‚Üí Checkpoint saved (best_loss={best_loss:.4f})")
            
            running_loss = 0.0
            
    except Exception as e:
        print(f"Error at step {global_step}: {e}")
        continue
    
    # Periodic MPS memory management for M1 Max optimization
    if global_step % 25 == 0:
        gc.collect()
        if train_device.type == "mps":
            torch.mps.empty_cache()
            torch.mps.synchronize()  # Ensure GPU operations complete

# Final save
print(f"\n{'='*60}")
print("‚úì Fine-tuning complete!")
ft_model.save_pretrained(CHECKPOINT_PATH)
print(f"Final checkpoint saved to: {CHECKPOINT_PATH}")
print(f"Total steps: {global_step}")
print(f"Best loss: {best_loss:.4f}")
print(f"Total time: {(time_module.time() - start_time) / 60:.1f} minutes")
print(f"{'='*60}")

In [None]:
# 2A-6) Load fine-tuned model for inference on M1 Max
from peft import PeftModel
import gc

# Clean up training model to free memory
del ft_model
del optimizer
gc.collect()

# Clear MPS cache
if torch.backends.mps.is_available():
    torch.mps.empty_cache()
    torch.mps.synchronize()

print("Loading fine-tuned model for inference on M1 Max...")

# Use MPS for inference
if torch.backends.mps.is_available():
    inference_device = "mps"
elif torch.cuda.is_available():
    inference_device = "cuda"
else:
    inference_device = "cpu"

inference_dtype = torch.float32  # MPS works best with float32

# Use the same model name as training
MODEL_NAME = "Qwen/Qwen2.5-VL-3B-Instruct"

base_model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    trust_remote_code=True,
    torch_dtype=inference_dtype,
)

# Load LoRA weights
finetuned_model = PeftModel.from_pretrained(base_model, CHECKPOINT_PATH)
finetuned_model.to(inference_device)
finetuned_model.eval()

# Clear cache after loading
if inference_device == "mps":
    torch.mps.empty_cache()
    torch.mps.synchronize()

# Update global model references for the GUI
processor = ft_processor
model = finetuned_model
device = inference_device

print(f"‚úì Fine-tuned model loaded on {inference_device}")
print(f"‚úì M1 Max GPU fully utilized for inference!")
print(f"‚úì Ready for facial feature analysis with improved accuracy!")

In [None]:
# 3) Initialize Camera and Live Preview
import atexit

# Gracefully stop any previous preview loop if re-running the cell
try:
    stop_preview()
except Exception:
    pass

camera_index = 0  # default webcam
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():
    raise RuntimeError("Failed to open webcam. Check permissions and camera availability.")

preview_running = False
preview_thread: Optional[threading.Thread] = None
current_frame_rgb: Optional[np.ndarray] = None

# Widgets
preview_image = widgets.Image(layout=widgets.Layout(width='100%', height='auto'))
status_label = widgets.HTML(value="<span style='color:#888'>Preview stopped.</span>")


def bgr_to_png_bytes(frame_bgr: np.ndarray) -> bytes:
    rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    pil_img = Image.fromarray(rgb)
    buf = io.BytesIO()
    pil_img.save(buf, format='PNG')
    return buf.getvalue()


def preview_loop():
    global preview_running, current_frame_rgb
    try:
        while preview_running:
            ok, frame = cap.read()
            if not ok:
                time.sleep(0.03)
                continue
            # Convert to PNG for the Image widget
            png_bytes = bgr_to_png_bytes(frame)
            preview_image.value = png_bytes
            current_frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            time.sleep(0.03)  # ~33 FPS cap to reduce CPU/GPU load
    except Exception:
        # Fail-safe: stop loop on unexpected errors to avoid kernel crash
        preview_running = False


def start_preview():
    global preview_running, preview_thread
    if preview_running:
        return
    preview_running = True
    status_label.value = "<span style='color:#0a0'>Preview running</span>"
    preview_thread = threading.Thread(target=preview_loop, daemon=True)
    preview_thread.start()


def stop_preview():
    global preview_running, preview_thread
    preview_running = False
    status_label.value = "<span style='color:#c00'>Preview stopped</span>"
    # Give the thread a moment to exit gracefully
    time.sleep(0.1)


# Ensure cleanup on kernel exit
def _release_camera():
    try:
        cap.release()
    except Exception:
        pass
atexit.register(_release_camera)

start_preview()
display(preview_image, status_label)



Image(value=b'', layout="Layout(height='auto', width='100%')")

HTML(value="<span style='color:#0a0'>Preview running</span>")

In [None]:
# 4) Build Minimal GUI
title = widgets.HTML(value="""
<div style='font-family:Inter,system-ui,Arial; font-weight:600; font-size:22px; color:#111; margin-bottom:8px;'>
Smart Mirror ‚Äì Image-Only Appearance Analysis</div>
<div style='font-family:Inter,system-ui,Arial; font-size:13px; color:#555; margin-bottom:16px;'>
Live preview below. Tap Capture for fast, image-only analysis.</div>
""")

# Capture button and output
capture_btn = widgets.Button(description="Capture & Analyze", button_style='',
                              layout=widgets.Layout(width='100%'),
                              tooltip="Capture current frame and generate image-only appearance analysis")
output_area = widgets.Output(layout=widgets.Layout(border='1px solid #eee', padding='10px'))

style_html = widgets.HTML(value="""
<style>
  .smart-card {
    background:#fff; border:1px solid #eaeaea; border-radius:14px;
    padding:16px; box-shadow:0 2px 10px rgba(0,0,0,0.06);
  }
  .row { display:flex; gap:16px; align-items:flex-start; }
  .col { flex:1; }
  .preview { border-radius:12px; overflow:hidden; border:1px solid #ddd; }
  .label { font-family:Inter,system-ui,Arial; font-size:12px; color:#666; margin:6px 0 4px; }
</style>
""")

ui = widgets.VBox([
    style_html,
    title,
    widgets.VBox([
        widgets.HTML(value="<div class='label'>Live Camera Preview</div>"),
        widgets.Box([preview_image], layout=widgets.Layout(css_classes=['preview'])),
        status_label,
    ], layout=widgets.Layout(css_classes=['smart-card'])),
    widgets.VBox([
        widgets.HTML(value="<div class='label'>Actions</div>"),
        capture_btn,
    ], layout=widgets.Layout(css_classes=['smart-card'])),
    widgets.VBox([
        widgets.HTML(value="<div class='label'>Result</div>"),
        output_area,
    ], layout=widgets.Layout(css_classes=['smart-card'])),
], layout=widgets.Layout(width='800px'))

display(ui)

VBox(children=(HTML(value='\n<style>\n  .smart-card {\n    background:#fff; border:1px solid #eaeaea; border-r‚Ä¶

In [None]:
# 5 & 6) Capture Frame, Tailored Prompt, and Run Inference

# Comprehensive facial feature analysis from image only using Qwen2.5-VL-3B
def compose_messages(image_pil: Image.Image):
    # Encode image to base64 for processor consumption via 'image' content
    buf = io.BytesIO()
    image_pil.save(buf, format='PNG')
    img_bytes = buf.getvalue()
    img_b64 = base64.b64encode(img_bytes).decode('utf-8')

    # System instruction: facial feature recognition and grooming tips
    system_instruction = (
        "You are an expert facial feature analyst and grooming assistant. "
        "Analyze the face in the image and describe ALL visible facial features in detail. "
        "Include: face shape (oval, round, square, heart, oblong), forehead (high/low, wide/narrow), "
        "eyebrows (shape, thickness, arch), eyes (shape, size, spacing), nose (shape, size, bridge), "
        "cheekbones (high/low, prominent), lips (shape, fullness), jawline (defined/soft, angular/rounded), "
        "chin (shape, prominence), skin texture and tone, facial hair if present (beard, mustache, stubble), "
        "hair (length, texture, color, style, parting, hairline), and any accessories (glasses, earrings). "
        "For any detail you cannot determine from the image, state 'cannot tell from image'. "
        "Avoid sensitive attributes (age, health status, ethnicity). Be respectful and precise."
    )

    # Instruction: describe facial features first, then provide personalized tips
    instruction = (
        "First, list 10-15 specific facial feature observations you can clearly see in the image. "
        "Then provide 5-8 personalized grooming/styling tips based on those features "
        "(e.g., hairstyles that complement face shape, beard styles, eyebrow grooming, skincare for skin type, "
        "glasses frame suggestions if applicable). Use generic product types, not brands."
    )

    # Build chat messages with system + image + task request
    content = [
        {"type": "text", "text": system_instruction},
        {"type": "image", "image_base64": img_b64, "mime_type": "image/png"},
        {"type": "text", "text": (
            "Task: Analyze this face and describe all visible facial features in detail. "
            "What do you observe about the face shape, forehead, eyebrows, eyes, nose, cheekbones, lips, jawline, chin, skin, facial hair, and hair? "
            "Then provide personalized grooming and styling recommendations.\n\n" + instruction
        )},
    ]
    messages = [{"role": "user", "content": content}]
    return messages


@torch.no_grad()
def generate_tips_from_frame(frame_rgb: np.ndarray) -> str:
    ensure_model_ready()
    pil_img = Image.fromarray(frame_rgb)
    messages = compose_messages(pil_img)
    inputs = processor.apply_chat_template(
        messages,
        add_generation_prompt=True,
        tokenize=True,
        return_dict=True,
        return_tensors="pt",
    )
    inputs = inputs.to(device)

    # Generation settings optimized for detailed facial analysis
    gen_kwargs = dict(
        max_new_tokens=300,
        do_sample=True,
        temperature=0.7,
        top_p=0.9,
        repetition_penalty=1.05,
        no_repeat_ngram_size=3,
    )
    outputs = model.generate(**inputs, **gen_kwargs)

    # Strip prompt tokens when decoding
    prompt_len = inputs["input_ids"].shape[-1]
    text = processor.decode(outputs[0][prompt_len:])
    return text.strip()

In [None]:
# 7) Display Results in GUI ‚Äì facial feature analysis via VLM

def on_capture_clicked(btn):
    with output_area:
        clear_output()
        try:
            if current_frame_rgb is None:
                print("No frame available. Try again.")
                return

            # Show captured thumbnail
            pil_img = Image.fromarray(current_frame_rgb)
            thumb = pil_img.copy()
            thumb.thumbnail((240, 240))
            buf = io.BytesIO()
            thumb.save(buf, format='PNG')
            print("üîç Analyzing your facial features‚Ä¶")
            display(Image.open(io.BytesIO(buf.getvalue())))

            # Temporarily pause preview to free resources during generation
            stop_preview()

            # Run Qwen2.5-VL-3B for facial feature analysis and tips
            analysis = generate_tips_from_frame(current_frame_rgb)
            print("\nüß† Facial Feature Analysis & Tips:")
            print(analysis)
        except RuntimeError as re:
            print("RuntimeError during analysis:", re)
            print("Trying a safer re-run on CPU‚Ä¶")
            try:
                global device
                device = "cpu"
                analysis = generate_tips_from_frame(current_frame_rgb)
                print(analysis)
            except Exception as e2:
                print("Fallback failed:", e2)
        except Exception as e:
            print("Error during analysis:", e)
        finally:
            start_preview()

capture_btn.on_click(on_capture_clicked)
print("UI ready. Use the Capture & Analyze button.")

UI ready. Use the Capture & Analyze button.


In [None]:
# 8) Cleanup Resources on Kernel Stop
def cleanup():
    stop_preview()
    try:
        cap.release()
    except Exception:
        pass
    print("Camera released.")
 
# Optional manual cleanup button
cleanup_btn = widgets.Button(description="Stop Preview & Cleanup", button_style='warning')
def on_cleanup_clicked(btn):
    cleanup()
display(cleanup_btn)
cleanup_btn.on_click(on_cleanup_clicked)

