In [None]:
# Install YOLOv5 and dependencies
!pip install git+https://github.com/mikel-brostrom/Yolov5_DeepSort_Pytorch.git
!git clone https://github.com/ultralytics/yolov5.git
!pip install -r requirements.txt
%cd yolov5

In [None]:
import cv2
import os
import numpy as np
from deep_sort_realtime.deepsort_tracker import DeepSort
import torch
import IPython.display as ipd
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Please ensure the path of yolo and where it loaded and for the output file too
model = torch.hub.load('/kaggle/working/yolov5', 'yolov5s', source='local')

In [None]:
video_path = '/kaggle/input/correct-dataset-cogniable/'
output_dir = '/kaggle/working/output_videos'
os.makedirs(output_dir, exist_ok=True)
tracker = DeepSort(max_age=30, n_init=2)

In [None]:
def is_human_like(bbox):
    x_min, y_min, x_max, y_max = bbox
    width = x_max - x_min
    height = y_max - y_min
    aspect_ratio = height / width
    return 1.5 <= aspect_ratio <= 4.0

In [None]:
def perform_detection(frame):
    """Detects persons in a frame using YOLOv5 and filters out non-human detections."""
    results = model(frame)
    detections = []
    for det in results.xyxy[0].cpu().numpy():
        x_min, y_min, x_max, y_max, confidence, class_id = det
        if class_id == 0 and is_human_like([x_min, y_min, x_max, y_max]):  # Ensure detection is a human-like shape
            detections.append([x_min, y_min, x_max, y_max, confidence])
    return detections

In [4]:
# this is main process video where detection, tracking and writing of video is done for each frames is done
def process_video(video_file):
    video_name = os.path.splitext(os.path.basename(video_file))[0]
    video_output_dir = os.path.join(output_dir, video_name)
    os.makedirs(video_output_dir, exist_ok=True)
    
    persons_folder = os.path.join(video_output_dir, 'persons')
    os.makedirs(persons_folder, exist_ok=True)
    
    try:
        # Setup video capture and writer to write the output of the code
        cap = cv2.VideoCapture(video_file)
        if not cap.isOpened():
            raise ValueError(f"Error opening video file: {video_file}")
        output_video_path = os.path.join(video_output_dir, f'{video_name}_output.mp4')
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_video_path, fourcc, 20.0, (int(cap.get(3)), int(cap.get(4))))
        captured_images = {}
        person_id_counter = 1
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break

            detections = perform_detection(frame)
            formatted_detections = []
            for det in detections:
                x_min, y_min, x_max, y_max, confidence = det
                formatted_detections.append(([x_min, y_min, x_max, y_max], confidence))

            tracked_objects = tracker.update_tracks(formatted_detections, frame=frame)

            for track in tracked_objects:
                if not track.is_confirmed():
                    continue

                track_id = track.track_id
                bbox = track.to_tlbr()  # Get bounding box in [x_min, y_min, x_max, y_max] format
                cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (0, 255, 0), 2)
                cv2.putText(frame, f"ID: {track_id}", (int(bbox[0]), int(bbox[1]) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

                if track_id not in captured_images:
                    person_image = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
                    if person_image.size > 0:
                        person_folder = os.path.join(persons_folder, f'person{person_id_counter}')
                        os.makedirs(person_folder, exist_ok=True)
                        person_image_path = os.path.join(person_folder, f'person_{track_id}.jpg')
                        cv2.imwrite(person_image_path, person_image)

                        captured_images[track_id] = person_image_path
                        person_id_counter += 1

            out.write(frame)

    except Exception as e:
        print(f"An error occurred while processing {video_file}: {e}")
    finally:
        cap.release()
        out.release()
    # Display the video
    ipd.display(ipd.Video(output_video_path, width=800))

In [None]:
video_files = [os.path.join(video_path, f) for f in os.listdir(video_path) if f.endswith(('.mp4', '.avi', '.mov'))]
for i, video_file in enumerate(video_files):
    try:
        print(f"Working on Video named {i+1}: {video_file}")
        process_video(video_file)
    except Exception as e:
        print(f"Failed to process {video_file}: {e}")