In [None]:
import os
import cv2
import torch
from ultralytics import YOLO 
import numpy as np 

In [None]:
class CrowdCounter:
    def _init_(self, model_path=None, person_class_id=0):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device for YOLO model: {self.device}")

        if model_path and os.path.exists(model_path):
            try:
                self.model = YOLO(model_path)
                print(f"YOLO model loaded successfully from {model_path}")
                if "yolov11" in model_path.lower():
                    print("Attempting to use YOLOv11.")
            except Exception as e:
                print(f"Error loading YOLO model from {model_path}: {e}. Please check the path and file format (e.g., .pt).")
                print("Attempting to load default YOLOv8n model as a fallback...")
                self.model = YOLO('yolov8n.pt') 
                print("Default YOLOv8n model loaded.")
        else:
            print(f"Warning: YOLO model not found at {model_path}. Loading default YOLOv8n model as a fallback.")
            self.model = YOLO('yolov8n.pt') 
            print("Default YOLOv8n model loaded.")
        
        self.person_class_id = person_class_id 
        print(f"Configured to count objects with class ID: {self.person_class_id} (typically 'person' in COCO).")

    def predict(self, image_np):
        results = self.model.predict(image_np, verbose=False) 

        pred_count = 0
        boxes_with_conf = []

        for r in results:
            for box in r.boxes:
                class_id = int(box.cls) 
                if class_id == self.person_class_id: 
                    pred_count += 1
                    x1, y1, x2, y2 = map(int, box.xyxy[0].tolist())
                    conf = float(box.conf)
                    boxes_with_conf.append({'box': (x1, y1, x2, y2), 'confidence': conf})
        
        return pred_count, boxes_with_conf

In [None]:
def run_video_inference(model_path, video_path, output_video_path, person_class_id=0):

    print(f"Attempting to load YOLO model from: {model_path}")
    counter = CrowdCounter(model_path=model_path, person_class_id=person_class_id)

    if not os.path.exists(video_path):
        print(f"Error: Video file not found at {video_path}")
        return

    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error: Could not open video file {video_path}. Please check the path and file format.")
        return

    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    print(f"Processing video: {video_path} ({frame_width}x{frame_height} @ {fps:.2f} FPS)")

    fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))

    if not out.isOpened():
        print(f"Error: Could not open video writer for {output_video_path}. Check file path, permissions, and codec support.")
        cap.release()
        return

    print(f"Saving output video to: {output_video_path}")

    prev_gray = None 
    hsv = None 

    while True:
        ret, frame = cap.read() 
        if not ret:
            print("End of video stream or error reading frame. Exiting.")
            break

        pred_count, detections = counter.predict(frame)

        current_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        if prev_gray is not None:
            flow = cv2.calcOpticalFlowFarneback(prev_gray, current_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

            mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])

            if hsv is None:
                hsv = np.zeros_like(frame)
                hsv[...,1] = 255 

            hsv[...,0] = ang * 180 / np.pi / 2 
            hsv[...,2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX) 

            rgb_flow = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

            frame = cv2.addWeighted(frame, 1, rgb_flow, 0.5, 0) 
        else:
            print("  Skipping optical flow for the first frame (no previous frame to compare).")

        prev_gray = current_gray

        for det in detections:
            x1, y1, x2, y2 = det['box']
            confidence = det['confidence']
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) 
            cv2.putText(frame, f'{confidence:.2f}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        count_text = f"Count: {pred_count}"
        cv2.putText(frame, count_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

        out.write(frame)

        cv2.imshow('YOLO Crowd Counting with Optical Flow', frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            print(" 'q' pressed. Exiting video stream.")
            break

    cap.release()
    out.release() 
    cv2.destroyAllWindows()
    print("Video processing completed and saved.")

In [None]:
if _name_ == '_main_':

    your_yolo_model_path = "C:\\Users\\Odwitiyo\\Downloads\\yolov8m.pt"  
    your_input_video_path = "C:\\Users\\Odwitiyo\\Downloads\\crowd_test_data\\1.mp4" 
    your_output_video_path = "yolo_crowd_counting_output_with_flow.mp4" 

    yolo_person_class_id = 0 
    run_video_inference(your_yolo_model_path, 
                        your_input_video_path,
                        your_output_video_path, 
                        yolo_person_class_id)