In [1]:
import os 
import math
import mediapipe as mp 
from yolov5 import YOLOv5
import cv2
import numpy as np 
import pandas as pd 
import warnings 
warnings.filterwarnings(action='ignore')

## Constant 

In [14]:
input_video = './20211118173000-20211118183000/car cam.mp4'
output_video = './20211118173000-20211118183000/car cam processed.mp4'

width,height = 1903,945
fbs = 10

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video,fourcc, fbs,(width, height))

worker_activity = {}  # Format: {person_id: {'standing': 0, 'sitting': 0, 'moving': 0}} # to track the activites for person 

landmark_data = [] # create landmarks to save the landmarks for csv file 
output_dir = './cropped_landmarks'

# for checking if this the first frame for the person
person_stabilization = {}  # Format: {person_id: stabilization_frames}


In [15]:

# Load YOLO model
yolo_model = YOLOv5("yolov5s.pt")  # Pre-trained YOLOv5 model

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Dictionary to store landmarks from the previous frame
prev_landmarks = {}

YOLOv5  2024-11-25 Python-3.11.5 torch-2.5.1+cpu CPU

Fusing layers... 
YOLOv5s summary: 270 layers, 7235389 parameters, 0 gradients, 16.6 GFLOPs
Adding AutoShape... 


In [4]:
# Function to compute Euclidean distance
def euclidean_distance(point1, point2):
    return math.sqrt((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2)

In [5]:
def classify_activity (landmarks, prev_landmarks, bbox_id):
    text = ''
    left_hip = landmarks[mp_pose.PoseLandmark.LEFT_HIP]
    right_hip = landmarks[mp_pose.PoseLandmark.RIGHT_HIP]
    left_knee = landmarks[mp_pose.PoseLandmark.LEFT_KNEE]
    right_knee = landmarks[mp_pose.PoseLandmark.RIGHT_KNEE]
    left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER]
    right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER]
    
    # Compute average positions for hips, knees, and shoulders
    avg_hip_y = (left_hip.y + right_hip.y) / 2
    avg_knee_y = (left_knee.y + right_knee.y) / 2
    avg_shoulder_y = (left_shoulder.y + right_shoulder.y) / 2
    current_shoulder = ((left_shoulder.x + right_shoulder.x) / 2, avg_shoulder_y)
    movement_detection = False
    if bbox_id in prev_landmarks:
        prev_shoulder = prev_landmarks[bbox_id]
        displacement = euclidean_distance(current_shoulder, prev_shoulder)
        if displacement >0.2:
            movement_detection = True
    prev_landmarks[bbox_id] = current_shoulder
        
    if movement_detection:
        text=  'moving'
    elif abs(avg_hip_y - avg_knee_y) < 0.1 and avg_hip_y > avg_shoulder_y:
        text= "sitting"
    elif avg_hip_y < avg_knee_y and avg_shoulder_y < avg_hip_y:
        text= 'standing'
    else:
        text= 'unknown'

    return text, prev_landmarks
    

In [6]:
def extract_landmark(landmarks,activity):
    left_hip = landmarks[mp_pose.PoseLandmark.LEFT_HIP]
    right_hip = landmarks[mp_pose.PoseLandmark.RIGHT_HIP]
    left_knee = landmarks[mp_pose.PoseLandmark.LEFT_KNEE]
    right_knee = landmarks[mp_pose.PoseLandmark.RIGHT_KNEE]
    left_shoulder = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER]
    right_shoulder = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER]
    return {'idx':idx,
                           "left_hip_x": left_hip.x,
                          'right_hip_x':right_hip.x,
                          'left_knee_x':left_knee.x,
                          'right_knee_x':right_knee.x,
                          'left_shoulder_x':left_shoulder.x,
                          'right_shoulder_x':right_shoulder.x,
                          "left_hip_y": left_hip.y,
                          'right_hip_y':right_hip.y,
                          'left_knee_y':left_knee.y,
                          'right_knee_y':right_knee.y,
                          'left_shoulder_y':left_shoulder.y,
                          'right_shoulder_y':right_shoulder.y,
                          "left_hip_z": left_hip.z,
                          'right_hip_z':right_hip.z,
                          'left_knee_z':left_knee.z,
                          'right_knee_z':right_knee.z,
                          'left_shoulder_z':left_shoulder.z,
                          'right_shoulder_z':right_shoulder.z,
                          "left_hip_v": left_hip.visibility,
                          'right_hip_v':right_hip.visibility,
                          'left_knee_v':left_knee.visibility,
                          'right_knee_v':right_knee.visibility,
                          'left_shoulder_v':left_shoulder.visibility,
                          'right_shoulder_v':right_shoulder.visibility,
                          
                          'avg_hip_y' : (left_hip.y + right_hip.y) / 2,
                          "avg_knee_y": (left_knee.y + right_knee.y) / 2,
                          'avg_shoulder_y':(left_shoulder.y + right_shoulder.y) / 2,
    
                           "label": activity}
    

In [7]:
def save_img(img, idx, dir_path):
    output_path = os.path.join(dir_path, f"person_{idx}.jpg")
    cv2.imwrite(output_path, cv2.cvtColor(img, cv2.COLOR_RGB2BGR))

In [16]:

# starting the video 
cap = cv2.VideoCapture(input_video)
while True:
    ret, frame = cap.read()
    frame = cv2.resize(frame, (width,height))
    if not ret:
        break
    # step one detect the human by YOLO & create the rectangle 
    detections = yolo_model.predict(frame)
    detections = detections.xyxy[0]
    for idx, (*box, conf, cls) in enumerate(detections.tolist()):
        x1, y1, x2, y2 = map(int, box)   # Bounding box coordinates
        if int(cls) == 0: # check if the object is human
            # cv2.rectangle(frame, (x1,y1),(x2,y2), (0,255,0),2)
            person_id = idx
            if person_id not in person_stabilization: # add the person if this is first time 
                person_stabilization[person_id] = 0
        
            # Step 2: Crop region for each person
            person_crop = frame[y1:y2,x1:x2]
            if person_crop.size>0:
                rgb_crop = cv2.cvtColor(person_crop, cv2.COLOR_BGR2RGB)
                results = pose.process(rgb_crop)
                
                # Draw pose landmarks on the cropped person, and save the landmarks, create activities and label it 
                if results.pose_landmarks:  
                    if person_stabilization[person_id]<5:
                        person_stabilization[person_id]+=1
                        continue
                        
                    landmarks = results.pose_landmarks.landmark
                    # Draw the landmarks
                    mp.solutions.drawing_utils.draw_landmarks(
                        frame[y1:y2, x1:x2], results.pose_landmarks, mp_pose.POSE_CONNECTIONS
                    )

                    
                    # make the activity 
                    activity, prev_landmarks = classify_activity(landmarks, prev_landmarks, idx)
                    label = f"{activity.upper()}"
                    cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

                    # Count the activity
                    if idx not in worker_activity:
                        worker_activity[idx]= {'standing': 0, 'sitting': 0, 'moving': 0, 'unknown':0}
                    worker_activity[idx][activity] +=1
                    
                    # save the img
                    save_img(rgb_crop, idx, output_dir)
                    
                    #extract the landmarks
                    landmark_data.append(extract_landmark(landmarks, activity))


    # Display the frame with bounding boxes and poses
    out.write(frame)
    cv2.imshow("Multi-Worker Activity Recognition", frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
out.release()
cv2.destroyAllWindows()
            

## Calculate Time

In [9]:
frame_rate = cap.get(cv2.CAP_PROP_FPS)
frame_to_minutes = 1/(fbs * 60)
workers_time = pd.DataFrame(worker_activity).T
workers_time *= frame_to_minutes


In [10]:
workers_time.sum()

standing    0.069444
sitting     0.031111
moving      0.008889
unknown     0.000556
dtype: float64

In [11]:
int(cap.get(cv2.CAP_PROP_FPS))

0

In [12]:
int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))

0

In [13]:
int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

0