# Installing OpenCV and Python Dependencies

## Install OpenCV
To install OpenCV using pip, run the following command:
```bash
pip install opencv-python
```

If you need OpenCV with additional functionalities like `opencv-contrib-python`, install:
```bash
pip install opencv-contrib-python
```

In [1]:
print("Test")

Test


In [2]:
import cv2


capture = cv2.VideoCapture(0)

if not capture.isOpened():
    print("Error: Could not open camera.")
else:
    while True:
        ret, frame = capture.read()  # Capture frame-by-frame
        if not ret:
            print("Failed to grab frame")
            break
        
        cv2.imshow('Camera Feed', frame)  # Display the frame
        
        if cv2.waitKey(1) & 0xFF == ord('q'):  # Press 'q' to exit
            break

# Release resources
capture.release()
cv2.destroyAllWindows()


## Install MediaPipe
MediaPipe is required for hand tracking:

In [None]:
!pip install xgboost

In [None]:
!pip install scikit-learn

In [None]:
!pip install mediapipe




[notice] A new release of pip is available: 24.0 -> 25.0.1
[notice] To update, run: python3.exe -m pip install --upgrade pip


In [23]:
import cv2
import mediapipe as mp

def drawLine_landmark(frame, hand_landmarks):
    index_finger_tip = hand_landmarks.landmark[8]
    thumb_tip = hand_landmarks.landmark[4]

    # Convert normalized coordinates (0-1) to pixel values
    index_finger_x = int(index_finger_tip.x * frame.shape[1])
    index_finger_y = int(index_finger_tip.y * frame.shape[0])
    thumb_x = int(thumb_tip.x * frame.shape[1])
    thumb_y = int(thumb_tip.y * frame.shape[0])

    # Calculate Euclidean distance
    dis = ((index_finger_tip.x - thumb_tip.x)**2 + (index_finger_tip.y - thumb_tip.y)**2)**0.5

    # Calculate midpoint of the line
    mid_x = (index_finger_x + thumb_x) // 2
    mid_y = (index_finger_y + thumb_y) // 2

    # Draw line
    cv2.line(frame, (index_finger_x, index_finger_y), (thumb_x, thumb_y), (205, 55, 120), 5)

    # Draw text at the midpoint
    text = f"{dis:.3f}"
    font_scale = 0.6
    font_thickness = 2
    text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, font_thickness)[0]
    text_x = mid_x - text_size[0] // 2  # Center text horizontally
    text_y = mid_y + text_size[1] // 2  # Center text vertically
    cv2.putText(frame, text, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), font_thickness, cv2.LINE_AA)

    depth = index_finger_tip.z  # Get Z-value (depth)
    text_Des = f"Depth: {depth:.3f}"  # Format to 3 decimal places
    cv2.putText(frame, text_Des, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2, cv2.LINE_AA)

    print(f"Drawing is done: Distance = {dis:.3f}")

# Initialize MediaPipe Hands
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
hands = mp_hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Open the camera
capture = cv2.VideoCapture(0)

if not capture.isOpened():
    print("Error: Could not open camera.")
else:
    while True:
        ret, frame = capture.read()
        if not ret:
            print("Failed to grab frame")
            break

        heightFrame, widthFrame, _ = frame.shape

        # Convert image to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        result = hands.process(rgb_frame)
        
        if result.multi_hand_landmarks:
            for hand_landmarks, handedness in zip(result.multi_hand_landmarks, result.multi_handedness):
                mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)
                
                # Get hand label (Left or Right)
                hand_label = handedness.classification[0].label
                x, y = int(hand_landmarks.landmark[0].x * widthFrame), int(hand_landmarks.landmark[0].y * heightFrame)
                cv2.putText(frame, f"{hand_label} Hand", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)

                # Draw line between index finger and thumb
                drawLine_landmark(frame, hand_landmarks)

        # Create a semi-transparent overlay for the red circle
        overlay = frame.copy()
        cv2.circle(overlay, (widthFrame // 2, heightFrame // 2), 75, (0, 0, 255), -1)  # Red filled circle
        alpha = 0.5  # Transparency level
        cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)

        cv2.imshow('Hand Tracking', frame)  # Display the frame
        
        if cv2.waitKey(1) & 0xFF == ord('q'):  # Press 'q' to exit
            break

# Release resources
capture.release()
cv2.destroyAllWindows()


Drawing is done: Distance = 0.158
Drawing is done: Distance = 0.177
Drawing is done: Distance = 0.238
Drawing is done: Distance = 0.238
Drawing is done: Distance = 0.191
Drawing is done: Distance = 0.123
Drawing is done: Distance = 0.076
Drawing is done: Distance = 0.141
Drawing is done: Distance = 0.226
Drawing is done: Distance = 0.287
Drawing is done: Distance = 0.230
Drawing is done: Distance = 0.100
Drawing is done: Distance = 0.114
Drawing is done: Distance = 0.278
Drawing is done: Distance = 0.293
Drawing is done: Distance = 0.296
Drawing is done: Distance = 0.108
Drawing is done: Distance = 0.068
Drawing is done: Distance = 0.042
Drawing is done: Distance = 0.136
Drawing is done: Distance = 0.170
Drawing is done: Distance = 0.258
Drawing is done: Distance = 0.269
Drawing is done: Distance = 0.327
Drawing is done: Distance = 0.340
Drawing is done: Distance = 0.376
Drawing is done: Distance = 0.395
Drawing is done: Distance = 0.396
Drawing is done: Distance = 0.375
Drawing is don

---

# I. Interface hands gusters
make a class or interface that will take hands gusters and assign to each on of them a fucntion and then we can programe these fucntion as we want

In [2]:
import cv2
import mediapipe as mp
import os
import json
import numpy as np

class HandGesture:
    def __init__(self, name, landmarks=None):
        self.name = name
        self.landmarks = landmarks if landmarks else []
    
    def set_landmarks(self, landmarks):
        self.landmarks = landmarks
    
    def compare_gesture(self, detected_landmarks):
        """Compare the detected hand with the stored gesture."""
        if not self.landmarks or not detected_landmarks:
            return False
        
        # Compute Euclidean distance between corresponding landmarks
        distances = [np.linalg.norm(
            np.array([self.landmarks[i].x, self.landmarks[i].y]) -
            np.array([detected_landmarks[i].x, detected_landmarks[i].y])
        ) for i in range(len(self.landmarks))]
        
        return np.mean(distances) < 0.05  # Adjust threshold as needed

class HandDetector:
    def __init__(self):
        self.mp_hands = mp.solutions.hands
        self.hands = self.mp_hands.Hands(static_image_mode=True, max_num_hands=1)
    
    def detect_hands(self, image):
        results = self.hands.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        
        if not results.multi_hand_landmarks:
            return None  # No hands detected
        
        return results.multi_hand_landmarks[0].landmark

def load_progress():
    """Load the progress from the JSON file, or initialize if not found."""
    progress_file = 'progress.json'
    
    if os.path.exists(progress_file):
        with open(progress_file, 'r') as f:
            return json.load(f)
    else:
        # If no progress file, initialize it with the first class index
        return {'last_class_index': 0}

def save_progress(progress):
    """Save the current progress to the JSON file."""
    with open('progress.json', 'w') as f:
        json.dump(progress, f)

def create_class_folder(class_index):
    dataset_dir = 'dataset'
    class_folder = os.path.join(dataset_dir, f'class_{class_index}')
    
    # Check if class folder already exists, and if so, increment the class_index
    while os.path.exists(class_folder):
        class_index += 1
        class_folder = os.path.join(dataset_dir, f'class_{class_index}')
    
    # Create the class folder
    os.makedirs(class_folder)
    return class_folder, class_index

def capture_and_record(nbr_sample_per_gestures = 100):
    progress = load_progress()  # Load the current progress
    class_index = progress['last_class_index']  # Get the last used class index
    
    cap = cv2.VideoCapture(0)
    detector = HandDetector()

    class_folder, class_index = create_class_folder(class_index)
    progress['last_class_index'] = class_index  # Update the progress
    save_progress(progress)  # Save progress to JSON file
    
    frame_count = 0
    num_frames = nbr_sample_per_gestures
    recording = False
    
    print("Press 's' to start recording, and 'q' to quit.")
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        
        # Create a copy of the frame for saving (without the text)
        frame_to_save = frame.copy()
        
        # Detect hand landmarks
        landmarks = detector.detect_hands(frame)
        
        if landmarks:
            # Display recording status on the original frame (with text)
            if recording:
                cv2.putText(frame, "Recording...", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
                cv2.putText(frame, f"Frames recorded: {frame_count}/{num_frames}", (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA)
                
                # Save the frame after every successful capture (without the text)
                if frame_count < num_frames:
                    frame_name = f"{frame_count + 1}.jpg"
                    cv2.imwrite(os.path.join(class_folder, frame_name), frame_to_save)  # Save the frame without text
                    frame_count += 1
                
                if frame_count >= num_frames:
                    print("Recording finished.")
                    cv2.putText(frame, "Recording Finished", (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
                    recording = False
                    class_folder, class_index = create_class_folder(class_index)
                    progress['last_class_index'] = class_index  # Update the progress
                    save_progress(progress) 
            else:
                # Show "Press 's' to start" message on the original frame
                cv2.putText(frame, "Press 's' to start recording", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)

        cv2.imshow("Hand Gesture Recorder", frame)
        
        key = cv2.waitKey(1) & 0xFF
        
        if key == ord('s'):  # Start recording when 's' is pressed
            if not recording:
                print("Starting recording...")
                recording = True
                frame_count = 0  # Reset frame count
        elif key == ord('q'):  # Quit when 'q' is pressed
            break
    
    cap.release()
    cv2.destroyAllWindows()

# Example usage
capture_and_record(200)


Press 's' to start recording, and 'q' to quit.
Starting recording...
Recording finished.
Starting recording...
Recording finished.
Starting recording...
Recording finished.
Starting recording...
Recording finished.
Starting recording...
Recording finished.
Starting recording...
Recording finished.


---
## Proccess data 

In [3]:
import os
import cv2
import mediapipe as mp
import json

DATA_DIR = 'dataset'  # Define the dataset directory

# Initialize hand detector using MediaPipe
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(static_image_mode=True, max_num_hands=1)

def extract_landmarks(image_path):
    """Extract hand landmarks from an image."""
    img = cv2.imread(image_path)
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    results = hands.process(img_rgb)
    
    # If no hands are detected, return None
    if not results.multi_hand_landmarks:
        return None

    landmarks = []
    for hand_landmarks in results.multi_hand_landmarks:
        for i in range(len(hand_landmarks.landmark)):
            x = hand_landmarks.landmark[i].x
            y = hand_landmarks.landmark[i].y
            # z = hand_landmarks.landmark[i].z
            # landmarks.append([x, y, z])  # Store x, y coordinates of each landmark
            landmarks.append([x, y])  # Store x, y coordinates of each landmark
    
    return landmarks

def collect_data():
    """Collect hand gesture data from dataset folder."""
    data = []  # This will store the landmark data
    labels = []  # This will store the corresponding labels (class)

    # Loop through each class folder (e.g., class_0, class_1, etc.)
    for dir_ in os.listdir(DATA_DIR):
        class_folder = os.path.join(DATA_DIR, dir_)
        if not os.path.isdir(class_folder):
            continue

        # Loop through all images in the class folder
        for img_path in os.listdir(class_folder):
            if img_path.endswith(".jpg") or img_path.endswith(".png"):
                image_path = os.path.join(class_folder, img_path)
                
                landmarks = extract_landmarks(image_path)
                if landmarks:
                    data.append(landmarks)  # Add the landmarks data
                    labels.append(dir_)  # The folder name is the class label

    # Create a dictionary with data and labels
    dataset = {"data": data, "labels": labels}

    # Save the dataset to a JSON file
    with open(os.path.join(DATA_DIR, "gesture_data.json"), 'w') as f:
        json.dump(dataset, f, indent=4)

    print("Data saved successfully to gesture_data.json")

# Run the data collection
collect_data()


Data saved successfully to gesture_data.json


---
## Clean and split data

In [4]:
import json
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Load the dataset from gesture_data.json
with open("dataset/gesture_data.json", "r") as f:
    data_dict = json.load(f)

# Convert data into NumPy array and reshape
data = np.array(data_dict['data'], dtype=np.float32)  # Shape: (samples, 21, 2)
num_samples = data.shape[0]

# Flatten each sample from (21, 2) to (42,)
data = data.reshape(num_samples, -1)  # Shape: (num_samples, 42)

# Convert labels into a NumPy array
labels = np.array(data_dict['labels'])

label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(labels)  # Now labels are integers: 0, 1, 2, ...

# Split dataset into training (80%) and testing (20%) sets
x_train, x_test, y_train, y_test = train_test_split(
    data, labels, test_size=0.2, shuffle=True, stratify=labels, random_state=42
)

# Print label mapping
print(f"Label Mapping: {dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))}")

# Save label mapping for later decoding (optional)
with open("dataset/label_mapping.json", "w") as f:
    json.dump({label: int(idx) for label, idx in zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_))}, f, indent=4)

# Save processed dataset
split_data = {
    "train": {"data": x_train.tolist(), "labels": y_train.tolist()},
    "test": {"data": x_test.tolist(), "labels": y_test.tolist()}
}

with open("dataset/train_data.json", "w") as f:
    json.dump(split_data['train'], f, indent=4)

with open("dataset/test_data.json", "w") as f:
    json.dump(split_data['test'], f, indent=4)

print("Data cleaned, converted to numerical labels, and saved successfully!")


Label Mapping: {'9eliza': 0, 'one001': 1, 'one002': 2, 'one003': 3, 'palm001': 4, 'palm002': 5}
Data cleaned, converted to numerical labels, and saved successfully!


---
## Train and save model

In [5]:
from xgboost import XGBClassifier
import pickle
from sklearn.metrics import accuracy_score

In [6]:
model = XGBClassifier()
model.fit(x_train, y_train)

# Save the trained model
with open("gesture_classifier.pkl", "wb") as f:
    pickle.dump(model, f)

print("Model saved successfully as gesture_classifier.pkl!")


Model saved successfully as gesture_classifier.pkl!


### Check Preformance

In [7]:
y_predict = model.predict(x_test)

score = accuracy_score(y_predict,y_test)
score

0.975

---
## Apply model to test

In [3]:
import json

def get_gesture_label(prediction, label_mapping_path="dataset/label_mapping.json"):
    # Load the label mapping
    with open(label_mapping_path, "r") as f:
        label_mapping = json.load(f)
    
    # Create reverse mapping (number -> name)
    reverse_mapping = {v: k for k, v in label_mapping.items()}
    
    # Handle single predictions or array predictions
    if isinstance(prediction, (np.ndarray, list)):
        prediction = prediction[0]  # Get first element if array
    
    return reverse_mapping.get(prediction, "UNKNOWN")

In [10]:
import cv2
import mediapipe as mp
import numpy as np
import pickle

# Load the trained model
model_path = "gesture_classifier.pkl"
with open(model_path, "rb") as f:
    model = pickle.load(f)

# Initialize Mediapipe Hand module
mp_hands = mp.solutions.hands
hands = mp_hands.Hands()

# Initialize OpenCV Video Capture (Webcam)
cap = cv2.VideoCapture(0)

while True:
    ret, frame = cap.read()
    if not ret:
        continue

    # Convert the image to RGB for Mediapipe
    img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Process the image to detect hands
    results = hands.process(img_rgb)

    if results.multi_hand_landmarks:
        for landmarks in results.multi_hand_landmarks:
            # Prepare the landmark points
            data_aux = []
            for i in range(21):  # 21 landmarks in hand
                x = landmarks.landmark[i].x
                y = landmarks.landmark[i].y
                data_aux.append(x)
                data_aux.append(y)

            # Convert the data into the format the model expects (flattened)
            data_point = np.array(data_aux).reshape(1, -1)

            # Make a prediction using the trained model
            label = model.predict(data_point)
            # Get the prediction probabilities
            proba = model.predict_proba(data_point)
            # Extract the max probability and the corresponding class
            max_proba = np.max(proba)
            class_idx = np.argmax(proba)
            
            # Display the predicted label and the confidence (probability)
            cv2.putText(frame, f"Prediction: {get_gesture_label(label[0])} ({max_proba*100:.2f}%)", 
                        (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

            # Draw landmarks and connections
            mp.solutions.drawing_utils.draw_landmarks(frame, landmarks, mp_hands.HAND_CONNECTIONS)

    # Show the frame with predictions
    cv2.imshow("Hand Gesture Recognition", frame)

    # Break the loop on pressing 'q'
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()


--- 
# test connection between ESP32 and my pc using stream

## I. using webstream

In [1]:
import socket
import struct
import numpy as np
import cv2

# ESP32 IP and TCP port
ESP32_IP = "192.168.128.66"
ESP32_PORT = 1234

# Frame size (you set this in the ESP32 code)
FRAME_WIDTH = 320
FRAME_HEIGHT = 240

def receive_exact(sock, count):
    """Receive exact number of bytes from socket."""
    buf = b""
    while len(buf) < count:
        chunk = sock.recv(count - len(buf))
        if not chunk:
            return None
        buf += chunk
    return buf

# Connect to the ESP32 TCP server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(f"Connecting to {ESP32_IP}:{ESP32_PORT}...")
sock.connect((ESP32_IP, ESP32_PORT))
print("Connected!")

try:
    while True:
        # Read 4-byte length
        length_bytes = receive_exact(sock, 4)
        if length_bytes is None:
            print("Disconnected")
            break

        frame_size = struct.unpack("<I", length_bytes)[0]

        # Read the full frame
        frame_data = receive_exact(sock, frame_size)
        if frame_data is None:
            print("Failed to receive full frame.")
            break

        # Convert to numpy array (grayscale image)
        gray_frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((FRAME_HEIGHT, FRAME_WIDTH))

        # Show the image
        cv2.imshow("ESP32-CAM Stream", gray_frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    sock.close()
    cv2.destroyAllWindows()


Connecting to 192.168.128.66:1234...
Connected!


: 

## II. Using MQTT

note is not wroking well is laggi so stream is much better

---
# Connect ESP23-cam with model classification

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import pickle

# Load the trained model
model_path = "gesture_classifier.pkl"
with open(model_path, "rb") as f:
    model = pickle.load(f)

# Initialize Mediapipe Hand module
mp_hands = mp.solutions.hands
hands = mp_hands.Hands()

ip_address = "192.168.128.66"
esp32_cam_url = f"http://{ip_address}:81/stream"

# Try to connect to the ESP32 MJPEG stream
cap = cv2.VideoCapture(esp32_cam_url)
# cap = cv2.VideoCapture(0)

if not cap.isOpened():
    print(f"Failed to connect to the ESP32-CAM at {esp32_cam_url}")
    exit()

print(f"Connected to ESP32-CAM at {esp32_cam_url}. Press 'q' to quit.")

while True:
    ret, frame = cap.read()
    if not ret:
        continue

    frame = cv2.resize(frame, (320, 240))
    small_frame = cv2.resize(frame, (320, 240))
    img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)

    # Process the image to detect hands
    results = hands.process(img_rgb)

    if results.multi_hand_landmarks:
        for landmarks in results.multi_hand_landmarks:
            # Prepare the landmark points
            data_aux = []
            for i in range(21):  # 21 landmarks in hand
                x = landmarks.landmark[i].x
                y = landmarks.landmark[i].y
                data_aux.append(x)
                data_aux.append(y)

            # Convert the data into the format the model expects (flattened)
            data_point = np.array(data_aux).reshape(1, -1)

            # Make a prediction using the trained model
            label = model.predict(data_point)
            # Get the prediction probabilities
            proba = model.predict_proba(data_point)
            # Extract the max probability and the corresponding class
            max_proba = np.max(proba)
            class_idx = np.argmax(proba)
            
            # Print the predicted label and the confidence (probability)
            print(f"Prediction: {label[0]} ({max_proba*100:.2f}%)")

    cv2.imshow('ESP32-CAM Stream', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
cap.release()
cv2.destroyAllWindows()


Connected to ESP32-CAM at http://192.168.128.66:81/stream. Press 'q' to quit.


---
## time clasulation for mediapipe

In [None]:
import cv2
import mediapipe as mp
import numpy as np
import time

# Initialize MediaPipe Hand module
mp_hands = mp.solutions.hands
hands = mp_hands.Hands()

# Connect to the camera (ESP32 or laptop cam)
ip_address = "192.168.128.66"
esp32_cam_url = f"http://{ip_address}:81/stream"
cap = cv2.VideoCapture(esp32_cam_url)
# cap = cv2.VideoCapture(0)

if not cap.isOpened():
    print(f"Failed to connect to ESP32-CAM at {esp32_cam_url}")
    exit()

print(f"Connected to ESP32-CAM at {esp32_cam_url}. Press 'q' to quit.")

# FPS tracking
last_fps_time = time.time()
frame_count = 0
hand_frame_count = 0
no_hand_frame_count = 0

while True:
    start_capture_time = time.time()
    ret, frame = cap.read()
    end_capture_time = time.time()

    if not ret:
        continue

    frame = cv2.resize(frame, (320, 240))
    img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    start_mp_time = time.time()
    results = hands.process(img_rgb)
    end_mp_time = time.time()

    # Count if hand is detected or not
    frame_count += 1
    if results.multi_hand_landmarks:
        hand_frame_count += 1
    else:
        no_hand_frame_count += 1

    start_show_time = time.time()
    cv2.imshow('ESP32-CAM Stream', frame)
    end_show_time = time.time()

    # Calculate durations
    capture_duration = (end_capture_time - start_capture_time) * 1000
    mp_duration = (end_mp_time - start_mp_time) * 1000
    show_duration = (end_show_time - start_show_time) * 1000

    print(f"Capture: {capture_duration:.2f} ms | MediaPipe: {mp_duration:.2f} ms | Show: {show_duration:.2f} ms")

    # Every second, print FPS and reset counters
    current_time = time.time()
    if current_time - last_fps_time >= 1.0:
        print(f"\nFPS: {frame_count} | Hand FPS: {hand_frame_count} | No-Hand FPS: {no_hand_frame_count}\n")
        frame_count = 0
        hand_frame_count = 0
        no_hand_frame_count = 0
        last_fps_time = current_time

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


In [None]:
import cv2
import mediapipe as mp
import numpy as np
import pickle
import threading
import time
from collections import deque
from datetime import datetime, timedelta

# Load the trained model
model_path = "gesture_classifier.pkl"
with open(model_path, "rb") as f:
    model = pickle.load(f)

# Initialize Mediapipe Hand module
mp_hands = mp.solutions.hands
hands = mp_hands.Hands()

ip_address = "192.168.128.66"
esp32_cam_url = f"http://{ip_address}:81/stream"

# Global variables for thread communication
latest_prediction = None
prediction_lock = threading.Lock()
frame_queue = deque(maxlen=1)
processing_active = True

def process_frames():
    global latest_prediction, processing_active
    
    last_processed_time = datetime.now()
    processing_interval = timedelta(milliseconds=200)
    
    while processing_active:
        current_time = datetime.now()
        
        if current_time - last_processed_time >= processing_interval:
            # Get the latest frame if available
            if len(frame_queue) > 0:
                frame = frame_queue[0]
                
                # Process the frame for hand detection
                small_frame = cv2.resize(frame, (320, 240))
                img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
                
                results = hands.process(img_rgb)
                
                if results.multi_hand_landmarks:
                    for landmarks in results.multi_hand_landmarks:
                        # Prepare the landmark points
                        data_aux = []
                        for i in range(21):  # 21 landmarks in hand
                            x = landmarks.landmark[i].x
                            y = landmarks.landmark[i].y
                            data_aux.append(x)
                            data_aux.append(y)

                        # Convert the data into the format the model expects
                        data_point = np.array(data_aux).reshape(1, -1)

                        # Make a prediction using the trained model
                        label = model.predict(data_point)
                        # Get the prediction probabilities
                        proba = model.predict_proba(data_point)
                        # Extract the max probability and the corresponding class
                        max_proba = np.max(proba)
                        class_idx = np.argmax(proba)
                        
                        # Update the latest prediction
                        with prediction_lock:
                            latest_prediction = {
                                'label': label[0],
                                'confidence': max_proba * 100,
                                'timestamp': current_time
                            }
                        
                        print(f"Prediction: {get_gesture_label(label[0])} ({max_proba*100:.2f}%)")
                
                last_processed_time = current_time
            else:
                # No frame available yet, wait a bit
                time.sleep(0.01)
        else:
            # Wait until it's time to process the next frame
            time.sleep(0.01)

# Start the processing thread
processing_thread = threading.Thread(target=process_frames)
processing_thread.start()

# Try to connect to the ESP32 MJPEG stream
cap = cv2.VideoCapture(esp32_cam_url)
# cap = cv2.VideoCapture(0)

if not cap.isOpened():
    print(f"Failed to connect to the ESP32-CAM at {esp32_cam_url}")
    processing_active = False
    processing_thread.join()
    exit()

print(f"Connected to ESP32-CAM at {esp32_cam_url}. Press 'q' to quit.")

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            continue

        # Resize and display the frame
        frame = cv2.resize(frame, (640, 480))
        cv2.imshow('ESP32-CAM Stream', frame)
        
        # Update the frame queue with the latest frame
        if len(frame_queue) > 0:
            frame_queue.pop()
        frame_queue.append(frame.copy())
        
        # Get the latest prediction if available
        with prediction_lock:
            if latest_prediction:
                # You can use the prediction here if needed
                pass
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

finally:
    # Clean up
    processing_active = False
    processing_thread.join()
    cap.release()
    cv2.destroyAllWindows()

---
---
# Final Code
---
---

In [4]:
import json

def get_gesture_label(prediction, label_mapping_path="dataset/label_mapping.json"):
    # Load the label mapping
    with open(label_mapping_path, "r") as f:
        label_mapping = json.load(f)
    
    # Create reverse mapping (number -> name)
    reverse_mapping = {v: k for k, v in label_mapping.items()}
    
    # Handle single predictions or array predictions
    if isinstance(prediction, (np.ndarray, list)):
        prediction = prediction[0]  # Get first element if array
    
    return reverse_mapping.get(prediction, "UNKNOWN")

In [5]:
import cv2
import mediapipe as mp
import numpy as np
import pickle
import threading
import time
from collections import deque, Counter
from datetime import datetime, timedelta
import paho.mqtt.client as mqtt

# ==== MQTT Broker Config ====
BROKER_IP = "192.168.128.88"
MQTT_TOPIC = "action"

# ==== MQTT Setup ====
client = mqtt.Client()
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("✅ Connected to MQTT broker!")
    else:
        print(f"❌ Connection failed with code {rc}")
client.on_connect = on_connect

print("🔄 Connecting to MQTT broker...")
client.connect(BROKER_IP, 1883, 60)
client.loop_start()
time.sleep(1)

# ==== Load the model ====
model_path = "gesture_classifier.pkl"
with open(model_path, "rb") as f:
    model = pickle.load(f)

# ==== Initialize Mediapipe Hand module ====
mp_hands = mp.solutions.hands
hands = mp_hands.Hands()

# ==== Camera setup ====
ip_address = "192.168.128.66"
esp32_cam_url = f"http://{ip_address}:81/stream"

# ==== Globals ====
latest_prediction = None
prediction_lock = threading.Lock()
frame_queue = deque(maxlen=1)
processing_active = True
prediction_buffer = []

# ==== Helper: Publish most frequent label every 3 ====
def publish_majority_prediction():
    global prediction_buffer
    if len(prediction_buffer) >= 3:
        counter = Counter(prediction_buffer)
        majority_label, count = counter.most_common(1)[0]
        print(f"📤 Publishing majority gesture: {majority_label} (count: {count})")
        client.publish(MQTT_TOPIC, str(get_gesture_label(majority_label)))  # FIXED: convert to str
        prediction_buffer = []  # Reset buffer

# ==== Thread: Frame processing and prediction ====
def process_frames():
    global latest_prediction, processing_active, prediction_buffer
    last_processed_time = datetime.now()
    processing_interval = timedelta(milliseconds=200)

    while processing_active:
        current_time = datetime.now()
        if current_time - last_processed_time >= processing_interval:
            if len(frame_queue) > 0:
                frame = frame_queue[0]
                small_frame = cv2.resize(frame, (320, 240))
                img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
                results = hands.process(img_rgb)

                if results.multi_hand_landmarks:
                    for landmarks in results.multi_hand_landmarks:
                        data_aux = []
                        for i in range(21):
                            x = landmarks.landmark[i].x
                            y = landmarks.landmark[i].y
                            data_aux.append(x)
                            data_aux.append(y)

                        data_point = np.array(data_aux).reshape(1, -1)
                        label = model.predict(data_point)
                        proba = model.predict_proba(data_point)
                        max_proba = np.max(proba)

                        with prediction_lock:
                            latest_prediction = {
                                'label': label[0],
                                'confidence': max_proba * 100,
                                'timestamp': current_time
                            }

                        print(f"🤖 Prediction: {label[0]} ({max_proba*100:.2f}%)")
                        prediction_buffer.append(label[0])
                        publish_majority_prediction()

                last_processed_time = current_time
            else:
                time.sleep(0.01)
        else:
            time.sleep(0.01)

# ==== Start processing thread ====
processing_thread = threading.Thread(target=process_frames)
processing_thread.start()

# ==== Connect to camera stream ====
cap = cv2.VideoCapture(esp32_cam_url)
if not cap.isOpened():
    print(f"Failed to connect to the ESP32-CAM at {esp32_cam_url}")
    processing_active = False
    processing_thread.join()
    exit()

print(f"✅ Connected to ESP32-CAM at {esp32_cam_url}. Press 'q' to quit.")

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            continue

        frame = cv2.resize(frame, (640, 480))
        cv2.imshow('ESP32-CAM Stream', frame)

        if len(frame_queue) > 0:
            frame_queue.pop()
        frame_queue.append(frame.copy())

        with prediction_lock:
            if latest_prediction:
                # Optionally do something with it
                pass

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

finally:
    processing_active = False
    processing_thread.join()
    cap.release()
    cv2.destroyAllWindows()
    client.loop_stop()
    client.disconnect()
    print("📴 Disconnected from MQTT broker.")


  client = mqtt.Client()


🔄 Connecting to MQTT broker...
✅ Connected to MQTT broker!
✅ Connected to ESP32-CAM at http://192.168.128.66:81/stream. Press 'q' to quit.
🤖 Prediction: 5 (60.86%)
🤖 Prediction: 3 (97.67%)
🤖 Prediction: 3 (63.16%)
📤 Publishing majority gesture: 3 (count: 2)
🤖 Prediction: 5 (50.75%)
🤖 Prediction: 3 (94.91%)
🤖 Prediction: 5 (98.99%)
📤 Publishing majority gesture: 5 (count: 2)
🤖 Prediction: 3 (71.77%)
🤖 Prediction: 3 (99.19%)
🤖 Prediction: 3 (72.66%)
📤 Publishing majority gesture: 3 (count: 3)
🤖 Prediction: 3 (71.23%)
🤖 Prediction: 3 (71.77%)
🤖 Prediction: 3 (71.77%)
📤 Publishing majority gesture: 3 (count: 3)
🤖 Prediction: 5 (99.06%)
🤖 Prediction: 3 (72.66%)
🤖 Prediction: 5 (99.21%)
📤 Publishing majority gesture: 5 (count: 2)
🤖 Prediction: 5 (99.49%)
🤖 Prediction: 5 (99.92%)
🤖 Prediction: 5 (99.90%)
📤 Publishing majority gesture: 5 (count: 3)
🤖 Prediction: 5 (99.91%)
🤖 Prediction: 5 (99.52%)
🤖 Prediction: 5 (99.47%)
📤 Publishing majority gesture: 5 (count: 3)
🤖 Prediction: 5 (99.81%)
🤖 P