NEW Approach - Tracking + Pose Estimation with bbox on elbow and wrist keypoint estimation with persistent person id's and log if no people in frame. 

In [None]:
from ultralytics import YOLO
import cv2
import numpy as np
import logging
import json
from collections import defaultdict

class ArmMovementTracker:
    def __init__(self, movement_threshold=13, frame_memory=15):
        self.trackers = {}
        self.movement_threshold = movement_threshold
        self.frame_memory = frame_memory

    def get_tracker(self, person_id):
        if person_id not in self.trackers:
            self.trackers[person_id] = {
                'prev_keypoints': {'left': {'wrist': None, 'elbow': None}, 'right': {'wrist': None, 'elbow': None}},
                'movement_counter': {'left': 0, 'right': 0},
                'active_action': None,
                'keypoint_history': {'left': [], 'right': []},
                'action_active': {'left': False, 'right': False}
            }
        return self.trackers[person_id]

    def calculate_keypoint_movement(self, person_id, side, wrist_pos, elbow_pos, frame_count):
        tracker = self.get_tracker(person_id)
        prev_keypoints = tracker['prev_keypoints']
        history = tracker['keypoint_history'][side]
        
        if frame_count % 3 == 0:
            current_keypoints = {'wrist': wrist_pos, 'elbow': elbow_pos, 'frame': frame_count}
            history.append(current_keypoints)
            if len(history) > self.frame_memory:
                history.pop(0)
            prev_keypoints[side]['wrist'] = wrist_pos
            prev_keypoints[side]['elbow'] = elbow_pos
        return history

    def check_significant_movement(self, history):
        if len(history) < 2:
            return False
        total_movement = 0
        for i in range(len(history) - 1):
            curr = history[i]
            next_frame = history[i + 1]
            wrist_movement = np.sqrt((curr['wrist'][0] - next_frame['wrist'][0])**2 + (curr['wrist'][1] - next_frame['wrist'][1])**2)
            elbow_movement = np.sqrt((curr['elbow'][0] - next_frame['elbow'][0])**2 + (curr['elbow'][1] - next_frame['elbow'][1])**2)
            total_movement += (wrist_movement + elbow_movement) / 2
        avg_movement = total_movement / (len(history) - 1)
        return avg_movement > self.movement_threshold

    def update_and_check_movement(self, person_id, side, wrist_pos, elbow_pos, frame_count):
        tracker = self.get_tracker(person_id)
        history = self.calculate_keypoint_movement(person_id, side, wrist_pos, elbow_pos, frame_count)
        if frame_count % 3 == 0 and len(history) >= 2:
            if self.check_significant_movement(history):
                tracker['movement_counter'][side] += 1
                if tracker['movement_counter'][side] >= self.frame_memory:
                    tracker['action_active'][side] = True
            else:
                tracker['movement_counter'][side] = 0
                tracker['action_active'][side] = False
        return tracker['action_active'][side]

def calculate_iou_rotated(points1, points2):
    pts1 = np.array(points1, dtype=np.float32)
    pts2 = np.array(points2, dtype=np.float32)
    x_min = min(np.min(pts1[:, 0]), np.min(pts2[:, 0]))
    y_min = min(np.min(pts1[:, 1]), np.min(pts2[:, 1]))
    x_max = max(np.max(pts1[:, 0]), np.max(pts2[:, 0]))
    y_max = max(np.max(pts1[:, 1]), np.max(pts2[:, 1]))
    w = int((x_max - x_min) * frame_width)
    h = int((y_max - y_min) * frame_height)
    if w <= 0 or h <= 0:
        return 0.0
    mask1 = np.zeros((h, w), dtype=np.uint8)
    mask2 = np.zeros((h, w), dtype=np.uint8)
    pts1_mask = (pts1 - [x_min, y_min]) * [w, h]
    pts2_mask = (pts2 - [x_min, y_min]) * [w, h]
    cv2.fillPoly(mask1, [pts1_mask.astype(np.int32)], 1)
    cv2.fillPoly(mask2, [pts2_mask.astype(np.int32)], 1)
    intersection = np.logical_and(mask1, mask2).sum()
    union = np.logical_or(mask1, mask2).sum()
    return intersection / union if union > 0 else 0.0

# Load models
tracker_model = YOLO("yolo11n.pt")
pose_model = YOLO("yolo11n-pose.pt")
video_path = "TACO BELL demo 2.mp4"

# Video setup
cap = cv2.VideoCapture(video_path)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
out = cv2.VideoWriter("output-1234.mp4", cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))

# Define pose connections
skeleton = [(5, 7), (7, 9), (6, 8), (8, 10), (5, 6), (5, 11), (6, 12), (11, 12), (11, 13), (13, 15), (12, 14), (14, 16)]

# Initialize trackers and logs
movement_tracker = ArmMovementTracker(movement_threshold=7, frame_memory=15)
logging.basicConfig(filename="action_log.txt", level=logging.INFO, format="%(message)s")

# Load class labelsf
with open('class.json', 'r') as f:
    class_data = json.load(f)
    categories = {cat['id']: cat['name'] for cat in class_data['categories']}

# Load object coordinates
boxes = []
with open('cord.txt', 'r') as f:
    for line in f:
        values = list(map(float, line.strip().split()))
        class_id = int(values[0])
        points = [(values[i], values[i + 1]) for i in range(1, len(values)-1, 2)]
        xs = [p[0] for p in points]
        ys = [p[1] for p in points]
        x_center = sum(xs) / len(xs)
        y_center = sum(ys) / len(ys)
        width = max(xs) - min(xs)
        height = max(ys) - min(ys)
        boxes.append({
            'class_id': class_id,
            'x_center': x_center,
            'y_center': y_center,
            'width': width,
            'height': height,
            'points': points,
            'label': categories.get(class_id, f"Class_{class_id}")
        })
        print(f"Loaded polygon: class_id={class_id}, points={points}")

# Process video
frame_count = 0
cv2.namedWindow("Pose Estimation with Action Detection", cv2.WINDOW_NORMAL)

for track_result, pose_result in zip(tracker_model.track(video_path, stream=True, classes=[0], tracker="bytetrack.yaml"),
                                     pose_model(video_path, stream=True)):
    frame_count += 1
    frame = track_result.orig_img.copy()

    # Tracking: Get bounding boxes and track_ids
    tracked_people = {}
    if track_result.boxes is not None and track_result.boxes.id is not None:
        for box, track_id in zip(track_result.boxes.xyxy.cpu().numpy(), track_result.boxes.id.cpu().numpy()):
            x1, y1, x2, y2 = map(int, box)
            center = ((x1 + x2) / 2, (y1 + y2) / 2)
            tracked_people[int(track_id)] = {'box': (x1, y1, x2, y2), 'center': center}
        print(f"Frame {frame_count}: Tracked {len(tracked_people)} people")

    # Draw tracked boxes
    for track_id, data in tracked_people.items():
        x1, y1, x2, y2 = data['box']
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)

    # Draw object polygons
    for box in boxes:
        points_abs = [(int(x * frame_width), int(y * frame_height)) for x, y in box['points']]
        for i in range(len(points_abs)):
            cv2.line(frame, points_abs[i], points_abs[(i + 1) % len(points_abs)], (0, 255, 0), 2)
        cv2.putText(frame, box['label'], points_abs[0], cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)

    # Pose: Match to tracked people and assign PIDs
    if pose_result.keypoints is not None and len(pose_result.keypoints.data) > 0 and pose_result.keypoints.data.shape[1] > 0:

        print(f"Frame {frame_count}: Detected {len(pose_result.keypoints.data)} poses")
        poses = []
        for keypoints in pose_result.keypoints.data.cpu().numpy():
            valid_points = keypoints[keypoints[:, 2] > 0.5, :2]
            if len(valid_points) > 0:
                kp_center = np.mean(valid_points, axis=0)
                poses.append({'keypoints': keypoints, 'center': (kp_center[0], kp_center[1])})

        # Match poses to tracked people
        matched_tracks = {}
        used_track_ids = set()
        for pose in poses:
            min_dist = float('inf')
            matched_track_id = None
            for track_id, data in tracked_people.items():
                if track_id in used_track_ids:
                    continue
                dist = np.sqrt((pose['center'][0] - data['center'][0])**2 + (pose['center'][1] - data['center'][1])**2)
                if dist < min_dist and dist < 200:
                    min_dist = dist
                    matched_track_id = track_id
            if matched_track_id is not None:
                matched_tracks[matched_track_id] = pose
                used_track_ids.add(matched_track_id)

        # Assign PIDs and process actions
        current_pids = {}
        pid = 0
        for track_id in sorted(matched_tracks.keys()):
            if pid < len(tracked_people):
                current_pids[pid] = {'track_id': track_id, 'pose': matched_tracks[track_id]}
                keypoints = matched_tracks[track_id]['keypoints']
                
                # Draw keypoints and skeleton
                for i, kp in enumerate(keypoints):
                    if kp[2] > 0.5:
                        x, y = int(kp[0]), int(kp[1])
                        cv2.circle(frame, (x, y), 5, (0, 255, 0), -1)
                        cv2.putText(frame, str(i), (x + 5, y), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 255, 0), 1)
                for start_idx, end_idx in skeleton:
                    if keypoints[start_idx][2] > 0.5 and keypoints[end_idx][2] > 0.5:
                        start = (int(keypoints[start_idx][0]), int(keypoints[start_idx][1]))
                        end = (int(keypoints[end_idx][0]), int(keypoints[end_idx][1]))
                        cv2.line(frame, start, end, (0, 255, 255), 2)

                # Label the tracking box
                x1, y1, x2, y2 = tracked_people[track_id]['box']
                cv2.putText(frame, f"P{pid}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

                # Action detection
                body_part_boxes = []
                for arm_side, wrist_idx, elbow_idx in [('left', 9, 7), ('right', 10, 8)]:
                    if keypoints[wrist_idx][2] > 0.65 and keypoints[elbow_idx][2] > 0.65:
                        wrist = keypoints[wrist_idx][:2]
                        elbow = keypoints[elbow_idx][:2]
                        is_action_active = movement_tracker.update_and_check_movement(pid, arm_side, wrist, elbow, frame_count)
                        color = (0, 255, 0) if is_action_active else (0, 0, 255)
                        cv2.line(frame, (int(wrist[0]), int(wrist[1])), (int(elbow[0]), int(elbow[1])), color, 2)

                        min_x = max(0, min(wrist[0], elbow[0]) - 20)
                        min_y = max(0, min(wrist[1], elbow[1]) - 20)
                        max_x = min(frame_width, max(wrist[0], elbow[0]) + 20)
                        max_y = min(frame_height, max(wrist[1], elbow[1]) + 20)
                        arm_points = [
                            (min_x / frame_width, min_y / frame_height),
                            (max_x / frame_width, min_y / frame_height),
                            (max_x / frame_width, max_y / frame_height),
                            (min_x / frame_width, max_y / frame_height)
                        ]
                        body_part_boxes.append({
                            "box": [min_x, min_y, max_x, max_y],
                            "points": arm_points,
                            "name": f"P{pid}_{arm_side}_arm",
                            "hand": arm_side,
                            "person_id": pid
                        })
                        cv2.rectangle(frame, (int(min_x), int(min_y)), (int(max_x), int(max_y)), color, 2)

                # Check interactions with objects
                person_interactions = defaultdict(set)
                for box in boxes:
                    for body_part in body_part_boxes:
                        if keypoints[body_part['hand'] == 'left' and 9 or 10][2] > 0.5:
                            iou = calculate_iou_rotated(body_part["points"], box['points'])
                            if iou > 0.05:
                                person_id = body_part['person_id']
                                arm_side = body_part['hand']
                                tracker = movement_tracker.get_tracker(person_id)
                                if tracker['action_active'][arm_side]:
                                    person_interactions[person_id].add(box['label'])

                for person_id, interacted_objects in person_interactions.items():
                    for obj_label in interacted_objects:
                        action = f"P{person_id} working with {obj_label}"
                        for box in boxes:
                            if box['label'] == obj_label:
                                points_abs = [(int(x * frame_width), int(y * frame_height)) for x, y in box['points']]
                                action_x, action_y = points_abs[0][0], points_abs[0][1] + 20
                                cv2.putText(frame, action, (action_x, action_y), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)
                                logging.info(f"Frame {frame_count}: {action}, {len(tracked_people)} people")

                pid += 1

    else:
        print(f"Frame {frame_count}: No poses detected")
        logging.info(f"Frame {frame_count}: No action detected, 0 people")

    # Display the frame
    cv2.imshow("Pose Estimation with Action Detection", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):  # Press 'q' to quit
        break

    # Write the frame to output video
    out.write(frame)

# Cleanup
out.release()
cap.release()
cv2.destroyAllWindows()

In [3]:
import pandas as pd
from collections import defaultdict

with open('action_log.txt', 'r') as f:
    log_lines = f.readlines()

parsed_data = []
for line in log_lines:
    line = line.strip()
    if line:
        parts = line.split(": ", 1)
        frame_part = parts[0]
        rest = parts[1]
        
        frame_num = int(frame_part.split("Frame ")[1])
        
        # Handle both action and no-action cases
        if "No action detected" in rest:
            num_people = int(rest.split(", ")[1].split()[0])
            person_id = "None"
            station = "None"
        else:
            action_text, people_text = rest.split(", ", 1)
            num_people = int(people_text.split()[0])
            person_id = action_text.split(" working with ")[0]
            station = action_text.split(" working with ")[1]
        
        # Calculate timestamp
        total_seconds = frame_num // 10
        hours = total_seconds // 3600
        minutes = (total_seconds % 3600) // 60
        seconds = total_seconds % 60
        timestamp = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
        
        parsed_data.append({
            "Frame": frame_num,
            "Timestamp": timestamp,
            "Person": person_id,
            "Station": station,
            "NumPeople": num_people
        })

df = pd.DataFrame(parsed_data)

# Group actions by timestamp and person
grouped_actions = df.groupby(['Timestamp', 'Person'])['Station'].apply(lambda x: '/'.join(sorted(set(x)))).reset_index()
grouped_actions['Action'] = grouped_actions.apply(
    lambda row: f"{row['Person']} working with {row['Station']}" if row['Person'] != "None" else "No action detected",
    axis=1
)

# Aggregate by timestamp
result = df.groupby('Timestamp').agg({
    'NumPeople': 'max',
}).reset_index()

actions_by_timestamp = grouped_actions.groupby('Timestamp')['Action'].apply(list).reset_index()
result = result.merge(actions_by_timestamp, on='Timestamp')

# Format result
result.columns = ['Timestamp', 'Number of Unique People', 'Actions']
result['Timestamp'] = pd.to_timedelta(result['Timestamp'])
result = result.sort_values('Timestamp')
result['Timestamp'] = result['Timestamp'].apply(lambda x: f"{x.components.hours:02d}:{x.components.minutes:02d}:{x.components.seconds:02d}")

print(result)
result.to_csv('action_summary.csv', index=False)

    Timestamp  Number of Unique People               Actions
0    00:00:21                        1  [No action detected]
1    00:00:26                        1  [No action detected]
2    00:00:27                        1  [No action detected]
3    00:00:28                        1  [No action detected]
4    00:00:29                        1  [No action detected]
..        ...                      ...                   ...
190  00:04:43                        1  [No action detected]
191  00:04:44                        1  [No action detected]
192  00:04:45                        0  [No action detected]
193  00:04:46                        0  [No action detected]
194  00:04:48                        2  [No action detected]

[195 rows x 3 columns]
