In [2]:
import cv2
from ultralytics import YOLO  # Ensure YOLO is imported correctly
import numpy as np
from utilities import is_inside, draw_detections_on_frame
from shapely.geometry import Point
from shapely.geometry.polygon import Polygon

In [3]:
class State:
    WAIT_FOR_PICKUP = 1
    WAIT_FOR_DROP = 2

def draw_sides(frame, region, sides, color):
    x_start, y_start, x_end, y_end = region[0][0], region[0][1], region[1][0], region[1][1]
    thickness = 2

    if 'left' in sides:
        cv2.line(frame, (x_start, y_start), (x_start, y_end), color, thickness)
    if 'right' in sides:
        cv2.line(frame, (x_end, y_start), (x_end, y_end), color, thickness)
    if 'top' in sides:
        cv2.line(frame, (x_start, y_start), (x_end, y_start), color, thickness)
    if 'bottom' in sides:
        cv2.line(frame, (x_start, y_end), (x_end, y_end), color, thickness)

def point_line_distance(point, line_start, line_end):
    """Calculate the minimum distance from a point to a line segment."""
    # Line vector
    line_vec = np.array(line_end) - np.array(line_start)
    # Point vector
    point_vec = np.array(point) - np.array(line_start)
    # Line length squared
    line_len2 = line_vec.dot(line_vec)
    # Project point onto the line using dot product
    projection = point_vec.dot(line_vec) / line_len2
    if projection < 0:
        projection = 0
    elif projection > 1:
        projection = 1
    # Find the closest point on the line segment
    closest_point = np.array(line_start) + projection * line_vec
    # Return the distance from the point to the closest point on the line
    return np.linalg.norm(closest_point - np.array(point))

def is_entering_from_side(box, region, sides, threshold=50):
    """Check if the center of a bounding box is within a threshold distance of a specified side of a region."""
    x_center, y_center = (box[0] + box[2]) / 2, (box[1] + box[3]) / 2
    
    side_centers = {
        'left': ((region[0][0], region[0][1]), (region[0][0], region[1][1])),
        'right': ((region[1][0], region[0][1]), (region[1][0], region[1][1])),
        'top': ((region[0][0], region[0][1]), (region[1][0], region[0][1])),
        'bottom': ((region[0][0], region[1][1]), (region[1][0], region[1][1]))
    }
    
    for side in sides:
        line_start, line_end = side_centers[side]
        if point_line_distance((x_center, y_center), line_start, line_end) < threshold:
            return True
    return False

def process_video(video_url, pickup_coords, drop_coords, pickup_sides, drop_sides, output_path):
    model = YOLO('handDetection.pt')
    cap = cv2.VideoCapture(video_url)
    if not cap.isOpened():
        raise ValueError(f"Couldn't open video stream from URL: {video_url}")

    width = 854
    height = 480
    original_fps = int(cap.get(cv2.CAP_PROP_FPS))

    frame_count = 0
    state = State.WAIT_FOR_PICKUP
    count = 0
    hand_was_in_drop = False
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            if frame_count % (original_fps // 5) == 0:  # limiting to 5 fps
                results = model(frame, conf=0.3, iou=0.2)
                hands_in_frame = [list(map(int, box_data[:4])) for box_data in results[0].boxes.data.cpu().numpy() if len(box_data) >= 4]

                hand_detected_in_pickup = any(is_inside(hand, pickup_coords) for hand in hands_in_frame)
                hand_detected_in_drop = any(is_inside(hand, drop_coords) for hand in hands_in_frame)
                entering_pickup = any(is_entering_from_side(hand, pickup_coords, pickup_sides) for hand in hands_in_frame)
                entering_drop = any(is_entering_from_side(hand, drop_coords, drop_sides) for hand in hands_in_frame)


                if state == State.WAIT_FOR_PICKUP and hand_detected_in_pickup and entering_pickup:
                    state = State.WAIT_FOR_DROP

                if state == State.WAIT_FOR_DROP and hand_detected_in_drop and entering_drop:
                    hand_was_in_drop = True

                if hand_was_in_drop and hand_detected_in_pickup and entering_pickup:
                    count += 1
                    state = State.WAIT_FOR_DROP
                    hand_was_in_drop = False


                for box_data in results[0].boxes.data.cpu().numpy():
                    frame = draw_detections_on_frame(frame, box_data, results[0].names)

                cv2.putText(frame, f"Cycle Count: {count}", (width - 250, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                cv2.putText(frame, f"Pickup: {hand_detected_in_pickup}", (width - 200, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                cv2.putText(frame, f"Drop: {hand_detected_in_drop}", (width - 200, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                cv2.putText(frame, f"Was in Drop: {hand_was_in_drop}", (width - 200, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                cv2.putText(frame, f"Entering Pickup: {entering_pickup}", (width - 200, 150), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                cv2.putText(frame, f"Entering Drop: {entering_drop}", (width - 200, 180), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                cv2.putText(frame, f"State: {state}", (width - 200, 210), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                cv2.rectangle(frame, pickup_coords[0], pickup_coords[1], (0, 255, 0), 2)
                cv2.rectangle(frame, drop_coords[0], drop_coords[1], (0, 255, 0), 2)
                draw_sides(frame, pickup_coords, pickup_sides, (0, 0, 255))
                draw_sides(frame, drop_coords, drop_sides, (0, 0, 255))
                
                out.write(frame)

            frame_count += 1

    finally:
        cap.release()
        out.release()
        cv2.destroyAllWindows()

# Example usage
video_url = 'videos/t3_resized2.mp4'
output_path = 'output/test_output.mp4'
pickup_coords = [[235, 204], [394, 327]]  # Define pickup ROI coordinates
drop_coords = [[404, 130], [602, 210]]  # Define drop ROI coordinates
pickup_sides = ["right", "bottom"]      # Define pickup sides to check
drop_sides = ["bottom"]                 # Define drop sides to check

process_video(video_url, pickup_coords, drop_coords, pickup_sides, drop_sides, output_path)


0: 384x640 1 Human hand, 5.5ms
Speed: 1.5ms preprocess, 5.5ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Human hand, 5.8ms
Speed: 1.3ms preprocess, 5.8ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 Human hands, 7.5ms
Speed: 1.6ms preprocess, 7.5ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 Human hands, 5.8ms
Speed: 1.5ms preprocess, 5.8ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 2 Human hands, 5.8ms
Speed: 1.2ms preprocess, 5.8ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Human hand, 6.5ms
Speed: 1.3ms preprocess, 6.5ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Human hand, 5.8ms
Speed: 1.5ms preprocess, 5.8ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Human hand, 5.6ms
Speed: 1.7ms preprocess, 5.6ms inference, 0.9ms postprocess per image