In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import LeaveOneGroupOut, KFold
from sklearn.metrics import accuracy_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image
import cv2

# Custom Dataset class for hand gestures
class HandGestureDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels
        
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return self.sequences[idx], self.labels[idx]

# Define Mediapipe processing
import mediapipe as mp

mp_hands = mp.solutions.hands
hands = mp_hands.Hands(
    static_image_mode=False,
    max_num_hands=1,
    min_detection_confidence=0.25,
    min_tracking_confidence=0.25
)
mp_drawing = mp.solutions.drawing_utils

def get_hand_landmarks_frame(frame, is_transform=False):
    if is_transform==True:
        frame = 255*frame
        frame = frame.transpose(2,0).numpy().astype('uint8')
    results = hands.process(frame)

    hand_landmarks = []
    if results.multi_hand_landmarks:
        for hand_landmark in results.multi_hand_landmarks[0].landmark:
            hand_landmarks.append([hand_landmark.x, hand_landmark.y])
        hand_landmarks = np.array(hand_landmarks)
    else:
        hand_landmarks = np.zeros((21,2))

    return torch.from_numpy(hand_landmarks.astype('float32'))

def get_trace_frame(frame, is_transform=False):
    trace_frame = torch.zeros(64,64)
    hand_landmarks = get_hand_landmarks_frame(frame, is_transform)
    for (x,y) in hand_landmarks:
        x = int(63*x)
        y = int(63*y)
        
        N = 63
        trace_frame[min(x,N)][min(y,N)] = 1
        
        trace_frame[min(x+1,N)][min(y,N)] = 1
        trace_frame[min(max(x-1,0),N)][min(y,N)] = 1
        trace_frame[min(x,N)][min(y+1,N)] = 1
        trace_frame[min(x,N)][min(max(y-1,0),N)] = 1
        
        trace_frame[min(x+1,N)][min(y+1,N)] = 1
        trace_frame[min(max(x-1,0),N)][min(y+1,N)] = 1
        trace_frame[min(x+1,N)][min(max(y-1,0),N)] = 1
        trace_frame[min(max(x-1,0),N)][min(max(y-1,0),N)] = 1
    return trace_frame

# Function to load and preprocess the dataset
def load_dataset(data_path):
    """
    Load and preprocess the hand gesture dataset with background blurring.
    
    Args:
        data_path: Path to the dataset directory
        
    Returns:
        X: Sequences of images with shape (num_sequences, channels, frames, height, width)
        y: Labels corresponding to gestures (0: down, 1: left, 2: right, 3: up)
        subject_ids: IDs of subjects for cross-validation
    """
    # Placeholder arrays for data, labels, and subject IDs
    sequences = []
    labels = []
    subject_ids = []
    
    # Map gesture names to numerical labels
    gesture_map = {'down': 0, 'left': 1, 'right': 2, 'up': 3}
    
    # Image transformation pipeline
    transform = transforms.Compose([
        transforms.Resize((64, 64)),  # Using power of 2 dimensions for better downsampling
        transforms.ToTensor(),  # Converts to [0,1] range and changes to CxHxW format
        transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize to [-1, 1] range
        #transforms.ColorJitter(brightness=0.5, contrast=1, saturation=0.1, hue=0.5)
    ])
    
    # Iterate through gesture folders
    for gesture in ['down', 'left', 'right', 'up']:
        gesture_path = os.path.join(data_path, gesture)
        
        # Iterate through sequence folders for this gesture
        for seq_folder in os.listdir(gesture_path):
            seq_path = os.path.join(gesture_path, seq_folder)
            
            if os.path.isdir(seq_path):
                # Extract subject ID from sequence folder name (assuming format contains subject ID)
                # Modify this according to your actual folder naming convention
                subject_id = int(seq_folder.split('_')[0])
                
                # Load frames for this sequence
                frame_files = sorted([f for f in os.listdir(seq_path) if f.endswith('.jpg') or f.endswith('.png')])
                
                if len(frame_files) > 0:
                    # Load and normalize frames
                    frames = []
                    for frame_file in frame_files:
                        frame_path = os.path.join(seq_path, frame_file)
                        
                        # Open image with OpenCV for skin detection and background blurring
                        img_cv = cv2.imread(frame_path)
                        if img_cv is not None:
                            img_tensor = get_trace_frame(img_cv)
                            frames.append(img_tensor)
                    
                    # Pad or truncate sequence to fixed length (8 frames - power of 2)
                    target_frames = 8
                    if len(frames) < target_frames:
                        # Pad with zeros if sequence is too short
                        for _ in range(target_frames - len(frames)):
                            frames.append(torch.zeros_like(frames[0]))
                    elif len(frames) > target_frames:
                        # Truncate if sequence is too long
                        frames = frames[:target_frames]
                    
                    # Stack frames along a new dimension
                    sequence_tensor = torch.from_numpy(np.array([frames]))   # Reshape to [channels, frames, height, width]

                    # Add sequence to dataset
                    sequences.append(sequence_tensor)
                    labels.append(gesture_map[gesture])
                    subject_ids.append(subject_id)
    
    # Convert lists to tensors
    X = torch.stack(sequences)
    y = torch.tensor(labels, dtype=torch.long)
    subject_ids = np.array(subject_ids)
    
    return X, y, subject_ids

# Dense Block for 3D DenseNet
class DenseBlock3D(nn.Module):
    def __init__(self, in_channels, growth_rate, num_layers):
        super(DenseBlock3D, self).__init__()
        self.layers = nn.ModuleList()
        for i in range(num_layers):
            self.layers.append(self._make_dense_layer(in_channels + i * growth_rate, growth_rate))
    
    def _make_dense_layer(self, in_channels, growth_rate):
        return nn.Sequential(
            nn.BatchNorm3d(in_channels),
            nn.ReLU(inplace=True),
            nn.Conv3d(in_channels, 4 * growth_rate, kernel_size=1, bias=False),
            nn.BatchNorm3d(4 * growth_rate),
            nn.ReLU(inplace=True),
            nn.Conv3d(4 * growth_rate, growth_rate, kernel_size=3, padding=1, bias=False)
        )
    
    def forward(self, x):
        features = [x]
        for layer in self.layers:
            new_feature = layer(torch.cat(features, 1))
            features.append(new_feature)
        return torch.cat(features, 1)


# Transition Layer for 3D DenseNet
class TransitionLayer3D(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(TransitionLayer3D, self).__init__()
        self.layers = nn.Sequential(
            nn.BatchNorm3d(in_channels),
            nn.ReLU(inplace=True),
            nn.Conv3d(in_channels, out_channels, kernel_size=1, bias=False),
            nn.AvgPool3d(kernel_size=2, stride=2)
        )
    
    def forward(self, x):
        return self.layers(x)


# 3D DenseNet Model
class DenseNet3D(nn.Module):
    def __init__(self, growth_rate=12, block_config=(2, 4, 6), num_init_features=16, 
                 compression_rate=0.5, num_classes=4):
        super(DenseNet3D, self).__init__()
        
        # First convolution and pooling
        self.features = nn.Sequential(
            nn.Conv3d(1, num_init_features, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm3d(num_init_features),
            nn.ReLU(inplace=True),
            nn.MaxPool3d(kernel_size=2, stride=2)
        )
        
        # Dense Blocks
        num_features = num_init_features
        for i, num_layers in enumerate(block_config):
            # Add a dense block
            block = DenseBlock3D(
                in_channels=num_features,
                growth_rate=growth_rate,
                num_layers=num_layers
            )
            self.features.add_module(f'denseblock{i+1}', block)
            num_features += num_layers * growth_rate
            
            # Add a transition layer (except after the last block)
            if i != len(block_config) - 1:
                out_features = int(num_features * compression_rate)
                trans = TransitionLayer3D(num_features, out_features)
                self.features.add_module(f'transition{i+1}', trans)
                num_features = out_features
        
        # Final batch norm
        self.features.add_module('norm_final', nn.BatchNorm3d(num_features))
        self.features.add_module('relu_final', nn.ReLU(inplace=True))
        
        # Global Average Pooling and classifier
        self.avgpool = nn.AdaptiveAvgPool3d((1, 1, 1))
        self.lstm = nn.LSTM(num_features,num_features)
        self.classifier = nn.Linear(num_features, num_classes)
        
        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv3d):
                nn.init.kaiming_normal_(m.weight)
            elif isinstance(m, nn.BatchNorm3d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        features = self.features(x)
        out = self.avgpool(features)
        out = out.view(out.size(0), -1)
        out = self.lstm(out)
        out = self.classifier(out)
        return out


# Function to train the model for one epoch
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)
        
        # Zero the parameter gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        
        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        
        # Statistics
        running_loss += loss.item() * inputs.size(0)
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    
    train_loss = running_loss / total
    train_acc = 100. * correct / total
    
    return train_loss, train_acc


# Function to evaluate the model
def evaluate(model, test_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            # Statistics
            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()
            
            # Save predictions and targets for confusion matrix
            all_preds.extend(predicted.cpu().numpy())
            all_targets.extend(targets.cpu().numpy())
    
    test_loss = running_loss / total
    test_acc = 100. * correct / total
    
    return test_loss, test_acc, all_preds, all_targets


def train_and_evaluate(X, y, subject_ids, model_type='densenet'):
    print("Training started")
    """
    Train and evaluate the 3D-CNN model using k-fold cross-validation
    with stratification across subjects.
    
    Args:
        X: Sequences of images tensor
        y: Labels tensor
        subject_ids: Subject IDs for cross-validation
        model_type: Type of model to use
        
    Returns:
        results: Dictionary containing evaluation results
        best_model_state: State dict of the best performing model
    """
    # Check for CUDA availability
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    
    # Initialize KFold cross-validator
    n_splits = 5  # 5-fold cross-validation
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    
    # Initialize lists to store results
    subject_accuracies = []
    all_y_true = []
    all_y_pred = []
    
    # Cross-validation parameters
    num_epochs = 25
    batch_size = 16
    
    # Track the best model across all folds
    best_overall_acc = 0
    final_best_model_state = None
    
    # Iterate through folds
    for fold, (train_idx, test_idx) in enumerate(kf.split(X.numpy()), 1):
        print(f"\n--- Fold {fold}/{n_splits} ---")
        
        # Create PyTorch datasets and dataloaders
        train_dataset = HandGestureDataset(X[train_idx], y[train_idx])
        test_dataset = HandGestureDataset(X[test_idx], y[test_idx])
        
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
        
        # Create the model
        model = DenseNet3D(growth_rate=12, block_config=(2, 4, 4), num_init_features=16).to(device)
        
        # Define loss function and optimizer
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
        
        # Learning rate scheduler
        scheduler = optim.lr_scheduler.OneCycleLR(
            optimizer, 
            max_lr=0.01,
            steps_per_epoch=len(train_loader),
            epochs=num_epochs
        )
        
        # Variables for early stopping
        best_test_acc = 0
        best_model_state = None
        patience = 15
        patience_counter = 0
        
        # Training loop
        for epoch in range(num_epochs):
            # Train for one epoch
            train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
            
            # Evaluate on test set
            test_loss, test_acc, _, _ = evaluate(model, test_loader, criterion, device)
            
            # Print progress
            print(f'Epoch {epoch+1}/{num_epochs}: '
                  f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, '
                  f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%')
            
            # Check for improvement
            if test_acc > best_test_acc:
                best_test_acc = test_acc
                best_model_state = model.state_dict().copy()
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print(f"Early stopping at epoch {epoch+1}")
                    break
        
        # Load best model for final evaluation
        model.load_state_dict(best_model_state)
        
        # Final evaluation
        _, final_test_acc, y_pred, y_true = evaluate(model, test_loader, criterion, device)
        
        # Store results
        subject_accuracies.append(final_test_acc / 100.0)
        all_y_true.extend(y_true)
        all_y_pred.extend(y_pred)
        
        print(f"Fold {fold} Final Test Accuracy: {final_test_acc:.2f}%")
        
        # Keep track of the best model across all folds
        if final_test_acc > best_overall_acc:
            best_overall_acc = final_test_acc
            final_best_model_state = best_model_state.copy()
    
    # Calculate overall metrics
    overall_accuracy = np.mean(subject_accuracies)
    conf_matrix = confusion_matrix(all_y_true, all_y_pred)
    
    # Store results
    results = {
        'subject_accuracies': subject_accuracies,
        'overall_accuracy': overall_accuracy,
        'confusion_matrix': conf_matrix,
        'y_true': all_y_true,
        'y_pred': all_y_pred
    }
    
    return results, final_best_model_state


# Function to visualize results
def visualize_results(results, model_name='densenet'):
    """
    Visualize the cross-validation results.
    
    Args:
        results: Dictionary containing evaluation results
        model_name: Name of the model
    """
    # Plot subject accuracies
    plt.figure(figsize=(10, 6))
    plt.bar(range(len(results['subject_accuracies'])), results['subject_accuracies'])
    plt.axhline(y=results['overall_accuracy'], color='r', linestyle='-', 
                label=f"Overall: {results['overall_accuracy']:.4f}")
    plt.xlabel('Subject')
    plt.ylabel('Accuracy')
    plt.title(f'Leave-One-Subject-Out Cross-Validation Results ({model_name})')
    plt.legend()
    plt.ylim(0, 1)
    plt.savefig(f'subject_accuracies_{model_name}.png')
    plt.close()
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    gesture_names = ['Down', 'Left', 'Right', 'Up']
    conf_matrix = results['confusion_matrix']
    conf_matrix_norm = conf_matrix.astype('float') / conf_matrix.sum(axis=1)[:, np.newaxis]
    
    sns.heatmap(conf_matrix_norm, annot=True, fmt='.2f', cmap='Blues',
                xticklabels=gesture_names, yticklabels=gesture_names)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'Confusion Matrix ({model_name})')
    plt.savefig(f'confusion_matrix_{model_name}.png')
    plt.close()


# Main execution
if __name__ == "__main__":
    # Set random seed for reproducibility
    torch.manual_seed(42)
    np.random.seed(42)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(42)
    
    # Path to your dataset
    data_path = "../../Dataset/STMM/image"
    
    # Load and preprocess the dataset
    X, y, subject_ids = load_dataset(data_path)
    
    # Print dataset info
    print(f"Dataset shape: {X.shape}")
    print(f"Number of sequences: {len(X)}")
    print(f"Number of subjects: {len(np.unique(subject_ids))}")
    
    # Run with DenseNet model
    print("\n--- Training with 3D DenseNet ---")
    densenet_results, best_model_state = train_and_evaluate(X, y, subject_ids, model_type='densenet')
    
    # Print overall results
    print("\n--- Overall Results ---")
    print(f"DenseNet Overall Accuracy: {densenet_results['overall_accuracy']:.4f}")
    
    # Visualize results
    print("\nGenerating visualization plots...")
    visualize_results(densenet_results, 'densenet')
    
    # Save the model
    print("\nSaving model...")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = DenseNet3D(growth_rate=12, block_config=(2, 4, 4), num_init_features=16).to(device)
    model.load_state_dict(best_model_state)
    
    # Save the model state dictionary
    torch.save(model.state_dict(), '../model/densenet_mediapipe_lstm.pth')
    print("Model saved successfully to 'densenet_mediapipe.pth'")
    
    print("\nDone! Results saved as PNG files.")

  warn(


Dataset shape: torch.Size([1840, 1, 8, 64, 64])
Number of sequences: 1840
Number of subjects: 460

--- Training with 3D DenseNet ---
Training started
Using device: cpu

--- Fold 1/5 ---
Epoch 1/25: Train Loss: 1.3268, Train Acc: 38.52%, Test Loss: 1.2769, Test Acc: 40.49%


KeyboardInterrupt: 