# Task 2 Live Demo
Use this notebook to start/stop the webcam overlay and optionally record annotated frames.

In [2]:
import sys
from pathlib import Path
import threading
import time

import cv2
import numpy as np
import torch

REPO_ROOT = Path('..').resolve()
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

from src.models.keypoint_resnet import KeypointResNet
from src.utils.emotion import EmotionClassifier
from src.utils.keypoints import denormalize_keypoints

IMAGENET_MEAN = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
IMAGENET_STD = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)

def resolve_device(name: str) -> torch.device:
    name = name.lower()
    if name == 'cuda' and torch.cuda.is_available():
        return torch.device('cuda')
    if name == 'mps' and torch.backends.mps.is_available():
        return torch.device('mps')
    return torch.device('cpu')

class LiveDemo:
    def __init__(self, checkpoint: str, device: str = 'mps', backbone: str = 'resnet18', image_size: int = 224):
        self.device = resolve_device(device)
        self.image_size = image_size
        self.model = KeypointResNet(pretrained=False, dropout=0.0, backbone_name=backbone)
        state = torch.load(checkpoint, map_location=self.device)
        self.model.load_state_dict(state['model_state'], strict=False)
        self.model.to(self.device)
        self.model.eval()
        self.emotion = EmotionClassifier()
        self.norm_factors = torch.tensor([[image_size, image_size]], dtype=torch.float32, device=self.device)
        self._thread = None
        self._stop = threading.Event()
        self.smooth_alpha = 0.5
        self.emotion_hold = 15

    def start(self, camera_index: int = 0, record: bool = False, record_path: str | None = None):
        if self._thread and self._thread.is_alive():
            print('Demo already running')
            return
        self._stop.clear()
        self._camera_index = camera_index
        self._record_path = record_path if record else None
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()

    def stop(self):
        if not self._thread:
            return
        self._stop.set()
        self._thread.join(timeout=2)
        self._thread = None

    def _preprocess(self, frame):
        rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        resized = cv2.resize(rgb, (self.image_size, self.image_size), interpolation=cv2.INTER_LINEAR)
        tensor = torch.as_tensor(resized, dtype=torch.float32).permute(2, 0, 1) / 255.0
        tensor = (tensor - IMAGENET_MEAN) / IMAGENET_STD
        return tensor, resized

    def _run(self):
        cap = cv2.VideoCapture(self._camera_index)
        if not cap.isOpened():
            print(f'Unable to open camera index {self._camera_index}')
            return
        writer = None
        if self._record_path:
            fourcc = cv2.VideoWriter_fourcc(*'MJPG')
            writer = cv2.VideoWriter(self._record_path, fourcc, 20, (self.image_size, self.image_size))
        smoothed = None
        emotion_history = []
        try:
            while not self._stop.is_set():
                ret, frame = cap.read()
                if not ret:
                    break
                tensor, display_rgb = self._preprocess(frame)
                tensor = tensor.unsqueeze(0).to(self.device)
                with torch.no_grad():
                    preds = self.model(tensor).view(1, -1, 2)
                preds_px = denormalize_keypoints(preds, self.norm_factors).cpu().numpy()[0]
                if smoothed is None:
                    smoothed = preds_px.copy()
                else:
                    alpha = max(0.0, min(0.99, self.smooth_alpha))
                    smoothed = alpha * smoothed + (1 - alpha) * preds_px
                overlay = display_rgb.copy()
                for (x, y) in smoothed:
                    cv2.circle(overlay, (int(x), int(y)), 2, (0, 255, 0), -1)
                emotion = self.emotion.predict(smoothed)
                emotion_history.append(emotion)
                if len(emotion_history) > self.emotion_hold:
                    emotion_history.pop(0)
                dominant = max(set(emotion_history), key=emotion_history.count)
                cv2.putText(overlay, dominant, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
                cv2.putText(overlay, 'Press q to exit', (overlay.shape[1]-180, overlay.shape[0]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)
                bgr = cv2.cvtColor(overlay, cv2.COLOR_RGB2BGR)
                if writer:
                    writer.write(bgr)
                cv2.imshow('Notebook Live Demo', bgr)
                key = cv2.waitKey(1) & 0xFF
                if key == ord('q'):
                    break
        finally:
            cap.release()
            cv2.destroyAllWindows()
            if writer:
                writer.release()

checkpoint_path = 'artifacts/task1_resnet18/best_model.pt'  # update if needed
demo = LiveDemo(checkpoint=checkpoint_path, device='mps', backbone='resnet18')


FileNotFoundError: [Errno 2] No such file or directory: 'artifacts/task1_hpc/best_model.pt'

In [None]:
import ipywidgets as widgets

record_toggle = widgets.ToggleButton(value=False, description='Record video')
record_path = widgets.Text(value='notebook_demo.mp4', description='Record path')
camera_index = widgets.IntText(value=0, description='Camera id')
start_button = widgets.Button(description='Start preview', button_style='success')
stop_button = widgets.Button(description='Stop', button_style='danger')

def on_start(_):
    demo.start(camera_index=camera_index.value, record=record_toggle.value, record_path=record_path.value)

def on_stop(_):
    demo.stop()

start_button.on_click(on_start)
stop_button.on_click(on_stop)
widgets.VBox([widgets.HBox([start_button, stop_button]), widgets.HBox([record_toggle, record_path, camera_index])])
