In [10]:
pip install torch torchvision transformers supervision tqdm opencv-python




In [11]:
import cv2
import numpy as np
import torch
from tqdm import tqdm
from collections import defaultdict, deque
from PIL import Image
from transformers import DetrImageProcessor, DetrForObjectDetection
import supervision as sv

# Download assets for video processing
from supervision.assets import VideoAssets, download_assets
download_assets(VideoAssets.VEHICLES)

# File paths and constants
SOURCE_VIDEO_PATH = "vehicles.mp4"
TARGET_VIDEO_PATH = "vehicles-result.mp4"
CONFIDENCE_THRESHOLD = 0.3
IOU_THRESHOLD = 0.5
MODEL_RESOLUTION = 1280
SOURCE = np.array([
    [1252, 787],
    [2298, 803],
    [5039, 2159],
    [-550, 2159]
])

TARGET_WIDTH = 25
TARGET_HEIGHT = 250
TARGET = np.array([
    [0, 0],
    [TARGET_WIDTH - 1, 0],
    [TARGET_WIDTH - 1, TARGET_HEIGHT - 1],
    [0, TARGET_HEIGHT - 1],
])

# Load the Swin Transformer model for object detection
processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50")
model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50")
model.eval()

# Initialize video processing
frame_generator = sv.get_video_frames_generator(source_path=SOURCE_VIDEO_PATH)
video_info = sv.VideoInfo.from_video_path(video_path=SOURCE_VIDEO_PATH)

# Annotation and tracking setup
thickness = sv.calculate_optimal_line_thickness(resolution_wh=video_info.resolution_wh)
text_scale = sv.calculate_optimal_text_scale(resolution_wh=video_info.resolution_wh)
bounding_box_annotator = sv.BoundingBoxAnnotator(thickness=thickness)
label_annotator = sv.LabelAnnotator(
    text_scale=text_scale,
    text_thickness=thickness,
    text_position=sv.Position.BOTTOM_CENTER
)
trace_annotator = sv.TraceAnnotator(
    thickness=thickness,
    trace_length=video_info.fps * 2,
    position=sv.Position.BOTTOM_CENTER
)

polygon_zone = sv.PolygonZone(polygon=SOURCE)
coordinates = defaultdict(lambda: deque(maxlen=video_info.fps))

# Perspective transformation setup
class ViewTransformer:
    def __init__(self, source: np.ndarray, target: np.ndarray) -> None:
        source = source.astype(np.float32)
        target = target.astype(np.float32)
        self.m = cv2.getPerspectiveTransform(source, target)

    def transform_points(self, points: np.ndarray) -> np.ndarray:
        if points.size == 0:
            return points
        reshaped_points = points.reshape(-1, 1, 2).astype(np.float32)
        transformed_points = cv2.perspectiveTransform(reshaped_points, self.m)
        return transformed_points.reshape(-1, 2)

view_transformer = ViewTransformer(source=SOURCE, target=TARGET)

# Process video and perform object detection
with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame in tqdm(frame_generator, total=video_info.total_frames):
        # Convert frame to PIL image for processing with Swin
        pil_frame = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        # Preprocess frame and perform inference
        inputs = processor(images=pil_frame, return_tensors="pt")
        with torch.no_grad():
            outputs = model(**inputs)

        # Convert outputs to detection format
        logits = outputs.logits[0]
        boxes = outputs.pred_boxes[0]
        scores = torch.softmax(logits, dim=-1)[:, :-1].max(dim=1).values
        labels = torch.argmax(logits[:, :-1], dim=1)

Downloading vehicles.mp4 assets 



  0%|          | 0/35345757 [00:00<?, ?it/s]

Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
100%|██████████| 538/538 [26:03<00:00,  2.91s/it]


In [18]:
# Process video and perform object detection
with sv.VideoSink(TARGET_VIDEO_PATH, video_info) as sink:
    for frame in tqdm(frame_generator, total=video_info.total_frames):
        # Convert frame to PIL image for processing with Swin
        pil_frame = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        # Preprocess frame and perform inference
        inputs = processor(images=pil_frame, return_tensors="pt")
        with torch.no_grad():
            outputs = model(**inputs)

        # Extract outputs and convert them into detections
        logits = outputs.logits[0]
        boxes = outputs.pred_boxes[0]
        scores = torch.softmax(logits, dim=-1)[:, :-1].max(dim=1).values
        labels = torch.argmax(logits[:, :-1], dim=1)

        # Collect detections
        detections = []
        for score, label, box in zip(scores, labels, boxes):
            if score > CONFIDENCE_THRESHOLD and label != 0:  # Filter detections
                x_min, y_min, x_max, y_max = box.tolist()
                detections.append(sv.Detection(
                    x_min=int(x_min * frame.shape[1]),
                    y_min=int(y_min * frame.shape[0]),
                    x_max=int(x_max * frame.shape[1]),
                    y_max=int(y_max * frame.shape[0]),
                    confidence=float(score),
                    class_id=int(label)
                ))

        sv_detections = sv.Detections(detections=detections)

        # Filter detections inside the polygon zone
        sv_detections = sv_detections[polygon_zone.trigger(sv_detections)]

        # Pass detections through the tracker
        byte_track = sv.ByteTrack(frame_rate=video_info.fps)
        tracked_detections = byte_track.update_with_detections(detections=sv_detections)

        # Calculate the detections position inside the target RoI
        points = tracked_detections.get_anchors_coordinates(anchor=sv.Position.BOTTOM_CENTER)
        points = view_transformer.transform_points(points=points).astype(int)

        # Store detections position for speed calculation
        for tracker_id, [_, y] in zip(tracked_detections.tracker_id, points):
            coordinates[tracker_id].append(y)

        # Format labels and calculate speeds
        labels = []
        for tracker_id in tracked_detections.tracker_id:
            if len(coordinates[tracker_id]) < video_info.fps / 2:
                labels.append(f"#{tracker_id}")
            else:
                # Calculate speed
                coordinate_start = coordinates[tracker_id][-1]
                coordinate_end = coordinates[tracker_id][0]
                distance = abs(coordinate_start - coordinate_end)
                time = len(coordinates[tracker_id]) / video_info.fps
                speed = distance / time * 3.6  # Convert to km/h
                labels.append(f"#{tracker_id} {int(speed)} km/h")

        # Annotate frame
        annotated_frame = frame.copy()
        annotated_frame = trace_annotator.annotate(
            scene=annotated_frame, detections=tracked_detections
        )
        annotated_frame = bounding_box_annotator.annotate(
            scene=annotated_frame, detections=tracked_detections
        )
        annotated_frame = label_annotator.annotate(
            scene=annotated_frame, detections=tracked_detections, labels=labels
        )

        # Write annotated frame to output video
        sink.write_frame(annotated_frame)

  0%|          | 0/538 [00:00<?, ?it/s]
