In [2]:
import cv2
import torch
from PIL import Image
from torchvision import transforms, models
import numpy as np
from deep_sort_realtime.deepsort_tracker import DeepSort

# Check for GPU and use it if available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load the pre-trained Faster R-CNN model from torchvision
model = models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
model.to(device)
model.eval()

# Define the video file path
video_path = r"cctv_feed_2.mp4"

# Initialize the video capture
cap = cv2.VideoCapture(video_path)

# Check if video opened successfully
if not cap.isOpened():
    print("Error: Could not open video.")
    exit()

# Get the video frame rate and size
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

# Initialize video writer
output_file = 'output_video.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_file, fourcc, fps, (frame_width, frame_height))

# Initialize DeepSORT with tuned parameters
tracker = DeepSort(max_age=30, n_init=3, nms_max_overlap=1.0, max_iou_distance=0.7, nn_budget=100)

# Frame selection logic
frame_count = 0
selected_frames = [int(fps / 3), int(fps * 2 / 3), int(fps - 1)]  # First, middle, and last frame indices in one second

# Custom ID mapping
id_mapping = {}
next_id = 1

# Initialize the transformation
transform = transforms.Compose([
    transforms.ToTensor(),
])

# Initialize dictionary to store trajectories
trajectories = {}

# Sectioning the video
sections = {
    'Sect1': ((1900, 10), (1000, 1050)),
    'Sect2': ((750, 400), (10, 1050)),
    'Sect3': ((900, 10), (10, 350))
}

def person_within_section(point, section_box):
    px, py = point
    (bx1, by1), (bx2, by2) = section_box
    return (bx2 <= px <= bx1) and (by1 <= py <= by2)

def process_frame(frame):
    global next_id

    # Convert the frame to a PIL image
    pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

    # Apply the transformation to the frame
    image = transform(pil_image)

    # Add a batch dimension and move to the device (GPU)
    image = image.unsqueeze(0).to(device)

    # Perform object detection
    with torch.no_grad():
        outputs = model(image)

    # Initializing section counts
    section_counts = {label: 0 for label in sections}

    # Extract the bounding boxes, labels, and scores
    boxes = outputs[0]['boxes'].cpu().numpy()
    labels = outputs[0]['labels'].cpu().numpy()
    scores = outputs[0]['scores'].cpu().numpy()

    # Prepare the detections for DeepSORT
    detections = []
    for i in range(len(boxes)):
        if scores[i] >= 0.5 and labels[i] == 1:  # 1 is the label for person
            box = boxes[i]
            x1, y1, x2, y2 = map(int, box)
            width, height = x2 - x1, y2 - y1
            detections.append((np.array([x1, y1, width, height]), scores[i]))

            centre_point = (x1 + width // 2, y1 + height // 2)
            cv2.circle(frame, centre_point, radius=5, color=(0, 255, 0), thickness=-1)

            for lab, ((top_left_x, top_left_y), (bottom_right_x, bottom_right_y)) in sections.items():
                cv2.rectangle(frame, (top_left_x, top_left_y), (bottom_right_x, bottom_right_y), color=(255, 0, 0), thickness=2)
                if person_within_section(centre_point, ((top_left_x, top_left_y), (bottom_right_x, bottom_right_y))):
                    section_counts[lab] += 1

    # Displaying sections along with section counts
    for lab, ((top_left_x, top_left_y), (bottom_right_x, bottom_right_y)) in sections.items():
        cv2.putText(frame, f'{lab} Count: {section_counts[lab]}', (top_left_x - 250, top_left_y + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)

    # Update the tracker
    tracks = tracker.update_tracks(detections, frame=frame)

    # Draw bounding boxes and unique IDs, update trajectories
    person_count = 0
    for track in tracks:
        if not track.is_confirmed() or track.time_since_update > 1:
            continue
        person_count += 1
        bbox = track.to_tlbr()  # Get the bounding box coordinates
        track_id = track.track_id  # Get the unique ID

        # Custom incremental ID mapping
        if track_id not in id_mapping:
            id_mapping[track_id] = next_id
            next_id += 1

        custom_id = id_mapping[track_id]

        x1, y1, x2, y2 = map(int, bbox)
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(frame, f'ID: {custom_id}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

        # Update trajectory
        centre_point = (x1 + (x2 - x1) // 2, y1 + (y2 - y1) // 2)
        if custom_id not in trajectories:
            trajectories[custom_id] = []
        trajectories[custom_id].append(centre_point)

    # Draw all trajectories
    for custom_id, points in trajectories.items():
        if points:
            # Mark the starting point
            start_point = points[0]
            cv2.circle(frame, start_point, radius=8, color=(255, 0, 0), thickness=-1)
        for i in range(1, len(points)):
            cv2.line(frame, points[i - 1], points[i], (0, 0, 255), 2)

    # Display the person count
    cv2.putText(frame, f'Total Persons: {person_count}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    return frame

# Main processing loop
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_number = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) - 1
    if frame_number % int(fps) in selected_frames:
        processed_frame = process_frame(frame)

        # Write the processed frame to the output video
        out.write(processed_frame)

        # Display the resulting frame
        cv2.imshow('Frame', processed_frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

# Release resources
cap.release()
out.release()
cv2.destroyAllWindows()

