                                   BONUS TASK
This code uses the YOLOv3 object detection model (via OpenCV's DNN module) to count the maximum number of people detected in any frame of a given video. It defines a function load_yolo_model that loads the YOLOv3 configuration, weights, and COCO class names from specified file paths. It also extracts the output layer names required for inference.
Finally it runs the count_people_in_video function on a sample video and prints the estimated maximum number of people detected in a brawl.

In [19]:
import cv2
import numpy as np
from collections import deque

def load_yolo_model(cfg_path=r"C:\Users\Nitro 5\PycharmProjects\pythonProject1\darknet\cfg\yolov3.cfg", 
                    weights_path=r"C:\Users\Nitro 5\PycharmProjects\pythonProject1\yolov3.weights", 
                    names_path=r"C:\Users\Nitro 5\PycharmProjects\pythonProject1\darknet\data\coco.names"):
    net = cv2.dnn.readNetFromDarknet(cfg_path, weights_path)
    net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
    net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)
    
    layer_names = net.getLayerNames()
    output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers().flatten()]
    
    with open(names_path, "r") as f:
        classes = [line.strip() for line in f.readlines()]
    
    return net, output_layers, classes

def compute_iou(boxA, boxB):
    """Compute Intersection over Union (IoU) between two bounding boxes."""
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[0] + boxA[2], boxB[0] + boxB[2])
    yB = min(boxA[1] + boxA[3], boxB[1] + boxB[3])
    
    interArea = max(0, xB - xA) * max(0, yB - yA)
    boxAArea = boxA[2] * boxA[3]
    boxBArea = boxB[2] * boxB[3]
    
    iou = interArea / float(boxAArea + boxBArea - interArea + 1e-6)
    return iou

def detect_fight_in_frame(boxes, distance_threshold=60, iou_threshold=0.1):
    """
    Given a list of bounding boxes, determine which persons are close enough (or overlapping)
    to be flagged as potentially fighting.
    
    The function returns a set of indices for persons meeting at least one of the conditions.
    """
    fight_indices = set()
    centers = []
    
    for (x, y, w, h) in boxes:
        center = (x + w / 2, y + h / 2)
        centers.append(center)
    
    # Compare each pair of boxes
    for i in range(len(boxes)):
        for j in range(i + 1, len(boxes)):
            # Compute Euclidean distance between centers
            dist = np.linalg.norm(np.array(centers[i]) - np.array(centers[j]))
            # Compute IoU for overlap between bounding boxes
            iou = compute_iou(boxes[i], boxes[j])
            # Flag if they are either very close or overlapping significantly
            if dist < distance_threshold or iou > iou_threshold:
                fight_indices.add(i)
                fight_indices.add(j)
                
    return fight_indices

def count_fighting_people_in_video(video_path, conf_threshold=0.5, nms_threshold=0.4, 
                                   distance_threshold=60, iou_threshold=0.1, smoothing_window=5):
    """
    Process a video, detect persons in each frame, and use heuristics to determine
    which persons are potentially fighting. Temporal smoothing is applied to reduce
    fluctuations in the fighting count. Returns the maximum (smoothed) count of people
    detected as involved in a fight in any frame.
    """
    # Load YOLO model
    net, output_layers, classes = load_yolo_model()
    
    # Open video file
    cap = cv2.VideoCapture(video_path)
    max_fight_count = 0
    frame_count = 0

    # Queue for smoothing fighting count over a few frames
    fight_count_queue = deque(maxlen=smoothing_window)

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_count += 1
        height, width = frame.shape[:2]
        
        # Create a blob from the image
        blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), swapRB=True, crop=False)
        net.setInput(blob)
        outs = net.forward(output_layers)
        
        boxes = []
        confidences = []
        class_ids = []

        # Process YOLO outputs
        for out in outs:
            for detection in out:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]
                
                if confidence > conf_threshold and classes[class_id] == "person":
                    center_x, center_y, w, h = (detection[0:4] * np.array([width, height, width, height])).astype("int")
                    x = int(center_x - w / 2)
                    y = int(center_y - h / 2)
                    
                    boxes.append([x, y, w, h])
                    confidences.append(float(confidence))
                    class_ids.append(class_id)
        
        # Apply Non-Maximum Suppression
        indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, nms_threshold)
        final_boxes = [boxes[i] for i in indices.flatten()] if indices is not None else []

        # Use heuristics to determine fighting individuals
        fighting_indices = detect_fight_in_frame(final_boxes, distance_threshold, iou_threshold)
        fight_count = len(fighting_indices)
        
        # Add the current fighting count to the smoothing queue
        fight_count_queue.append(fight_count)
        # Compute the median fighting count over the smoothing window
        smoothed_fight_count = int(np.median(list(fight_count_queue)))
        
        if smoothed_fight_count > max_fight_count:
            max_fight_count = smoothed_fight_count

        # (Optional) Draw bounding boxes and annotate potential fighting persons
        for idx, box in enumerate(final_boxes):
            x, y, w, h = box
            color = (0, 0, 255) if idx in fighting_indices else (0, 255, 0)
            cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
        
        # Uncomment the following lines to display frames during processing:
        # cv2.imshow("Frame", frame)
        # if cv2.waitKey(1) & 0xFF == ord('q'):
        #     break

    cap.release()
    # cv2.destroyAllWindows()
    print(f"Processed {frame_count} frames. Maximum smoothed fighting people count in a frame: {max_fight_count}")
    return max_fight_count

# Example usage:
video_path = r"C:\Users\Nitro 5\PycharmProjects\pythonProject1\Test\Brawl_test\BrawlTest_55.mp4"
fighting_count = count_fighting_people_in_video(video_path)
print("Estimated maximum number of people involved in the brawl (smoothed):", fighting_count)


Processed 153 frames. Maximum smoothed fighting people count in a frame: 6
Estimated maximum number of people involved in the brawl (smoothed): 6


This code processes an input video to detect and annotate people using YOLOv3. It loads the YOLOv3 configuration, weights, and class names from specified file paths. It extracts the output layers required for inference.

In [20]:
import cv2
import numpy as np
import os
from collections import deque

def load_yolo_model(cfg_path, weights_path, names_path):
    # Load YOLOv3 network
    net = cv2.dnn.readNetFromDarknet(cfg_path, weights_path)
    net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
    net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)

    # Get the output layer names
    layer_names = net.getLayerNames()
    output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers().flatten()]

    # Load class names
    with open(names_path, "r") as f:
        classes = [line.strip() for line in f.readlines()]

    return net, output_layers, classes

def compute_iou(boxA, boxB):
    """Compute Intersection over Union (IoU) between two bounding boxes."""
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[0] + boxA[2], boxB[0] + boxB[2])
    yB = min(boxA[1] + boxA[3], boxB[1] + boxB[3])
    interArea = max(0, xB - xA) * max(0, yB - yA)
    boxAArea = boxA[2] * boxA[3]
    boxBArea = boxB[2] * boxB[3]
    iou = interArea / float(boxAArea + boxBArea - interArea + 1e-6)
    return iou

def detect_fight_in_frame(boxes, distance_threshold=60, iou_threshold=0.1):
    """
    Given a list of bounding boxes, return a set of indices for persons who are flagged
    as potentially fighting. A person is flagged if the distance between centers is less than
    distance_threshold or if the bounding boxes have significant overlap.
    """
    fight_indices = set()
    centers = []
    
    for (x, y, w, h) in boxes:
        center = (x + w/2, y + h/2)
        centers.append(center)
    
    for i in range(len(boxes)):
        for j in range(i + 1, len(boxes)):
            # Calculate Euclidean distance between centers
            dist = np.linalg.norm(np.array(centers[i]) - np.array(centers[j]))
            # Calculate IoU between bounding boxes
            iou = compute_iou(boxes[i], boxes[j])
            # Flag if they are close or overlapping
            if dist < distance_threshold or iou > iou_threshold:
                fight_indices.add(i)
                fight_indices.add(j)
                
    return fight_indices

def process_video(input_video_path, output_video_path, conf_threshold=0.5, nms_threshold=0.4,
                  distance_threshold=60, iou_threshold=0.1, smoothing_window=5):
    # Update paths accordingly
    cfg_path = r"C:\Users\Nitro 5\PycharmProjects\pythonProject1\darknet\cfg\yolov3.cfg"
    weights_path = r"C:\Users\Nitro 5\PycharmProjects\pythonProject1\yolov3.weights"
    names_path = r"C:\Users\Nitro 5\PycharmProjects\pythonProject1\darknet\data\coco.names"

    # Check if YOLO files exist
    for path in [cfg_path, weights_path, names_path]:
        if not os.path.exists(path):
            raise FileNotFoundError(f"File not found: {path}")

    net, output_layers, classes = load_yolo_model(cfg_path, weights_path, names_path)

    cap = cv2.VideoCapture(input_video_path)
    if not cap.isOpened():
        raise Exception("Error opening video file.")

    # Get video properties and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = cap.get(cv2.CAP_PROP_FPS)
    width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    print("VideoWriter object:", writer)

    # Queue for smoothing fighting count over a few frames
    fight_count_queue = deque(maxlen=smoothing_window)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Create blob and forward pass through the network
        blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), swapRB=True, crop=False)
        net.setInput(blob)
        outs = net.forward(output_layers)

        boxes = []
        confidences = []
        class_ids = []

        # Process detections
        for output in outs:
            for detection in output:
                scores = detection[5:]
                class_id = np.argmax(scores)
                confidence = scores[class_id]

                if confidence > conf_threshold and classes[class_id] == "person":
                    center_x = int(detection[0] * width)
                    center_y = int(detection[1] * height)
                    w = int(detection[2] * width)
                    h = int(detection[3] * height)

                    x = int(center_x - w / 2)
                    y = int(center_y - h / 2)
                    boxes.append([x, y, w, h])
                    confidences.append(float(confidence))
                    class_ids.append(class_id)

        # Apply Non-Maximum Suppression
        indices = cv2.dnn.NMSBoxes(boxes, confidences, conf_threshold, nms_threshold)
        final_boxes = [boxes[i] for i in indices.flatten()] if indices is not None and len(indices) > 0 else []

        # Determine fighting persons using heuristics
        fighting_indices = detect_fight_in_frame(final_boxes, distance_threshold, iou_threshold)

        total_persons = len(final_boxes)
        fighting_persons = len(fighting_indices)

        # Add the current fighting count to the queue for smoothing
        fight_count_queue.append(fighting_persons)
        # Compute the median count over the smoothing window
        smoothed_fight_count = int(np.median(list(fight_count_queue)))

        # Draw bounding boxes and annotate each person
        for idx, box in enumerate(final_boxes):
            x, y, w, h = box
            if idx in fighting_indices:
                color = (0, 0, 255)  # Red for potential fighters
                label = f"Fighter: {confidences[idx]:.2f}"
            else:
                color = (0, 255, 0)  # Green for bystanders
                label = f"Person: {confidences[idx]:.2f}"
            cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
            cv2.putText(frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # Draw counts on the frame (smoothed fighting count)
        cv2.putText(frame, f"Total Persons: {total_persons}", (10, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
        cv2.putText(frame, f"Fighting Persons: {smoothed_fight_count}", (10, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

        # Write the processed frame
        writer.write(frame)

        # Optionally display the frame (for debugging)
        # cv2.imshow("Detection", frame)
        # if cv2.waitKey(1) & 0xFF == ord('q'):
        #     break

    cap.release()
    writer.release()
    # cv2.destroyAllWindows()
    print("Processing complete. Output saved to:", output_video_path)

# Example usage:
input_video = r"C:\Users\Nitro 5\PycharmProjects\pythonProject1\Test\Brawl_test\BrawlTest_55.mp4"
output_video = r"BonusTaskSample_BrawlTest55.mp4"
process_video(input_video, output_video)


VideoWriter object: < cv2.VideoWriter 00000279C8DFBC50>
Processing complete. Output saved to: BonusTaskSample_BrawlTest55.mp4


In [2]:
!jupyter nbconvert --to notebook --inplace Brawl_Detector.ipynb



[NbConvertApp] Converting notebook Brawl_Detector.ipynb to notebook
[NbConvertApp] Writing 64081 bytes to Brawl_Detector.ipynb
