In [8]:
#!pip install rfdetr

In [9]:
import cv2
import time
from rfdetr import RFDETRBase
from rfdetr.util.coco_classes import COCO_CLASSES


def process_video(video_path, output_path=None, threshold=0.5, show_video=True):

    allowed_classes = [3,4,6,8]

    # Initialize the model
    model = RFDETRBase()  # Default resolution is 800

    # Open the video file
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return

    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    frame_skip = 3
    fps_adjusted = fps/frame_skip

    # Initialize video writer if output path is provided
    out = None
    if output_path:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(output_path, fourcc, fps_adjusted, (width, height))

    # Variables for FPS calculation
    frame_count = 0
    start_time = time.time()
    fps_display = 0

    frame_count = 0

    # Process each frame
    while cap.isOpened() and frame_count <= 500:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % frame_skip == 0:
            # Convert frame to RGB (RFDETR might expect RGB format)
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Get detections
            results = model.predict(rgb_frame, threshold=threshold)

            # Extract boxes, scores, and labels
            boxes = results.xyxy
            scores = results.confidence
            labels = results.class_id

            class_mask = [label in allowed_classes for label in labels]

            boxes = boxes[class_mask]
            scores = scores[class_mask]
            labels = labels[class_mask]

            # Draw bounding boxes
            for i in range(len(boxes)):
                box = boxes[i]
                x1, y1, x2, y2 = map(int, box)
                score = scores[i]
                class_id = labels[i]

                label = f"{COCO_CLASSES[class_id]}: {score:.2f}"

                # Draw rectangle
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)

                # Draw label
                cv2.putText(
                    frame,
                    label,
                    (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX,
                    0.5,
                    (0, 255, 0),
                    1,
                )

            # Calculate and display FPS

            if frame_count % frame_skip*3 == 0:  # Update FPS every * frames
                end_time = time.time()
                elapsed_time = end_time - start_time
                fps_display = (frame_count / elapsed_time) / frame_skip

            # Display FPS on frame
            cv2.putText(
                frame,
                f"FPS: {fps_display:.2f}",
                (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.7,
                (0, 0, 255),
                2,
            )

            # Write to output video if specified
            if out:
                out.write(frame)

            # Display the frame
            if show_video:
                cv2.imshow("RFDETR Detection", frame)

                # Exit if 'q' is pressed
                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break

        frame_count += 1

    # Release resources
    cap.release()
    if out:
        out.release()
    if show_video:
      cv2.destroyAllWindows()


if __name__ == "__main__":
    # Example usage
    video_path = (
        "data/input/Video1.mp4"
    )
    output_path = "Video1a_output.mp4"

    process_video(video_path, output_path, threshold=0.5, show_video=False)

Loading pretrain weights


Model is not optimized for inference. Latency may be higher than expected. You can optimize the model for inference by calling model.optimize_for_inference().
