In [None]:
import cv2
import mediapipe as mp
import pandas as pd
import os
import numpy as np
import tensorflow as tf
from PIL import Image
from io import BytesIO
import requests
import time
from sklearn.utils import class_weight
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt

# Definition
res_x = 1280
res_y = 720

labels = []
training_folder = ''

url_cam = "http://admin:12345@10.100.91.200/image/jpeg.cgi"
url_shelly = "http://10.100.91.43:8080/rest/items/ShellyLight_Betrieb"

In [None]:
import cv2
import mediapipe as mp
import numpy as np
from collections import defaultdict

# hands, pose, face landmarks
mp_holistic = mp.solutions.holistic

# Data storage: dict mapping label -> list of video sequences
# Each sequence is shape (num_frames, total_landmarks)
sequences_by_label = defaultdict(list)

def extract_landmarks_from_frame(holistic_results):
    landmarks = []
    
    # Left hand landmarks 
    if holistic_results.left_hand_landmarks:
        for lm in holistic_results.left_hand_landmarks.landmark:
            landmarks.extend([lm.x, lm.y, lm.z])
    else:
        landmarks.extend([0.0] * 63) 
    
    # Right hand landmarks
    if holistic_results.right_hand_landmarks:
        for lm in holistic_results.right_hand_landmarks.landmark:
            landmarks.extend([lm.x, lm.y, lm.z])
    else:
        landmarks.extend([0.0] * 63)
    
    # Pose landmarks
    if holistic_results.pose_landmarks:
        for lm in holistic_results.pose_landmarks.landmark:
            landmarks.extend([lm.x, lm.y, lm.z])
    else:
        landmarks.extend([0.0] * 99)
    
    # Face landmarks
    if holistic_results.face_landmarks:
        for lm in holistic_results.face_landmarks.landmark:
            landmarks.extend([lm.x, lm.y, lm.z])
    else:
        landmarks.extend([0.0] * 1404)
    
    return np.array(landmarks, dtype=np.float32)


def process_video(video_path, label_idx):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return None
    
    frame_landmarks = []
    
    with mp_holistic.Holistic(
        static_image_mode=False,
        model_complexity=1,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5) as holistic:
        
        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            # Convert BGR to RGB
            image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image_rgb.flags.writeable = False
            
            # Process frame with holistic model
            results = holistic.process(image_rgb)
            
            # Extract landmarks
            landmarks = extract_landmarks_from_frame(results)
            frame_landmarks.append(landmarks)
            
            frame_count += 1
        
        cap.release()
    
    if frame_count == 0:
        print(f"Warning: No frames processed from {video_path}")
        return None
    
    # Convert list to numpy array: shape (num_frames, num_landmarks)
    sequence = np.array(frame_landmarks, dtype=np.float32)
    print(f"Processed: {video_path} - Shape: {sequence.shape} - Label: {labels[label_idx]}")
    
    return sequence


# Process all videos in training folder
print("Processing videos...\n")
for label in labels:
    label_idx = labels.index(label)
    label_folder = os.path.join(training_folder, label)
    
    if not os.path.isdir(label_folder):
        print(f"Warning: Folder not found {label_folder}")
        continue
    
    for video_file in os.listdir(label_folder):
        if video_file.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')):
            video_path = os.path.join(label_folder, video_file)
            try:
                sequence = process_video(video_path, label_idx)
                if sequence is not None:
                    sequences_by_label[label_idx].append(sequence)
            except Exception as e:
                print(f"Error processing {video_path}: {e}")

print(f"\n=== Dataset Summary ===")
print(f"Total landmarks per frame: 63 (left hand) + 63 (right hand) + 99 (pose) + 1404 (face) = 1629")
print(f"\nSequences per label:")
for label_idx, sequences in sorted(sequences_by_label.items()):
    label_name = labels[label_idx]
    print(f"  {label_name}: {len(sequences)} videos")
    if sequences:
        print(f"    Sample sequence shape: {sequences[0].shape}")

# Create lists for model training
all_sequences = []
all_labels = []

for label_idx in sorted(sequences_by_label.keys()):
    for sequence in sequences_by_label[label_idx]:
        all_sequences.append(sequence)
        all_labels.append(label_idx)

print(f"\nTotal sequences for training: {len(all_sequences)}")
print(f"Label distribution: {dict(zip([labels[i] for i in sorted(sequences_by_label.keys())], 
                                       [len(sequences_by_label[i]) for i in sorted(sequences_by_label.keys())]))}")


In [None]:
import sys
print(f"Python version: {sys.version}")
print(f"Python executable: {sys.executable}")


!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install

Python version: 3.10.9 (tags/v3.10.9:1dd9be6, Dec  6 2022, 20:01:21) [MSC v.1934 64 bit (AMD64)]
Python executable: c:\Users\matth\.pyenv\pyenv-win\versions\3.10.9\python.exe


ERROR: Could not find a version that satisfies the requirement tensorflow.keras.models (from versions: none)
ERROR: No matching distribution found for tensorflow.keras.models


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split

# Pad sequences to the same length
max_length = max(len(seq) for seq in all_sequences)
padded_sequences = np.array([np.pad(seq, ((0, max_length - len(seq)), (0, 0)), mode='constant') for seq in all_sequences])

# Convert labels to categorical
num_classes = len(labels)
y = to_categorical(all_labels, num_classes=num_classes)

# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(padded_sequences, y, test_size=0.2, random_state=42)

# Build LSTM model
model = Sequential([
    LSTM(128, return_sequences=True, input_shape=(max_length, 1629)),
    Dropout(0.5),
    LSTM(64, return_sequences=False),
    Dropout(0.5),
    Dense(32, activation='relu'),
    Dense(num_classes, activation='softmax')
])

model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
print(model.summary())

# Train the model
history = model.fit(X_train, y_train, epochs=50, batch_size=8, validation_data=(X_test, y_test), verbose=1)

# Evaluate on test set
loss, accuracy = model.evaluate(X_test, y_test)
print(f"\nTest Accuracy: {accuracy:.4f}")