### YOLO modeling

after testing the fealability of using pose estimators I now move to a newer model one that can support video anylsis better then the MPI heatmap-appraoch models

In [2]:
import torch
import numpy as np
import cv2
import ultralytics

print(f"âœ… NumPy: {np.__version__}")       # Expect 1.26.x
print(f"âœ… OpenCV: {cv2.__version__}")     # Expect 4.10.x
print(f"ðŸš€ GPU Active: {torch.backends.mps.is_available()}")
print(ultralytics.__version__)                # Expect 8.2.0+

âœ… NumPy: 1.26.4
âœ… OpenCV: 4.10.0
ðŸš€ GPU Active: False
8.3.241


In [3]:
# Helpers 
from typing import List, Union
import matplotlib.pyplot as plt
def show_image(img_or_list: Union[np.ndarray, List[np.ndarray]], row_plot: int = 1, titles: List[str] = None):
    """
    Display one or multiple images in a grid.
    - img_or_list: single array or list of arrays
    - row_plot: number of rows
    - titles: optional list of titles (same length as images)
    """
    imgs = img_or_list if isinstance(img_or_list, list) else [img_or_list]
    n = len(imgs)
    cols = int(np.ceil(n / row_plot))
    rows = row_plot

    fig, axes = plt.subplots(rows, cols, figsize=(4 * cols, 4 * rows))
    axes = np.array(axes).reshape(-1) if isinstance(axes, np.ndarray) else np.array([axes])

    for i, im in enumerate(imgs):
        if im.ndim == 2:
            axes[i].imshow(im, cmap='gray')
        else:
            axes[i].imshow(im)
        axes[i].axis('off')
        if titles is not None and i < len(titles):
            axes[i].set_title(titles[i])
    for j in range(i + 1, len(axes)):
        axes[j].axis('off')

    plt.tight_layout()
    plt.show()



### Downloading the Model

In [4]:
from ultralytics import YOLO # type: ignore

# 1. Load the model (YOLO11 is the 2025 standard)
model = YOLO('yolo11n-pose.pt') 

# Social Synchrony
The first metric I would examine would be joint velocity: 

In [5]:
import os
import cv2
import numpy as np
import pandas as pd

from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

# Signal processing + stats
from scipy.signal import savgol_filter, correlate
from scipy.stats import pearsonr

from ultralytics import YOLO # type: ignore

# -----------------------------
# User config 
# -----------------------------
VIDEO_PATH = "./images/pope_and_bibi.mp4"     # <-- change
MODEL_WEIGHTS = "yolov8n-pose.pt" # or yolov8s-pose.pt for stronger results
CONF_THRES = 0.25
IOU_THRES  = 0.5

# Rolling window for synchrony (in seconds)
ROLL_WIN_SEC = 2.0

# Max lag for cross-correlation (in seconds)
MAX_LAG_SEC = 1.0

# Joint groups (COCO-17 indices; adjust if your model differs)
# COCO keypoints order (common): 
# 0 nose, 
# 1 l_eye, 
# 2 r_eye, 
# 3 l_ear, 
# 4 r_ear,
# 5 l_shoulder,
#  6 r_shoulder,
#  7 l_elbow, 
# 8 r_elbow, 
# 9  l_wrist, 
# 10 r_wrist,
# 11 l_hip,
#  12 r_hip, 
# 13 l_knee, 
# 14 r_knee, 
# 15 l_ankle, 
# 16 r_ankle
UPPER_BODY = [5, 6, 7, 8, 9, 10]  # shoulders, elbows, wrists
TORSO      = [11, 12, 5, 6]       # hips + shoulders
LOWER_BODY = [13, 14, 15, 16]     # knees, ankles


### Loading the video 

since I want to asses synchrony I have you know the fps of the video processer in order for me to accurantly define what I consider to be a synchrenize movement. 


In [6]:
# --- Chunk 1: Video I/O + FPS ---

cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError("Could not open video.")

fps = cap.get(cv2.CAP_PROP_FPS)
if fps is None or fps <= 1e-6:
    # Fallback if metadata missing
    fps = 30.0

dt = 1.0 / fps
print(f"FPS={fps:.3f}  dt={dt:.4f}s")


FPS=29.970  dt=0.0334s


The videoCapture function process around 30 frames per second, and alternativley every frame is 0.333 second interval from the previous one. 

Next, we would prcess the video through the YOLO model

In [7]:
model = YOLO(MODEL_WEIGHTS)


output_path = "./data_output/bibi_and_pope_skeleton.mp4"
# Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # type: ignore


# We'll store raw estimators per frame in a list.
# Each item: a list of estimators, where estimator = (bbox_xyxy, kpts_xy, kpts_conf)
raw_estimators: List[List[Tuple[np.ndarray, np.ndarray, np.ndarray]]] = []


# Stuff for Video I/O

cap = cv2.VideoCapture(VIDEO_PATH)
fps = cap.get(cv2.CAP_PROP_FPS)
if fps is None or fps <= 0:
    fps = 30.0  # fallback (important!)
frame_idx = 0
    # add a video writer to fetch the skelaton video
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

# starting the every frame processing: 
while True:
    ret, frame = cap.read()
    
    if not ret:
        break
    


    # Run pose prediction
    # Note: model(frame) returns a Results list (usually len 1 for one image)

    # before passing the frame into the model I want to have a filter to improve the quality:
    blurred_frame = cv2.GaussianBlur(frame, (7, 7), 1.0)
    high_freq = cv2.subtract(frame.astype(float), blurred_frame.astype(float))
    alpha = 1.5
    sharpened = frame.astype(float) + alpha * high_freq
    sharpened = np.clip(sharpened, 0, 255).astype(np.uint8)
        
    results = model.predict(
        source=sharpened,
        conf=CONF_THRES,
        iou=IOU_THRES,
        verbose=False
    )

    dets_this_frame = []
    r0 = results[0]

    # r0.boxes.xyxy -> Nx4
    # r0.keypoints.xy -> NxKx2
    # r0.keypoints.conf -> NxK
    if r0.keypoints is not None and len(r0.keypoints) > 0:
        boxes_xyxy = r0.boxes.xyxy.cpu().numpy() if r0.boxes is not None else None # type: ignore

        kpts_xy    = r0.keypoints.xy.cpu().numpy() # type: ignore

        kpts_conf  = r0.keypoints.conf.cpu().numpy()  # type: ignore

        n_people = kpts_xy.shape[0]
        for i in range(n_people):
            bbox = boxes_xyxy[i] if boxes_xyxy is not None else None
            dets_this_frame.append((bbox, kpts_xy[i], kpts_conf[i]))

    raw_estimators.append(dets_this_frame)

    annotated_frame = results[0].plot()

    if annotated_frame.shape[:2] != (frame_height, frame_width):
        annotated_frame = cv2.resize(annotated_frame, (frame_width, frame_height))
    if annotated_frame.dtype != np.uint8:
        annotated_frame = annotated_frame.astype(np.uint8)
    
    
    # write a video frame with skeleton

    out.write(annotated_frame)
    frame_idx += 1

cap.release()
out.release()
n_frames = len(raw_estimators)
times = np.arange(n_frames) * dt
print(f"Loaded {n_frames} frames of estimators.")


Loaded 1122 frames of estimators.


The next section is needed only if the video contains 2 people. In my case I know that the SRL labs have 3 videos for each interactione ( 1 video containing the both side of the dyad while the other 2 videos are capturing each individual seperately). therefore, the next chulk might come handy if i would use only the dyad video - although that I intend to use all 3 videos.


the purpose of the next chulk enables the model to overcome of seeing the world in snapshots-  with it, the model understands a sequence. that means that since the video is divided into frames we need to form a way in which the model now that the person in frame(n ) and the person in frame(n+1) is the same person. basically a Name Tag phase.

### Identity tracking for dyads

First, let Understand whats this model yields: 
the first parameter is the frame index (remember that the model run 30 fps )and the second parameter is the Person. within each frame (element in the list) and person index theres 17 elements findicating the joint of the corresponding body part. 

In [13]:
raw_estimators[1000][0] 

(array([     834.34,      143.87,      1205.4,      711.51], dtype=float32),
 array([[     943.77,      237.66],
        [     961.87,      220.35],
        [     941.71,      222.49],
        [     1011.3,      216.71],
        [     954.74,      217.16],
        [       1105,      289.04],
        [     933.67,       278.3],
        [     1165.3,      422.64],
        [     900.03,      382.93],
        [     1041.3,      465.94],
        [     943.89,      448.62],
        [     1094.3,      506.03],
        [     976.57,       493.2],
        [     1041.6,      575.51],
        [     893.79,      557.94],
        [     1077.8,       715.9],
        [     958.32,      694.53]], dtype=float32),
 array([    0.94467,      0.9245,     0.64433,     0.88572,     0.15267,     0.99369,     0.95836,     0.98598,     0.88566,     0.97664,     0.87879,     0.98274,     0.96508,     0.87681,     0.77472,     0.41018,     0.31426], dtype=float32))

In [None]:
# Helpers for tracking
import numpy as np
from typing import List, Tuple, Optional


#bbox_centroid-  compute centroid from bbox - each box represents [x1,y1,x2,y2] which are the top-left and bottom-right corners
# each box is a person detected in the frame
def bbox_centroid(bbox_xyxy: np.ndarray) -> np.ndarray:
    """Compute centroid from [x1,y1,x2,y2]."""
    x1, y1, x2, y2 = bbox_xyxy
    return np.array([(x1 + x2) / 2.0, (y1 + y2) / 2.0], dtype=float)
#----------------------------

#bbox_area- compute area from bbox
def bbox_area(bbox_xyxy: np.ndarray) -> float:
    """Compute area of bounding box."""
    x1, y1, x2, y2 = bbox_xyxy
    return max(0.0, x2 - x1) * max(0.0, y2 - y1)
#----------------------------
# Greedy assignment function - assigns detected people in current frame to those in previous frame based on centroid distances
def greedy_assign(prev_centroids: List[np.ndarray],
                  curr_centroids: List[np.ndarray]) -> List[Optional[int]]:
    """
    Greedy assignment: returns mapping from prev index -> curr index (or None).
    """
    if len(prev_centroids) == 0 or len(curr_centroids) == 0:
        return [None] * len(prev_centroids)

    # Distance matrix (prev x curr)
    D = np.zeros((len(prev_centroids), len(curr_centroids)), dtype=float)
    for i, pc in enumerate(prev_centroids):
        for j, cc in enumerate(curr_centroids):
            D[i, j] = np.linalg.norm(pc - cc)

    assigned_curr = set()
    mapping = [None] * len(prev_centroids)

    for _ in range(min(len(prev_centroids), len(curr_centroids))):
        # Find the global minimum distance remaining in D
        i, j = np.unravel_index(np.argmin(D), D.shape)
        if D[i, j] == np.inf:
            break
            
        mapping[i] = j 
        assigned_curr.add(j)
        # "Remove" this row and column from future consideration
        D[i, :] = np.inf
        D[:, j] = np.inf

    return mapping # pyright: ignore[reportReturnType]


First let's create an empty list catch the raw_estimators for 2 persons given that the input would include a dyad so no need to over-complecate it. 

In [9]:

# --- Tracking two main subjects (IDs 0 and 1) ---
# 1. Initialize data containers
K = 17  # Standard COCO keypoints
n_frames = len(raw_estimators)
tracks = {
    0: {"kpts_xy": np.full((n_frames, K, 2), np.nan, dtype=float),
        "kpts_conf": np.full((n_frames, K), np.nan, dtype=float)},
    1: {"kpts_xy": np.full((n_frames, K, 2), np.nan, dtype=float),
        "kpts_conf": np.full((n_frames, K), np.nan, dtype=float)},
}


then  I would find the FIRST frame that actually contains a detection


In [None]:



first_valid_idx = None
for i, frame_dets in enumerate(raw_estimators):
    if len(frame_dets) > 0:
        first_valid_idx = i
        break

if first_valid_idx is None:
    raise RuntimeError("No detections found in any frame. Check your video or CONF_THRES.")

print(f"Starting tracking from frame index: {first_valid_idx}")


Starting tracking from frame index: 0


0

In [None]:

# 3. Initialize IDs 0 and 1 from that first valid frame (by largest area)

first_frame_dets = raw_estimators[first_valid_idx]


# Sort by area so the main subjects get IDs 0 and 1 - the largest two bboxes 
first_sorted = sorted(first_frame_dets, key=lambda d: bbox_area(d[0]) if d[0] is not None else 0, reverse=True)



# Assign to tracks - getting the condinate of the first frame identified of the 2 peson and STORE by SIZE
# arranging the first_sorted(which is raw_estimetors) so that the "0" would be the one how is the larger box in the 
#first frame identified with people

# so here we have the keypoints of the first frame identified of the 2 peson and STORE by SIZE
for pid in [0, 1]:
    if pid < len(first_sorted):
        bbox, kxy, kcf = first_sorted[pid]
        tracks[pid]["kpts_xy"][first_valid_idx] = kxy
        tracks[pid]["kpts_conf"][first_valid_idx] = kcf





[(array([     371.38,      104.53,      797.69,      662.71], dtype=float32),
  array([[     551.93,      240.41],
         [     587.16,      219.35],
         [     539.47,      211.19],
         [     644.91,      230.39],
         [     522.09,      209.09],
         [     709.83,      350.54],
         [     453.23,      334.01],
         [     763.28,      547.99],
         [     437.84,      536.56],
         [     677.92,      617.19],
         [     567.18,      624.48],
         [     652.79,      664.99],
         [     479.45,      655.01],
         [     683.69,      711.48],
         [     453.24,      678.58],
         [     689.54,         720],
         [     562.87,      680.47]], dtype=float32),
  array([    0.98782,     0.98007,     0.94901,     0.91652,     0.57789,     0.99126,     0.97965,       0.935,     0.84354,     0.87143,     0.73743,     0.72691,      0.6455,    0.054678,    0.039682,   0.0072372,    0.005859], dtype=float32)),
 (array([          0,      6

Now with the heavy lifting; It plays the video forward and ensures that "Person 0" in Frame 10 remains "Person 0" in Frame 11, even if they move.


In [None]:

# 4. Main Tracking Loop

for t in range(first_valid_idx + 1, n_frames): # It starts exactly one frame after your "Casting" frame and runs until the end of the video.
    dets = raw_estimators[t]
    if not dets:
        continue

    # Prepare previous centroids for matching
    prev_centroids = []
    prev_valid_pids = []

    # The "Memory" Phase (Where were they?):
    for pid in [0, 1]:
        # Get last known position from the tracks we are building
        last_kpts = tracks[pid]["kpts_xy"][t-1]
        if not np.all(np.isnan(last_kpts)):
            # Use the bounding box of the keypoints as the centroid
            x1, y1 = np.nanmin(last_kpts[:, 0]), np.nanmin(last_kpts[:, 1])
            x2, y2 = np.nanmax(last_kpts[:, 0]), np.nanmax(last_kpts[:, 1])
            prev_centroids.append(np.array([(x1+x2)/2, (y1+y2)/2]))
            prev_valid_pids.append(pid)

    # Prepare current detections
    curr_bboxes = [d[0] for d in dets if d[0] is not None]
    curr_centroids = [bbox_centroid(b) for b in curr_bboxes]

    if not curr_centroids or not prev_centroids:
        # Fallback: if matching fails, just assign first available detections
        for pid in [0, 1]:
            if pid < len(dets):
                _, kxy, kcf = dets[pid]
                tracks[pid]["kpts_xy"][t] = kxy
                tracks[pid]["kpts_conf"][t] = kcf
        continue

    # Match previous frame people to current frame people
    mapping = greedy_assign(prev_centroids, curr_centroids)

    used_det_indices = set()
    for i, j in enumerate(mapping):
        if j is None: continue
        
        pid = prev_valid_pids[i]
        target_bbox = curr_bboxes[j]
        
        # Find which detection index in 'dets' this belongs to
        for di, d in enumerate(dets):
            if d[0] is not None and np.allclose(d[0], target_bbox) and di not in used_det_indices:
                used_det_indices.add(di)
                _, kxy, kcf = d
                tracks[pid]["kpts_xy"][t] = kxy
                tracks[pid]["kpts_conf"][t] = kcf
                break

print("Tracking done (dyad IDs: 0 and 1).")

Tracking done (dyad IDs: 0 and 1).


now we have  tracks object is now a dictionary containing two main matrices for each person - keypooint and confidance.
If my video is 30 seconds long at 30 FPS, you have 900 rows of data

kpts_xy:
    Rows: Frame Number (Time).
    Columns: Body Part (Nose, Left Eye, Right Shoulder, etc.).
    The values are the coordinates $(x, y)$.
kpts_conf:
    Rows: Frame Number (Time).
    Columns: Body Part (Nose, Left Eye, Right Shoulder, etc.).
    the value is a single number (0.0 to 1.0) representing how "confident" the model is



## Preprocessing- data handling and noise reduction 
Now lets deal with noise and uncertainty 


In [None]:
# Helpers for cleaning keypoints
def apply_conf_mask(kpts_xy: np.ndarray, kpts_conf: np.ndarray, conf_min: float = 0.3) -> np.ndarray:
    """Set (x,y) to NaN where confidence < conf_min."""
    out = kpts_xy.copy()
    bad = (kpts_conf < conf_min) | np.isnan(kpts_conf)
    out[bad, :] = np.nan
    return out

def interp_nan_1d(y: np.ndarray, max_gap: int = 10) -> np.ndarray:
    """
    Interpolate NaNs in a 1D array; only fill gaps <= max_gap.
    """
    y2 = y.copy()
    n = len(y2)
    idx = np.arange(n)

    nan_mask = np.isnan(y2)
    if nan_mask.all():
        return y2

    # Interpolate all NaNs first
    y2[nan_mask] = np.interp(idx[nan_mask], idx[~nan_mask], y2[~nan_mask])

    # Re-mask large gaps
    # Find consecutive NaN runs in original
    runs = []
    start = None
    for i in range(n):
        if nan_mask[i] and start is None:
            start = i
        if (not nan_mask[i]) and start is not None:
            runs.append((start, i-1))
            start = None
    if start is not None:
        runs.append((start, n-1))

    for a, b in runs:
        if (b - a + 1) > max_gap:
            y2[a:b+1] = np.nan

    return y2

def smooth_1d(y: np.ndarray, window: int = 11, poly: int = 2) -> np.ndarray:
    """
    Savitzky-Golay smoothing, safe with NaNs (smooth only the valid part).
    """
    y2 = y.copy()
    valid = ~np.isnan(y2)
    if valid.sum() < window:
        return y2
    y2[valid] = savgol_filter(y2[valid], window_length=window, polyorder=poly)
    return y2


In [None]:

# Clean each person, each joint, both x and y
CONF_THER= 0.3
MAX_GAP_FRAMES = int(0.3 * fps)  # fill up to ~300ms gaps

clean_xy = {}
for pid in [0, 1]:
    xy = tracks[pid]["kpts_xy"]          # (T,K,2)
    cf = tracks[pid]["kpts_conf"]        # (T,K)

    # Apply confidence mask frame-by-frame
    xy_masked = np.full_like(xy, np.nan)
    for t in range(n_frames):
        xy_masked[t] = apply_conf_mask(xy[t], cf[t], conf_min=CONF_THER)

    # Interpolate + smooth per joint, per coordinate over time
    xy_filt = xy_masked.copy()
    for j in range(K):
        for c in [0, 1]:  # x,y
            series = xy_masked[:, j, c]
            series = interp_nan_1d(series, max_gap=MAX_GAP_FRAMES)
            series = smooth_1d(series, window=11, poly=2)
            xy_filt[:, j, c] = series

    clean_xy[pid] = xy_filt

print("Keypoints cleaned (confidence mask + interpolation + smoothing).")
