**Video Annotation Pipeline Using MediaPipe and SVM**

In [1]:
!pip install mediapipe==0.10.14



### **Use this code with svm_winner.pkl or xgb_model.pkl model**

In [6]:
COLAB           = True
INPUT_VIDEO     = "input_video.mp4"
OUTPUT_VIDEO    = "output_annotated.mp4"
MODEL_PATH      = "/content/svm_winner.pkl"

MAX_HANDS            = 1   # Only ONE hand
MIN_DETECT_CONF      = 0.7
MIN_TRACK_CONF       = 0.6
MIN_PROBA_SHOW       = 0.3
SMOOTH_WINDOW        = 10
LABEL_PERSIST_FRAMES = 8
FONT_SCALE  = 0.85
THICKNESS   = 2


import subprocess, sys
def _pip(pkg):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])

if COLAB:
    for _pkg in ("mediapipe", "opencv-python-headless"):
        try:
            __import__(_pkg.replace("-headless", "").replace("-", "_"))
        except ImportError:
            _pip(_pkg)

import cv2
import mediapipe as mp
import numpy as np
import joblib
import os
import time
from collections import deque, Counter


mp_hands   = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils


# Numeric → Text Mapping
CLASS_NAMES = {
    0:  "call",
    1:  "dislike",
    2:  "fist",
    3:  "four",
    4:  "like",
    5:  "mute",
    6:  "ok",
    7:  "one",
    8:  "palm",
    9:  "peace",
    10: "peace_inverted",
    11: "rock",
    12: "stop",
    13: "stop_inverted",
    14: "three",
    15: "three2",
    16: "two_up",
    17: "two_up_inverted",
}


def _load_joblib(path, label):
    if path and os.path.exists(path):
        obj = joblib.load(path)
        print(f"  [OK] {label:<20} loaded from '{path}'")
        return obj
    print(f"  [--] {label:<20} NOT found at '{path}'")
    return None


print("\n── Loading model files ──────────────────────────────")
model = _load_joblib(MODEL_PATH, "Model")
USE_MODEL = model is not None

if USE_MODEL:
    print("  [OK] Gesture prediction ENABLED")
else:
    print("  [--] Gesture prediction DISABLED")
print("─────────────────────────────────────────────────────\n")


class PredictionSmoother:
    def __init__(self, window=10, persist=8):
        self.window      = window
        self.persist     = persist
        self.history     = [deque(maxlen=window)]
        self.last_label  = [None]
        self.last_proba  = [0.0]
        self.frames_gone = [0]

    def update(self, hand_idx, label, proba):
        self.history[hand_idx].append((label, proba))
        self.frames_gone[hand_idx] = 0
        labels = [l for l, _ in self.history[hand_idx]]
        winner = Counter(labels).most_common(1)[0][0]
        avg_p  = np.mean([p for l, p in self.history[hand_idx] if l == winner])
        self.last_label[hand_idx] = winner
        self.last_proba[hand_idx] = float(avg_p)
        return winner, float(avg_p)

    def get_persisted(self, hand_idx):
        self.frames_gone[hand_idx] += 1
        if (self.last_label[hand_idx] is not None
                and self.frames_gone[hand_idx] <= self.persist):
            return self.last_label[hand_idx], self.last_proba[hand_idx]
        return None, 0.0

    def reset_hand(self, hand_idx):
        self.history[hand_idx].clear()
        self.last_label[hand_idx]  = None
        self.last_proba[hand_idx]  = 0.0
        self.frames_gone[hand_idx] = 0


smoother = PredictionSmoother(window=SMOOTH_WINDOW, persist=LABEL_PERSIST_FRAMES)


def preprocess_hand(hand_landmarks):
    arr = np.array(
        [[lm.x, lm.y, lm.z] for lm in hand_landmarks.landmark],
        dtype=np.float32
    )

    if not np.isfinite(arr).all():
        return None

    arr[:, 0] -= arr[0, 0]
    arr[:, 1] -= arr[0, 1]

    scale = float(np.sqrt(arr[12, 0] ** 2 + arr[12, 1] ** 2))
    if scale < 1e-6:
        return None

    arr[:, 0] /= scale
    arr[:, 1] /= scale

    return arr.flatten().astype(np.float32)


def predict_raw(features):
    try:
        x = features.reshape(1, -1)
        pred = model.predict(x)[0]

        # Convert numeric → text
        if isinstance(pred, (int, np.integer)):
            label_str = CLASS_NAMES.get(int(pred), "Unknown")
        else:
            label_str = str(pred)

        if hasattr(model, "predict_proba"):
            proba = float(model.predict_proba(x).max())
        elif hasattr(model, "decision_function"):
            scores = model.decision_function(x).ravel()
            exp = np.exp(scores - scores.max())
            proba = float(exp.max() / exp.sum())
        else:
            proba = 1.0

        return label_str, proba

    except Exception:
        return "Unknown", 0.0


HAND_COLOR = (0, 220, 100)


def draw_landmarks(frame, hand_landmarks):
    mp_drawing.draw_landmarks(
        frame, hand_landmarks, mp_hands.HAND_CONNECTIONS,
        mp_drawing.DrawingSpec(color=HAND_COLOR, thickness=2, circle_radius=4),
        mp_drawing.DrawingSpec(color=(220, 220, 220), thickness=1, circle_radius=2),
    )


def draw_gesture_label(frame, label, proba, hand_landmarks):
    h, w  = frame.shape[:2]
    wrist = hand_landmarks.landmark[0]
    cx    = int(np.clip(wrist.x * w,       5,  w - 220))
    cy    = int(np.clip(wrist.y * h - 25, 30,  h -  10))

    text = f"{label}  {proba:.0%}"

    cv2.putText(frame, text, (cx+1, cy+1),
                cv2.FONT_HERSHEY_SIMPLEX, FONT_SCALE, (0,0,0), THICKNESS+2, cv2.LINE_AA)
    cv2.putText(frame, text, (cx, cy),
                cv2.FONT_HERSHEY_SIMPLEX, FONT_SCALE, HAND_COLOR, THICKNESS, cv2.LINE_AA)


def process_frame(frame, hands_model):
    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = hands_model.process(rgb)

    if results.multi_hand_landmarks:
        hand_landmarks = results.multi_hand_landmarks[0]  # ✅ Only first hand
        draw_landmarks(frame, hand_landmarks)

        if USE_MODEL:
            features = preprocess_hand(hand_landmarks)
            if features is not None:
                raw_label, raw_proba = predict_raw(features)
                stable_label, stable_proba = smoother.update(
                    0, raw_label, raw_proba
                )

                if stable_proba >= MIN_PROBA_SHOW:
                    draw_gesture_label(
                        frame, stable_label, stable_proba,
                        hand_landmarks
                    )

    return frame


def process_video(input_path, output_path):

    cap = cv2.VideoCapture(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    fw  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    fh  = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(output_path, fourcc, fps, (fw, fh))

    with mp_hands.Hands(
        static_image_mode=False,
        max_num_hands=MAX_HANDS,
        min_detection_confidence=MIN_DETECT_CONF,
        min_tracking_confidence=MIN_TRACK_CONF,
    ) as hands:

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            frame = process_frame(frame, hands)
            writer.write(frame)

    cap.release()
    writer.release()

    print("Done. Output saved to:", output_path)


def run_colab():
    from google.colab import files

    uploaded = files.upload()
    input_path = list(uploaded.keys())[0]
    process_video(input_path, OUTPUT_VIDEO)
    files.download(OUTPUT_VIDEO)


if __name__ == "__main__":
    if COLAB:
        run_colab()
    else:
        process_video(INPUT_VIDEO, OUTPUT_VIDEO)


── Loading model files ──────────────────────────────
  [OK] Model                loaded from '/content/svm_winner.pkl'
  [OK] Gesture prediction ENABLED
─────────────────────────────────────────────────────



Saving Gesture Video.mp4 to Gesture Video.mp4




Done. Output saved to: output_annotated.mp4




<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>