In [3]:
import ultralytics
import cv2
import pandas as pd
from ultralytics import YOLO
import numpy as np
from shapely.geometry import Point, Polygon
from PIL import Image, ImageDraw, ImageFont
import supervision as sv

segmentation_model = YOLO('models/platform-segmentation-model.pt')
pose_model = YOLO('models/yolov8x-pose.pt')
video_path = 'dataset/platform-crossing/3_jump.mp4'
image_path = 'frames/platform-crossing-3-jump/frame_149.jpg'


In [4]:
def is_point_inside_polygon(point, polygon):
    point = Point(point)
    polygon = Polygon(polygon)
    return polygon.contains(point)

def callback(image_slice: np.ndarray) -> sv.KeyPoints:
    result = pose_model(image_slice)[0]
    return sv.KeyPoints.from_ultralytics(result)
slicer = sv.InferenceSlicer(callback = callback)

def use_sahi(image):
    detections = slicer(image)
    points = []

    for detection in detections.xyxy:
        left_bottom_corner = (int(detection[0]), int(detection[3]))
        right_bottom_corner = (int(detection[2]), int(detection[3]))  
        # print(left_bottom_corner, right_bottom_corner)
        cv2.circle(image, left_bottom_corner, 5, (0, 255, 0), -1)
        cv2.circle(image, right_bottom_corner, 5, (0, 255, 0), -1)
        points.append(left_bottom_corner)
        points.append(right_bottom_corner)
    # cv2.imwrite('output.jpg', image)

    return points, image
 

def check_boundaries(image, warning_mask, danger_mask):
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 3
    color = (255, 0, 0)  # BGR color format
    thickness = 2
    
    leg_points, image = use_sahi(image)
    if len(leg_points) == 0:
        return image
    for point in leg_points:
        # cv2.circle(image, tuple(point.astype(np.int32)), 5, (0, 255, 0), -1)
        # is_inside_safe = is_point_inside_polygon(point, safe_mask)
        
        is_inside_warning = is_point_inside_polygon(point, warning_mask)
        is_inside_danger = is_point_inside_polygon(point, danger_mask)
        
        if is_inside_warning:
            cv2.putText(image, 'Warning', (50, 50), font, font_scale, color, thickness)
            print('Warning')
            # cv2.circle(image, tuple(point.astype(np.int32)), 5, (255, 255, 0), -1)
        elif is_inside_danger:
            cv2.putText(image, 'Danger', (50, 200), font, font_scale, color, thickness)
            print('Danger')
            # cv2.circle(image, tuple(point.astype(np.int32)), 5, (255, 0, 0), -1)
        # elif is_inside_safe:
        #     cv2.circle(image, tuple(point.astype(np.int32)), 5, (0, 255, 0), -1)
    return image



In [5]:
# Open the video file
video = cv2.VideoCapture(video_path)
fps = video.get(cv2.CAP_PROP_FPS)
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
# Read the first frame
ret, frame = video.read()

# Check if the frame was successfully read
if ret:
    # Display the first frame
    result = segmentation_model.predict(frame, save=True)[0]
else:
    print('Error reading the video file.')

# Release the video file
video.release()



Speed: 3.6ms preprocess, 680.6ms inference, 706.7ms postprocess per image at shape (1, 3, 384, 640)
Results saved to [1mruns\segment\predict15[0m


In [6]:
classes = np.array(result.boxes.cls)
# safe_index = np.where(classes == 0)[0]
warning_index = np.where(classes == 2)[0]
danger_index = np.where(classes == 1)[0]

# safe_mask = result.masks.xy[safe_index[0]].astype(np.int32)
warning_mask = result.masks.xy[warning_index[0]].astype(np.int32)
danger_mask = result.masks.xy[danger_index[0]].astype(np.int32)
# scene = image.copy()
# for i in range(len(results[0].masks.xy)):
#     mask = results[0].masks.xy[0].astype(np.int32)
#     polygon = sv.PolygonZone(mask)
#     # scene = image.copy()
#     image = sv.draw_polygon(scene, polygon.polygon, sv.Color(r=255, g=255, b=0), 2)
    
# with sv.ImageSink(target_dir_path='output') as sink:
#     sink.save_image(image=scene, image_name='platform_segments.jpg')



In [8]:
detections = slicer(frame)


0: 640x640 (no detections), 1323.0ms
Speed: 5.9ms preprocess, 1323.0ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1219.8ms
Speed: 4.5ms preprocess, 1219.8ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1219.8ms
Speed: 5.5ms preprocess, 1219.8ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1275.7ms
Speed: 5.0ms preprocess, 1275.7ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1250.0ms
Speed: 5.5ms preprocess, 1250.0ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1257.1ms
Speed: 5.5ms preprocess, 1257.1ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1286.7ms
Speed: 6.5ms preprocess, 1286.7ms inference, 1.5ms postprocess per image at shape (1, 3, 640, 640)

0: 640x256 (no detections), 553.1ms
Speed: 3.0ms prepr

AttributeError: 'KeyPoints' object has no attribute 'xyxy'

In [None]:
vertex_annotator = sv.VertexAnnotator(color=sv.Color.GREEN, radius=10)
annotated_frame = vertex_annotator.annotate(
    scene=frame.copy(),
    key_points=key_points
)

In [7]:
image = check_boundaries(frame, warning_mask, danger_mask)
cv2.imwrite('output.jpg', image)


0: 640x640 (no detections), 1305.6ms
Speed: 5.0ms preprocess, 1305.6ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1257.5ms
Speed: 7.2ms preprocess, 1257.5ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1277.2ms
Speed: 3.5ms preprocess, 1277.2ms inference, 2.5ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1288.8ms
Speed: 5.0ms preprocess, 1288.8ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1266.6ms
Speed: 6.0ms preprocess, 1266.6ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1264.5ms
Speed: 4.0ms preprocess, 1264.5ms inference, 0.5ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1240.0ms
Speed: 4.0ms preprocess, 1240.0ms inference, 1.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x256 (no detections), 566.3ms
Speed: 1.5ms prepr

AttributeError: 'KeyPoints' object has no attribute 'xyxy'

In [5]:
# import cv2
# from tqdm import tqdm

# # Path to the video file
# # video_path = 'dataset/platform-crossing/5_jump.avi'
# output_path = 'video_with_new_image.avi'
# results = pose_model.predict(video_path, stream=True, vid_stride=10)  # predict on an image

# # Open the video file
# frame_count = 0
# # Get the video properties



# # Create a tqdm progress bar
# progress_bar = tqdm(total=total_frames, desc='Processing Frames')
# fourcc = cv2.VideoWriter_fourcc(*'XVID')
# output_video = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# # Read the new image

# # Iterate over the frames in the video
# for pose_result in results:
#     frame = check_boundaries(pose_result, warning_mask, danger_mask)

#     # Write the modified frame to the output video
#     output_video.write(frame)
    
#     frame_count += 1
#     progress_bar.update(1)
# # Release the video file and the output video
# output_video.release()

In [6]:
import cv2

# Open the video file

cap = cv2.VideoCapture(video_path)

# Create a VideoWriter object to save the modified video
output_path = 'path_to_save_modified_video.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# Loop through each frame of the video
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    image = check_boundaries(frame, warning_mask, danger_mask)
    out.write(image)

# Release the video capture and writer objects
cap.release()
out.release()



0: 640x640 (no detections), 1947.8ms
Speed: 12.5ms preprocess, 1947.8ms inference, 0.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1884.0ms
Speed: 5.0ms preprocess, 1884.0ms inference, 1.5ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1740.0ms
Speed: 6.5ms preprocess, 1740.0ms inference, 5.2ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1917.6ms
Speed: 6.6ms preprocess, 1917.6ms inference, 1.4ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 person, 1913.2ms
Speed: 6.0ms preprocess, 1913.2ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1936.8ms
Speed: 7.5ms preprocess, 1936.8ms inference, 2.5ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 1845.0ms
Speed: 5.5ms preprocess, 1845.0ms inference, 2.3ms postprocess per image at shape (1, 3, 640, 640)

0: 640x256 (no detections), 852.1ms
Speed: 3.0ms prep

In [None]:
# bounding_box_annotator = sv.BoundingBoxAnnotator()
# annotated_frame = bounding_box_annotator.annotate(
#     scene=image.copy(),
#     detections=detections
# )

# with sv.ImageSink(target_dir_path='output') as sink:
#     sink.save_image(image=annotated_frame, image_name='platform_segments.jpg')