In [1]:
import cv2
from ultralytics import YOLO
import pandas as pd
import numpy as np
import os
import subprocess
from tqdm import tqdm
import supervision as sv
import datetime
import torch

In [2]:
def filter_tracks(centers, patience):
    """Function to filter track history"""
    filter_dict = {}
    for k, i in centers.items():
        d_frames = i.items()
        filter_dict[k] = dict(list(d_frames)[-patience:])
    return filter_dict

In [3]:
def update_tracking(centers_old,obj_center, thr_centers, lastKey, frame, frame_max):
    """Function to update track of objects"""
    is_new = 0
    lastpos = [(k, list(center.keys())[-1], list(center.values())[-1]) for k, center in centers_old.items()]
    lastpos = [(i[0], i[2]) for i in lastpos if abs(i[1] - frame) <= frame_max]
    # Calculating distance from existing centers points
    previous_pos = [(k,obj_center) for k,centers in lastpos if (np.linalg.norm(np.array(centers) - np.array(obj_center)) < thr_centers)]
    # if distance less than a threshold, it will update its positions
    if previous_pos:
        id_obj = previous_pos[0][0]
        centers_old[id_obj][frame] = obj_center
    # Else a new ID will be set to the given object
    else:
        if lastKey:
            last = lastKey.split('D')[1]
            id_obj = 'ID' + str(int(last)+1)
        else:
            id_obj = 'ID0'
        is_new = 1
        centers_old[id_obj] = {frame:obj_center}
        lastKey = list(centers_old.keys())[-1]
    return centers_old, id_obj, is_new, lastKey

## Generate Text File w/ Timestamps

In [4]:
def detect_pedestrians(video_path, target_dir):
    
    ### Configurations #Verbose during prediction
    verbose = False
    # Scaling percentage of original frame
    scale_percent = 100
    # model confidence level
    conf_level = 0.3
    # Threshold of centers ( old\new)
    thr_centers = 30
    # Number of max frames to consider a object lost
    frame_max = 50
    # Number of max tracked centers stored
    patience = 100
    # ROI area color transparency
    alpha = 0.3
    # ------------------------------------------------------- # Reading video with cv2
    video = cv2.VideoCapture(video_path)

    # Objects to detect Yolo
    class_IDS = [0, 1] # person and bike
    # Auxiliary variables
    centers_old = {}

    obj_id = 0
    end = []
#     frames_list = []
    count_p = 0
    lastKey = ''
    print(f'[INFO] - Verbose during Prediction: {verbose}')

    # Original information of video
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    fps = video.get(cv2.CAP_PROP_FPS)
    print('[INFO] - Original Dim: ', (width, height, fps))

    # Scaling Video for better performance
    print(scale_percent)
    if scale_percent != 100:
        print('[INFO] - Scaling change may cause errors in pixels lines ')
        width = int(width * scale_percent / 100)
        height = int(height * scale_percent / 100)
        print('[INFO] - Dim Scaled: ', (width, height))
    print(scale_percent)

    if '/' in video_path:
        video_name = video_path.split("/")[-1].split(".")[0]
    else:
        video_name = video_path.split(".")[0]

#     result_video_name = video_name + ".mp4"
    result_directory = target_dir
#     annotated_video = result_directory + "/Annotated_" + result_video_name
#     VIDEO_CODEC = "MP4V"

#     output_video = cv2.VideoWriter(annotated_video,
#                                    cv2.VideoWriter_fourcc(*VIDEO_CODEC),
#                                    fps, (width, height))
    
    dict_classes = model.model.names
    # rois = extract_roi_from_video(video_path=video_path, regions=regions)
    # roi_counts = {roi['name']: 0 for roi in rois}
    count_p_roi = 0
    
    # Keep track of when objects were detected
    object_time_tracker = {}
    
    for i in tqdm(range(int(video.get(cv2.CAP_PROP_FRAME_COUNT)))):  # Outer loop iterating through each frame
        # print(i)
        # _, frame = video.read()

        isFrame, frame = video.read()

        if not isFrame:
            break

        # for roi in rois:  # Inner loop iterating through each region of interest

        #     area_roi = [np.array(roi['polygon'], dtype=np.int32)]

        # x_range, y_range = roi['range']
        # ROI = frame[y_range[0]:y_range[1], x_range[0]:x_range[1]]

        if verbose:
            print('Dimension Scaled(frame): ', (frame.shape[1], frame.shape[0]))

        # y_hat = model.predict(frame, conf=conf_level, classes=class_IDS, device='cpu', verbose=False, tracker="bytetrack.yaml")
        y_hat = model.track(frame, persist=True, conf=conf_level, classes=class_IDS, iou=0.5, show=False, verbose=False, tracker="bytetrack.yaml")

        boxes = y_hat[0].boxes.xyxy.cpu().numpy()
        conf = y_hat[0].boxes.conf.cpu().numpy()
        classes = y_hat[0].boxes.cls.cpu().numpy()

        positions_frame = pd.DataFrame(
            np.concatenate(
                [boxes, conf.reshape(-1, 1), classes.reshape(-1, 1)]
                , axis=1
            ),
            columns=['xmin', 'ymin', 'xmax', 'ymax', 'conf', 'class']
        )

        labels = [dict_classes[i] for i in classes]

        for ix, row in enumerate(positions_frame.iterrows()):
            xmin, ymin, xmax, ymax, confidence, category, = row[1].astype('int')
            center_x, center_y = int(((xmax + xmin)) / 2), int((ymax + ymin) / 2)

            centers_old, id_obj, is_new, lastKey = update_tracking(
                centers_old,
                (center_x, center_y),
                thr_centers,
                lastKey,
                i, frame_max
            )
            
            # Save time stamps of objects in frame
            curr_frame_num = int(video.get(cv2.CAP_PROP_POS_FRAMES))
            obj_idx = str(dict_classes[category]) + "_" + str(id_obj)
            if(not(obj_idx in object_time_tracker)):
                object_time_tracker[obj_idx] = str(datetime.timedelta(seconds=curr_frame_num/fps))
            
            # roi_counts[roi['name']] += is_new

            # Draw rectangle around object (person, bicycle, etc...)
            cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 0, 255), 2)
            # for center_x, center_y in centers_old[id_obj].values():
            #     cv2.circle(frame, (center_x, center_y), 5, (0, 0, 255), -1) # Adds circle in person

            cv2.putText(
                img=frame,
                text=id_obj + ':' + str(np.round(conf[ix], 2)),
                org=(xmin, ymin - 10),
                fontFace=cv2.FONT_HERSHEY_TRIPLEX,
                fontScale=0.8,
                color=(0, 0, 255),
                thickness=1
            )

        # # Update count for the current ROI in the dictionary
        # # roi_counts[roi['name']] = count_p_roi
        # y_coordinate = 40
        # for region, person_count in roi_counts.items():
        #     cv2.putText(img=frame, text=f'Counts People in ROI {region}:{person_count}',
        #                 org=(30, y_coordinate), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
        #                 fontScale=1, color=(255, 0, 0), thickness=1)
        #     y_coordinate += 50

        centers_old = filter_tracks(centers_old, patience)
        # if verbose:
        #     print(counter_in, counter_out)

        overlay = frame.copy()
        # cv2.polylines(overlay, pts=area_roi, isClosed=True, color=(255, 0, 0), thickness=2)
        # cv2.fillPoly(overlay, area_roi, (255, 0, 0))
        frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)

#         frames_list.append(frame)
#         output_video.write(frame)
        
#     print(object_time_tracker)
    obj_tracker_file_ptr = open(result_directory + "/" + video_name+"_obj_timestamps", "w")
    obj_tracker_file_ptr.write(str(object_time_tracker))
    obj_tracker_file_ptr.close()

#     output_video.release()

    # for region, person_count in roi_counts.items():
    #     print(f"Number of persons moving {region} is {person_count}")
#     print(f"Annotated video saved at {annotated_video}")

In [6]:
# Device to run in
device = None
if(torch.cuda.is_available()):
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print("using", device, "device")

model = YOLO('yolov9e-seg.pt')
# use gpu for model
model.to(device)
# torch.cuda.synchronize()

# Run
# detect_pedestrians("/home/servicer/Documents/ouput_video.mp4", "/home/servicer/Documents")
detect_pedestrians("/home/servicer/Documents/ouput_video1.mp4", "/home/servicer/Documents")
# detect_pedestrians("./sources/ped_sample_vid.mp4", "./sources")
torch.cuda.synchronize()

using cuda device
[INFO] - Verbose during Prediction: False
[INFO] - Original Dim:  (2560, 1440, 24.95034939315925)
100
100


100%|██████████████████████████████████████████████████████████████████████████████████| 2226/2226 [01:16<00:00, 29.12it/s]


## Display Pedestrians Detected

In [7]:
def display_pedestrians(video_path, target_dir):
    
    ### Configurations #Verbose during prediction
    verbose = False
    # Scaling percentage of original frame
    scale_percent = 100
    # model confidence level
    conf_level = 0.3
    # Threshold of centers ( old\new)
    thr_centers = 30
    # Number of max frames to consider a object lost
    frame_max = 30
    # Number of max tracked centers stored
    patience = 100
    # ROI area color transparency
    alpha = 0.3
    # ------------------------------------------------------- # Reading video with cv2
    video = cv2.VideoCapture(video_path)

    # Objects to detect Yolo
    class_IDS = [0, 1] # person and bike
    # Auxiliary variables
    centers_old = {}

    obj_id = 0
    end = []
    count_p = 0
    lastKey = ''
    print(f'[INFO] - Verbose during Prediction: {verbose}')

    # Original information of video
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    fps = video.get(cv2.CAP_PROP_FPS)
    print('[INFO] - Original Dim: ', (width, height, fps))

    # Scaling Video for better performance
    print(scale_percent)
    if scale_percent != 100:
        print('[INFO] - Scaling change may cause errors in pixels lines ')
        width = int(width * scale_percent / 100)
        height = int(height * scale_percent / 100)
        print('[INFO] - Dim Scaled: ', (width, height))
    print(scale_percent)

    if '/' in video_path:
        video_name = video_path.split("/")[-1].split(".")[0]
    else:
        video_name = video_path.split(".")[0]

    result_video_name = video_name + ".mp4"
    result_directory = target_dir
    annotated_video = result_directory + "/Annotated_" + result_video_name
    VIDEO_CODEC = "MP4V"

    output_video = cv2.VideoWriter(annotated_video,
                                   cv2.VideoWriter_fourcc(*VIDEO_CODEC),
                                   fps, (width, height))
    
    dict_classes = model.model.names
    # rois = extract_roi_from_video(video_path=video_path, regions=regions)
    # roi_counts = {roi['name']: 0 for roi in rois}
    count_p_roi = 0
    
    # Keep track of when objects were detected
    object_time_tracker = {}
    
    for i in tqdm(range(int(video.get(cv2.CAP_PROP_FRAME_COUNT)))):  # Outer loop iterating through each frame
        # print(i)
        # _, frame = video.read()

        isFrame, frame = video.read()

        if not isFrame:
            break

        # for roi in rois:  # Inner loop iterating through each region of interest

        #     area_roi = [np.array(roi['polygon'], dtype=np.int32)]

        # x_range, y_range = roi['range']
        # ROI = frame[y_range[0]:y_range[1], x_range[0]:x_range[1]]

        if verbose:
            print('Dimension Scaled(frame): ', (frame.shape[1], frame.shape[0]))

        # y_hat = model.predict(frame, conf=conf_level, classes=class_IDS, device='cpu', verbose=False, tracker="bytetrack.yaml")
        y_hat = model.track(frame, persist=True, conf=conf_level, classes=class_IDS, iou=0.5, show=False, verbose=False, tracker="bytetrack.yaml")
        
#         # DISPLAY ###############
#         annotated_frame = y_hat[0].plot()
#         cv2.imshow('YOLOv9 Tracking', annotated_frame)

#         # Press 'q' to exit
#         if cv2.waitKey(1) & 0xFF == ord('q'):
#             break
#         ########################
        
        boxes = y_hat[0].boxes.xyxy.cpu().numpy()
        conf = y_hat[0].boxes.conf.cpu().numpy()
        classes = y_hat[0].boxes.cls.cpu().numpy()

        positions_frame = pd.DataFrame(
            np.concatenate(
                [boxes, conf.reshape(-1, 1), classes.reshape(-1, 1)]
                , axis=1
            ),
            columns=['xmin', 'ymin', 'xmax', 'ymax', 'conf', 'class']
        )

        labels = [dict_classes[i] for i in classes]

        for ix, row in enumerate(positions_frame.iterrows()):
            xmin, ymin, xmax, ymax, confidence, category, = row[1].astype('int')
            center_x, center_y = int(((xmax + xmin)) / 2), int((ymax + ymin) / 2)

            centers_old, id_obj, is_new, lastKey = update_tracking(
                centers_old,
                (center_x, center_y),
                thr_centers,
                lastKey,
                i, frame_max
            )
            
            # Save time stamps of objects in frame
            curr_frame_num = int(video.get(cv2.CAP_PROP_POS_FRAMES))
            obj_idx = str(dict_classes[category]) + "_" + str(id_obj)
            if(not(obj_idx in object_time_tracker)):
                object_time_tracker[obj_idx] = str(datetime.timedelta(seconds=curr_frame_num/fps))
            
            # roi_counts[roi['name']] += is_new

            # Draw rectangle around object (person, bicycle, etc...)
            cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 0, 255), 2)
            # for center_x, center_y in centers_old[id_obj].values():
            #     cv2.circle(frame, (center_x, center_y), 5, (0, 0, 255), -1) # Adds circle in person

            cv2.putText(
                img=frame,
                text=id_obj + ':' + str(np.round(conf[ix], 2)),
                org=(xmin, ymin - 10),
                fontFace=cv2.FONT_HERSHEY_TRIPLEX,
                fontScale=0.8,
                color=(0, 0, 255),
                thickness=1
            )

        # # Update count for the current ROI in the dictionary
        # # roi_counts[roi['name']] = count_p_roi
        # y_coordinate = 40
        # for region, person_count in roi_counts.items():
        #     cv2.putText(img=frame, text=f'Counts People in ROI {region}:{person_count}',
        #                 org=(30, y_coordinate), fontFace=cv2.FONT_HERSHEY_SIMPLEX,
        #                 fontScale=1, color=(255, 0, 0), thickness=1)
        #     y_coordinate += 50

        centers_old = filter_tracks(centers_old, patience)
        # if verbose:
        #     print(counter_in, counter_out)

        overlay = frame.copy()
        # cv2.polylines(overlay, pts=area_roi, isClosed=True, color=(255, 0, 0), thickness=2)
        # cv2.fillPoly(overlay, area_roi, (255, 0, 0))
        frame = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)
        output_video.write(frame)
        
#     print(object_time_tracker)
    obj_tracker_file_ptr = open(result_directory + "/" + video_name+"_obj_timestamps", "w")
    obj_tracker_file_ptr.write(str(object_time_tracker))
    obj_tracker_file_ptr.close()

    output_video.release()

    # for region, person_count in roi_counts.items():
    #     print(f"Number of persons moving {region} is {person_count}")
#     print(f"Annotated video saved at {annotated_video}")

In [8]:
# Device to run in
device = None
if(torch.cuda.is_available()):
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print("using", device, "device")

model = YOLO('yolov9e-seg.pt')
# use gpu for model
model.to(device)
# torch.cuda.synchronize()

# Run
# display_pedestrians("/home/servicer/Documents/ouput_video.mp4", "/home/servicer/Documents")
display_pedestrians("/home/servicer/Documents/ouput_video1.mp4", "/home/servicer/Documents")
# display_pedestrians("./sources/ped_sample_vid.mp4", "./sources")
torch.cuda.synchronize()

using cuda device


OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'


[INFO] - Verbose during Prediction: False
[INFO] - Original Dim:  (2560, 1440, 24.95034939315925)
100
100


100%|██████████████████████████████████████████████████████████████████████████████████| 2226/2226 [02:01<00:00, 18.38it/s]


In [27]:
!ls /home/servicer/Documents/

Annotated_ouput_video1.mp4    ouput_video_1.mp4_obj_timestamps
Annotated_ouput_video.mp4     ouput_video1_obj_timestamps
Annotated_ped_sample_vid.mp4  ouput_video.mp4
input.mp4		      ouput_video.mp4_obj_timestamps
ouput_video1.mp4


In [30]:
torch.cuda.synchronize()