In [5]:
%cd ..

/Users/jashtandel/SEM6/Approach2


## Testing Inference on Video

In [5]:
import cv2
import numpy as np
import streamlit as st
import torch
import time
from src.detection_keypoint import DetectKeypoint
from src.classification_keypoint import AngleLSTMNet, AngleFeatureExtractor
from sklearn.preprocessing import LabelEncoder
import pandas as pd

# Initialize label encoder with the same classes used during training
def initialize_label_encoder():
    train_data = pd.read_csv('/Users/jashtandel/SEM6/Approach2/datasets/train_action_pose_keypoint.csv')
    label_encoder = LabelEncoder()
    label_encoder.fit(train_data['label'])
    return label_encoder

def extract_keypoints_and_angles(results, person_idx):
    """
    Extract keypoints from YOLOv8 pose detection results and convert to angular features
    for a single person.
    """
    try:
        keypoints = results.keypoints[person_idx]
        keypoints_xy = keypoints.xy[0].cpu().numpy()
        keypoints_flat = keypoints_xy.flatten()
        
        # Normalize coordinates
        img_height, img_width = results.orig_shape
        keypoints_flat[::2] /= img_width
        keypoints_flat[1::2] /= img_height
        
        angle_extractor = AngleFeatureExtractor()
        angles = angle_extractor.calculate_angles(keypoints_flat)
        angles_tensor = torch.tensor(angles, dtype=torch.float32).reshape(1, 1, 8)
        return angles_tensor
    except Exception as e:
        print(f"Error extracting keypoints and angles: {str(e)}")
        return None

def pose_classification_video(video_path, output_path):
    label_encoder = initialize_label_encoder()
    detection_keypoint = DetectKeypoint()
    
    model = AngleLSTMNet(
        input_size=8,
        hidden_size=128,
        num_layers=3,
        num_classes=len(label_encoder.classes_),
        lstm_dropout=0.3,
        fc_dropout=0.5
    )
    
    model.load_state_dict(torch.load('/Users/jashtandel/SEM6/YoloV8-Pose-Keypoint-Classification-master/models/HAAD_Pose_Angle.pt'))
    model.eval()
    
    cap = cv2.VideoCapture(video_path)
    st_frame = st.empty()
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    # FPS calculation variables
    prev_time = 0
    curr_time = 0
    fps_avg = []
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
            
        # Calculate FPS
        curr_time = time.time()
        fps = 1 / (curr_time - prev_time) if prev_time > 0 else 0
        prev_time = curr_time
        fps_avg.append(fps)
        if len(fps_avg) > 30:  # Average over last 30 frames
            fps_avg.pop(0)
        avg_fps = sum(fps_avg) / len(fps_avg)
            
        results = detection_keypoint(frame)
        classifications = []
        probabilities = []  # Store probabilities for each person
        num_persons = len(results.boxes)
        
        for i in range(num_persons):
            try:
                angles_tensor = extract_keypoints_and_angles(results, i)
                
                if angles_tensor is not None:
                    with torch.no_grad():
                        outputs = model(angles_tensor)
                        probabilities_tensor = torch.nn.functional.softmax(outputs.data, dim=1)
                        prob_values, predicted = torch.max(probabilities_tensor, 1)
                        action_label = label_encoder.inverse_transform([predicted.item()])[0]
                        classifications.append(action_label)
                        probabilities.append(prob_values.item())
                else:
                    classifications.append("unknown")
                    probabilities.append(0.0)
            except Exception as e:
                print(f"Error processing person {i}: {str(e)}")
                classifications.append("unknown")
                probabilities.append(0.0)
        
        frame_draw = results.plot(boxes=False)
        
        # Draw FPS
        cv2.putText(
            frame_draw,
            f'FPS: {avg_fps:.1f}',
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            1, (0, 255, 0), 2
        )
        
        # Draw bounding boxes and labels for each person
        for i, (action_label, probability) in enumerate(zip(classifications, probabilities)):
            if i < len(results.boxes):
                x_min, y_min, x_max, y_max = results.boxes.xyxy[i].cpu().numpy()
                
                # Draw bounding box
                frame_draw = cv2.rectangle(
                    frame_draw, 
                    (int(x_min), int(y_min)), (int(x_max), int(y_max)), 
                    (0, 0, 255), 2
                )
                
                # Create label with probability
                label_text = f'{action_label.upper()} ({probability:.2%})'
                
                # Draw label background
                (w, h), _ = cv2.getTextSize(
                    label_text, 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2
                )
                frame_draw = cv2.rectangle(
                    frame_draw, 
                    (int(x_min), int(y_min)-20), (int(x_min)+w, int(y_min)), 
                    (0, 0, 255), -1
                )
                
                # Draw label text with probability
                frame_draw = cv2.putText(
                    frame_draw,
                    label_text,
                    (int(x_min), int(y_min)-4),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.5, (255, 255, 255),
                    thickness=2
                )
        
        # Update Streamlit display
        frame_draw_rgb = cv2.cvtColor(frame_draw, cv2.COLOR_BGR2RGB)
        st_frame.image(frame_draw_rgb, channels="RGB")
        
        out.write(frame_draw)
    
    cap.release()
    out.release()

# Streamlit app layout
st.set_page_config(layout="wide", page_title="YoloV8 Multi-Person Keypoint Classification Video")
st.write("## YoloV8 Multi-Person Keypoint Clasxsification in Video")
st.write("Upload a video to classify actions for multiple people in real-time.")

# Sidebar for video upload
st.sidebar.write("## Upload Video :gear:")
video_upload = st.sidebar.file_uploader("Upload a video", type=["mp4", "mov", "avi"])

if video_upload is not None:
    video_path = f"/tmp/{video_upload.name}"
    with open(video_path, "wb") as f:
        f.write(video_upload.getbuffer())
else:
    video_path = '/Users/jashtandel/Downloads/PETS09-S2L1-raw.mp4'

output_path = '/Users/jashtandel/SEM6/Approach2/output/annotated_output_multi_Impor.mp4'

# Run pose classification
pose_classification_video(video_path, output_path)

2024-11-22 17:41:43.108 
  command:

    streamlit run /opt/anaconda3/lib/python3.12/site-packages/ipykernel_launcher.py [ARGUMENTS]



0: 480x640 2 persons, 480.9ms
Speed: 77.2ms preprocess, 480.9ms inference, 5599.9ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 2 persons, 280.0ms
Speed: 6.5ms preprocess, 280.0ms inference, 120.7ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 130.8ms
Speed: 2.2ms preprocess, 130.8ms inference, 382.0ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 128.9ms
Speed: 2.2ms preprocess, 128.9ms inference, 81.3ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 2 persons, 120.3ms
Speed: 1.9ms preprocess, 120.3ms inference, 50.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 120.0ms
Speed: 1.9ms preprocess, 120.0ms inference, 57.1ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 122.7ms
Speed: 3.3ms preprocess, 122.7ms inference, 51.2ms postprocess per image at shape (1, 3, 480, 640)

0: 480x640 1 person, 118.5ms
Speed: 2.1ms preprocess, 118.5ms inference, 103.1ms postpro

## Testing Inference using Webcam

In [None]:
import cv2
import numpy as np
import torch
import time
from src.detection_keypoint import DetectKeypoint
from src.classification_keypoint import AngleLSTMNet, AngleFeatureExtractor
from sklearn.preprocessing import LabelEncoder
import pandas as pd

def initialize_label_encoder():
    """
    Initialize label encoder with classes from training data.
    
    Returns:
        LabelEncoder: Fitted label encoder
    """
    train_data = pd.read_csv('/Users/jashtandel/SEM6/Approach2/datasets/train_action_pose_keypoint.csv')
    label_encoder = LabelEncoder()
    label_encoder.fit(train_data['label'])
    return label_encoder

def extract_keypoints_and_angles(results, person_idx):
    """
    Extract keypoints from YOLOv8 pose detection results and convert to angular features
    for a single person.
    
    Args:
        results: Detection results
        person_idx: Index of the person to extract keypoints for
    
    Returns:
        torch.Tensor: Normalized angular features, or None if extraction fails
    """
    try:
        keypoints = results.keypoints[person_idx]
        keypoints_xy = keypoints.xy[0].cpu().numpy()
        keypoints_flat = keypoints_xy.flatten()
        
        # Normalize coordinates
        img_height, img_width = results.orig_shape
        keypoints_flat[::2] /= img_width
        keypoints_flat[1::2] /= img_height
        
        angle_extractor = AngleFeatureExtractor()
        angles = angle_extractor.calculate_angles(keypoints_flat)
        angles_tensor = torch.tensor(angles, dtype=torch.float32).reshape(1, 1, 8)
        return angles_tensor
    except Exception as e:
        print(f"Error extracting keypoints and angles: {str(e)}")
        return None

def pose_classification_webcam():
    """
    Perform real-time pose classification using webcam input.
    """
    # Initialize components
    label_encoder = initialize_label_encoder()
    detection_keypoint = DetectKeypoint()
    
    # Load pre-trained model
    model = AngleLSTMNet(
        input_size=8,
        hidden_size=128,
        num_layers=3,
        num_classes=len(label_encoder.classes_),
        lstm_dropout=0.3,
        fc_dropout=0.5
    )
    
    model.load_state_dict(torch.load('/Users/jashtandel/SEM6/YoloV8-Pose-Keypoint-Classification-master/models/HAAD_Pose_Angle.pt'))
    model.eval()
    
    # Open webcam
    cap = cv2.VideoCapture(0)  # 0 for default webcam
    
    # FPS calculation variables
    prev_time = 0
    fps_avg = []
    
    while True:
        # Read frame from webcam
        ret, frame = cap.read()
        if not ret:
            break
        
        # Calculate FPS
        curr_time = time.time()
        fps = 1 / (curr_time - prev_time) if prev_time > 0 else 0
        prev_time = curr_time
        fps_avg.append(fps)
        
        # Keep only last 30 FPS measurements
        if len(fps_avg) > 30:
            fps_avg.pop(0)
        avg_fps = sum(fps_avg) / len(fps_avg)
        
        # Perform pose detection
        results = detection_keypoint(frame)
        classifications = []
        probabilities = []
        num_persons = len(results.boxes)
        
        # Process each detected person
        for i in range(num_persons):
            try:
                angles_tensor = extract_keypoints_and_angles(results, i)
                
                if angles_tensor is not None:
                    with torch.no_grad():
                        outputs = model(angles_tensor)
                        probabilities_tensor = torch.nn.functional.softmax(outputs.data, dim=1)
                        prob_values, predicted = torch.max(probabilities_tensor, 1)
                        action_label = label_encoder.inverse_transform([predicted.item()])[0]
                        classifications.append(action_label)
                        probabilities.append(prob_values.item())
                else:
                    classifications.append("unknown")
                    probabilities.append(0.0)
            except Exception as e:
                print(f"Error processing person {i}: {str(e)}")
                classifications.append("unknown")
                probabilities.append(0.0)
        
        # Draw detection results
        frame_draw = results.plot(boxes=False)
        
        # Draw FPS
        cv2.putText(
            frame_draw,
            f'FPS: {avg_fps:.1f}',
            (10, 30),
            cv2.FONT_HERSHEY_SIMPLEX,
            1, (0, 255, 0), 2
        )
        
        # Draw bounding boxes and labels for each person
        for i, (action_label, probability) in enumerate(zip(classifications, probabilities)):
            if i < len(results.boxes):
                x_min, y_min, x_max, y_max = results.boxes.xyxy[i].cpu().numpy()
                
                # Draw bounding box
                frame_draw = cv2.rectangle(
                    frame_draw, 
                    (int(x_min), int(y_min)), (int(x_max), int(y_max)), 
                    (0, 0, 255), 2
                )
                
                # Create label with probability
                label_text = f'{action_label.upper()} ({probability:.2%})'
                
                # Draw label background
                (w, h), _ = cv2.getTextSize(
                    label_text, 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2
                )
                frame_draw = cv2.rectangle(
                    frame_draw, 
                    (int(x_min), int(y_min)-20), (int(x_min)+w, int(y_min)), 
                    (0, 0, 255), -1
                )
                
                # Draw label text with probability
                frame_draw = cv2.putText(
                    frame_draw,
                    label_text,
                    (int(x_min), int(y_min)-4),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.5, (255, 255, 255),
                    thickness=2
                )
        
        # Display frame
        cv2.imshow('Multi-Person Pose Classification', frame_draw)
        
        # Break loop on 'q' key press
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Release resources
    cap.release()
    cv2.destroyAllWindows()

# Run the webcam pose classification
if __name__ == "__main__":
    pose_classification_webcam()


0: 384x640 1 person, 581.7ms
Speed: 32.5ms preprocess, 581.7ms inference, 1537.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 230.4ms
Speed: 4.5ms preprocess, 230.4ms inference, 64.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 123.7ms
Speed: 2.0ms preprocess, 123.7ms inference, 41.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 105.8ms
Speed: 1.8ms preprocess, 105.8ms inference, 40.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 97.2ms
Speed: 1.8ms preprocess, 97.2ms inference, 40.9ms postprocess per image at shape (1, 3, 384, 640)


2024-11-26 15:03:23.952 python[4984:58799] +[IMKClient subclass]: chose IMKClient_Modern
2024-11-26 15:03:23.952 python[4984:58799] +[IMKInputSession subclass]: chose IMKInputSession_Modern



0: 384x640 1 person, 95.9ms
Speed: 1.8ms preprocess, 95.9ms inference, 41.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 97.8ms
Speed: 1.5ms preprocess, 97.8ms inference, 41.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 97.8ms
Speed: 1.5ms preprocess, 97.8ms inference, 40.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 100.3ms
Speed: 1.9ms preprocess, 100.3ms inference, 41.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 97.7ms
Speed: 1.8ms preprocess, 97.7ms inference, 40.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 98.2ms
Speed: 1.8ms preprocess, 98.2ms inference, 40.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 96.8ms
Speed: 1.7ms preprocess, 96.8ms inference, 41.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 person, 100.4ms
Speed: 1.9ms preprocess, 100.4ms inference, 37.2ms postprocess per image at sha