In [1]:
# --- Cell 1: Imports, config, indices, base helpers ---

import os, json, math
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional, Any
from src.keypoints_extractor import PoseEstimator

import cv2
import numpy as np

# ---------- Config principali ----------
TARGET_FPS = 15
VIS_TH = 0.5             # soglia visibilità landmark
SMOOTH_WIN = 7           # smoothing temporale "morbido" (frames) su serie derivate

# Kalman (costanti di default per modello velocità costante 1D)
KALMAN_Q = 1e-3          # process noise
KALMAN_R = 4e-3          # measurement noise

# ---------- Indici MediaPipe Pose ----------
class POSE:
    NOSE = 0
    LEFT_EYE_INNER = 1
    LEFT_EYE = 2
    LEFT_EYE_OUTER = 3
    RIGHT_EYE_INNER = 4
    RIGHT_EYE = 5
    RIGHT_EYE_OUTER = 6
    LEFT_EAR = 7
    RIGHT_EAR = 8
    LEFT_MOUTH = 9
    RIGHT_MOUTH = 10
    LEFT_SHOULDER = 11
    RIGHT_SHOULDER = 12
    LEFT_ELBOW = 13
    RIGHT_ELBOW = 14
    LEFT_WRIST = 15
    RIGHT_WRIST = 16
    LEFT_PINKY = 17
    RIGHT_PINKY = 18
    LEFT_INDEX = 19
    RIGHT_INDEX = 20
    LEFT_THUMB = 21
    RIGHT_THUMB = 22
    LEFT_HIP = 23
    RIGHT_HIP = 24
    LEFT_KNEE = 25
    RIGHT_KNEE = 26
    LEFT_ANKLE = 27
    RIGHT_ANKLE = 28
    LEFT_HEEL = 29
    RIGHT_HEEL = 30
    LEFT_FOOT_INDEX = 31
    RIGHT_FOOT_INDEX = 32

# ---------- Helpers geometrici ----------
def _xy_from_landmark(lmk, frame_shape):
    h, w = frame_shape[:2]
    return np.array([lmk.x*w, lmk.y*h], dtype=np.float32)

def angle3p_2d(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> float:
    ab, cb = a - b, c - b
    denom = (np.linalg.norm(ab)*np.linalg.norm(cb) + 1e-9)
    return math.degrees(math.acos(np.clip(np.dot(ab, cb)/denom, -1.0, 1.0)))

def angle3p_3d(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> float:
    ab, cb = a - b, c - b
    denom = (np.linalg.norm(ab)*np.linalg.norm(cb) + 1e-9)
    return math.degrees(math.acos(np.clip(np.dot(ab, cb)/denom, -1.0, 1.0)))

def smooth_signal(x: np.ndarray, win: int) -> np.ndarray:
    if win <= 1 or len(x) < 3: return x
    k = min(win, len(x))
    if k % 2 == 0: k -= 1
    if k < 1: return x
    kernel = np.ones(k, dtype=np.float32)/k
    return np.convolve(x, kernel, mode='same')

In [2]:
# --- Cell V0: verbose helper ---
def vprint(verbose: bool, *args):
    if verbose:
        print(*args)

In [3]:
current_directory = os.getcwd()
print("La directory corrente è:", current_directory)
parent_directory = os.path.dirname(current_directory)
print("La directory genitore è:", parent_directory)
os.chdir(parent_directory)
print("La nuova directory corrente è:", os.getcwd())

La directory corrente è: /Users/giorgio/PersonalProjects/ai_trainer/notebooks
La directory genitore è: /Users/giorgio/PersonalProjects/ai_trainer
La nuova directory corrente è: /Users/giorgio/PersonalProjects/ai_trainer


In [4]:
# --- Cell 2: IO helpers ---

def load_meta_json(path: str) -> Dict[str, Any]:
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)

def save_json(obj: Dict[str, Any], path: str) -> None:
    with open(path, 'w', encoding='utf-8') as f:
        json.dump(obj, f, ensure_ascii=False, indent=2)

def iter_video_frames(path: str, target_fps: int = TARGET_FPS):
    """
    Itera sui frame del video/GIF restituendo (frame_bgr, t_sec, idx) downsamplato a target_fps.
    """
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        raise RuntimeError(f"Impossibile aprire: {path}")

    src_fps = cap.get(cv2.CAP_PROP_FPS)
    if src_fps is None or src_fps <= 0:
        src_fps = 30.0
    step = max(1, int(round(src_fps / target_fps)))
    eff_fps = src_fps / step

    raw_idx = 0
    out_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if raw_idx % step == 0:
            t = out_idx / eff_fps
            yield frame, float(t), int(out_idx)
            out_idx += 1
        raw_idx += 1
    cap.release()

In [5]:
# --- Cell 2: Video reader + presence detection ---

def iter_video_frames(path: str, target_fps: int = TARGET_FPS):
    cap = cv2.VideoCapture(path)
    if not cap.isOpened():
        raise RuntimeError(f"Impossibile aprire: {path}")
    src_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    step = max(1, int(round(src_fps / target_fps)))
    eff_fps = src_fps / step

    raw_idx = 0
    out_idx = 0
    while True:
        ok, frame = cap.read()
        if not ok: break
        if raw_idx % step == 0:
            yield frame, float(out_idx/eff_fps), int(out_idx)
            out_idx += 1
        raw_idx += 1
    cap.release()

def presence_score(kp2d: Dict[int, Dict[str,float]]) -> float:
    """Percentuale landmark con visibilità >= VIS_TH."""
    if not kp2d: return 0.0
    v = [d.get('v',0.0) for d in kp2d.values()]
    ok = sum(1 for z in v if z >= VIS_TH)
    return ok / max(len(v),1)

In [6]:
# --- Cell V1: Estrazione 2D+3D con stampe ---
def extract_pose_sequence(video_path: str, pose_estimator, vis_th: float = VIS_TH, verbose: bool = True):
    frames, shapes = [], []
    total = 0
    for frame_bgr, t_sec, idx in iter_video_frames(video_path, TARGET_FPS):
        total += 1
        res = pose_estimator.process(frame_bgr)
        shapes.append(frame_bgr.shape)

        kp2d = {}
        if res.pose_landmarks:
            for i, lm in enumerate(res.pose_landmarks.landmark):
                if lm.visibility >= vis_th:
                    x, y = _xy_from_landmark(lm, frame_bgr.shape)
                    kp2d[i] = {'x': float(x), 'y': float(y), 'v': float(lm.visibility)}

        kp3d = {}
        if res.pose_world_landmarks:
            for i, lm in enumerate(res.pose_world_landmarks.landmark):
                v = kp2d.get(i, {}).get('v', 0.0)
                kp3d[i] = {'X': float(lm.x), 'Y': float(lm.y), 'Z': float(lm.z), 'v': float(v)}

        presence = presence_score(kp2d)
        frames.append({
            'frame_idx': idx,
            't': float(t_sec),
            'keypoints': kp2d,
            'world': kp3d,
            'presence': presence
        })
        if verbose and idx % 30 == 0:
            print(f"[extract] frame={idx:4d} t={t_sec:5.2f}s  visible%={presence*100:4.1f}  kp2d={len(kp2d)}  kp3d={len(kp3d)}")

    if verbose:
        print(f"[extract] tot_frame_analizzati={total}  (downsample a ~{TARGET_FPS} fps)")
    return frames, shapes

In [7]:
# --- Cell V2: Rep detection (robusta) su segnali essenziali 3D/body-centric ---

from dataclasses import dataclass
from typing import List, Tuple, Optional

# Parametri (puoi ritoccarli se serve)
MIN_REL_ROM = 0.14
MIN_REP_DURATION_S   = 0.50
MIN_PHASE_DURATION_S = 0.18
MIN_GAP_BETWEEN_REPS_S = 0.20
SMOOTH_WIN = 7  # già definito: va bene così

@dataclass
class Rep:
    start_idx: int
    bottom_idx: int
    end_idx: int
    start_t: float
    bottom_t: float
    end_t: float
    signal_key: str
    prom_min: float
    rom: float
    duration: float

def _find_local_extrema(x: np.ndarray) -> Tuple[List[int], List[int]]:
    maxima, minima = [], []
    for i in range(1, len(x)-1):
        if x[i] > x[i-1] and x[i] > x[i+1]: maxima.append(i)
        if x[i] < x[i-1] and x[i] < x[i+1]: minima.append(i)
    return maxima, minima

def _detect_reps_on_signal(t: np.ndarray, x: np.ndarray, key: str) -> List[Rep]:
    if len(x) < 5: return []
    xs = smooth_signal(x.copy(), SMOOTH_WIN)
    maxima, minima = _find_local_extrema(xs)
    if not maxima or not minima: return []
    glob_rom = float(np.nanmax(xs) - np.nanmin(xs))
    thr_prom = float(MIN_REL_ROM * max(glob_rom, 1e-6))
    v = np.zeros_like(xs);  # velocità numerica
    for i in range(1,len(xs)-1):
        dt = max(t[i+1]-t[i-1], 1e-6)
        v[i] = (xs[i+1]-xs[i-1])/dt
    v[0] = (xs[1]-xs[0])/max(t[1]-t[0],1e-6)
    v[-1]= (xs[-1]-xs[-2])/max(t[-1]-t[-2],1e-6)

    reps = []
    last_end_t = -1e9
    last_end_idx = -10**9
    for mi in minima:
        tops_before = [j for j in maxima if j < mi]
        tops_after  = [j for j in maxima if j > mi]
        if not tops_before or not tops_after: continue
        s = tops_before[-1]; e = tops_after[0]
        # refrattario
        if s <= last_end_idx or (t[s]-last_end_t) < MIN_GAP_BETWEEN_REPS_S:
            continue
        # prominence
        prom_left  = float(xs[s] - xs[mi])
        prom_right = float(xs[e] - xs[mi])
        prom_min = min(prom_left, prom_right)
        if prom_left < thr_prom or prom_right < thr_prom: continue
        # durate
        t_ecc = float(t[mi]-t[s]); t_con = float(t[e]-t[mi]); t_rep = float(t[e]-t[s])
        if t_rep < MIN_REP_DURATION_S or t_ecc < MIN_PHASE_DURATION_S or t_con < MIN_PHASE_DURATION_S: continue
        # segno velocità coerente
        if np.median(v[s:mi+1]) >= 0: continue  # deve scendere
        if np.median(v[mi:e+1]) <= 0: continue  # deve salire
        rep_rom = float(max(xs[s], xs[e]) - xs[mi])
        reps.append(Rep(s, mi, e, float(t[s]), float(t[mi]), float(t[e]), key, float(prom_min), float(rep_rom), float(t_rep)))
        last_end_t = t[e]; last_end_idx = e
    return reps

def detect_reps_essential(frames_out: List[Dict[str,Any]], verbose: bool = True) -> Tuple[List[Rep], str]:
    """
    Sceglie il miglior segnale tra:
      - 'features_3d.pelvis_y_body' (invertito)
      - angoli ginocchio/anca/gomito/spalla/3D (non invertiti)
    """
    candidates = [
        ('pelvis_y_body', True),
        ('knee_L_deg_3d', False), ('knee_R_deg_3d', False),
        ('hip_L_deg_3d', False),  ('hip_R_deg_3d', False),
        ('shoulder_L_deg_3d', False), ('shoulder_R_deg_3d', False),
        ('elbow_L_deg_3d', False),    ('elbow_R_deg_3d', False),
        ('ankle_L_deg_3d', False),    ('ankle_R_deg_3d', False),
    ]
    t = np.array([f['t'] for f in frames_out], dtype=np.float32)
    best_reps, best_key, best_score = [], None, (-1, -np.inf, -np.inf)

    for key, invert in candidates:
        x = []
        for f in frames_out:
            val = f['features_3d'].get(key)
            x.append(np.nan if val is None else float(val))
        x = np.array(x, dtype=np.float32)
        if np.all(np.isnan(x)):
            continue
        # fill -> smooth handled inside detector
        xw = -x if invert else x
        reps = _detect_reps_on_signal(t, xw, key)
        if not reps:
            continue
        rep_count = len(reps)
        med_prom  = float(np.median([r.prom_min for r in reps]))
        med_rom   = float(np.median([r.rom for r in reps]))
        score = (rep_count, med_prom, med_rom)
        if score > best_score:
            best_score = score; best_reps = reps; best_key = key

    if verbose:
        if best_key:
            print(f"[reps] segnale='{best_key}'  reps={len(best_reps)}  med_prom={best_score[1]:.2f}  med_rom={best_score[2]:.2f}")
        else:
            print("[reps] nessuna ripetizione rilevata con i segnali essenziali")

    # etichetta i keyframe a bordo del frames_out
    for r in best_reps:
        def set_if_none(i, tag):
            frames_out[i]['label'] = frames_out[i].get('label') or tag
        set_if_none(r.start_idx, 'top_start')
        set_if_none(r.bottom_idx,'bottom')
        set_if_none(r.end_idx,  'top_end')
        mid_ecc = (r.start_idx + r.bottom_idx)//2
        mid_con = (r.bottom_idx + r.end_idx)//2
        set_if_none(mid_ecc, 'mid_eccentric')
        set_if_none(mid_con, 'mid_concentric')

    return best_reps, (best_key or 'none')

In [8]:
# --- Cell V3: Visualizzazione keyframe con overlay e label ---

def extract_keyframe_indices_from_reps(reps: List[Rep]) -> List[int]:
    idxs = []
    for r in reps:
        idxs += [r.start_idx, (r.start_idx+r.bottom_idx)//2, r.bottom_idx, (r.bottom_idx+r.end_idx)//2, r.end_idx]
    return sorted(set(idxs))

def save_keyframe_overlays_from_frames(video_path: str,
                                       frames_raw: List[Dict[str,Any]],
                                       frames_out: List[Dict[str,Any]],
                                       key_indices: List[int],
                                       pose_estimator,
                                       out_dir: str = "debug_keyframes",
                                       visibility_th: float = VIS_TH,
                                       verbose: bool = True):
    os.makedirs(out_dir, exist_ok=True)
    # Ricostruiamo una mappa idx->label per scriverlo sull'immagine
    labels = {f['frame_idx']: (f.get('label') or '') for f in frames_out if f.get('label')}

    # Iteriamo i frame del video nella stessa cadenza usata in estrazione
    cap = cv2.VideoCapture(video_path)
    src_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    step = max(1, int(round(src_fps / TARGET_FPS)))

    raw_idx = 0
    out_idx = 0
    sel = set(key_indices)
    saved = 0
    while True:
        ok, frame = cap.read()
        if not ok: break
        if raw_idx % step == 0:
            if out_idx in sel:
                res = pose_estimator.process(frame)
                overlay = pose_estimator.draw_landmarks(frame, res.pose_landmarks if res else None, visibility_th=visibility_th)
                # label in alto a sinistra
                tag = labels.get(out_idx, '')
                if tag:
                    cv2.putText(overlay, f"{out_idx:04d} | {tag}", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,255,0), 2, cv2.LINE_AA)
                else:
                    cv2.putText(overlay, f"{out_idx:04d}", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,255,0), 2, cv2.LINE_AA)
                out_path = os.path.join(out_dir, f"kf_{out_idx:04d}.jpg")
                cv2.imwrite(out_path, overlay)
                saved += 1
            out_idx += 1
        raw_idx += 1
    cap.release()
    if verbose:
        print(f"[viz] keyframes salvati: {saved} in '{out_dir}'")

In [9]:
# --- Cell 4: Kalman 1D cost-vel per 3D landmarks ---

class Kalman1D:
    def __init__(self, q=KALMAN_Q, r=KALMAN_R):
        # stato = [pos, vel]
        self.x = np.zeros((2,1), dtype=np.float32)
        self.P = np.eye(2, dtype=np.float32)
        self.q = float(q)
        self.r = float(r)
        self.initialized = False

    def predict(self, dt):
        F = np.array([[1, dt],
                      [0,  1 ]], dtype=np.float32)
        Q = np.array([[self.q*dt*dt, 0],
                      [0,           self.q]], dtype=np.float32)
        self.x = F @ self.x
        self.P = F @ self.P @ F.T + Q

    def update(self, z):
        H = np.array([[1,0]], dtype=np.float32)
        R = np.array([[self.r]], dtype=np.float32)
        y = np.array([[z]], dtype=np.float32) - H @ self.x
        S = H @ self.P @ H.T + R
        K = self.P @ H.T @ np.linalg.inv(S)
        self.x = self.x + K @ y
        self.P = (np.eye(2) - K @ H) @ self.P

    def step(self, z, dt):
        if not self.initialized:
            self.x = np.array([[z],[0]], dtype=np.float32)
            self.P = np.eye(2, dtype=np.float32)
            self.initialized = True
            return z
        self.predict(dt)
        self.update(z)
        return float(self.x[0,0])

def kalman_smooth_world(frames: List[Dict[str,Any]], joints: List[int] = list(range(33))) -> List[Dict[int, Dict[str,float]]]:
    """
    Restituisce lista per-frame di world smussati: {id:{'X','Y','Z','v'}}
    """
    # prepara filtri per ogni (joint, axis)
    filters = {(j,ax): Kalman1D() for j in joints for ax in 'XYZ'}

    out = []
    prev_t = frames[0]['t'] if frames else 0.0
    for f in frames:
        dt = max(f['t'] - prev_t, 1.0/TARGET_FPS)
        prev_t = f['t']
        sm = {}
        for j in joints:
            if j in f['world']:
                X, Y, Z = f['world'][j]['X'], f['world'][j]['Y'], f['world'][j]['Z']
                v = f['world'][j]['v']
                Xs = filters[(j,'X')].step(X, dt)
                Ys = filters[(j,'Y')].step(Y, dt)
                Zs = filters[(j,'Z')].step(Z, dt)
                sm[j] = {'X':Xs, 'Y':Ys, 'Z':Zs, 'v':v}
        out.append(sm)
    return out

In [10]:
# --- Cell 5: Body-centric transform + view estimation ---

def body_frame_from_world(world_pts: Dict[int, Dict[str,float]]) -> Optional[Tuple[np.ndarray, np.ndarray]]:
    need = [POSE.LEFT_HIP, POSE.RIGHT_HIP, POSE.LEFT_SHOULDER, POSE.RIGHT_SHOULDER]
    if any(i not in world_pts for i in need): return None
    LHIP = np.array([world_pts[POSE.LEFT_HIP][k]  for k in 'XYZ'], dtype=np.float32)
    RHIP = np.array([world_pts[POSE.RIGHT_HIP][k] for k in 'XYZ'], dtype=np.float32)
    LSH  = np.array([world_pts[POSE.LEFT_SHOULDER][k]  for k in 'XYZ'], dtype=np.float32)
    RSH  = np.array([world_pts[POSE.RIGHT_SHOULDER][k] for k in 'XYZ'], dtype=np.float32)

    hip_mid = 0.5*(LHIP + RHIP)
    sh_mid  = 0.5*(LSH + RSH)

    x_axis = RHIP - LHIP; x_axis /= (np.linalg.norm(x_axis)+1e-9)
    y_axis = sh_mid - hip_mid; y_axis /= (np.linalg.norm(y_axis)+1e-9)
    z_axis = np.cross(x_axis, y_axis); z_axis /= (np.linalg.norm(z_axis)+1e-9)
    y_axis = np.cross(z_axis, x_axis); y_axis /= (np.linalg.norm(y_axis)+1e-9)

    R = np.stack([x_axis, y_axis, z_axis], axis=0)  # rows = body axes in world basis
    origin = hip_mid
    return R, origin

def to_body_coords(world_pts: Dict[int, Dict[str,float]]) -> Optional[Dict[int, np.ndarray]]:
    bf = body_frame_from_world(world_pts)
    if bf is None: return None
    R, origin = bf
    out = {}
    for i, v in world_pts.items():
        P = np.array([v['X'], v['Y'], v['Z']], dtype=np.float32)
        out[i] = R @ (P - origin)  # world->body
    return out

# --- Patch 1: Robust view estimation (3D + face cues + smoothing-ready) ---

def _yaw_from_shoulders_3d(world_pts: Dict[int, Dict[str,float]]) -> Optional[float]:
    """Yaw stimato da spalle 3D: atan2(dz, dx). |yaw|~0 => front/back; ~90 => side."""
    need = [POSE.LEFT_SHOULDER, POSE.RIGHT_SHOULDER]
    if any(i not in world_pts for i in need):
        return None
    L = world_pts[POSE.LEFT_SHOULDER]; R = world_pts[POSE.RIGHT_SHOULDER]
    dx = float(R['X'] - L['X'])
    dz = float(R['Z'] - L['Z'])
    yaw = math.degrees(math.atan2(dz, dx + 1e-9))
    return float(yaw)

def _face_visibility_score(kp2d: Dict[int, Dict[str,float]]) -> float:
    """Somma visibilità di NOSE + occhi come indizio di 'front/semi_front'."""
    ids = [POSE.NOSE, POSE.LEFT_EYE, POSE.RIGHT_EYE]
    return float(sum(kp2d.get(i, {}).get('v', 0.0) for i in ids))

def estimate_view_robust(world_pts: Dict[int, Dict[str,float]],
                         kp2d: Optional[Dict[int, Dict[str,float]]] = None) -> Dict[str, Optional[float]]:
    """
    Restituisce: {'view': label, 'yaw_deg': yaw, 'roll_deg': roll, 'pitch_deg': pitch, 'front_score': s}
    - usa yaw dalle spalle 3D
    - disambiguazione front/back con face_visibility
    - soglie morbide per semi_front / semi_back
    """
    # Se non abbiamo i 3D sufficienti, fallback "unknown"
    need = [POSE.LEFT_HIP, POSE.RIGHT_HIP, POSE.LEFT_SHOULDER, POSE.RIGHT_SHOULDER]
    if any(i not in world_pts for i in need):
        return {'view':'unknown','yaw_deg':None,'roll_deg':None,'pitch_deg':None,'front_score':None}

    # Body frame per roll/pitch (come prima)
    # Nota: roll/pitch sono meno usati per view, li manteniamo per debug.
    Rinfo = body_frame_from_world(world_pts)
    if Rinfo is None:
        return {'view':'unknown','yaw_deg':None,'roll_deg':None,'pitch_deg':None,'front_score':None}
    R, _ = Rinfo
    x_body, y_body, z_body = R[0], R[1], R[2]
    roll  = math.degrees(math.atan2(x_body[1], np.linalg.norm([x_body[0], x_body[2]]) + 1e-9))
    pitch = math.degrees(math.atan2(y_body[2], y_body[1] + 1e-9))

    # Yaw continuo dai 3D delle spalle
    yaw = _yaw_from_shoulders_3d(world_pts)
    if yaw is None:
        return {'view':'unknown','yaw_deg':None,'roll_deg':float(roll),'pitch_deg':float(pitch),'front_score':None}
    yaw_abs = abs(yaw)

    # Indizio "frontness" dai landmark del viso
    face_vis = _face_visibility_score(kp2d or {})
    # Normalizziamo grossolanamente: se NOSE+occhi tutti ben visibili (~>=1.2) => forte indizio front/semi_front
    # (MediaPipe visibility ~ [0..1] per landmark)
    front_score = min(face_vis / 1.5, 1.0)  # [0..1] circa

    # Regole morbide per label:
    # - yaw small ⇒ front/back zone: usiamo face_score per scegliere il lato
    # - yaw medium ⇒ semi_front/side
    # - yaw large ⇒ side / semi_back
    # Soglie suggerite (tunable):
    T_SEMI = 25.0   # sotto ~25° consideriamo "quasi front/back"
    T_SIDE = 65.0   # sopra ~65° consideriamo "quasi side"

    if yaw_abs <= T_SEMI:
        # fascia front/back: discriminazione via front_score
        if front_score >= 0.6:
            view = 'front'
        elif front_score >= 0.35:
            view = 'semi_front'
        else:
            view = 'back'  # poche feature facciali visibili ⇒ probabile schiena o viso girato
    elif yaw_abs <= T_SIDE:
        # zona intermedia
        if front_score >= 0.5:
            view = 'semi_front'
        elif front_score <= 0.2:
            view = 'semi_back'
        else:
            view = 'side'
    else:
        # quasi side pieno
        view = 'side' if front_score >= 0.25 else 'semi_back'

    return {'view':view, 'yaw_deg':float(yaw), 'roll_deg':float(roll), 'pitch_deg':float(pitch), 'front_score':float(front_score)}

In [11]:
# --- Patch 2: Temporal smoothing for view estimation ---

from collections import deque, Counter

def smooth_yaw_series(yaws: List[Optional[float]], win: int = 9) -> List[Optional[float]]:
    buf = deque(maxlen=win)
    out = []
    for y in yaws:
        if y is not None:
            buf.append(y)
            out.append(float(np.mean(buf)))
        else:
            out.append(None)
    return out

def smooth_labels(labels: List[str], win: int = 9) -> List[str]:
    buf = deque(maxlen=win)
    out = []
    for lab in labels:
        if lab != 'unknown':
            buf.append(lab)
        if len(buf) == 0:
            out.append(lab)
        else:
            # majority vote
            c = Counter(buf)
            out.append(c.most_common(1)[0][0])
    return out

def estimate_view_sequence(frames: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
    """
    Calcola la view per frame con estimate_view_robust e applica smoothing (yaw + majority su classi).
    Ritorna una lista parallela di dict view_info.
    """
    views_raw = []
    yaws = []
    for f in frames:
        w3 = f.get('world', {})
        kp = f.get('keypoints', {})
        vi = estimate_view_robust(w3, kp2d=kp)
        views_raw.append(vi)
        yaws.append(vi.get('yaw_deg'))

    yaws_sm = smooth_yaw_series(yaws, win=9)
    # sostituisci yaw smussato
    for vi, ys in zip(views_raw, yaws_sm):
        if ys is not None:
            vi['yaw_deg'] = float(ys)

    # smooth label
    labels = [vi['view'] for vi in views_raw]
    labels_sm = smooth_labels(labels, win=9)
    for vi, lab in zip(views_raw, labels_sm):
        vi['view'] = lab

    return views_raw

In [12]:
# --- Cell 6: Essential, view-agnostic features (per-frame) ---

ESSENTIAL_JOINT_ANGLES_3D = {
    'shoulder_L_deg_3d': (POSE.LEFT_ELBOW,  POSE.LEFT_SHOULDER,  POSE.LEFT_HIP),
    'shoulder_R_deg_3d': (POSE.RIGHT_ELBOW, POSE.RIGHT_SHOULDER, POSE.RIGHT_HIP),
    'elbow_L_deg_3d':    (POSE.LEFT_SHOULDER,  POSE.LEFT_ELBOW,  POSE.LEFT_WRIST),
    'elbow_R_deg_3d':    (POSE.RIGHT_SHOULDER, POSE.RIGHT_ELBOW, POSE.RIGHT_WRIST),
    'hip_L_deg_3d':      (POSE.LEFT_SHOULDER,  POSE.LEFT_HIP,  POSE.LEFT_KNEE),
    'hip_R_deg_3d':      (POSE.RIGHT_SHOULDER, POSE.RIGHT_HIP, POSE.RIGHT_KNEE),
    'knee_L_deg_3d':     (POSE.LEFT_HIP,  POSE.LEFT_KNEE,  POSE.LEFT_ANKLE),
    'knee_R_deg_3d':     (POSE.RIGHT_HIP, POSE.RIGHT_KNEE, POSE.RIGHT_ANKLE),
    'ankle_L_deg_3d':    (POSE.LEFT_KNEE,  POSE.LEFT_ANKLE,  POSE.LEFT_FOOT_INDEX),
    'ankle_R_deg_3d':    (POSE.RIGHT_KNEE, POSE.RIGHT_ANKLE, POSE.RIGHT_FOOT_INDEX),
}

# --- Patch 3: usa stima view sequenziale smussata in compute_essential_features ---

def compute_essential_features(frames: List[Dict[str,Any]],
                               world_smoothed: List[Dict[int, Dict[str,float]]],
                               user_hint: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
    # Prepara una lista "frames_for_view" che contenga world + kp2d (con i world già smooth)
    frames_for_view = []
    for f, w3 in zip(frames, world_smoothed):
        frames_for_view.append({
            'world': w3,
            'keypoints': f.get('keypoints', {}),
            't': f['t'],
            'frame_idx': f['frame_idx'],
            'presence': f['presence'],
        })
    # Stima view smussata
    views_seq = estimate_view_sequence(frames_for_view)

    out = []
    for f, w3, vinfo in zip(frames, world_smoothed, views_seq):
        # quality/presence
        q = {'visible_ratio': float(f['presence'])}

        # body-centric transform
        body = to_body_coords(w3)

        feats = {}
        # Angoli 3D invarianti
        if w3:
            def get3(i):
                if i in w3: return np.array([w3[i]['X'], w3[i]['Y'], w3[i]['Z']], dtype=np.float32)
                return None
            for name, (a,b,c) in ESSENTIAL_JOINT_ANGLES_3D.items():
                pa, pb, pc = get3(a), get3(b), get3(c)
                feats[name] = float(angle3p_3d(pa,pb,pc)) if (pa is not None and pb is not None and pc is not None) else None

        # Distanze normalizzate nel body frame
        if body is not None and all(k in body for k in [POSE.LEFT_ANKLE, POSE.RIGHT_ANKLE, POSE.LEFT_HIP, POSE.RIGHT_HIP,
                                                        POSE.LEFT_SHOULDER, POSE.RIGHT_SHOULDER]):
            ankle_L, ankle_R = body[POSE.LEFT_ANKLE], body[POSE.RIGHT_ANKLE]
            hip_L, hip_R     = body[POSE.LEFT_HIP],   body[POSE.RIGHT_HIP]
            sh_L, sh_R       = body[POSE.LEFT_SHOULDER], body[POSE.RIGHT_SHOULDER]
            hip_w = np.linalg.norm(hip_R - hip_L) + 1e-6
            stance = np.linalg.norm(ankle_R - ankle_L)
            feats['stance_width_norm'] = float(stance / hip_w)
            hip_mid = 0.5*(hip_L + hip_R)
            sh_mid  = 0.5*(sh_L + sh_R)
            feats['pelvis_y_body'] = float(hip_mid[1])
            v = sh_mid - hip_mid
            y_axis = np.array([0,1,0], dtype=np.float32)
            denom = (np.linalg.norm(v)*np.linalg.norm(y_axis)+1e-9)
            feats['torso_incline_deg'] = float(math.degrees(math.acos(np.clip(np.dot(v,y_axis)/denom, -1,1))))
        else:
            feats['stance_width_norm'] = None
            feats['pelvis_y_body'] = None
            feats['torso_incline_deg'] = None

        if user_hint and isinstance(user_hint.get('height_cm'), (int,float)):
            feats['user_height_cm'] = float(user_hint['height_cm'])

        out.append({
            'frame_idx': f['frame_idx'],
            't': f['t'],
            'presence': f['presence'],
            'quality': q,
            'view': vinfo,           # <-- ora con label smussata + yaw/pitch/roll/front_score
            'features_3d': feats
        })
    return out

In [13]:
# --- Cell 7: Time-series essenziali + quality score clip ---

ESSENTIAL_KEYS_TS = [
    # angoli 3D principali
    'shoulder_L_deg_3d','shoulder_R_deg_3d',
    'elbow_L_deg_3d','elbow_R_deg_3d',
    'hip_L_deg_3d','hip_R_deg_3d',
    'knee_L_deg_3d','knee_R_deg_3d',
    'ankle_L_deg_3d','ankle_R_deg_3d',
    # distanze normalizzate/body-centric
    'stance_width_norm','pelvis_y_body','torso_incline_deg'
]

def extract_series(frames_out: List[Dict[str,Any]], key: str) -> Tuple[np.ndarray, np.ndarray]:
    t = np.array([f['t'] for f in frames_out], dtype=np.float32)
    x = []
    for f in frames_out:
        val = f['features_3d'].get(key)
        x.append(np.nan if val is None else float(val))
    return t, np.array(x, dtype=np.float32)

def clip_quality(frames_out: List[Dict[str,Any]]) -> Dict[str, float]:
    vis = np.array([f['presence'] for f in frames_out], dtype=np.float32) if frames_out else np.array([])
    return {
        'mean_presence': float(np.mean(vis)) if vis.size else 0.0,
        'min_presence': float(np.min(vis)) if vis.size else 0.0,
        'frames': int(len(frames_out))
    }

def series_package(frames_out: List[Dict[str,Any]]) -> Dict[str, Any]:
    pkg = {}
    for k in ESSENTIAL_KEYS_TS:
        t, x = extract_series(frames_out, k)
        xs = smooth_signal(x, SMOOTH_WIN)
        pkg[k] = {
            't': t.tolist(),
            'series': xs.tolist(),
            'rom': (float(np.nanmax(xs)-np.nanmin(xs)) if xs.size else None)
        }
    return pkg

In [14]:
# --- Cell V4: Pipeline completa con stampe, rep count e keyframe overlay ---

def process_clip_crossview_verbose(meta_in: Dict[str,Any],
                                   pose_estimator,
                                   out_json_path: Optional[str] = None,
                                   user_hint: Optional[Dict[str,Any]] = None,
                                   save_keyframes: bool = True,
                                   keyframes_dir: str = "debug_keyframes",
                                   verbose: bool = True) -> Dict[str,Any]:

    if 'media' not in meta_in or 'clip_path' not in meta_in['media']:
        raise ValueError("meta_in deve contenere media.clip_path")
    video_path = meta_in['media']['clip_path']
    if not os.path.exists(video_path):
        raise FileNotFoundError(f"Video non trovato: {video_path}")

    vprint(verbose, f"[start] file='{video_path}'  TARGET_FPS={TARGET_FPS}")

    # 1) Pose 2D+3D con log
    frames_raw, _ = extract_pose_sequence(video_path, pose_estimator, vis_th=VIS_TH, verbose=verbose)
    vprint(verbose, f"[presence] media visibilità: {np.mean([f['presence'] for f in frames_raw]):.3f}")

    # 2) Kalman sui 3D
    vprint(verbose, "[kalman] smoothing world landmarks ...")
    world_smoothed = kalman_smooth_world(frames_raw)

    # 3) Feature essenziali 3D + view + quality
    vprint(verbose, "[features] calcolo feature 3D essenziali ...")
    frames_out = compute_essential_features(frames_raw, world_smoothed, user_hint=user_hint)

    # 4) Rep detection (robusta) + keyframes
    vprint(verbose, "[reps] rilevazione ripetizioni ...")
    reps, used_signal = detect_reps_essential(frames_out, verbose=verbose)
    kf_idxs = extract_keyframe_indices_from_reps(reps)
    vprint(verbose, f"[reps] conteggio ripetizioni = {len(reps)}  | keyframes estratti = {len(kf_idxs)}")

    # 5) Time series (essenziali) + qualità clip
    vprint(verbose, "[timeseries] estrazione serie e ROM ...")
    ts = series_package(frames_out)
    qclip = clip_quality(frames_out)

    # 6) View dominante
    views = [f['view'].get('view') for f in frames_out if f.get('view')]
    dom_view = None
    if views:
        vals, counts = np.unique([v or 'unknown' for v in views], return_counts=True)
        dom_view = str(vals[np.argmax(counts)])
    if user_hint and user_hint.get('view_hint'):
        dom_view = user_hint['view_hint']
    vprint(verbose, f"[view] dominante: {dom_view}")

    # 7) Salva keyframes con overlay
    if save_keyframes and len(kf_idxs) > 0:
        vprint(verbose, "[viz] salvataggio keyframes con overlay ...")
        save_keyframe_overlays_from_frames(video_path, frames_raw, frames_out, kf_idxs, pose_estimator, out_dir=keyframes_dir, visibility_th=VIS_TH, verbose=verbose)

    # 8) Output finale
    out_obj = dict(meta_in)
    out_obj.setdefault('analysis', {})
    out_obj['analysis'].update({
        'pose': {'fps_effective': TARGET_FPS, 'dominant_view': dom_view},
        'clip_quality': qclip,
        'rep_detection': {
            'signal_used': used_signal,
            'rep_count': len(reps),
            'keyframes_indices': kf_idxs
        },
        'frame_features': frames_out,
        'time_series': ts
    })

    if out_json_path is not None:
        with open(out_json_path, 'w', encoding='utf-8') as f:
            json.dump(out_obj, f, ensure_ascii=False, indent=2)
        vprint(verbose, f"[done] output salvato in: {out_json_path}")

    return out_obj



In [15]:
# Esempio d'uso:
pose_est = PoseEstimator(static_image_mode=False, model_complexity=1, enable_segmentation=False)
meta = load_meta_json("data/benchmark/scraped_json/BWSQUAT.json")
result = process_clip_crossview_verbose(meta, pose_estimator=pose_est, out_json_path="output_crossview_verbose.json",
                                        user_hint={'height_cm': 160}, save_keyframes=True, keyframes_dir="kf_out", verbose=True)
print("Reps:", result['analysis']['rep_detection']['rep_count'])

[start] file='data/benchmark/clip/10526.mp4'  TARGET_FPS=15
[extract] frame=   0 t= 0.00s  visible%=100.0  kp2d=33  kp3d=33


I0000 00:00:1757494693.117180 8636024 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.4), renderer: Apple M4 Max
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1757494693.153926 8636144 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1757494693.162110 8636144 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1757494693.174010 8636145 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.


[extract] frame=  30 t= 2.00s  visible%=100.0  kp2d=33  kp3d=33
[extract] frame=  60 t= 4.00s  visible%=100.0  kp2d=33  kp3d=33
[extract] frame=  90 t= 6.01s  visible%=100.0  kp2d=33  kp3d=33
[extract] frame= 120 t= 8.01s  visible%=100.0  kp2d=33  kp3d=33
[extract] frame= 150 t=10.01s  visible%=100.0  kp2d=33  kp3d=33
[extract] frame= 180 t=12.01s  visible%=100.0  kp2d=33  kp3d=33
[extract] tot_frame_analizzati=204  (downsample a ~15 fps)
[presence] media visibilità: 1.000
[kalman] smoothing world landmarks ...
[features] calcolo feature 3D essenziali ...
[reps] rilevazione ripetizioni ...
[reps] segnale='hip_L_deg_3d'  reps=4  med_prom=128.33  med_rom=129.05
[reps] conteggio ripetizioni = 4  | keyframes estratti = 20
[timeseries] estrazione serie e ROM ...
[view] dominante: side
[viz] salvataggio keyframes con overlay ...
[viz] keyframes salvati: 20 in 'kf_out'
[done] output salvato in: output_crossview_verbose.json
Reps: 4


In [16]:
result

{'canonical_key': ['WeightExercises', 'BWSquat'],
 'name': 'Squat',
 'url': 'https://exrx.net/WeightExercises/GluteusMaximus/BWSquat',
 'tools': ['bodyweight'],
 'sections': {'classification': {'utility': 'Basic',
   'mechanics': 'Compound',
   'force': 'Push'},
  'preparation': 'Stand with arms extended forward.',
  'execution': 'Squat down by bending hips back while allowing knees to bend forward slightly, keeping back straight and knees pointed same direction as feet. Descend until thighs are just past parallel to floor. Squat up by extending knees and hips until legs are straight. Return and repeat.',
  'comments': 'Keep head facing forward, back straight, chest high, and feet flat on surface with equal distribution of weight through forefoot and heel. Knees should point same direction as feet throughout movement. Arms positioned forward allows torso to be positioned more upright. See Squat Analysis .\nEasier\nMovement can be made easier by holding onto rails or back of chairs to e