# INIT

In [1]:
from modules.node import VideoReader, Tracker, Render
from modules.FaceMesh import FaceMeshHandler
from queue import Queue
from multiprocessing import Queue as mpQueue
from pathlib import Path
from typing import cast

from omegaconf import OmegaConf, DictConfig, ListConfig

def load_dict_config(path: Path) -> DictConfig:
    conf = OmegaConf.load(path)
    if not isinstance(conf, DictConfig):
        raise TypeError(f"Config at {path} must be a DictConfig, got {type(conf).__name__}")
    return cast(DictConfig, conf)

video_reader_conf = load_dict_config(Path('configs/video_reader.json'))
tracker_conf = load_dict_config(Path('configs/tracker.json'))
render_conf = load_dict_config(Path('configs/render.json'))
face_mesh_conf = load_dict_config(Path('configs/face_mesh.json'))

frame_queue = Queue(maxsize=2)
tracker_queue = mpQueue(maxsize=2)

video_reader = VideoReader(video_reader_conf, frame_queue)
face_mesh_handler = FaceMeshHandler(face_mesh_conf)
tracker = Tracker(tracker_conf, face_mesh_handler, frame_queue, tracker_queue)
render = Render(render_conf, tracker_queue)

# LANDMARKS VISUALIZATION

In [2]:
import threading
import cv2
import queue

thread_video_reader = threading.Thread(target=video_reader.start, daemon=True)
thread_tracker = threading.Thread(target=tracker.start, daemon=True)

thread_video_reader.start()
thread_tracker.start()

try:
    while True:
        try:
            tracker_payload = tracker_queue.get(timeout=0.5)
        except queue.Empty:
            if not thread_video_reader.is_alive():
                break
            continue

        if tracker_payload.frame is None:
            continue

        frame = tracker_payload.frame
        if video_reader_conf.target_format == "rgb":
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        h, w = frame.shape[:2]
        if tracker_payload.state != "LOST":
            for lm in tracker_payload.landmarks:
                x = int(lm.x * w)
                y = int(lm.y * h)
                if 0 <= x < w and 0 <= y < h:
                    cv2.circle(frame, (x, y), 1, (0, 255, 0), -1)

        cv2.imshow("Frame", frame)
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
finally:
    video_reader.stop()
    tracker.stop()
    thread_video_reader.join(timeout=1)
    thread_tracker.join(timeout=1)
    cv2.destroyAllWindows()