In [34]:
import cv2
import time
import torch
import torchvision
from torchvision.models.detection import ssdlite320_mobilenet_v3_large, ssd300_vgg16
from torchvision.transforms import functional as F
from PIL import Image
from torchvision.models.detection.retinanet import _COCO_CATEGORIES
from collections import deque
import gc


In [35]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

device(type='cuda')

In [36]:
# model = ssdlite320_mobilenet_v3_large(pretrained=True)
model = ssd300_vgg16(pretrained=True)
model.eval()
model = model.to(device)



In [37]:
video_path = './data/lange_10.mp4'
output_path = './result/lange_10_SDD.mp4'

cap = cv2.VideoCapture(video_path)

frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))


In [38]:
cls_list = ['car', 'motorcycle', 'bus', 'truck']

def get_predictions(frames, model, device, threshold=0.4):
    transform = torchvision.transforms.ToTensor()
    frames_tensor = [transform(frame).to(device) for frame in frames]
    with torch.no_grad():
        outputs = model(frames_tensor)
    
    all_pred_boxes = []
    all_pred_classes = []
    all_pred_scores = []

    for output in outputs:
        boxes = output['boxes']
        labels = output['labels']
        scores = output['scores']

        pred_boxes = []
        pred_classes = []
        pred_scores = []
        for box, label, score in zip(boxes, labels, scores):
            if score > threshold and _COCO_CATEGORIES[label] in cls_list:
                pred_boxes.append(box)
                pred_classes.append(_COCO_CATEGORIES[label])
                pred_scores.append(score)

        all_pred_boxes.append(pred_boxes)
        all_pred_classes.append(pred_classes)
        all_pred_scores.append(pred_scores)

    return all_pred_boxes, all_pred_classes, all_pred_scores

In [39]:
batch_size = 32
frames_buffer = deque()
total_frames = 0
fps_start_time = time.time()

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    total_frames += 1
    frames_buffer.append(frame)

    if len(frames_buffer) == batch_size:
        frames_list = list(frames_buffer)
        boxes_batch, classes_batch, scores_batch = get_predictions(frames_list, model, device)

        for frame, boxes, cls, scores in zip(frames_list, boxes_batch, classes_batch, scores_batch):
            for box, cl, score in zip(boxes, cls, scores):
                x1, y1, x2, y2 = map(int, box.tolist())
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 6)
                label = f'{cl} {score:.2f}'
                cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 6)
            out.write(frame)

        frames_buffer.clear()

# Обработка оставшихся кадров
if len(frames_buffer) > 0:
    frames_list = list(frames_buffer)
    boxes_batch, classes_batch, scores_batch = get_predictions(frames_list, model, device)

    for frame, boxes, cls, scores in zip(frames_list, boxes_batch, classes_batch, scores_batch):
        for box, cl, score in zip(boxes, cls, scores):
            x1, y1, x2, y2 = map(int, box.tolist())
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 6)
            label = f'{cl} {score:.2f}'
            cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 6)
        out.write(frame)

fps_end_time = time.time()
average_fps = total_frames / (fps_end_time - fps_start_time)

print("Average FPS:", average_fps)

cap.release()
out.release()
cv2.destroyAllWindows()
torch.cuda.empty_cache()
gc.collect()


Average FPS: 1.3504207325220035


0