In [2]:
import os
import pathlib
import numpy as np
import cv2
from tensorflow import keras
from keras import layers, preprocessing, callbacks, losses, utils, models
import sys
import PyQt5
from PyQt5.QtWidgets import QApplication, QLabel, QWidget, QVBoxLayout, QHBoxLayout, QGridLayout,QFrame
from PyQt5.QtGui import QPixmap, QPainter, QPen, QColor
from PyQt5.QtCore import Qt
import time
import glob
from collections import defaultdict


In [None]:
# =========================
# 1) Parsing des fichiers
# =========================

def load_sequence_txt(path):
    """
    Lit un .txt où chaque ligne est une frame : "v1;v2;...;vN;"
    -> retourne un numpy array (T, D) où T = nb de frames, D = nb de features par frame
    """
    feats = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            parts = [p for p in line.strip().split(";") if p.strip() != ""]
            if not parts:
                continue
            vec = list(map(float, parts))
            feats.append(vec)
    X = np.array(feats, dtype=np.float32)  # (T, D)
    return X

def parse_annotations(path):
    """
    Fichier d’annotations, format (extraits) :
      1;DENY;670;751;CIRCLE;1610;1655; ...
    => Pour chaque ID, on récupère une liste [(label, start, end), ...]
    """
    ann = defaultdict(list)
    with open(path, "r", encoding="utf-8") as f:
        for raw in f:
            raw = raw.strip()
            if not raw:
                continue
            parts = [p for p in raw.split(";") if p != ""]
            # parts = [id, L1, s1, e1, L2, s2, e2, ...]
            seq_id = parts[0]
            triples = parts[1:]
            # lire par paquets de 3 (label, start, end)
            for i in range(0, len(triples), 3):
                label = triples[i]
                start = int(triples[i+1])
                end   = int(triples[i+2])
                ann[seq_id].append((label, start, end))
    return ann

In [None]:
# =========================
# 2) Prétraitements
# =========================

def normalize_framewise(X):
    """
    Normalisation simple frame-par-frame :
    - centrage par la moyenne
    - mise à l’échelle par l’écart-type
    Remarque : si tes features sont des coordonnées articulaires (x,y,z) concaténées,
    tu peux remplacer par un centrage/scale géométrique (soustraire le 'poignet', etc.).
    """
    Xn = X.copy()
    mu = Xn.mean(axis=1, keepdims=True)      # (T,1)
    sigma = Xn.std(axis=1, keepdims=True) + 1e-8
    Xn = (Xn - mu) / sigma
    return Xn

def slice_segment(X, start, end):
    """
    Découpe X (T,D) sur l’intervalle [start, end] inclusif (ou semi-ouvert)
    On borne pour éviter les dépassements.
    """
    T = len(X)
    s = max(0, start)
    e = min(T, end)
    if e <= s:
        return None
    return X[s:e]

In [None]:
# =========================
# 3) Construction dataset
# =========================

def build_dataset_from_folder(seq_folder, ann_path, max_len=200):
    """
    - lit toutes les séquences *.txt du dossier
    - récupère les segments annotés pour chaque ID
    - normalise, pad/tronque à max_len
    Retourne X (N, max_len, D), y (N,), class_names
    """
    annotations = parse_annotations(ann_path)
    sequences = sorted(glob.glob(os.path.join(seq_folder, "*.txt")))
    X_list, y_list = [], []
    label_to_id = {}
    next_id = 0

    for path in sequences:
        # ID = nom de fichier sans extension
        seq_id = os.path.splitext(os.path.basename(path))[0]
        if seq_id not in annotations:
            continue
        X = load_sequence_txt(path)          # (T, D)
        X = normalize_framewise(X)

        for (label, start, end) in annotations[seq_id]:
            seg = slice_segment(X, start, end)
            if seg is None: 
                continue
            # padding/tronquage à max_len
            if len(seg) >= max_len:
                seg = seg[:max_len]
            else:
                pad = np.zeros((max_len - len(seg), seg.shape[1]), dtype=seg.dtype)
                seg = np.vstack([seg, pad])
            # map label -> id
            if label not in label_to_id:
                label_to_id[label] = next_id
                next_id += 1
            y = label_to_id[label]
            X_list.append(seg)
            y_list.append(y)

    if not X_list:
        raise RuntimeError("Aucun segment construit (vérifie les chemins et les IDs).")

    X = np.stack(X_list, axis=0)          # (N, max_len, D)
    y = np.array(y_list, dtype=np.int64)  # (N,)
    # classes triées par id
    id_to_label = {v:k for k,v in label_to_id.items()}
    class_names = [id_to_label[i] for i in range(len(id_to_label))]
    return X, y, class_names

In [None]:
# =========================
# 4) Modèle (BiLSTM)
# =========================

def make_model(T, D, C):
    """
    T = longueur max (max_len)
    D = nb features par frame
    C = nb de classes
    """
    inp = layers.Input(shape=(T, D))
    x = layers.Masking(mask_value=0.0)(inp)
    x = layers.Bidirectional(layers.LSTM(128, return_sequences=True))(x)
    x = layers.Bidirectional(layers.LSTM(64))(x)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    out = layers.Dense(C, activation='softmax', dtype='float32')(x)
    model = models.Model(inp, out)
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

In [None]:
# =========================
# 5) Entraînement + test
# =========================

# Dossier où se trouvent 1.txt, 2.txt, ...
seq_folder = "/Users/valentindaveau/Documents/UE_Vision/training_set/sequences/"  # adapte si besoin
ann_path   = "annotations_revised_training.txt"

# Construire le dataset
X, y, class_names = build_dataset_from_folder(seq_folder, ann_path, max_len=200)
print("Dataset:", X.shape, y.shape, class_names)

# Split simple (train/val)
idx = np.arange(len(X))
np.random.shuffle(idx)
split = int(0.8 * len(X))
tr, va = idx[:split], idx[split:]
Xtr, ytr = X[tr], y[tr]
Xva, yva = X[va], y[va]

T, D = X.shape[1], X.shape[2]
C    = len(class_names)

model = make_model(T, D, C)
history = model.fit(Xtr, ytr, validation_data=(Xva, yva), epochs=20, batch_size=32)

In [None]:
# =========================
# 6) Enlève index 3 (pouce) de MediaPipe
# =========================


KEEP_IDX_20 = [0, 1, 2, 4,  5,6,7,8,  9,10,11,12,  13,14,15,16,  17,18,19,20]

def mp21_to_dataset20(mp21_xyz: np.ndarray) -> np.ndarray:
    """
    mp21_xyz: array (21, 3) en ordre MediaPipe (channels_last)
    retourne: array (20, 3) dans l’ordre de ton dataset
    """
    assert mp21_xyz.shape == (21, 3), f"attendu (21,3), reçu {mp21_xyz.shape}"
    return mp21_xyz[KEEP_IDX_20, :]

In [None]:
# =========================
# 7) Normalisation données Mediapipe
# =========================


def normalize_landmarks(X20: np.ndarray) -> np.ndarray:
    """
    X20: (20,3) ordre dataset.
    - centre sur le poignet (point 0)
    - mise à l’échelle par une distance de référence stable (ex: WRIST→MIDDLE_MCP)
    """
    wrist = X20[0]
    Xc = X20 - wrist
    # distance de ref: WRIST (0) -> MIDDLE MCP (index MediaPipe 9, devenu X20[8] après mapping)
    scale = np.linalg.norm(Xc[8]) + 1e-8
    return Xc / scale

In [None]:
# =========================
# 8) Inférence (sortie)
# =========================

def predict_sequence(model, path_txt, max_len=200):
    X = load_sequence_txt(path_txt)
    X = normalize_framewise(X)
    # simple agrégation : on prend toute la séquence tronquée/paddée
    if len(X) >= max_len:
        Xp = X[:max_len]
    else:
        pad = np.zeros((max_len - len(X), X.shape[1]), dtype=X.dtype)
        Xp = np.vstack([X, pad])
    Xp = np.expand_dims(Xp, axis=0)  # (1, T, D)
    prob = model.predict(Xp, verbose=0)[0]  # (C,)
    k = int(np.argmax(prob))
    return k, float(prob[k])

k, p = predict_sequence(model, "1.txt", max_len=200)
print(f"Prédiction pour 1.txt : {class_names[k]} (p={p:.2f})")