# IVE Face Recognition (InsightFace + Tracking)
โปรเจ็กต์แฮกทอนสัปดาห์ที่ 5: ทำ Face Recognition + Tracking จากวิดีโอสัมภาษณ์ IVE (YouTube) โดยเน้นความนิ่งและความแม่นยำของชื่อที่ติดบนใบหน้า ใช้ชุดอ้างอิงที่เตรียมไว้ใน `project_recognition/ive_reference/`

In [None]:
# Setup: install deps (GPU-first), skip if already installed
import sys, subprocess, pkg_resources, os

def pip_install(requirements):
    to_install = []
    for req in requirements:
        try:
            pkg_resources.require(req)
        except pkg_resources.DistributionNotFound:
            to_install.append(req)
        except pkg_resources.VersionConflict:
            to_install.append(req)
    if to_install:
        print("Installing:", to_install)
        subprocess.check_call([sys.executable, "-m", "pip", "install", "-q"] + to_install)
    else:
        print("All deps already present.")

requirements = [
    "insightface==0.7.3",
    "onnxruntime-gpu==1.17.1",
    "opencv-python==4.8.1.78",
    "scipy==1.10.1",
    "filterpy==1.4.5",
    "tqdm>=4.66.1",
]
pip_install(requirements)

# Quick CUDA provider sanity check for onnxruntime
try:
    import onnxruntime as ort
    print("ORT providers:", ort.get_available_providers())
except Exception as e:
    print("ORT check failed:", e)

try:
    import cv2
    print("OpenCV version:", cv2.__version__, "CUDA devices:", cv2.cuda.getCudaEnabledDeviceCount())
except Exception as e:
    print("OpenCV check failed:", e)


Installing: ['scipy==1.10.1', 'yt-dlp==2024.7.16']
ORT providers: ['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider']
OpenCV version: 4.8.1 CUDA devices: 0


In [None]:
# Imports & basic setup
import os, math, time, glob, json, random
from pathlib import Path
from collections import defaultdict, Counter, deque
import numpy as np
import cv2
from tqdm import tqdm

# InsightFace
from insightface.app import FaceAnalysis

np.random.seed(42)
random.seed(42)

# Paths (try local notebook dir first, fallback to project root layout)
ROOT = Path.cwd()
if (ROOT / "ive_reference").exists():
    REFERENCE_ROOT = ROOT / "ive_reference"
    OUTPUT_DIR = ROOT / "outputs"
else:
    REFERENCE_ROOT = ROOT / "week05" / "project_recognition" / "ive_reference"
    OUTPUT_DIR = ROOT / "week05" / "project_recognition" / "outputs"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Video settings (use existing local file; no downloading)
local_video_fallback = OUTPUT_DIR / "ive_interview_input.mp4"

print("Reference dir:", REFERENCE_ROOT)
print("Output dir:", OUTPUT_DIR)


Reference dir: e:\Master_Degree\DL-FOR-COMPUTER-VISION\week05\project_recognition\ive_reference
Output dir: e:\Master_Degree\DL-FOR-COMPUTER-VISION\week05\project_recognition\outputs


In [None]:
# Helpers: video utilities (local file only)

def open_video(path: Path):
    cap = cv2.VideoCapture(str(path))
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video: {path}")
    fps = cap.get(cv2.CAP_PROP_FPS) or 30
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    return cap, fps, w, h, total


def sample_frames(path: Path, num_samples: int = 3):
    cap, fps, w, h, total = open_video(path)
    indices = np.linspace(0, max(total - 1, 1), num=num_samples, dtype=int)
    frames = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
        ok, frame = cap.read()
        if ok:
            frames.append(frame)
    cap.release()
    print(f"Sampled {len(frames)} frames of {total} | {w}x{h} @ {fps:.2f} fps")
    return frames


In [None]:
# Initialize InsightFace models (CPU-only for stability)

def init_face_analysis():
    providers = ["CPUExecutionProvider"]
    app = FaceAnalysis(name="buffalo_l", providers=providers)
    app.prepare(ctx_id=-1, det_size=(640, 640))
    model_keys = list(app.models.keys())
    recog_key = "face_recognition" if "face_recognition" in app.models else ("recognition" if "recognition" in app.models else None)
    print("Model keys:", model_keys)
    if recog_key:
        model_obj = app.models[recog_key]
        prov = None
        if hasattr(model_obj, "sess") and hasattr(model_obj.sess, "providers"):
            prov = model_obj.sess.providers
        elif hasattr(model_obj, "providers"):
            prov = model_obj.providers
        print("Recognition providers:", prov)
    else:
        print("[WARN] recognition model key not found; available:", model_keys)
    return app, providers


app, providers = init_face_analysis()
print("Using providers:", providers)


Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\feelc/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\feelc/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\feelc/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\feelc/.insightface\models\buffalo_l\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\feelc/.insightface\models\buffalo_l\w600k_r50.onnx recognition ['None', 3, 112, 112] 127.

In [14]:
# Build gallery from reference images

def normalize_embedding(emb):
    norm = np.linalg.norm(emb) + 1e-9
    return emb / norm


def build_gallery(reference_root: Path, max_imgs_per_id: int = 50):
    gallery = {}
    stats = {}
    for person_dir in sorted(reference_root.glob("*")):
        if not person_dir.is_dir():
            continue
        name = person_dir.name.replace("_", " ")
        embeds = []
        files = sorted(person_dir.glob("*"))[:max_imgs_per_id]
        for fp in files:
            img = cv2.imread(str(fp))
            if img is None:
                continue
            faces = app.get(img)
            if not faces:
                continue
            # pick largest face
            faces = sorted(faces, key=lambda f: (f.bbox[2]-f.bbox[0])*(f.bbox[3]-f.bbox[1]), reverse=True)
            emb = faces[0].normed_embedding
            embeds.append(emb)
        if len(embeds) == 0:
            print(f"[WARN] No face detected for {name}")
            continue
        embeds = np.stack(embeds, axis=0)
        mean_emb = normalize_embedding(embeds.mean(axis=0))
        gallery[name] = mean_emb
        stats[name] = len(embeds)
    print("Gallery built:", stats)
    return gallery


gallery = build_gallery(REFERENCE_ROOT)
list(gallery.keys()), len(gallery)


  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4


Gallery built: {'An Yujin': 29, 'Jang Wonyoung': 26, 'Kim Gaeul': 29, 'Kim Jiwon': 38, 'Lee Hyunseo': 39, 'Naoi Rei': 25}


(['An Yujin',
  'Jang Wonyoung',
  'Kim Gaeul',
  'Kim Jiwon',
  'Lee Hyunseo',
  'Naoi Rei'],
 6)

In [15]:
# Recognition + tracking utilities

COLORS = {}

def get_color(track_id: int):
    if track_id not in COLORS:
        np.random.seed(track_id + 123)
        COLORS[track_id] = tuple(int(x) for x in np.random.randint(30, 230, size=3))
    return COLORS[track_id]


def cosine_sim(a, b):
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-9))


def best_match(embedding, gallery, threshold=0.38):
    best_name = "Unknown"
    best_score = -1
    for name, ref_emb in gallery.items():
        score = cosine_sim(embedding, ref_emb)
        if score > best_score:
            best_score = score
            best_name = name
    if best_score < threshold:
        best_name = "Unknown"
    return best_name, best_score


def iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    inter = max(0, xB - xA) * max(0, yB - yA)
    areaA = max(0, boxA[2]-boxA[0]) * max(0, boxA[3]-boxA[1])
    areaB = max(0, boxB[2]-boxB[0]) * max(0, boxB[3]-boxB[1])
    denom = areaA + areaB - inter + 1e-6
    return inter / denom


class Track:
    def __init__(self, track_id, bbox, embedding, name, sim):
        self.id = track_id
        self.bbox = bbox
        self.embedding = embedding
        self.name_history = deque([name], maxlen=15)
        self.sim_history = deque([sim], maxlen=15)
        self.lost = 0
        self.last_update = 0

    def update(self, bbox, embedding, name, sim):
        self.bbox = bbox
        # exponential moving average of embedding for stability
        self.embedding = normalize_embedding(0.7 * self.embedding + 0.3 * embedding)
        self.name_history.append(name)
        self.sim_history.append(sim)
        self.lost = 0
        self.last_update = 0

    @property
    def stable_name(self):
        counts = Counter([n for n in self.name_history if n != "Unknown"])
        if not counts:
            return "Unknown"
        return counts.most_common(1)[0][0]

    @property
    def stable_sim(self):
        if not self.sim_history:
            return 0
        return float(np.mean(self.sim_history))


class SimpleTracker:
    def __init__(self, iou_thr=0.4, embed_thr=0.35, max_lost=15):
        self.iou_thr = iou_thr
        self.embed_thr = embed_thr
        self.max_lost = max_lost
        self.tracks = []
        self.next_id = 0

    def step(self, detections):
        # detections: list of dict(bbox, embedding, name, sim)
        assigned = set()
        for det in detections:
            best_iou = 0
            best_track = None
            for trk in self.tracks:
                iou_score = iou(det['bbox'], trk.bbox)
                sim_score = cosine_sim(det['embedding'], trk.embedding)
                if iou_score > self.iou_thr and sim_score > self.embed_thr:
                    if iou_score + sim_score > best_iou:
                        best_iou = iou_score + sim_score
                        best_track = trk
            if best_track:
                best_track.update(det['bbox'], det['embedding'], det['name'], det['sim'])
                assigned.add(best_track.id)
            else:
                trk = Track(self.next_id, det['bbox'], det['embedding'], det['name'], det['sim'])
                self.tracks.append(trk)
                self.next_id += 1
        # mark lost
        alive = []
        for trk in self.tracks:
            if trk.id not in assigned:
                trk.lost += 1
            if trk.lost <= self.max_lost:
                alive.append(trk)
        self.tracks = alive
        return self.tracks


In [16]:
# Drawing helpers

def draw_label(frame, bbox, text, color, sim=0):
    x1, y1, x2, y2 = map(int, bbox)
    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
    label = f"{text} ({sim:.2f})"
    (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
    cv2.rectangle(frame, (x1, y1 - th - 8), (x1 + tw + 4, y1), color, -1)
    cv2.putText(frame, label, (x1 + 2, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)


In [17]:
# Main pipeline: detect -> recognize -> track -> render

def process_video(
    video_path: Path,
    output_path: Path,
    gallery: dict,
    det_thresh: float = 0.45,
    rec_thresh: float = 0.4,
    tracker_iou: float = 0.45,
    tracker_embed: float = 0.35,
    max_frames: int = None,
    warmup: int = 0,
):
    tracker = SimpleTracker(iou_thr=tracker_iou, embed_thr=tracker_embed, max_lost=20)
    cap, fps, w, h, total = open_video(video_path)
    total_iter = total if max_frames is None else min(total, max_frames)
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(str(output_path), fourcc, fps, (w, h))

    frame_idx = 0
    name_counts = Counter()
    start = time.time()

    pbar = tqdm(total=total_iter, desc="Processing", ncols=100)
    while frame_idx < total_iter:
        ok, frame = cap.read()
        if not ok:
            break
        faces = app.get(frame)
        detections = []
        for f in faces:
            if f.det_score < det_thresh:
                continue
            bbox = f.bbox.astype(float)
            emb = normalize_embedding(f.normed_embedding)
            name, sim = best_match(emb, gallery, threshold=rec_thresh)
            detections.append({"bbox": bbox, "embedding": emb, "name": name, "sim": sim})

        tracks = tracker.step(detections)
        for trk in tracks:
            color = get_color(trk.id)
            label = trk.stable_name
            name_counts[label] += 1
            draw_label(frame, trk.bbox, f"{label}#{trk.id}", color, trk.stable_sim)

        writer.write(frame)
        frame_idx += 1
        pbar.update(1)
    pbar.close()

    cap.release()
    writer.release()
    elapsed = time.time() - start
    fps_run = frame_idx / max(elapsed, 1e-3)
    print(f"Done. Frames: {frame_idx}/{total_iter}, runtime: {elapsed:.1f}s, avg fps: {fps_run:.2f}")
    print("Name counts (rough hit frequency):", name_counts)
    return name_counts, fps_run


In [None]:
# Resolve video path (local file only)
custom_video_path = None  # e.g., Path(r"E:/Downloads/ive_interview.mp4")

if custom_video_path and Path(custom_video_path).exists():
    video_path = Path(custom_video_path)
else:
    video_path = local_video_fallback

if not video_path.exists():
    raise FileNotFoundError(f"Video not found at {video_path}. Set custom_video_path to your MP4.")

print("Using video:", video_path)
_ = sample_frames(video_path, num_samples=2)


Downloading from YouTube: https://www.youtube.com/watch?v=cmdMopdk6lo
-> e:\Master_Degree\DL-FOR-COMPUTER-VISION\week05\project_recognition\outputs\ive_interview_input.mp4
[WARN] pytube failed, trying yt-dlp. Error: HTTP Error 400: Bad Request


ERROR: [youtube] cmdMopdk6lo: Failed to extract any player response; please report this issue on  https://github.com/yt-dlp/yt-dlp/issues?q= , filling out the appropriate issue template. Confirm you are on the latest version using  yt-dlp -U


DownloadError: ERROR: [youtube] cmdMopdk6lo: Failed to extract any player response; please report this issue on  https://github.com/yt-dlp/yt-dlp/issues?q= , filling out the appropriate issue template. Confirm you are on the latest version using  yt-dlp -U

In [None]:
# Quick dry-run on first N frames for sanity (adjust N as needed)
quick_output = OUTPUT_DIR / "ive_quickcheck.mp4"
_ = process_video(
    video_path=video_path,
    output_path=quick_output,
    gallery=gallery,
    det_thresh=0.45,
    rec_thresh=0.40,
    tracker_iou=0.45,
    tracker_embed=0.36,
    max_frames=200,
)
print("Quick sample saved:", quick_output)


In [None]:
# Full run on the whole video (may take several minutes)
output_video = OUTPUT_DIR / "ive_recognized.mp4"
name_counts, fps_run = process_video(
    video_path=video_path,
    output_path=output_video,
    gallery=gallery,
    det_thresh=0.45,
    rec_thresh=0.40,
    tracker_iou=0.45,
    tracker_embed=0.36,
    max_frames=None,  # set small int for quick dry run
)
print("Saved:", output_video)
