In [1]:
import torch
import cv2
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt 
from moviepy import ImageSequenceClip
import os 

import seaborn as sns
from pathlib import Path
import os

# Set style for better visualizations
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)

TARGET_SIZE=(224,224)

In [2]:
dataset_path = "dataset_weight_estimation\\dataset_weight_estimation"
vid_carry = cv2.VideoCapture(dataset_path + "\\empty\\ope1\\carry\\angle3.mp4")
vid_walk  = cv2.VideoCapture(dataset_path + "\\empty\\ope1\\walk\\angle1.mp4")

In [48]:
def process_video(input_path, output_path, target_size=TARGET_SIZE, target_frames=1000):
    cap = cv2.VideoCapture(input_path)
    frames = []

    # Read frames and resize
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.resize(frame, target_size)
        frames.append(frame)
    cap.release()

    # Pad if needed
    if len(frames) < target_frames:
        last_frame = frames[-1]
        frames.extend([last_frame] * (target_frames - len(frames)))

    # Trim if video is longer
    frames = frames[:target_frames]

    # Save as video
    clip = ImageSequenceClip([cv2.cvtColor(f, cv2.COLOR_BGR2RGB) for f in frames], fps=25)
    clip.write_videofile(output_path, codec="libx264")

    #print(f"Saved processed video: {output_path}")


# Example
#process_video("dataset_weight_estimation\\dataset_weight_estimation\\empty\\ope1\\carry\\angle3.mp4","processed_empty_224_1000.mp4")

In [23]:
process_video("dataset_weight_estimation\\dataset_weight_estimation\\heavy\\ope1\\carry\\angle3.mp4","processed_heavy_224_1000.mp4")


MoviePy - Building video processed_heavy_224_1000.mp4.
MoviePy - Writing video processed_heavy_224_1000.mp4



                                                                           

MoviePy - Done !
MoviePy - video ready processed_heavy_224_1000.mp4
Saved processed video: processed_heavy_224_1000.mp4


## Preprocessing dataset
### shape (224,224) , frames 1000

In [42]:
DATA_DIR = "dataset_weight_estimation\\dataset_weight_estimation\\"
OUT_PATH = "Preprocessed_data"
os.makedirs(OUT_PATH)

In [None]:
for label in ["empty","heavy","light"]:
    for i in range(8) : 
        for j in range(3) :
            vid_path = DATA_DIR + f"\\{label}\\ope{i+1}\\carry\\angle{j+1}.mp4"
            out_path = f"\\{label}\\ope{i+1}\\carry"
            if not os.path.exists(OUT_PATH + out_path) : 
                os.makedirs(OUT_PATH+out_path)
            
            process_video(vid_path,OUT_PATH + out_path + f"\\angle{j+1}.mp4")

In [None]:
for label in ["empty","heavy","light"]:
    for i in range(8) : 
        for j in range(3) :
            vid_path = DATA_DIR + f"\\{label}\\ope{i+1}\\walk\\angle{j+1}.mp4"
            out_path = f"\\{label}\\ope{i+1}\\walk"
            if not os.path.exists(OUT_PATH + out_path) : 
                os.makedirs(OUT_PATH+out_path)
            process_video(vid_path,OUT_PATH + out_path + f"\\angle{j+1}.mp4")

In [25]:
### KEYPOINTS EXTRACTION
import torch
from random import random
import torchvision 
from ultralytics import YOLO
from tqdm import tqdm

In [None]:
# --- Configuration ---
VID = "processed_empty_224_1000.mp4" 
OUTPUT_VIDEO_PATH = "keypoint_visualization_yolo.mp4"
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# --- Load YOLOv11-Pose Model ---
try:
    print("Loading YOLOv11-pose model...")
    # Using 'yolov8n-pose.pt' as it's a standard small model. 
    # Replace with 'yolo11n-pose.pt' if you have that specific file.
    model = YOLO("yolo11n-pose.pt") 
    model.to(DEVICE)
    print(f"YOLO model loaded successfully on device: {DEVICE}")
except Exception as e:
    print(f"Error loading YOLO model: {e}")
    exit()

# --- Keypoint Drawing Utilities (No changes needed here) ---
KEYPOINT_DICT = {
    'nose': 0, 'left_eye': 1, 'right_eye': 2, 'left_ear': 3, 'right_ear': 4,
    'left_shoulder': 5, 'right_shoulder': 6, 'left_elbow': 7, 'right_elbow': 8,
    'left_wrist': 9, 'right_wrist': 10, 'left_hip': 11, 'right_hip': 12,
    'left_knee': 13, 'right_knee': 14, 'left_ankle': 15, 'right_ankle': 16
}
SKELETON_EDGES = [
    (KEYPOINT_DICT['left_shoulder'], KEYPOINT_DICT['right_shoulder']), (KEYPOINT_DICT['left_hip'], KEYPOINT_DICT['right_hip']),
    (KEYPOINT_DICT['left_shoulder'], KEYPOINT_DICT['left_hip']), (KEYPOINT_DICT['right_shoulder'], KEYPOINT_DICT['right_hip']),
    (KEYPOINT_DICT['left_shoulder'], KEYPOINT_DICT['left_elbow']), (KEYPOINT_DICT['left_elbow'], KEYPOINT_DICT['left_wrist']),
    (KEYPOINT_DICT['right_shoulder'], KEYPOINT_DICT['right_elbow']), (KEYPOINT_DICT['right_elbow'], KEYPOINT_DICT['right_wrist']),
    (KEYPOINT_DICT['left_hip'], KEYPOINT_DICT['left_knee']), (KEYPOINT_DICT['left_knee'], KEYPOINT_DICT['left_ankle']),
    (KEYPOINT_DICT['right_hip'], KEYPOINT_DICT['right_knee']), (KEYPOINT_DICT['right_knee'], KEYPOINT_DICT['right_ankle']),
    (KEYPOINT_DICT['nose'], KEYPOINT_DICT['left_eye']), (KEYPOINT_DICT['nose'], KEYPOINT_DICT['right_eye']),
    (KEYPOINT_DICT['left_eye'], KEYPOINT_DICT['left_ear']), (KEYPOINT_DICT['right_eye'], KEYPOINT_DICT['right_ear']),
]

def draw_keypoints(frame, keypoints, confidence_threshold=0.2):
    h, w, _ = frame.shape
    for kp in keypoints:
        y, x, conf = kp
        if conf > confidence_threshold:
            # Note: keypoints are already normalized [y, x], so we multiply by h, w
            cv2.circle(frame, (int(x * w), int(y * h)), 4, (0, 255, 0), -1)

def draw_skeleton(frame, keypoints, confidence_threshold=0.2):
    h, w, _ = frame.shape
    for start_idx, end_idx in SKELETON_EDGES:
        start_kp, end_kp = keypoints[start_idx], keypoints[end_idx]
        if start_kp[2] > confidence_threshold and end_kp[2] > confidence_threshold:
            start_point = (int(start_kp[1] * w), int(start_kp[0] * h))
            end_point = (int(end_kp[1] * w), int(end_kp[0] * h))
            cv2.line(frame, start_point, end_point, (255, 0, 0), 2)

def get_keypoints_from_result(result, frame_h, frame_w):
    """
    Extracts, normalizes, and formats keypoints from a YOLO result.
    Handles multi-person detection by picking the most confident person.
    
    Returns:
        np.ndarray: A NumPy array of shape (17, 3) with normalized [y, x, confidence].
    """
    if result.keypoints is None or len(result.keypoints.data) == 0:
        return np.zeros((17, 3), dtype=np.float32)

    kpts_tensor = result.keypoints.data
    
    # --- Handle multiple people: select the one with the highest avg confidence ---
    if kpts_tensor.shape[0] > 1:
        confidences = kpts_tensor[:, :, 2].mean(dim=1)
        best_person_idx = confidences.argmax()
        person_kpts = kpts_tensor[best_person_idx]
    else:
        person_kpts = kpts_tensor[0]

    # Convert to numpy and normalize
    person_kpts_np = person_kpts.cpu().numpy()
    
    # Format to [y, x, confidence] and normalize
    formatted_kps = np.zeros((17, 3), dtype=np.float32)
    for i in range(17):
        x, y, conf = person_kpts_np[i]
        formatted_kps[i] = [y / frame_h, x / frame_w, conf]
        
    return formatted_kps

def process_and_visualize_video(video_path: str, output_path: str, yolo_model):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return

    frame_width, frame_height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps, num_frames = int(cap.get(cv2.CAP_PROP_FPS)), int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    print(f"Processing video: {os.path.basename(video_path)}")
    for _ in tqdm(range(num_frames), desc="Visualizing with YOLO"):
        ret, frame = cap.read()
        if not ret: break

        # --- Run YOLO Inference ---
        # The model expects BGR frames, which cv2.read() provides
        results = yolo_model(frame, verbose=False)
        
        # --- Extract, normalize, and format keypoints ---
        # We process the first (and likely only) result
        keypoints = get_keypoints_from_result(results[0], frame_height, frame_width)

        # --- Drawing on the frame (this happens on CPU) ---
        vis_frame = frame.copy()
        draw_keypoints(vis_frame, keypoints)
        draw_skeleton(vis_frame, keypoints)
        out.write(vis_frame)

    cap.release()
    out.release()
    print(f"\nVisualization complete! Video saved to: {output_path}")

if __name__ == "__main__":
    
    process_and_visualize_video(VID, OUTPUT_VIDEO_PATH, model)

Loading YOLOv11-pose model...
YOLO model loaded successfully on device: cuda
Processing video: processed_empty_224_1000.mp4


Visualizing with YOLO: 100%|██████████| 1000/1000 [00:16<00:00, 61.69it/s]


Visualization complete! Video saved to: keypoint_visualization_yolo.mp4





In [41]:
### Background subtraction
VID = "processed_heavy_224_1000.mp4"
VID_NAME  = "processed_heavy_224_1000" 
cap = cv2.VideoCapture(VID)
ret,prev_frame = cap.read()

if not ret: 
    print(f"[ERROR] Cannot read first frame")
h,w = prev_frame.shape[:2]

bg = cv2.createBackgroundSubtractorMOG2(history=400,varThreshold=40,detectShadows=False)
prev_gray = cv2.cvtColor(prev_frame,cv2.COLOR_BGR2GRAY)

fg_masks = []
flows = []
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out_mask = cv2.VideoWriter(
    f"{VID_NAME}_fg_mask.mp4",
    fourcc,
    cap.get(cv2.CAP_PROP_FPS),
    (w, h),
    isColor=False
)
while True : 
    ret,frame = cap.read()
    if not ret : 
        break
    gray = cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)
    
    # ===== 1. Background Subtraction =====
    fg_mask = bg.apply(frame)
    _,fg_mask = cv2.threshold(fg_mask,250,255,cv2.THRESH_BINARY)
    fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN,
                                  cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2)))
    fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE,
                                  cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))) 
    out_mask.write(fg_mask)
    # ===== 2. Optical Flow =====
    flow = cv2.calcOpticalFlowFarneback(prev_gray, gray,
                                        None,
                                        0.5, 3, 15, 3, 5, 1.2, 0)

    # Save for deep learning
    fg_masks.append(fg_mask.astype(np.uint8))         # (H, W)
    flows.append(flow.astype(np.float32))            # (H, W, 2)

    prev_gray = gray
out_mask.release()
cap.release()
video_name = os.path.splitext(VID)[0]
np.save(os.path.join(".",f"{video_name}_fg.npy"), np.array(fg_masks))
np.save(os.path.join("." , f"{video_name}_flow.npy"), np.array(flows))

print("Saved processed data for video:" ,VID)


Saved processed data for video: processed_heavy_224_1000.mp4
