# Predictions

## Prérequis

In [1]:

%pip install pandas numpy scikit-learn matplotlib
%pip install pyyaml h5py

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install tensorflow[and-cuda]

Note: you may need to restart the kernel to use updated packages.


## Initialisation

In [3]:
from IPython.display import HTML

import os 
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

import tensorflow as tf

2026-01-19 15:52:00.316060: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [4]:
# Constants
parties_du_corps=["Bassin","HancheD","GenouD","ChevilleD","HancheG","GenouG","ChevilleG","Colonne","Thorax","Cou","Tete","EpauleG","CoudeG","PoignetG","EpauleD","CoudeD","PoignetD"]
path="./data/points"
SQUELETTE = [
    (0,1),(1,2),(2,3),        # jambe droite
    (0,4),(4,5),(5,6),        # jambe gauche
    (0,7),(7,8),(8,9),(9,10), # colonne
    (8,11),(11,12),(12,13),   # bras gauche
    (8,14),(14,15),(15,16)   # bras droit
]


In [5]:
#import des points
points = os.listdir(path)
points = [f for f in points if os.path.isfile(os.path.join(path, f))]
datas=[]
for ele in points:
    df=pd.DataFrame(pd.read_json(os.path.join(path, ele))['instances'].str[0].str.get('keypoints').to_list())
    df.columns=parties_du_corps
    datas.append(df)
del points
del df
del ele

### formatage des données
Nous considérerons un jeu de données séquentielle par vidéo.
Chaque vecteur d'entrées du modèle fera 17x3 en dimension, soit les coordonées des 17 membres enregistré.


`in` : ${\mathbb{R}^{3}}^{17}$

On ressors une prédiction du même type, mais centré sur le bassin :

`out` : ${\mathbb{R}^{3}}^{17}$


In [6]:
def centrage_bassin(seq):
    bass = seq[:, 0:1, :] 
    return seq-bass


def compute_velocity(sequence):
    vel = np.zeros_like(sequence)
    vel[1:] = sequence[1:] - sequence[:-1]
    return vel

# formatage des données :
fdatas = [
    centrage_bassin(np.array([
        np.array([np.array(df[dt][i]) for dt in df]) 
        for i in range(df.shape[0])
    ]))
    for df in datas
]
del datas

PointSequence : type = type(fdatas[0])
print(PointSequence)

<class 'numpy.ndarray'>


In [None]:
# calcul des statistiques
all_pos = np.concatenate(fdatas, axis=0)
all_pos_flat = all_pos.reshape(-1, 51)

# Statistiques pour les positions
MEAN_POS = all_pos_flat.mean(axis=0)
STD_POS = all_pos_flat.std(axis=0) + 1e-8

# de même pour les vitesses
fdatas_vel = [compute_velocity(seq) for seq in fdatas]
all_vel = np.concatenate(fdatas_vel, axis=0)
all_vel_flat = all_vel.reshape(-1, 51)

MEAN_VEL = all_vel_flat.mean(axis=0)
STD_VEL = all_vel_flat.std(axis=0) + 1e-8

# et pour les os
BONE_LENGTHS = {}
for i, j in SQUELETTE:
    dists = np.linalg.norm(all_pos[:, i] - all_pos[:, j], axis=1)
    BONE_LENGTHS[(i, j)] = np.mean(dists)

print(f"frames traitées : {len(all_pos)}")

Nombre total de frames traitées : 384446


### Fonctions utilitaires

In [None]:
def plot_animation(sequence, title=""):
    T, _, _ = sequence.shape

    fig = plt.figure()
    ax = fig.add_subplot(111, projection="3d")
    ax.set_title(title)

    # limites fixes
    mins = sequence.min(axis=(0,1))
    maxs = sequence.max(axis=(0,1))

    ax.set_xlim(mins[0], maxs[0])
    ax.set_ylim(mins[1], maxs[1])
    ax.set_zlim(mins[2], maxs[2])

    ax.set_xlabel("X")
    ax.set_ylabel("Y")
    ax.set_zlabel("Z")

    pts0 = sequence[0]

    points = ax.scatter(
        pts0[:,0], pts0[:,1], pts0[:,2],
        c="red", s=40
    )

    lines = []
    for i, j in SQUELETTE:
        line, = ax.plot(
            [pts0[i,0], pts0[j,0]],
            [pts0[i,1], pts0[j,1]],
            [pts0[i,2], pts0[j,2]],
            c="black"
        )
        lines.append(line)

    def update(frame):
        pts = sequence[frame]

        points._offsets3d = (pts[:,0], pts[:,1], pts[:,2])

        for line, (i, j) in zip(lines, SQUELETTE):
            line.set_data([pts[i,0], pts[j,0]],
                          [pts[i,1], pts[j,1]])
            line.set_3d_properties([pts[i,2], pts[j,2]])

        return [points] + lines

    return FuncAnimation(
        fig,
        update,
        frames=T,
        interval=33,
        blit=False
    )



## Premier Jet

Nous allons tenté d'utiliser un [LSTM](https://en.wikipedia.org/wiki/Long_short-term_memory)

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Input

In [None]:
WINDOW_SIZE = 30

lstm_simple = Sequential([
    Input((WINDOW_SIZE,51)),
    LSTM(128, return_sequences=True),
    LSTM(128),
    Dense(51)
])

lstm_simple.compile(optimizer='adam', loss='mse')

Nous faison rentrer dans notre modèle les 30 dernières positions connue, pour qu'il prédise la suivante

In [None]:

def make_data_for_lstm_simple(data: list[PointSequence], window=WINDOW_SIZE):
    # passage sous forme de "fenêtre"
    def create_sequence(seq: PointSequence):
        x, y = [], []
        for i in range(len(seq) - window):
            x.append(seq[i : i + window])
            y.append(seq[i + window])
        return np.array(x), np.array(y)

    X_all, Y_all = [], []

    for seq in data:
        X, Y = create_sequence(seq)
        X_all.append(X)
        Y_all.append(Y)

    X_all = np.concatenate(X_all)
    Y_all = np.concatenate(Y_all)

    # reshape (17,3) -> 51
    X_all = X_all.reshape(X_all.shape[0], X_all.shape[1], -1)
    Y_all = Y_all.reshape(Y_all.shape[0], -1)
    # normalization
    EPSILON = 1e-8
    mean = X_all.mean(axis=(0, 1))
    std = X_all.std(axis=(0, 1)) + EPSILON

    X_all = (X_all - mean) / std
    Y_all = (Y_all - mean) / std

    return X_all, Y_all, mean, std


def inverse_lstm_output(pred_seq, mean, std):
    pred_seq = np.array(pred_seq)

    # 2D
    if pred_seq.ndim == 3 and pred_seq.shape[1:] == (17,3):
        T = pred_seq.shape[0]
        pred_seq = pred_seq.reshape(T, 51)
    
    # reshape mean et std
    mean_flat = mean.reshape(1, 51)
    std_flat = std.reshape(1, 51)

    # dénormalisation
    pred_seq = pred_seq * std_flat + mean_flat

    # -> (T,17,3)
    pred_seq = pred_seq.reshape(pred_seq.shape[0], 17, 3)

    return pred_seq


In [None]:
# entrainement du modèle
train, test= train_test_split(fdatas[:150])

X, Y, _, _ = make_data_for_lstm_simple(train)

lstm_simple.fit(X, Y, epochs=20, batch_size=64, validation_split=0.2)

In [None]:
#test
X_t, Y_t, mean_t, std_t = make_data_for_lstm_simple(test)
mean_mse = 0
Y_pred = lstm_simple.predict(X_t)
for (y_t, y_p) in zip(Y_t, Y_pred):
    mean_mse += mean_squared_error(y_t, y_p)
mean_mse /= len(Y_pred)

print(f"mean mse : {mean_mse}")


Comparaison des prédictions avec les vraies valeurs

In [None]:
anim1 = plot_animation(test[0][30:150], title="True")
anim2 = plot_animation(inverse_lstm_output(Y_pred, mean_t, std_t)[0:120], title="Pred")

In [None]:
HTML(anim1.to_jshtml()+anim2.to_jshtml())

passage en auto régression  
le modèle s'appuie sur ses dernières valeurs prédit pour en prédire de nouvelles

In [None]:

def auto_pred(model, input, nbframe):
    window = input.copy()
    pred = []

    for _ in range(nbframe):
        y = model.predict(window[None], verbose=0)

        pred.append(y)

        window = np.vstack([window[1:], y])
    
    return np.array(pred)


In [None]:
input = Y_t[0:WINDOW_SIZE]
Y_pred = auto_pred(lstm_simple, input, 120)

In [None]:
Y_pred_seq = inverse_lstm_output(Y_pred, mean_t, std_t)
anim = plot_animation(Y_pred_seq)
HTML(anim.to_jshtml())


Le résultat n'est pas très bon...

Identifions les différents problèmes :
- le modèle ne "sait" pas que les os ont une taille fixe.
- les erreurs s'accumules avec le temps
- au lieu de prédire 1 frame à la fois, nous ferions mieux d'entrainer le modèle à prédire plusieur frames à la fois


In [None]:
# on sauvegrade le modèle :
lstm_simple.save('./models/lstm_simple.keras')

## 2ème jet :
Nous allons prendre en entré/sortie la vitesse, au lieu de la position.

In [None]:
def compute_position(
    vel_seq: PointSequence, star_pos: np.ndarray, bassin_en_zero=True
) -> PointSequence:
    T = vel_seq.shape[0]
    poses = np.zeros((T, 17, 3))

    poses[0] = star_pos + vel_seq[0]

    if bassin_en_zero:
        poses[0, 0] = 0
        
    for t in range(1, T):
        poses[t] = poses[t - 1] + vel_seq[t]

        if bassin_en_zero:
            poses[t, 0] = 0
    return poses

Nous prédirons aussi plusieurs frames à l'avance

In [None]:
HORIZON = 10
WINDOW_SIZE = 30
NB_FEATURES = len(parties_du_corps)*3

Changeons aussi l'architectuer du modèle, afin d'avoir une architecture plus pertinante 

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Conv1D,
    LSTM,
    Dense,
    Input,
    BatchNormalization,
    Dropout,
    Reshape,
)

def build_conv_lstm(window_size=WINDOW_SIZE, n_features=NB_FEATURES, horizon=HORIZON):
    model = Sequential(
        [
            Input((window_size, n_features)),
            Conv1D(128, 5, padding="same", activation="relu"),
            BatchNormalization(),
            Conv1D(128, 3, padding="same", activation="relu"),
            BatchNormalization(),
            LSTM(128),
            Dropout(0.3),
            Dense(horizon * n_features),
            Reshape((horizon, n_features)),
        ]
    )

    model.compile(optimizer="adam", loss="mse")
    return model

In [None]:
def make_data_for_conv_lstm(data: list[PointSequence], window=WINDOW_SIZE, horizon=HORIZON):
    X_all, Y_all = [], []

    for seq in data:
        for i in range(len(seq) - window - horizon):
            X_all.append(seq[i:i+window])
            Y_all.append(seq[i + window : i + window + horizon])

    X_all = np.array(X_all)
    Y_all = np.array(Y_all)

    X_all = X_all.reshape(X_all.shape[0], window, -1)
    Y_all = Y_all.reshape(Y_all.shape[0], horizon, -1)

    # Normalisation
    EPSILON = 1e-8
    mean = X_all.mean(axis=(0,1))
    std = (X_all.std(axis=(0,1))) + EPSILON

    X_all = (X_all - mean) / std
    Y_all = (Y_all - mean) / std

    return X_all, Y_all, mean, std


def inverse_conv_lstm_output(pred, mean, std):
    pred = pred*std+mean
    return pred.reshape(pred.shape[0], HORIZON, 17,3)



In [None]:
# on prend donc les vitesses

fdatas_vel = [compute_velocity(centrage_bassin(d)) for d in fdatas]
train, test = train_test_split(fdatas_vel[:150])

X_train, Y_train, mean, std = make_data_for_conv_lstm(train)

In [None]:
conv_lstm = build_conv_lstm()

In [None]:
conv_lstm.fit(X_train, Y_train, epochs=20, batch_size=64, validation_split=0.2)

In [None]:
def auto_pred_horizon(model, input, nb_frame):
    window = input.copy()
    preds = []

    while len(preds) < nb_frame:
        y = model.predict(window[None], verbose=0)[0]
        preds.extend(y)

        window = np.vstack([window[HORIZON:], y])

    return np.array(preds[:nb_frame])


In [None]:
TEST_INDEX = 1

input = test[TEST_INDEX][:WINDOW_SIZE]
input = input.reshape(WINDOW_SIZE, -1)
input = (input - mean) / std

In [None]:
V_pred = auto_pred_horizon(conv_lstm, input, 120)
V_pred = V_pred.reshape(120, 17, 3)

V_true = test[TEST_INDEX][30:150]

P0 = test[TEST_INDEX][WINDOW_SIZE - 1]
P0_true = centrage_bassin(fdatas[TEST_INDEX][WINDOW_SIZE - 1][None])[0]

P_pred = compute_position(V_pred, P0)
P_true = compute_position(V_true, P0_true)


anim_true = plot_animation(P_true, title="true")
anim_pred = plot_animation(P_pred, title="pred")

In [None]:
HTML(anim_true.to_jshtml()+anim_pred.to_jshtml())

Le modèle est encore moins bon, nous allons donc faire en sorte de passer la vitesse + la position, afin d'avoir une loss qui prend en compte la distance entre les points, pour voir si celle-ci est réaliste (vis à vis des os)


In [None]:
# on sauvegrade le modèle :
conv_lstm.save('./models/conv_lstm.keras')


## 3ème jet 

Nous allons prendre un jeu de donner qui garde à la fois la vitesse et la position 

In [None]:
HORIZON = 10
WINDOW_SIZE = 30
NB_FEATURES = len(parties_du_corps)*6

In [None]:
def bone_length_loss(skeleton):
    def loss(y_true, y_pred):

        y_true = tf.reshape(y_true, (-1, tf.shape(y_true)[1], 17, 3))
        y_pred = tf.reshape(y_pred, (-1, tf.shape(y_pred)[1], 17, 3))

        bone_loss = 0.0

        for i, j in skeleton:
            true_len = tf.norm(
                y_true[:, :, i] - y_true[:, :, j],
                axis=-1
            )
            pred_len = tf.norm(
                y_pred[:, :, i] - y_pred[:, :, j],
                axis=-1
            )
            bone_loss += tf.reduce_mean(tf.square(pred_len - true_len))

        return bone_loss / len(skeleton)

    return loss

def smooth_vel_loss(horizon=HORIZON):
    def loss(_y_true, y_pred):
        yp = tf.reshape(y_pred, (-1, horizon, 17, 3))
        vel = yp[:,1:] - yp[:,:-1]
        return tf.reduce_mean(tf.square(vel))
    
    return loss


def total_loss(skeleton, base_loss = tf.losses.mse, bone_loss_w= 0.05, smooth_loss_w= 0.01, horizon=HORIZON):
    bone_loss = bone_length_loss(skeleton)
    smooth_loss = smooth_vel_loss(horizon=horizon)
    def loss(y_true, y_pred):
        return base_loss(y_true, y_pred) + bone_loss_w * bone_loss(y_true, y_pred) + smooth_loss_w* smooth_loss(y_true, y_pred)
    return loss

In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Conv1D,
    LSTM,
    Dense,
    Input,
    BatchNormalization,
    RepeatVector
)
from tensorflow.keras.callbacks import (EarlyStopping,ReduceLROnPlateau)

def build_conv_pos_vel(window_size=WINDOW_SIZE, n_features=NB_FEATURES, horizon=HORIZON, bone_loss_weight = 0.01):
    model = Sequential(
        [
            Input((window_size, n_features)),

            Conv1D(128, 5),
            BatchNormalization(),
            Conv1D(128, 3),
            BatchNormalization(),

            LSTM(256),

            RepeatVector(horizon),
            LSTM(256, return_sequences=True),

            Dense(n_features//2),
        ]
    )

    callbacks = [
        EarlyStopping(patience=10, restore_best_weights=True),
        ReduceLROnPlateau(patience=5, factor=0.5)
    ]

    model.compile(optimizer="adam", loss=total_loss(SQUELETTE, bone_loss_w=bone_loss_weight))
    return (model, callbacks)

In [None]:
def centrage_bassin(seq: PointSequence)->PointSequence:
    bass = seq[:, 0:1, :] 
    return seq-bass


def make_data_for_pos_vel(data_pos: list[PointSequence], window=WINDOW_SIZE, horizon=HORIZON,stats=None):
    X_all, Y_all = [],[]

    for pos_seq in data_pos:
        #centrage bassin
        pos_seq = centrage_bassin(pos_seq)
        # calcule de la vitesse
        vel_seq = np.zeros_like(pos_seq)
        vel_seq[1:] = pos_seq[1:] - pos_seq[:-1]
        
        # concatenation
        pos_vel = np.concatenate([pos_seq, vel_seq], axis=-1) 
        
        #création de la fenêtre
        for i in range(len(pos_vel) - window - horizon):
            X_all.append(pos_vel[i : i + window])
            Y_all.append(pos_seq[i + window : i + window + horizon, :])

    X_all = np.array(X_all).reshape(len(X_all), window, -1)
    Y_all = np.array(Y_all).reshape(len(Y_all), horizon, -1)

    #normalisation
    EPSILON = 1e-8
    if stats is None:
        # calcules des stats si en train
        mean_x = X_all.mean(axis=(0, 1))
        std_x = X_all.std(axis=(0, 1)) + EPSILON

        mean_y = Y_all.mean(axis=(0, 1))
        std_y  = Y_all.std(axis=(0, 1)) + EPSILON

    else:
        # Re-utilisation des stats fournies si en test
        mean_x, std_x, mean_y, std_y = stats

    X_norm = (X_all - mean_x) / std_x
    Y_norm = (Y_all - mean_y) / std_y

    return X_norm, Y_norm, (mean_x, std_x, mean_y, std_y)

def inverse_pos_vel_output(pred_norm, stats):
    _, _, mean_y, std_y = stats

    pred = np.array(pred_norm)

    if pred.ndim == 3:
        B, H, F = pred.shape
        pred = pred.reshape(B * H, F)
    elif pred.ndim == 2:
        pass
    else:
        raise ValueError("Format inattendu pour pred_norm")

    # dénormalisation pos
    pred = pred * std_y + mean_y

    # reshape en squelette
    pred = pred.reshape(pred.shape[0], 17, 3)

    return pred


In [None]:
train, test = train_test_split(fdatas[0:100])
X_train, Y_train, stats = make_data_for_pos_vel(train)
print("X mean abs:", np.mean(np.abs(X_train)))
print("Y mean abs:", np.mean(np.abs(Y_train)))

In [None]:
X_test, Y_test,_ = make_data_for_pos_vel(test, stats=stats)

In [None]:
model_pos_vel,callbacks = build_conv_pos_vel()

In [None]:
model_pos_vel.fit(X_train, Y_train, 
          validation_data=(X_test, Y_test),
          epochs=20, 
          batch_size=64,
          callbacks = callbacks
          )

In [None]:
TRAINING_WINDOW = 50
TRAIN_PERCENTAGE = 0.70
EPOCHS_PER_WINDOW = 5
# training 
i = 0
while(i*TRAINING_WINDOW < len(fdatas) * TRAIN_PERCENTAGE):
    i+=1
    train, test = train_test_split(fdatas[(i-1)*TRAINING_WINDOW:i*TRAINING_WINDOW])
    X_train, Y_train, stats = make_data_for_pos_vel(train)
    X_test, Y_test,_ = make_data_for_pos_vel(test, stats=stats)
    del train
    del test
    model_pos_vel.fit(X_train, Y_train, 
          validation_data=(X_test, Y_test),
          epochs=EPOCHS_PER_WINDOW, 
          batch_size=64)
    del X_train
    del Y_train
    del stats
    del X_test
    del Y_test

In [None]:
def auto_pred_pos_vel(model, seq_pos, stats, nb_frames,
                      window=WINDOW_SIZE, horizon=HORIZON):

    mean_x, std_x, _, _ = stats

    # --- préparation fenêtre initiale ---
    seq_pos = centrage_bassin(seq_pos)

    vel = np.zeros_like(seq_pos)
    vel[1:] = seq_pos[1:] - seq_pos[:-1]

    pos_vel = np.concatenate([seq_pos, vel], axis=-1)

    window_data = pos_vel[:window]        # (window,17,6)

    preds = []

    while len(preds) < nb_frames:
        # normalisation
        X = window_data.reshape(1, window, -1)
        X = (X - mean_x) / std_x

        # prédiction horizon (normalisée)
        Y_norm = model.predict(X, verbose=0)[0]   # (horizon,51)

        # inverse normalisation positions
        Y = inverse_pos_vel_output(Y_norm, stats)   # (horizon,17,3)

        # on ajoute au résultat
        for p in Y:
            preds.append(p)

        # --- mise à jour fenêtre ---
        # reconstruire pos+vel à partir des nouvelles positions
        last_pos = window_data[-1, :, :3]

        new_vel = Y - np.vstack([last_pos[None], Y[:-1]])
        new_pos_vel = np.concatenate([Y, new_vel], axis=-1)

        window_data = np.vstack([window_data[horizon:], new_pos_vel])

    return np.array(preds[:nb_frames])


In [None]:
TEST_INDEX = -1
NB_FRAMES = 120

seq = fdatas[TEST_INDEX]

train, _ = train_test_split(fdatas[0:100])
_, _, stats = make_data_for_pos_vel(train)

# prédiction longue
P_pred = auto_pred_pos_vel(
    model_pos_vel,
    seq_pos=seq,
    stats=stats,
    nb_frames=NB_FRAMES
)

P_true = centrage_bassin(seq)[WINDOW_SIZE:WINDOW_SIZE+NB_FRAMES]

anim_true = plot_animation(P_true, "true")
anim_pred = plot_animation(P_pred, "pred")


In [None]:
HTML(anim_true.to_jshtml() + anim_pred.to_jshtml())

In [None]:
# on sauvegrade le modèle :
model_pos_vel.save('./models/model_pos_vel.keras')