In [1]:
# Import necessary libraries
# Import library related to computer vision
import cv2
import os
import torch
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torch
import torchvision
import numpy as np

In [2]:
#load the video and convert to frames
video = cv2.VideoCapture('input_traffic-mini.mp4')

# frames per second
# Constant used to get property from vid
fps = video.get(cv2.CAP_PROP_FPS) #frames per second
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))

# loop to read thru video, ret is boolean to say if image
# was successfuly read, frame actually stores the image data
# delay frame is wait time between the frames read
for i in range(frame_count):
    ret, frame = video.read()    
    delay_frame = int(1000 / fps)
# release object, prevents memory leaks
video.release()


In [3]:
output_dir = os.getcwd()

In [4]:
#faster RCNN pre-trained model
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True) 

#classes
# Replace the pretrained with a new head to predict the classes
num_classes = 2 
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Define the optimizer and loss function
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
criterion = torch.nn.CrossEntropyLoss()

# Save the trained model
torch.save(model.state_dict(), 'output_dir')



In [5]:
num_classes = 2

# Load the pre-trained Faster R-CNN model
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

# Replace the pre-trained head with a new head that predicts the correct number of classes
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)

# Load the saved model weights
model.load_state_dict(torch.load('output_dir'))

# Set the model to evaluation mode
model.eval()

# Open the video file
cap = cv2.VideoCapture('input_traffic-mini.mp4')

# Get the video's frames per second (fps) and number of frames
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# Define the output video codec and create a VideoWriter object
# Change 'XVID' to 'mp4v' or 'avc1' for better compatibility with MP4
fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
output_video = cv2.VideoWriter('output_video_mini.mp4', fourcc, fps, (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

# Define the class IDs that correspond to vehicles (2 for cars, 3 for motorcycles,6 for bus,  8 for trucks)
vehicle_class_ids = [2, 3, 6, 8]

# Loop through each frame of the video
for i in range(frame_count):
    # Get the current frame
    ret, frame = cap.read()
    
    # Convert the frame to a tensor
    frame_tensor = torchvision.transforms.functional.to_tensor(frame)
    
    # Make a batch out of the single tensor
    batch = torch.unsqueeze(frame_tensor, 0)
    
    # Pass the batch through the model
    with torch.no_grad():
        predictions = model(batch)
        
    # Get the predicted bounding boxes, class labels, and scores for this frame
    boxes = predictions[0]['boxes'].numpy()
    labels = predictions[0]['labels'].numpy()
    scores = predictions[0]['scores'].numpy()
    
    # Filter out low-confidence predictions
    high_confidence = scores > 0.5
    boxes = boxes[high_confidence]
    labels = labels[high_confidence]
    scores = scores[high_confidence]
    
    # Draw the predicted bounding boxes on the frame
    for box, label, score in zip(boxes, labels, scores):
        box = [int(coord) for coord in box]
        cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2)
        cv2.putText(frame, f'{label} {score:.2f}', (box[0], box[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
    
    # Write the frame to the output video file
    output_video.write(frame)
    
    # Display the resulting image
    cv2.imshow('frame', frame)
    
    # Wait for the specified time between frames
    delay = int(1000 / fps)
    if cv2.waitKey(delay) & 0xFF == ord('q'):
        break

# Release the video capture object and close all windows
cap.release()
cv2.destroyAllWindows()