In [4]:
import numpy as np
import pandas as pd
from joblib import load
from pyriemann.estimation import Covariances
from pyriemann.tangentspace import TangentSpace

# === 1. Charger les fichiers test ===
guided_X = np.load("guided_testset_X.npy")     # (1660, 8, 500)
free_X = np.load("freemoves_testset_X.npy")    # (1540, 8, 500)

# === 2. Fonction de features temporelles ===
from sklearn.base import BaseEstimator, TransformerMixin
class TimeDomainFeatureExtractor(BaseEstimator, TransformerMixin):
    def __init__(self, mpr_thresh='std'):
        """
        mpr_thresh: threshold for MPR (default: one std per channel)
        """
        self.mpr_thresh = mpr_thresh
    def fit(self, X, y=None):
        # No fitting necessary for this transformer
        return self
    def transform(self, X):
        # X shape: (n_samples, n_channels, window_size)
        n_samples, n_channels, window_size = X.shape
        features = []
        for i in range(n_samples):
            feats = []
            for ch in range(n_channels):
                x = X[i, ch, :]
                # MAV: Mean Absolute Value
                mav = np.mean(np.abs(x))
                # RMS: Root Mean Square
                rms = np.sqrt(np.mean(x**2))
                # VAR: Variance
                var = np.var(x, ddof=1)
                # STD: Standard Deviation
                std = np.std(x, ddof=1)
                # ZC: Zero Crossings (number of times signal changes sign)
                zc = np.sum(np.diff(np.sign(x)) != 0)
                # MPR: Myopulse Percentage Rate (fraction of samples above threshold)
                thresh = std if self.mpr_thresh == 'std' else self.mpr_thresh
                mpr = np.mean(np.abs(x) > thresh)
                feats.extend([mav, rms, var, std, zc, mpr])
            features.append(feats)
        return np.array(features)



# === 4. Extraire les features ===



# === 5. Prédictions



In [5]:
# === 3. Charger les modèles entraînés ===
model_time_guided = load("model_guided_time.joblib")
model_cov_guided = load("model_guided_cov.joblib")
stacking_guided = load("stacking_model_guided.joblib")

model_time_free = load("model_free_time.joblib")
model_cov_free = load("model_free_cov.joblib")

In [10]:
guided_X_reshaped = guided_X.reshape(-1, 8, 500)  # (5*332, 8, 500)
free_X_reshaped = free_X.reshape(-1, 8, 500)  # (5*332, 8, 500)


In [13]:
# --- Guided ---
X_guided_time = TimeDomainFeatureExtractor().transform(guided_X_reshaped)
X_guided_cov = Covariances(estimator='oas').fit_transform(guided_X_reshaped)
X_guided_tan = TangentSpace().fit(Covariances().fit_transform(guided_X_reshaped)).transform(X_guided_cov)

# --- Free ---
X_free_time = TimeDomainFeatureExtractor().transform(free_X_reshaped)
X_free_cov = Covariances(estimator='oas').fit_transform(free_X_reshaped)
X_free_tan = TangentSpace().fit(Covariances().fit_transform(free_X_reshaped)).transform(X_free_cov)

In [14]:
# --- Guided: stacking ---
y_guided_time = model_time_guided.predict(X_guided_time)
y_guided_cov = model_cov_guided.predict(X_guided_tan)
X_guided_meta = np.concatenate([y_guided_time, y_guided_cov], axis=1)
guided_preds = stacking_guided.predict(X_guided_meta)  # (1660, 51)

In [15]:
# --- Free: mean ensemble ---
y_free_time = model_time_free.predict(X_free_time)
y_free_cov = model_cov_free.predict(X_free_tan)
free_preds = (y_free_time + y_free_cov) / 2            # (1540, 51)

# === 6. Fusionner les prédictions
final_preds = np.vstack((guided_preds, free_preds))    # (3200, 51)

In [16]:
# === 7. Sauvegarder dans un fichier CSV
df = pd.DataFrame(final_preds)
df.to_csv("team_submission.csv", index=False, header=False)

print("✅ Fichier 'team_submission.csv' généré avec succès.")

✅ Fichier 'team_submission.csv' généré avec succès.


## Visualisation

sEMG: (5 sessions, 8 électrodes, 230 000 échantillons)
Joint angles: (5 sessions, 51 articulations, 230 000 échantillons)
:
5 sessions : Probablement 5 enregistrements distincts (ex: 5 répétitions de gestes guidés/libres).

8 électrodes : Signaux sEMG capturés par 8 capteurs sur l’avant-bras.

51 articulations : Angles de chaque jointure de la main (doigts, poignet).

230 000 échantillons : Durée d’enregistrement ≈ 225 secondes (à 1024 Hz, 230000/1024 ≈ 224.6s)

In [None]:
# Vérifier que chaque session a le même nombre d’échantillons
assert emg_data.shape[0] == joint_angles.shape[0], "Nombre de sessions différent"
assert emg_data.shape[2] == joint_angles.shape[2], "Nombre d’échantillons différent par session"


In [None]:
emg_merged = emg_data.reshape(-1, 8, 230000)  # Garde la structure (5,8,230k)
# Ou emg_flat = emg_data.reshape(-1, 8) pour un tableau 2D (total_samples, 8)
session_id = 0  # Première session
emg_session = emg_data[session_id]  # Shape: (8, 230000)
angles_session = joint_angles[session_id]  # Shape: (51, 230000)
# Choisir une articulation (ex: index finger) et une électrode
joint_id = 10  # À adapter selon la documentation
electrode_id = 0

plt.figure(figsize=(12, 4))
plt.plot(emg_session[electrode_id, :500], label='sEMG Electrode 0')
plt.plot(angles_session[joint_id, :500], label='Joint Angle', alpha=0.7)
plt.title('sEMG vs Joint Angle (first 500 samples)')
plt.legend()
plt.show()




In [None]:
import numpy as np

def sliding_window(data, window_size, step_size):
    """
    Splits data into overlapping windows.
    Arguments:
        data: np.ndarray, shape (n_channels, n_samples)
        window_size: int, number of samples per window
        step_size: int, number of samples to move the window
    Returns:
        windows: np.ndarray, shape (n_windows, n_channels, window_size)
    """
    n_channels, n_samples = data.shape
    n_windows = 1 + (n_samples - window_size) // step_size
    windows = np.zeros((n_windows, n_channels, window_size))
    for i in range(n_windows):
        start = i * step_size
        end = start + window_size
        windows[i] = data[:, start:end]
    return windows


In [None]:
# Parameters
window_size = 500      # 500 samples per window
overlap = 250          # 250 samples overlap (50%)
step_size = window_size - overlap

# Example: use session 0
emg_session = emg_data[0]        # shape: (8, 230000)
angles_session = joint_angles[0] # shape: (51, 230000)

# Windowing
emg_windows = sliding_window(emg_session, window_size, step_size)         # shape: (n_windows, 8, 500)
angles_windows = sliding_window(angles_session, window_size, step_size)   # shape: (n_windows, 51, 500)

print(f"EMG windows shape: {emg_windows.shape}")
print(f"Angles windows shape: {angles_windows.shape}")


In [None]:
import numpy as np

def extract_time_features(windows):
    """
    Extracts time-domain features from sEMG windows.
    Args:
        windows: np.ndarray, shape (n_windows, n_channels, window_size)
    Returns:
        features: np.ndarray, shape (n_windows, n_channels * n_features)
    """
    n_windows, n_channels, window_size = windows.shape
    feature_list = []
    for w in range(n_windows):
        feats = []
        for c in range(n_channels):
            signal = windows[w, c, :]
            # Mean Absolute Value (MAV)
            mav = np.mean(np.abs(signal))
            # Root Mean Square (RMS)
            rms = np.sqrt(np.mean(signal ** 2))
            # Variance
            var = np.var(signal)
            # Standard Deviation
            std = np.std(signal)
            # Zero Crossing (ZC)
            zc = np.sum(np.diff(np.sign(signal)) != 0)
            feats.extend([mav, rms, var, std, zc])
        feature_list.append(feats)
    features = np.array(feature_list)
    return features


In [None]:
# Suppose emg_windows is already created (shape: n_windows, 8, 500)
emg_features = extract_time_features(emg_windows)  # shape: (n_windows, 8*5) = (n_windows, 40)

print(f"Extracted features shape: {emg_features.shape}")


In [None]:
# angles_windows: (n_windows, 51, 500)
# Take the mean for each joint in each window
target_angles = np.mean(angles_windows, axis=2)  # shape: (n_windows, 51)
print(f"Target angles shape: {target_angles.shape}")


Ridge Regression	Rapide, interprétable	    Limité aux relations linéaires
SVR (RBF)	        Capture des non-linéarités	Lent pour grands jeux de données

## Regression linéaire

In [None]:
from sklearn.linear_model import Ridge
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import numpy as np

# Split data into training and testing sets (80/20)
X_train, X_test, y_train, y_test = train_test_split(
    emg_features,          # Features (shape: n_windows, 40)
    target_angles,         # Targets (shape: n_windows, 51)
    test_size=0.2,
    random_state=42
)

# Initialize and train Ridge Regression model
ridge_model = Ridge(alpha=1.0)  # alpha = regularization strength
ridge_model.fit(X_train, y_train)

# Predict and evaluate
y_pred = ridge_model.predict(X_test)
rmse = np.sqrt(np.mean((y_test - y_pred)**2))
print(f"Ridge RMSE: {rmse:.3f}")  # Ex: 0.456


## Regression de support vectorielle avec noyau RBF

In [None]:
from sklearn.svm import SVR
from sklearn.multioutput import MultiOutputRegressor

# SVR ne gère pas les sorties multiples → On utilise MultiOutputRegressor
svr = SVR(kernel='rbf', C=1.0, epsilon=0.1)  # noyau Gaussien
svr_model = MultiOutputRegressor(svr)        # Entraîne un SVR par articulation

svr_model.fit(X_train, y_train)
y_pred_svr = svr_model.predict(X_test)
rmse_svr = np.sqrt(mean_squared_error(y_test, y_pred_svr))
print(f"SVR RMSE: {rmse_svr:.3f}")  # Ex: 0.482


Ridge  est nettement plus performant ici, avec un RMSE presque 2 fois plus petit que celui de SVR.
Cela signifie que les prédictions Ridge sont, en moyenne, beaucoup plus proches des vraies valeurs.
Moins d'erreur = meilleur ajustement sur les données.

In [None]:
from sklearn.model_selection import GridSearchCV

# Define the grid of alpha values to test
param_grid = {'alpha': [0.01, 0.1, 1, 10, 100]}

ridge = Ridge()
grid_search = GridSearchCV(ridge, param_grid, cv=5, scoring='neg_mean_squared_error')
grid_search.fit(X_train, y_train)

print(f"Best alpha for Ridge: {grid_search.best_params_['alpha']}")
print(f"Best RMSE: {np.sqrt(-grid_search.best_score_):.3f}")


Nous allons faire l'optimisation des paramètres sur la SVR, mais elle ne sera retenue pour la suite.

In [None]:
from sklearn.svm import SVR
from sklearn.multioutput import MultiOutputRegressor

param_grid = {
    'estimator__C': [0.1, 1, 10],
    'estimator__gamma': ['scale', 0.01, 0.1]
}

svr = SVR(kernel='rbf')
multi_svr = MultiOutputRegressor(svr)

grid_search_svr = GridSearchCV(multi_svr, param_grid, cv=3, scoring='neg_mean_squared_error')
grid_search_svr.fit(X_train, y_train)

print(f"Best params for SVR: {grid_search_svr.best_params_}")
print(f"Best RMSE: {np.sqrt(-grid_search_svr.best_score_):.3f}")


Visualisation des predictions pour la premiere articulation

In [None]:
import matplotlib.pyplot as plt

# Choose a joint to visualize (e.g., joint 0)
joint_idx = 0

plt.figure(figsize=(12, 5))
plt.plot(y_test[:, joint_idx], label='True angle')
plt.plot(y_pred[:, joint_idx], label='Ridge prediction', alpha=0.7)
plt.plot(y_pred_svr[:, joint_idx], label='SVR prediction', alpha=0.7)
plt.title(f'Prediction vs True for Joint {joint_idx}')
plt.xlabel('Test window')
plt.ylabel('Angle')
plt.legend()
plt.show()


In [None]:
import numpy as np

# Suppose you have a list of trained models and their predictions:
# predictions_list = [y_pred_model1, y_pred_model2, y_pred_model3, ...]
# Each y_pred_modelX shape: (n_samples, n_targets)

# Simple average ensemble
ensemble_pred_mean = np.mean(predictions_list, axis=0)


In [None]:
from sklearn.linear_model import Ridge
from sklearn.model_selection import train_test_split

# Stack predictions as features for meta-learner
X_meta = np.concatenate(predictions_list, axis=1)  # shape: (n_samples, n_models * n_targets)

# Split for meta-learner training
X_meta_train, X_meta_test, y_meta_train, y_meta_test = train_test_split(
    X_meta, y_true, test_size=0.2, random_state=42
)

meta_learner = Ridge()
meta_learner.fit(X_meta_train, y_meta_train)
ensemble_pred_stack = meta_learner.predict(X_meta_test)


In [None]:
# Suppose you have computed RMSE for each model on each protocol
rmse_guided = [rmse_model1_guided, rmse_model2_guided, ...]
rmse_free = [rmse_model1_free, rmse_model2_free, ...]

best_model_guided = np.argmin(rmse_guided)
best_model_free = np.argmin(rmse_free)

print(f"Best model for guided gestures: Model {best_model_guided}")
print(f"Best model for free gestures: Model {best_model_free}")


In [None]:
plt.scatter(y_true[:, joint_idx], ensemble_pred_mean[:, joint_idx], alpha=0.5)
plt.xlabel('True Values')
plt.ylabel('Predicted Values')
plt.title(f'True vs Predicted for Joint {joint_idx}')
plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'k--')  # Perfect prediction line
plt.show()


In [None]:
from sklearn.feature_selection import SelectKBest, f_regression

# Sélection des 10 meilleures features pour le premier angle articulaire
selector = SelectKBest(score_func=f_regression, k=10)
selector.fit(X_feat, y[:, 0])

plt.figure(figsize=(12,4))
plt.bar(range(X_feat.shape[1]), selector.scores_)
plt.xlabel("Feature Index")
plt.ylabel("F-score")
plt.title("SelectKBest Feature Scores (first joint angle)")
plt.show()


In [None]:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import Ridge
from sklearn.multioutput import MultiOutputRegressor
from sklearn.feature_selection import SelectKBest, f_regression

pipeline = Pipeline([
    ('features', TimeDomainFeatureExtractor()),
    ('select', SelectKBest(score_func=f_regression, k=20)),
    ('regressor', MultiOutputRegressor(Ridge()))
])


In [None]:
from sklearn.model_selection import GroupKFold, cross_val_score
from sklearn.metrics import make_scorer
import numpy as np

def rmse(y_true, y_pred):
    return np.sqrt(np.mean((y_true - y_pred) ** 2))

rmse_scorer = make_scorer(rmse, greater_is_better=False)
cv = GroupKFold(n_splits=5)

scores = cross_val_score(
    pipeline, X, y,
    groups=groups,
    cv=cv,
    scoring=rmse_scorer
)

print("Pipeline RMSE per fold:", -scores)
print("Pipeline RMSE mean:", -np.mean(scores))


In [None]:
from pyriemann.estimation import Covariances
from pyriemann.tangentspace import TangentSpace

pipeline = Pipeline([
    ('cov', Covariances(estimator='oas')),
    ('ts', TangentSpace()),
    ('reg', MultiOutputRegressor(Ridge()))
])

scores = cross_val_score(
    pipeline, X, y,
    groups=groups, cv=cv,
    scoring=rmse_scorer
)

print("Covariance pipeline RMSE per fold:", -scores)
print("Covariance pipeline RMSE mean:", -np.mean(scores))
