In [1]:
import mediapipe as mp
import numpy as np
import torch
# Importations nécessaires
import os
import torch
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as transforms
import mediapipe as mp
import numpy as np
import torch
import cv2


In [7]:
class HandFeatureExtractor:
    def __init__(self):
        self.mp_hands = mp.solutions.hands
        self.hands = self.mp_hands.Hands(
            static_image_mode=True,
            max_num_hands=1,
            min_detection_confidence=0.5
        )

    def extract_landmarks(self, image):
        # Convertir le tensor PyTorch en numpy array
        image_np = image.squeeze().numpy()  # Enlever la dimension du canal
        
        # Normaliser entre 0-255 et convertir en uint8
        image_np = (image_np * 255).astype(np.uint8)
        
        # Convertir en RGB en répétant le canal
        image_rgb = np.stack((image_np,) * 3, axis=-1)
        
        # Traitement MediaPipe
        results = self.hands.process(image_rgb)
        
        landmarks = []
        if results.multi_hand_landmarks:
            for hand_landmarks in results.multi_hand_landmarks:
                for landmark in hand_landmarks.landmark:
                    landmarks.extend([landmark.x, landmark.y, landmark.z])
        
        # Si aucun landmark n'est détecté, retourner des zéros
        if not landmarks:
            landmarks = [0.0] * 63  # 21 points * 3 coordonnées
            
        return torch.tensor(landmarks, dtype=torch.float32)


In [8]:

# Définition de la classe BriareoDataset
class BriareoDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.sequences = []  # Au lieu de samples, on stocke des séquences
        self.feature_extractor = HandFeatureExtractor()

        
        print(f"Chargement du dataset depuis {root_dir}")

        for person_id in range(0, 26):
            person_dir = f"{str(person_id).zfill(3)}"
            person_path = os.path.join(root_dir, person_dir)
            
            if not os.path.exists(person_path):
                continue
                
            for gesture_id in range(12):
                gesture_dir = f"g{str(gesture_id).zfill(2)}"
                gesture_path = os.path.join(person_path, gesture_dir)
                
                if not os.path.exists(gesture_path):
                    print(f"Skipping {gesture_path}")
                    continue
                
                for repetition_id in range(3):
                    repetition_dir = f"{str(repetition_id).zfill(2)}"
                    repetition_path = os.path.join(gesture_path, repetition_dir)
                    
                    if not os.path.exists(repetition_path):
                        print(f"Skipping {repetition_path}")
                        continue
                    
                    sequence = {
                        'person_id': person_id,
                        'gesture_id': gesture_id,
                        'repetition_id': repetition_id,
                        'frames': []
                    }
                    
                    valid_sequence = True
                    for frame_id in range(40):
                        l_img_path = os.path.join(repetition_path, 'L', 'raw', f"{str(frame_id).zfill(3)}_rl.png")
                        r_img_path = os.path.join(repetition_path, 'R', 'raw', f"{str(frame_id).zfill(3)}_rr.png")

                        
                        if os.path.exists(l_img_path) and os.path.exists(r_img_path):
                            sequence['frames'].append({
                                'frame_id': frame_id,
                                'l_img_path': l_img_path,
                                'r_img_path': r_img_path
                            })
                        else:
                            valid_sequence = False
                            print(f"Séquence incomplète : {l_img_path} ou {r_img_path} manquant")
                            break
                    
                    if valid_sequence and len(sequence['frames']) ==  40:
                        self.sequences.append(sequence)


    def __len__(self):
        return len(self.sequences)

   
    def __getitem__(self, idx):
        sequence = self.sequences[idx]
        sequence_features = []
        
        for frame in sequence['frames']:
            l_image = Image.open(frame['l_img_path']).convert('L')
            r_image = Image.open(frame['r_img_path']).convert('L')

            if self.transform:
                l_image = self.transform(l_image)
                r_image = self.transform(r_image)
            
            # Extraire les landmarks pour les deux mains
            l_landmarks = self.feature_extractor.extract_landmarks(l_image)
            r_landmarks = self.feature_extractor.extract_landmarks(r_image)
            
            # Concaténer les features des deux mains
            frame_features = torch.cat([l_landmarks, r_landmarks])
            sequence_features.append(frame_features)
        
        return {
            'features': torch.stack(sequence_features),  # [40, 126]
            'gesture_id': sequence['gesture_id']
        }
    

In [9]:

# Définir les transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# Créer l'instance du dataset
dataset = BriareoDataset(root_dir='leap_motion/train', transform=transform)

Chargement du dataset depuis leap_motion/train


I0000 00:00:1740148995.460480 7149444 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.3), renderer: Apple M2 Pro
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1740148995.478338 7152903 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1740148995.483799 7152904 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [10]:
sample = dataset[0]
print(sample)

W0000 00:00:1740148998.075193 7152908 landmark_projection_calculator.cc:186] Using NORM_RECT without IMAGE_DIMENSIONS is only supported for the square ROI. Provide IMAGE_DIMENSIONS or use PROJECTION_MATRIX.


{'features': tensor([[ 0.0000,  0.0000,  0.0000,  ...,  0.4644,  0.4240, -0.0042],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.4646,  0.4225, -0.0089],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.4636,  0.4215, -0.0075],
        ...,
        [ 0.0000,  0.0000,  0.0000,  ...,  0.4577,  0.4256, -0.0046],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.4584,  0.4265, -0.0079],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.4582,  0.4266, -0.0069]]), 'gesture_id': 0}


In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class GestureClassifier(nn.Module):
    def __init__(self, input_size=126, hidden_size=256, num_layers=2, num_classes=12):
        super(GestureClassifier, self).__init__()
        
        self.lstm = nn.LSTM(
            input_size=input_size,      # 126 features (63 landmarks par main)
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,           # Format: [batch, sequence, features]
            dropout=0.3,
            bidirectional=True          # LSTM bidirectionnel pour mieux capturer le contexte
        )
        
        # Attention layer
        self.attention = nn.Sequential(
            nn.Linear(hidden_size * 2, 64),  # *2 car bidirectionnel
            nn.Tanh(),
            nn.Linear(64, 1),
            nn.Softmax(dim=1)
        )
        
        # Classification layers
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size * 2, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        # x shape: [batch, 40, 126]
        
        # LSTM output
        lstm_out, _ = self.lstm(x)  # [batch, 40, hidden_size*2]
        
        # Attention weights
        attention_weights = self.attention(lstm_out)  # [batch, 40, 1]
        
        # Apply attention
        context = torch.sum(attention_weights * lstm_out, dim=1)  # [batch, hidden_size*2]
        
        # Classification
        output = self.classifier(context)  # [batch, num_classes]
        return output

In [12]:
import torch
from torch.utils.data import DataLoader
import torch.nn as nn
from tqdm import tqdm  # Changement ici

def train_model(model, train_loader, val_loader, num_epochs=50):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Utilisation de : {device}")
    
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    best_val_acc = 0
    history = {'train_acc': [], 'train_loss': [], 'val_acc': [], 'val_loss': []}
    
    # Barre de progression principale pour les epochs
    for epoch in tqdm(range(num_epochs), desc="Epochs"):
        # Mode entraînement
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0
        
        # Barre de progression pour les batches
        train_loop = tqdm(train_loader, desc=f"Train Epoch {epoch+1}", 
                         leave=False, position=1)
        
        for batch in train_loop:
            features = batch['features'].to(device)
            labels = batch['gesture_id'].to(device)
            
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            _, predicted = outputs.max(1)
            train_total += labels.size(0)
            train_correct += predicted.eq(labels).sum().item()
            
            # Mise à jour de la barre de progression des batches
            train_loop.set_postfix(
                loss=f"{train_loss/train_total:.4f}",
                acc=f"{100.*train_correct/train_total:.2f}%"
            )
        
        # Validation
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0
        
        val_loop = tqdm(val_loader, desc=f"Val Epoch {epoch+1}", 
                       leave=False, position=1)
        
        with torch.no_grad():
            for batch in val_loop:
                features = batch['features'].to(device)
                labels = batch['gesture_id'].to(device)
                
                outputs = model(features)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()
                
                # Mise à jour de la barre de validation
                val_loop.set_postfix(
                    loss=f"{val_loss/val_total:.4f}",
                    acc=f"{100.*val_correct/val_total:.2f}%"
                )
        
        # Calcul des métriques finales de l'epoch
        train_acc = 100. * train_correct / train_total
        train_loss = train_loss / len(train_loader)
        val_acc = 100. * val_correct / val_total
        val_loss = val_loss / len(val_loader)
        
        # Mise à jour de l'historique
        history['train_acc'].append(train_acc)
        history['train_loss'].append(train_loss)
        history['val_acc'].append(val_acc)
        history['val_loss'].append(val_loss)
        
        # Sauvegarde du meilleur modèle
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_model.pth')
            
        # Affichage des métriques de l'epoch
        print(f"\nEpoch {epoch+1}/{num_epochs}:")
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
    
    return history


In [77]:
from tqdm import tqdm

train_x = range(100)
train_y = range(200)

train_iter = zip(train_x, train_y)

# Notice `train_iter` can only be iter over once, so i get `total` in this way.
total = min(len(train_x), len(train_y))

with tqdm(total=total) as pbar:
    for item in train_iter:
        # do something ...
        pbar.update(1)




100%|██████████| 100/100 [00:00<00:00, 819200.00it/s]


In [13]:

def evaluate_model(model, val_loader, criterion, device):
    # Phase de validation
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    
    with torch.no_grad():
        for batch in val_loader:
            features = batch['features'].to(device)
            labels = batch['gesture_id'].to(device)
            
            outputs = model(features)
            loss = criterion(outputs, labels)
            
            val_loss += loss.item()
            _, predicted = outputs.max(1)
            val_total += labels.size(0)
            val_correct += predicted.eq(labels).sum().item()
    
    # Calcul des métriques de validation
    avg_val_loss = val_loss / len(val_loader)
    val_acc = 100. * val_correct / val_total

    return avg_val_loss, val_acc

In [19]:
def train_batch(model, train_loader, criterion, optimizer, device, progress_bar):
    model.train()
    train_loss = 0
    train_correct = 0
    train_total = 0
    
    for batch in train_loader:
        features = batch['features'].to(device)
        labels = batch['gesture_id'].to(device)
        
        optimizer.zero_grad()
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        _, predicted = outputs.max(1)
        train_total += labels.size(0)
        train_correct += predicted.eq(labels).sum().item()


        progress_bar.write(
            f"Train Loss: {train_loss/train_total:.4f} | Train Acc: {100.*train_correct/train_total:.2f}%"
        )
        progress_bar.update(1)
    
    # Calcul des métriques d'entraînement
    avg_train_loss = train_loss / len(train_loader)
    train_acc = 100. * train_correct / train_total

    return train_acc, avg_train_loss
    

In [20]:
from tqdm.auto import tqdm  # tqdm.auto s'adapte automatiquement à l'environnement
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, random_split

def train_model(model, train_loader, val_loader, num_epochs=50):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Utilisation de : {device}")
    
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    best_val_acc = 0
    history = {'train_acc': [], 'train_loss': [], 'val_acc': [], 'val_loss': []}
    
    print("Entraînement du modèle...")
    with tqdm(range(num_epochs), desc="Entrainement") as pbar:
        train_acc, avg_train_loss = train_batch(model, train_loader, criterion, optimizer, device, pbar)
        avg_val_loss, val_acc = evaluate_model(model, val_loader, criterion, device)
        
        # Mise à jour de l'historique
        history['train_acc'].append(train_acc)
        history['train_loss'].append(avg_train_loss)
        history['val_acc'].append(val_acc)
        history['val_loss'].append(avg_val_loss)
        
        # # Sauvegarde du meilleur modèle
        # if val_acc > best_val_acc:
        #     best_val_acc = val_acc
        #     torch.save(model.state_dict(), 'best_model.pth')
    
    return history


In [21]:
from torch.utils.data import random_split

# Utilisation :
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=512)

# Entraînement du modèle
model = GestureClassifier()
history = train_model(model, train_loader, val_loader)

Utilisation de : cpu
Entraînement du modèle...


Entrainement:   0%|          | 0/50 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [6]:
from time import sleep
from tqdm.auto import tqdm

values = range(3)
with tqdm(total=len(values)) as pbar:
    for i in values:
        pbar.write('processed: %d' %i)
        pbar.update(1)
        sleep(1)

  0%|          | 0/3 [00:00<?, ?it/s]

processed: 0
processed: 1
processed: 2


In [None]:
# import torch
# from torch.utils.data import DataLoader
# from torch.optim import Adam
# from torch.optim.lr_scheduler import ReduceLROnPlateau
# import numpy as np
# from sklearn.metrics import confusion_matrix
# import seaborn as sns
# import matplotlib.pyplot as plt

# def train_model(model, train_loader, val_loader, num_epochs=50):
#     device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#     print(f"Utilisation de : {device}")
    
#     model = model.to(device)
#     criterion = nn.CrossEntropyLoss()
#     optimizer = Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
#     scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)
    
#     best_val_acc = 0
    
#     for epoch in range(num_epochs):
#         # Mode entraînement
#         model.train()
#         train_loss = 0
#         train_correct = 0
#         train_total = 0
        
#         for batch in train_loader:
#             features = batch['features'].to(device)
#             labels = batch['gesture_id'].to(device)
            
#             optimizer.zero_grad()
#             outputs = model(features)
#             loss = criterion(outputs, labels)
#             loss.backward()
#             optimizer.step()
            
#             train_loss += loss.item()
#             _, predicted = outputs.max(1)
#             train_total += labels.size(0)
#             train_correct += predicted.eq(labels).sum().item()
        
#         # Mode validation
#         model.eval()
#         val_loss = 0
#         val_correct = 0
#         val_total = 0
        
#         with torch.no_grad():
#             for batch in val_loader:
#                 features = batch['features'].to(device)
#                 labels = batch['gesture_id'].to(device)
                
#                 outputs = model(features)
#                 loss = criterion(outputs, labels)
                
#                 val_loss += loss.item()
#                 _, predicted = outputs.max(1)
#                 val_total += labels.size(0)
#                 val_correct += predicted.eq(labels).sum().item()
        
#         # Calcul des métriques
#         train_acc = 100. * train_correct / train_total
#         val_acc = 100. * val_correct / val_total
        
#         print(f'Epoch [{epoch+1}/{num_epochs}]')
#         print(f'Train Loss: {train_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}%')
#         print(f'Val Loss: {val_loss/len(val_loader):.4f}, Val Acc: {val_acc:.2f}%')
        
#         # Mise à jour du scheduler
#         scheduler.step(val_loss)
        
#         # Sauvegarde du meilleur modèle
#         if val_acc > best_val_acc:
#             best_val_acc = val_acc
#             torch.save(model.state_dict(), 'best_model.pth')

In [None]:
# from tqdm.notebook import tqdm  # Changé pour une meilleure intégration avec Jupyter
# import torch
# from torch.utils.data import DataLoader
# import torch.nn as nn

# def train_model(model, train_loader, val_loader, num_epochs=50):
#     device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#     print(f"Utilisation de : {device}")
    
#     model = model.to(device)
#     criterion = nn.CrossEntropyLoss()
#     optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
#     # Configuration de la barre de progression principale
#     epoch_pbar = tqdm(range(num_epochs), desc='Training Progress', position=0)
    
#     best_val_acc = 0
#     history = {'train_acc': [], 'train_loss': [], 'val_acc': [], 'val_loss': []}
    
#     for epoch in epoch_pbar:
#         # Phase d'entraînement
#         model.train()
#         train_loss = 0
#         train_correct = 0
#         train_total = 0
        
#         # Désactiver les sous-barres de progression pour plus de clarté
#         for batch in train_loader:
#             try:
#                 features = batch['features'].to(device)
#                 labels = batch['gesture_id'].to(device)
                
#                 optimizer.zero_grad()
#                 outputs = model(features)
#                 loss = criterion(outputs, labels)
#                 loss.backward()
#                 optimizer.step()
                
#                 train_loss += loss.item()
#                 _, predicted = outputs.max(1)
#                 train_total += labels.size(0)
#                 train_correct += predicted.eq(labels).sum().item()
#             except Exception as e:
#                 print(f"Erreur dans le batch d'entraînement : {str(e)}")
#                 continue
        
#         # Phase de validation
#         model.eval()
#         val_loss = 0
#         val_correct = 0
#         val_total = 0
        
#         with torch.no_grad():
#             for batch in val_loader:
#                 try:
#                     features = batch['features'].to(device)
#                     labels = batch['gesture_id'].to(device)
                    
#                     outputs = model(features)
#                     loss = criterion(outputs, labels)
                    
#                     val_loss += loss.item()
#                     _, predicted = outputs.max(1)
#                     val_total += labels.size(0)
#                     val_correct += predicted.eq(labels).sum().item()
#                 except Exception as e:
#                     print(f"Erreur dans le batch de validation : {str(e)}")
#                     continue
        
#         # Calcul des métriques
#         train_acc = 100. * train_correct / train_total if train_total > 0 else 0
#         train_loss = train_loss / len(train_loader) if len(train_loader) > 0 else 0
#         val_acc = 100. * val_correct / val_total if val_total > 0 else 0
#         val_loss = val_loss / len(val_loader) if len(val_loader) > 0 else 0
        
#         # Mise à jour de l'historique
#         history['train_acc'].append(train_acc)
#         history['train_loss'].append(train_loss)
#         history['val_acc'].append(val_acc)
#         history['val_loss'].append(val_loss)
        
#         # Mise à jour de la barre de progression
#         epoch_pbar.set_postfix({
#             'train_loss': f'{train_loss:.4f}',
#             'train_acc': f'{train_acc:.2f}%',
#             'val_loss': f'{val_loss:.4f}',
#             'val_acc': f'{val_acc:.2f}%'
#         })
        
#         # Sauvegarde du meilleur modèle
#         if val_acc > best_val_acc:
#             best_val_acc = val_acc
#             torch.save(model.state_dict(), 'best_model.pth')
    
#     return history


In [None]:
# from torch.utils.data import random_split
# # Préparation des données avec worker_init_fn
# def worker_init_fn(worker_id):
#     np.random.seed(np.random.get_state()[1][0] + worker_id)

# # Création des DataLoaders
# train_size = int(0.8 * len(dataset))
# val_size = len(dataset) - train_size
# train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# # DataLoaders sans parallélisation pour le moment
# train_loader = DataLoader(
#     train_dataset, 
#     batch_size=32, 
#     shuffle=True,
#     num_workers=0  # Changé de 4 à 0
# )
# val_loader = DataLoader(
#     val_dataset, 
#     batch_size=32,
#     num_workers=0  # Changé de 4 à 0
# )

# # Création du modèle
# model = GestureClassifier()

# # Lancement de l'entraînement
# train_model(model, train_loader, val_loader, num_epochs=50)

Utilisation de : cpu


ImportError: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html