In [None]:
import cv2
import os
import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F
from PIL import Image
import numpy as np

# Open the video file
video_capture = cv2.VideoCapture('cctv_footage.mp4')

# Create a directory to save the frames as image files
output_directory = 'frames_output'
os.makedirs(output_directory, exist_ok=True)

frame_count = 0

# Initialize the object detection model
model = fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()

# Create a function to preprocess frames (similar to what you used for detection)
def preprocess_frame(frame):
    frame = cv2.resize(frame, (224, 224))
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame = frame.astype(np.float32)
    frame = frame / 127.5 - 1.0
    return frame

# Initialize an OrderedDict to store tracked objects and their corresponding trackers

tracked_objects = OrderedDict()

# Create an object tracker (e.g., KLT Tracker) for tracking persons
tracker = cv2.TrackerKLT_create()

while True:
    ret, frame = video_capture.read()
    if not ret:
        break

    frame_count += 1

    # Save the frame as an image file in the output directory
    frame_filename = os.path.join(output_directory, f'frame_{frame_count:04d}.jpg')
    cv2.imwrite(frame_filename, frame)

    # Preprocess the frame
    preprocessed_frame = preprocess_frame(frame)

    # Perform object detection with the model
    image_tensor = F.to_tensor(Image.fromarray(frame)).unsqueeze(0)
    with torch.no_grad():
        prediction = model(image_tensor)

    # Extract "person" class bounding boxes
    labels = prediction[0]['labels']
    boxes = prediction[0]['boxes']
    person_boxes = [boxes[i] for i in range(len(boxes)) if labels[i] == 1]

    # Update the trackers and track persons
    for object_id, (x, y, w, h) in list(tracked_objects.items()):
        success, new_box = tracker.update(preprocessed_frame)
        if success:
            tracked_objects[object_id] = new_box
        else:
            # If tracking fails, remove the object from tracking
            del tracked_objects[object_id]

    # Add new detections to the tracked objects
    for detection in person_boxes:
        x, y, w, h = map(int, detection)
        tracked_objects[len(tracked_objects)] = (x, y, w, h)
        # Initialize a tracker for the newly detected person
        tracker.init(preprocessed_frame, (x, y, w, h))

    # Draw bounding boxes for tracked persons
    for object_id, (x, y, w, h) in list(tracked_objects.items()):
        x, y, w, h = map(int, (x, y, w, h))
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

    # Display the frame with tracked persons
    cv2.imshow('Object Tracking', frame)

    # Press 'q' to exit the video
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
video_capture.release()
cv2.destroyAllWindows()


In [1]:
import cv2
import os
import torch
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F
from PIL import Image
import numpy as np

In [2]:
# Open the video file
video_capture = cv2.VideoCapture('cctv_footage.mp4')

# Create a directory to save the frames as image files
output_directory = 'frames_output'
os.makedirs(output_directory, exist_ok=True)

frame_count = 0


In [3]:
# Initialize the object detection model
model = fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()

# Create a function to preprocess frames (similar to what you used for detection)
def preprocess_frame(frame):
    frame = cv2.resize(frame, (224, 224))
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame = frame.astype(np.float32)
    frame = frame / 127.5 - 1.0
    return frame

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:00<00:00, 180MB/s]


In [6]:
from collections import OrderedDict
# Initialize an OrderedDict to store tracked objects and their corresponding trackers
tracked_objects = OrderedDict()


In [13]:
# Create an object tracker (e.g., KCF Tracker) for tracking persons
tracker = cv2.TrackerKCF_create()

while True:
    ret, frame = video_capture.read()
    if not ret:
        break

    frame_count += 1

    # Save the frame as an image file in the output directory
    frame_filename = os.path.join(output_directory, f'frame_{frame_count:04d}.jpg')
    cv2.imwrite(frame_filename, frame)

    # Preprocess the frame
    preprocessed_frame = preprocess_frame(frame)

    # Update the trackers and track persons
    for object_id, (x, y, w, h) in list(tracked_objects.items()):
        success, new_box = tracker.update(preprocessed_frame)
        if success:
            tracked_objects[object_id] = new_box
        else:
            # If tracking fails, remove the object from tracking
            del tracked_objects[object_id]

    # Perform object detection with the model
    with torch.no_grad():  # Corrected syntax here
        image_tensor = F.to_tensor(Image.fromarray(frame)).unsqueeze(0)
        prediction = model(image_tensor)

    # Extract "person" class bounding boxes
    labels = prediction[0]['labels']
    boxes = prediction[0]['boxes']
    person_boxes = [boxes[i] for i in range(len(boxes)) if labels[i] == 1]

    # Add new detections to the tracked objects and initialize trackers
    for detection in person_boxes:
        x, y, w, h = map(int, detection)
        tracked_objects[len(tracked_objects)] = (x, y, w, h)

        # Initialize a tracker for the newly detected person
        roi = (x, y, w, h)
        tracker.init(preprocessed_frame, roi)

    # Draw bounding boxes for tracked persons
    for object_id, (x, y, w, h) in list(tracked_objects.items()):
        x, y, w, h = map(int, (x, y, w, h))
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

    # Display the frame with tracked persons
    cv2.imshow('Object Tracking', frame)

    # Press 'q' to exit the video
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
video_capture.release()
cv2.destroyAllWindows()


error: ignored