## Machine Learning Hand Keypoint Detector

In [None]:
import os
import onnx
import time
import yaml
import torch
import numpy as np
from pathlib import Path
from ultralytics import YOLO

class HandWristDetector:
    def __init__(self, config_path='config.yaml'):
        """
        Initialize HandWristDetector with configuration
        
        Args:
            config_path (str): Path to the configuration YAML file
        """
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)
        
        # Initialize YOLO pose detection model
        model_size = self.config['model']['size']
        model_path = f"yolov8{model_size}-pose.pt"
        
        # Download model if not exists
        if not os.path.exists(model_path):
            print(f"Downloading YOLOv8{model_size} pose model...")
        
        self.model = YOLO(model_path)
        
    def train(self, data_yaml):
        """
        Train the model with custom configuration
        
        Args:
            data_yaml (str): Path to the data YAML file containing dataset configuration
            
        Returns:
            results: Training results object
        """
        # Set training arguments
        args = dict(
            data=data_yaml,                    # Path to data YAML file
            task='pose',                       # Task type for pose detection
            mode='train',                      # Training mode
            model=self.model,                  # Model to train
            epochs=self.config['model']['epochs'],
            imgsz=self.config['model']['image_size'],
            batch=self.config['model']['batch_size'],
            device='',                         # Device to use (auto-select)
            workers=8,                         # Number of worker threads
            optimizer='AdamW',                  # Optimizer to use (Adam)
            patience=20,                       # Early stopping patience
            verbose=True,                      # Print verbose output
            seed=0,                           # Random seed
            deterministic=True,                # Enable deterministic mode
            single_cls=True,                   # Single class training
            rect=True,                         # Rectangular training
            cos_lr=True,                       # Cosine learning rate scheduler
            close_mosaic=10,                   # Disable mosaic augmentation for final epochs
            resume=False,                      # Resume training
            amp=True,                          # Automatic Mixed Precision
            
            # Learning rate settings
            lr0=0.001,                        # Initial learning rate
            lrf=0.01,                         # Final learning rate fraction
            momentum=0.937,                    # SGD momentum/Adam beta1
            weight_decay=0.0005,              # Optimizer weight decay
            warmup_epochs=3.0,                # Warmup epochs
            warmup_momentum=0.8,              # Warmup initial momentum
            warmup_bias_lr=0.1,               # Warmup initial bias learning rate
            
            # Loss coefficients
            box=7.5,                          # Box loss gain
            cls=0.5,                          # Class loss gain
            pose=12.0,                        # Pose loss gain
            kobj=2.0,                         # Keypoint obj loss gain
            
            # Augmentation settings
            degrees=10.0,                      # Rotation degrees
            translate=0.2,                    # Translation
            scale=0.7,                        # Scale
            fliplr=0.5,                       # Horizontal flip probability
            mosaic=1.0,                       # Mosaic probability
            mixup=0.0,                        # Mixup probability
            
            # Saving settings
            project='runs/pose',              # Project name
            name='train',                     # Run name
            exist_ok=False,                   # Allow existing project
            pretrained=True,                  # Use pretrained model
            plots=True,                       # Generate plots
            save=True,                        # Save train checkpoints
            save_period=-1,                   # Save checkpoint every x epochs
            
            # Validation settings
            val=True,                         # Validate during training
            save_json=False,                  # Save JSON validation results
            conf=None,                        # Confidence threshold
            iou=0.7,                          # NMS IoU threshold
            max_det=300,                      # Maximum detections per image
            
            # Advanced settings
            fraction=1.0,                     # Dataset fraction to train on
            profile=False,                    # Profile ONNX/TF.js/TensorRT
            overlap_mask=True,                # Masks should overlap during inference
            mask_ratio=4,                     # Mask downsample ratio
            dropout=0.2,                      # Use dropout regularization
            label_smoothing=0.1,              # Label smoothing epsilon
            nbs=64,                          # Nominal batch size
        )
        
        # Start training
        try:
            results = self.model.train(**args)
            return results
        except Exception as e:
            print(f"Training error: {str(e)}")
            raise
    
    def evaluate(self, data_yaml):
        """
        Evaluate the model on validation/test set
        
        Args:
            data_yaml (str): Path to the data YAML file
            
        Returns:
            results: Validation results object
        """
        try:
            results = self.model.val(
                data=data_yaml,
                imgsz=self.config['model']['image_size'],
                batch=self.config['model']['batch_size'],
                conf=0.25,
                iou=0.7,
                device='',
                verbose=True,
                save_json=False,
                save_hybrid=False,
                max_det=300,
                half=False
            )
            return results
        except Exception as e:
            print(f"Evaluation error: {str(e)}")
            raise
    
    def export_model(self, format='onnx'):
        """
        Export the model to specified format
        
        Args:
            format (str): Format to export to ('onnx' or 'tflite')
        """
        try:
            if format == 'onnx':
                self.model.export(
                    format='onnx',
                    dynamic=True,
                    simplify=True,
                    opset=11,
                    device='cpu'
                )
            elif format == 'tflite':
                self.model.export(
                    format='tflite',
                    int8=True,
                    device='cpu'
                )
        except Exception as e:
            print(f"Export error: {str(e)}")
            raise
    
    def predict(self, image_path):
        """
        Run inference on a single image
        
        Args:
            image_path (str): Path to the input image
            
        Returns:
            results: Detection results object
        """
        try:
            results = self.model.predict(
                source=image_path,
                conf=0.25,
                iou=0.45,
                imgsz=self.config['model']['image_size'],
                device='',
                verbose=False,
                save=True,
                save_txt=False,
                save_conf=False,
                save_crop=False,
                show_labels=True,
                show_conf=True,
                max_det=300,
                agnostic_nms=False,
                classes=None,
                retina_masks=False,
                boxes=True
            )
            return results[0]
        except Exception as e:
            print(f"Prediction error: {str(e)}")
            raise
    
    def predict_batch(self, image_paths):
        """
        Run inference on a batch of images
        
        Args:
            image_paths (list): List of paths to input images
            
        Returns:
            results: List of detection results objects
        """
        try:
            results = self.model.predict(
                source=image_paths,
                conf=0.25,
                iou=0.45,
                imgsz=self.config['model']['image_size'],
                batch=self.config['model']['batch_size']
            )
            return results
        except Exception as e:
            print(f"Batch prediction error: {str(e)}")
            raise


## Webcam script to test the model

In [None]:
import cv2
import numpy as np
import time
import logging
from datetime import datetime
import os
import yaml
from ultralytics import YOLO

class WebcamDetector:
    def __init__(self, config_path='config.yaml'):
        """Initialize the webcam detector with configuration"""
        self.logger = logging.getLogger(__name__)
        self.config = self._load_config(config_path)
        self.cap = None
        self.frame_count = 0
        self.fps = 0
        self.last_time = time.time()
        
        # Initialize the hand detector model with custom weights
        try:
            # Use the last trained weights from your output directory
            weights_path = os.path.join(
                self.config['paths']['output_dir'],
                'runs/pose/train11/weights/best.pt'
            )
            if not os.path.exists(weights_path):
                raise FileNotFoundError(f"Weights file not found at {weights_path}")
            
            self.model = YOLO(weights_path)
            self.logger.info("Custom hand detection model loaded successfully")
        except Exception as e:
            self.logger.error(f"Error loading model: {e}")
            raise

    def _load_config(self, config_path):
        """Load configuration file"""
        try:
            with open(config_path, 'r') as f:
                return yaml.safe_load(f)
        except Exception as e:
            self.logger.error(f"Error loading config: {e}")
            raise

    def initialize_camera(self, camera_id=0):
        """Initialize the webcam"""
        self.cap = cv2.VideoCapture(camera_id)
        if not self.cap.isOpened():
            raise ValueError(f"Could not open camera {camera_id}")
        
        # Set camera properties
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 640)
        self.logger.info("Camera initialized successfully")

    def calculate_fps(self):
        """Calculate FPS"""
        self.frame_count += 1
        if (time.time() - self.last_time) > 1.0:
            self.fps = self.frame_count
            self.frame_count = 0
            self.last_time = time.time()
        return self.fps

    def draw_detections(self, frame, result):
        """Draw hand detections and wrist keypoints"""
        annotated_frame = frame.copy()
        
        if result.keypoints is not None:
            keypoints = result.keypoints.data
            boxes = result.boxes.data
            
            for box, kpts in zip(boxes, keypoints):
                # Get box coordinates
                x1, y1, x2, y2 = map(int, box[:4])
                conf = float(box[4])
                
                # Draw bounding box for hand
                cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                
                # Add confidence score
                cv2.putText(annotated_frame, f"Hand: {conf:.2f}", 
                           (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 
                           0.5, (0, 255, 0), 2)
                
                # Draw wrist keypoint
                for kpt in kpts:
                    x, y = map(int, kpt[:2])
                    conf = float(kpt[2])
                    if conf > 0.5:  # Only draw high-confidence keypoints
                        cv2.circle(annotated_frame, (x, y), 5, (255, 0, 0), -1)
                        cv2.putText(annotated_frame, f"Wrist: {conf:.2f}", 
                                  (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX,
                                  0.5, (255, 0, 0), 2)
        
        # Add FPS counter
        fps = self.calculate_fps()
        cv2.putText(annotated_frame, f'FPS: {fps}', (10, 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        
        return annotated_frame

    def process_frame(self, frame):
        """Process each frame for hand detection"""
        try:
            # Run inference
            results = self.model.predict(
                source=frame,
                conf=0.25,  # Confidence threshold
                iou=0.45,   # NMS IoU threshold
                verbose=False,
                stream=True
            )
            
            # Get the first result
            result = next(results)
            
            # Draw detections
            annotated_frame = self.draw_detections(frame, result)
            
            return annotated_frame
            
        except Exception as e:
            self.logger.error(f"Error processing frame: {e}")
            return frame

    def run(self):
        """Main loop for webcam detection"""
        try:
            self.initialize_camera()
            
            while True:
                ret, frame = self.cap.read()
                if not ret:
                    self.logger.error("Failed to grab frame")
                    break

                # Process frame
                processed_frame = self.process_frame(frame)
                
                # Display the frame
                cv2.imshow('Hand-Wrist Detection', processed_frame)
                
                # Break loop on 'q' press
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                
        except Exception as e:
            self.logger.error(f"Error in webcam detection: {e}")
            raise
        finally:
            if self.cap is not None:
                self.cap.release()
            cv2.destroyAllWindows()
            self.logger.info("Webcam detection stopped")

def main():
    # Set up logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    
    # Initialize and run detector
    detector = WebcamDetector()
    detector.run()

if __name__ == '__main__':
    main()