# Deploying RF-DETR Model to SageMaker Using PyTorch

**Designed for use in Amazon SageMaker Studio Environment**


This notebook demonstrates end-to-end deployment of the RF-DETR object detection model to Amazon SageMaker using PyTorch. The workflow includes using the pre-trained rf-detr-large checkpoint, creating custom inference code, packaging the model artifact, and deploying to a real-time SageMaker endpoint with GPU acceleration.

This Notebook has been successfully tested in Amazon SageMaker Studio.


## Load Required Packages


In [1]:
# Use this version of pip to avoid compatibility issues
%pip install pip==24.0 -q


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install -r requirements-deploy.txt -q


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


## Initialize SageMaker Configuration


In [3]:
import sagemaker
import os

session = sagemaker.Session()
bucket = session.default_bucket()
region = session.boto_region_name

try:
    role = sagemaker.get_execution_role()
except Exception:
    role = os.environ.get("SAGEMAKER_ROLE")
    if role is None:
        raise RuntimeError(
            "SageMaker role not found. Set SAGEMAKER_ROLE env var when running locally."
        )

print(f"RoleArn: {role}")
print(f"Region: {region}")
print(f"Default S3 Bucket: {bucket}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
RoleArn: arn:aws:iam::676164205626:role/service-role/AmazonSageMaker-ExecutionRole-20221107T192794
Region: us-east-1
Default S3 Bucket: sagemaker-us-east-1-676164205626


## Compress and Copy Model Model Artifacts to Amazon S3


In [1]:
VARIANTS = {
    "nano": {"file": "rf-detr-nano.pth", "type": "rfdetr-nano", "resolution": "384"},
    "small": {"file": "rf-detr-small.pth", "type": "rfdetr-small", "resolution": "512"},
    "medium": {
        "file": "rf-detr-medium.pth",
        "type": "rfdetr-medium",
        "resolution": "576",
    },
    "base": {"file": "rf-detr-base.pth", "type": "rfdetr-base", "resolution": "560"},
    "large": {"file": "rf-detr-large.pth", "type": "rfdetr-large", "resolution": "560"},
}

In [None]:
%%time

from rfdetr import RFDETRLarge

# Download the model 1x
model = RFDETRLarge()
print(model.model_config)

In [None]:
%%time

# Programmatically create a model artifact (tar.gz) containing the chosen weights and upload to S3
import tarfile
import os

# Choose the weight file you want to package (one of the downloaded files)
model_name = VARIANTS["large"]["file"]
weights_path = os.path.join(os.getcwd(), model_name)
if not os.path.exists(weights_path):
    raise FileNotFoundError(f"Weights not found: {weights_path}")

artifact_path = "model.tar.gz"
with tarfile.open(artifact_path, "w:gz") as tar:
    tar.add(weights_path, arcname=model_name)

In [None]:
%%time

# Upload to SageMaker session default S3 bucket
model_s3_path = session.upload_data(
    path=artifact_path, bucket=bucket, key_prefix="pytorch_models_rf-detr"
)
print(f"Uploaded model artifact to: {model_s3_path}")

## Create the SageMaker Real-Time Endpoint


In [None]:
%%time

from sagemaker.pytorch import PyTorchModel
from sagemaker.deserializers import JSONDeserializer
from datetime import datetime, timezone
import sagemaker


# Reuse session from upload cell or create new one
if "session" not in globals():
    session = sagemaker.Session()
model_destination = f"s3://{bucket}/pytorch_models_rf-detr/model.tar.gz"

# Prefer the artifact uploaded earlier by the notebook (model_s3_path),
# Otherwise, fall back to the constructed model_destination
model_data = globals().get("model_s3_path", model_destination)

print(f"Using model_data: {model_data}")

image_uri = f"763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-inference:2.6.0-gpu-py312-cu124-ubuntu22.04-sagemaker-v1.56"

pytorch_model = PyTorchModel(
    model_data=model_data,
    role=role,
    image_uri=image_uri,
    entry_point="inference.py",
    source_dir="code",
    env={
        "RFDETR_VARIANT": "large",
        "RFDETR_CONF": "0.25",
    },
)

instance_type = "ml.g4dn.xlarge"

endpoint_name = (
    "rfdetr-"
    + VARIANTS["large"]["type"]
    + "-pytorch-"
    + str(datetime.now(timezone.utc).strftime("%Y-%m-%d-%H-%M-%S-%f"))
)

print(f"Deploying to endpoint: {endpoint_name}")

# Should take about 7-8 minutes to deploy
predictor = pytorch_model.deploy(
    initial_instance_count=1,
    instance_type=instance_type,
    endpoint_name=endpoint_name,
    deserializer=JSONDeserializer(),
)

## Real-time Inference


In [None]:
# You SageMaker real-time endpoint name
ENDPOINT_NAME = "rfdetr-large-pytorch-2025-12-29-21-19-19-286016"

### Simple Object Detection

Perform real-time inference on the sample image directory and display the results with bounding box visualization. We are not passing any additional optional parameters.


In [None]:
%%time

from sagemaker.serializers import IdentitySerializer
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
import json
import random
import glob
import os

# Use the predictor from the deploy cell above (already has correct endpoint and session)
# If reconnecting to an existing endpoint, uncomment and set endpoint_name:
from sagemaker.predictor import Predictor
from sagemaker.deserializers import JSONDeserializer

predictor = Predictor(
    endpoint_name=ENDPOINT_NAME,
    sagemaker_session=session,
    deserializer=JSONDeserializer(),
)

predictor.serializer = IdentitySerializer(content_type="image/jpeg")

base_dir = "sample_images"
out_dir = "sample_images_output"
os.makedirs(out_dir, exist_ok=True)

image_paths = sorted(
    glob.glob(os.path.join(base_dir, "*.jpg"))
    + glob.glob(os.path.join(base_dir, "*.jpeg"))
    + glob.glob(os.path.join(base_dir, "*.png"))
)

if not image_paths:
    raise FileNotFoundError(f"No images found in {base_dir}")

print(f"Found {len(image_paths)} images in {base_dir}")

font = ImageFont.load_default(size=24)
IMG_SIZE = int(VARIANTS["large"]["resolution"])


def resize_long_side(image: Image.Image, max_size: int = 640) -> Image.Image:
    w, h = image.size
    long_side = max(w, h)
    if long_side <= max_size:
        return image  # no upscaling
    scale = max_size / float(long_side)
    new_w, new_h = int(w * scale), int(h * scale)
    return image.resize((new_w, new_h), Image.Resampling.LANCZOS)


for image_path in image_paths:
    try:
        orig_image = Image.open(image_path).convert("RGB")
    except Exception as e:
        print(f"Skipping unreadable image: {image_path} - {e}")
        continue

    # Downscale client-side: long side = 560, keep aspect ratio
    send_image = resize_long_side(orig_image, IMG_SIZE)

    buffer = BytesIO()
    send_image.save(buffer, format="JPEG", quality=90)
    payload = buffer.getvalue()

    result = predictor.predict(payload)
    # print(json.dumps(result, indent=2))

    # ASSUMPTION: boxes are in the coordinates of send_image
    draw = ImageDraw.Draw(orig_image)

    send_w, send_h = send_image.size
    orig_w, orig_h = orig_image.size
    x_ratio = orig_w / send_w
    y_ratio = orig_h / send_h

    for det in result.get("detections", []):
        x1, y1, x2, y2 = det["box"]
        conf = det["confidence"]
        label = det["label"]

        # Scale from send_image coords back to original
        x1, x2 = int(x_ratio * x1), int(x_ratio * x2)
        y1, y2 = int(y_ratio * y1), int(y_ratio * y2)

        # color ranges: 10, 255 full, 10-100 dark, 150-225 light
        color = (
            random.randint(10, 100),
            random.randint(10, 100),
            random.randint(10, 100),
        )

        draw.rectangle([(x1, y1), (x2, y2)], outline=color, width=3)

        text = f"{label} ({int(conf * 100)}%)"
        draw.text((x1, max(0, y1 - 30)), text, fill=color, font=font)

    base_name = os.path.basename(image_path)
    name, ext = os.path.splitext(base_name)
    out_path = os.path.join(out_dir, f"{name}_detected{ext}")
    orig_image.save(out_path, quality=95)
    print(f"Saved: {out_path}")

print(f"Done. {len(image_paths)} images processed; results in: {out_dir}")

### Real-time Inference with JSON Format and Dynamic Confidence Threshold

The inference endpoint supports a JSON request format that allows you to dynamically specify the confidence threshold per request. This is useful for:

- Testing different confidence thresholds without redeploying
- Adjusting sensitivity based on use case
- A/B testing different threshold values

**JSON Request Format:**

```json
{
  "image": "base64_encoded_image_data",
  "confidence": 0.35 // Optional: defaults to 0.25 if not provided
}
```

The example below demonstrates how to use the JSON format with different confidence thresholds.


In [None]:
%%time

from sagemaker.serializers import JSONSerializer
from sagemaker.predictor import Predictor
from sagemaker.deserializers import JSONDeserializer
from PIL import Image
from io import BytesIO
import json
import base64
import glob
import os

# Connect to the existing endpoint
predictor = Predictor(
    endpoint_name=ENDPOINT_NAME,
    sagemaker_session=session,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

# Load a sample image
base_dir = "sample_images"
image_paths = glob.glob(os.path.join(base_dir, "*.jpg"))[:1]  # Test with first image
IMG_SIZE = int(VARIANTS["large"]["resolution"])

if not image_paths:
    raise FileNotFoundError(f"No images found in {base_dir}")

image_path = image_paths[0]
print(f"Testing with image: {image_path}")

# Load and prepare image
orig_image = Image.open(image_path).convert("RGB")


# Downscale client-side: long side = 560, keep aspect ratio
def resize_long_side(image: Image.Image, max_size: int = 560) -> Image.Image:
    w, h = image.size
    long_side = max(w, h)
    if long_side <= max_size:
        return image
    scale = max_size / float(long_side)
    new_w, new_h = int(w * scale), int(h * scale)
    return image.resize((new_w, new_h), Image.Resampling.LANCZOS)


send_image = resize_long_side(orig_image, IMG_SIZE)

buffer = BytesIO()
send_image.save(buffer, format="JPEG", quality=90)
image_bytes = buffer.getvalue()

# Test with different confidence thresholds
confidence_thresholds = [0.25, 0.5, 0.90, 0.94]

print(f"\nTesting with different confidence thresholds:\n")

for confidence in confidence_thresholds:
    # Create JSON payload with base64-encoded image and confidence
    payload = {
        "image": base64.b64encode(image_bytes).decode("utf-8"),
        "confidence": confidence,
    }

    # Send request
    result = predictor.predict(payload)

    detection_count = result.get("metadata", {}).get("count", 0)
    inference_time = result.get("metadata", {}).get("inference_time_ms", "N/A")

    print(
        f"Confidence: {confidence:.2f} | Detections: {detection_count:2d} | Inference time: {inference_time} ms"
    )

    # Print first detection for reference
    if result.get("detections"):
        first_det = result["detections"][0]
        print(
            f"  → First detection: {first_det['label']} ({first_det['confidence']:.3f})"
        )

print(
    f"\nNote: Higher confidence thresholds typically result in fewer but more confident detections."
)

### Advanced Filtering: Classes, Max Detections, and Minimum Box Area

In addition to dynamic confidence thresholds, the endpoint supports powerful post-processing filters:

1. **`classes`**: Filter to specific object classes (e.g., only "person", "car"). Set to `null` to return all classes.
2. **`max_detections`**: Limit the number of detections returned (returns top N by confidence)
3. **`min_box_area`**: Filter out small detections below a minimum bounding box area (in pixels²)

These filters are applied after inference, so they don't affect inference speed but can significantly reduce response payload size.

**Example JSON Request:**

```json
{
  "image": "base64_encoded_image",
  "confidence": 0.3,
  "classes": ["person", "car"], // Only return people and cars (or null for all)
  "max_detections": 10, // Return top 10 detections only
  "min_box_area": 1000 // Filter out boxes smaller than 1000 px²
}
```


In [None]:
%%time

from sagemaker.serializers import JSONSerializer
from sagemaker.predictor import Predictor
from sagemaker.deserializers import JSONDeserializer
from PIL import Image
from io import BytesIO
import json
import base64
import glob
import os

# Connect to the existing endpoint
predictor = Predictor(
    endpoint_name=ENDPOINT_NAME,
    sagemaker_session=session,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

# Load a sample image
base_dir = "sample_images"
image_paths = glob.glob(os.path.join(base_dir, "*.jpg"))[:1]
IMG_SIZE = int(VARIANTS["large"]["resolution"])

if not image_paths:
    raise FileNotFoundError(f"No images found in {base_dir}")

image_path = image_paths[0]
print(f"Testing with image: {image_path}\n")

# Load and prepare image
orig_image = Image.open(image_path).convert("RGB")


def resize_long_side(image: Image.Image, max_size: int = 560) -> Image.Image:
    w, h = image.size
    long_side = max(w, h)
    if long_side <= max_size:
        return image
    scale = max_size / float(long_side)
    new_w, new_h = int(w * scale), int(h * scale)
    return image.resize((new_w, new_h), Image.Resampling.LANCZOS)


send_image = resize_long_side(orig_image, IMG_SIZE)

buffer = BytesIO()
send_image.save(buffer, format="JPEG", quality=90)
image_bytes = buffer.getvalue()
image_b64 = base64.b64encode(image_bytes).decode("utf-8")

# Example 1: Return all classes (no class filtering)
print("=" * 80)
print("Example 1: Return all classes (classes=null)")
print("=" * 80)
payload1 = {
    "image": image_b64,
    "confidence": 0.25,
    "classes": None,  # Explicitly request all classes
}
result1 = predictor.predict(payload1)
print(f"Detections: {result1['metadata']['count']}")
print(f"Classes found: {list(set([d['label'] for d in result1['detections']]))}")
print()

# Example 2: Filter to specific classes only
print("=" * 80)
print("Example 2: Filter to specific classes (scissors, bottle, knife)")
print("=" * 80)
payload2 = {
    "image": image_b64,
    "confidence": 0.25,
    "classes": ["scissors", "bottle", "knife"],
}
result2 = predictor.predict(payload2)
print(f"Detections: {result2['metadata']['count']}")
print(f"Classes found: {list(set([d['label'] for d in result1['detections']]))}")
if result2["metadata"].get("applied_filters"):
    print(f"Applied filters: {result2['metadata']['applied_filters']}")
print()

# Example 3: Limit to top 5 detections
print("=" * 80)
print("Example 3: Limit to top 5 detections by confidence")
print("=" * 80)
payload3 = {
    "image": image_b64,
    "confidence": 0.2,  # Lower confidence to get more candidates
    "max_detections": 5,
}
result3 = predictor.predict(payload3)
print(f"Detections returned: {result3['metadata']['count']}")
if result3["metadata"].get("applied_filters"):
    print(f"Applied filters: {result3['metadata']['applied_filters']}")
if result3["detections"]:
    print(
        f"Top detection: {result3['detections'][0]['label']} ({result3['detections'][0]['confidence']:.3f})"
    )
print()

# Example 4: Filter by minimum box area
print("=" * 80)
print("Example 4: Filter out small detections (min_box_area=8000 px²)")
print("=" * 80)
payload4 = {"image": image_b64, "confidence": 0.25, "min_box_area": 8000}
result4 = predictor.predict(payload4)
print(f"Detections (large objects only): {result4['metadata']['count']}")
if result4["detections"]:
    for det in result4["detections"]:
        print(
            f"  - {det['label']}: {det['area']:.1f} px² (conf: {det['confidence']:.3f})"
        )
if result4["metadata"].get("applied_filters"):
    print(f"Applied filters: {result4['metadata']['applied_filters']}")
print()

# Example 5: Combine all filters
print("=" * 80)
print("Example 5: Combine all filters")
print("=" * 80)
payload5 = {
    "image": image_b64,
    "confidence": 0.2,
    "classes": ["scissors", "bottle", "knife"],
    "max_detections": 3,
    "min_box_area": 1000,
}
result5 = predictor.predict(payload5)
print(f"Final detections: {result5['metadata']['count']}")
print(f"Applied filters: {result5['metadata'].get('applied_filters', {})}")
for det in result5["detections"]:
    print(
        f"  - {det['label']}: area={det['area']:.1f}px², conf={det['confidence']:.3f}"
    )

print("\n" + "=" * 80)
print("Summary: Filters allow you to customize responses without redeploying!")
print("=" * 80)

In [None]:
from sagemaker.serializers import JSONSerializer
from sagemaker.predictor import Predictor
from sagemaker.deserializers import JSONDeserializer
from PIL import Image
from io import BytesIO
import base64
import supervision as sv
import numpy as np

# Load and prepare image
orig_image = Image.open("objects_small.jpg").convert("RGB")

IMG_SIZE = int(VARIANTS["large"]["resolution"])


def resize_long_side(image: Image.Image, max_size: int = 560) -> Image.Image:
    w, h = image.size
    long_side = max(w, h)
    if long_side <= max_size:
        return image
    scale = max_size / float(long_side)
    new_w, new_h = int(w * scale), int(h * scale)
    return image.resize((new_w, new_h), Image.Resampling.LANCZOS)


send_frame = resize_long_side(orig_image, IMG_SIZE)
buffer = BytesIO()
send_frame.save(buffer, format="JPEG", quality=90)

# Calculate scaling factors for box coordinates
scale_x = orig_image.size[0] / send_frame.size[0]
scale_y = orig_image.size[1] / send_frame.size[1]

# Prepare and send request
payload = {
    "image": base64.b64encode(buffer.getvalue()).decode("utf-8"),
    "confidence": 0.5,
}

predictor = Predictor(
    endpoint_name=ENDPOINT_NAME,
    sagemaker_session=session,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

result = predictor.predict(payload)
print(f"Detections: {result['metadata']['count']}")

# Rescale boxes to original image coordinates
boxes_rescaled = [
    [x1 * scale_x, y1 * scale_y, x2 * scale_x, y2 * scale_y]
    for x1, y1, x2, y2 in [d["box"] for d in result["detections"]]
]

# Create detections and labels
detections = sv.Detections(
    xyxy=np.array(boxes_rescaled),
    class_id=np.array([d["class_id"] for d in result["detections"]]),
    confidence=np.array([d["confidence"] for d in result["detections"]]),
)

labels = [f"{d['label']} {d['confidence'] * 100:.1f}%" for d in result["detections"]]

# Annotate image
annotated = sv.BoxAnnotator().annotate(
    sv.LabelAnnotator(smart_position=True).annotate(
        orig_image, detections=detections, labels=labels
    ),
    detections=detections,
)

# Save result
annotated.save("annotated_output.jpg")

## Video Object Detection with Amazon SageMaker


### Simple Video Object Detections


In [None]:
import os
import base64
from io import BytesIO

import numpy as np
from PIL import Image

import supervision as sv
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

SOURCE_VIDEO_PATH = "sample_video.mp4"
TARGET_VIDEO_PATH = "sample_video_annotated.mp4"
IMG_SIZE = int(VARIANTS["large"]["resolution"])

# Single, reused SageMaker predictor
predictor = Predictor(
    endpoint_name=ENDPOINT_NAME,
    sagemaker_session=session,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

mask_annotator = sv.MaskAnnotator(
    opacity=0.4,
)
box_annotator = sv.BoxAnnotator(
    thickness=2,
)
label_annotator = sv.LabelAnnotator(
    # text_scale=0.4,
    # text_padding=8,
    # text_thickness=0,
    smart_position=True,  # <– try to avoid overlapping labels
)


def resize_long_side(image: Image.Image, max_size: int = 560) -> Image.Image:
    w, h = image.size
    if max(w, h) <= max_size:
        return image
    scale = max_size / float(max(w, h))
    new_w, new_h = int(w * scale), int(h * scale)
    return image.resize((new_w, new_h), Image.Resampling.LANCZOS)


def callback(frame: np.ndarray, index: int) -> np.ndarray:
    if index % 50 == 0:
        print(f"[cb] frame {index}, shape={frame.shape}")

    h_orig, w_orig = frame.shape[:2]

    # BGR -> RGB PIL
    rgb = frame[:, :, ::-1]
    pil_frame = Image.fromarray(rgb)

    # Resize for inference
    send_frame = resize_long_side(pil_frame, IMG_SIZE)
    w_inf, h_inf = send_frame.size

    # Scale from inference coords -> original coords
    scale_x = w_orig / w_inf
    scale_y = h_orig / h_inf

    # JPEG buffer
    buf = BytesIO()
    send_frame.save(buf, format="JPEG", quality=90)

    payload = {
        "image": base64.b64encode(buf.getvalue()).decode("utf-8"),
        "confidence": 0.25,
    }

    result = predictor.predict(payload)
    detections_raw = result["detections"]

    if not detections_raw:
        return frame

    # RF‑DETR: box = [x1, y1, x2, y2] in resized space
    boxes_orig = []
    for d in detections_raw:
        x1, y1, x2, y2 = d["box"]
        boxes_orig.append(
            [
                x1 * scale_x,
                y1 * scale_y,
                x2 * scale_x,
                y2 * scale_y,
            ]
        )

    detections = sv.Detections(
        xyxy=np.array(boxes_orig),
        class_id=np.array([d["class_id"] for d in detections_raw]),
        confidence=np.array([d["confidence"] for d in detections_raw]),
    )

    labels = [f"{d['label']} {d['confidence'] * 100:.1f}%" for d in detections_raw]

    annotated = box_annotator.annotate(
        scene=frame.copy(),
        detections=detections,
    )
    annotated = label_annotator.annotate(
        scene=annotated,
        detections=detections,
        labels=labels,
    )

    return annotated


print("Starting video processing...")
sv.process_video(
    source_path=SOURCE_VIDEO_PATH,
    target_path=TARGET_VIDEO_PATH,
    callback=callback,
    show_progress=True,  # supported in recent versions [web:22][web:26]
)
print("Processing complete.")
print("Final file size (bytes):", os.path.getsize(TARGET_VIDEO_PATH))

### Video Object Detection with Counts


In [None]:
import os
import base64
from io import BytesIO

import cv2
import numpy as np
from PIL import Image

import supervision as sv
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

SOURCE_VIDEO_PATH = "sample_video.mp4"
TARGET_VIDEO_PATH = "sample_video_annotated.mp4"
IMG_SIZE = int(VARIANTS["large"]["resolution"])

# Single, reused SageMaker predictor
predictor = Predictor(
    endpoint_name=ENDPOINT_NAME,
    sagemaker_session=session,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

mask_annotator = sv.MaskAnnotator(
    opacity=0.4,
)
box_annotator = sv.BoxAnnotator(
    thickness=2,
)
label_annotator = sv.LabelAnnotator(
    # text_scale=0.4,
    # text_padding=8,
    # text_thickness=0,
    smart_position=True,  # <– try to avoid overlapping labels
)


def resize_long_side(image: Image.Image, max_size: int = 560) -> Image.Image:
    w, h = image.size
    if max(w, h) <= max_size:
        return image
    scale = max_size / float(max(w, h))
    new_w, new_h = int(w * scale), int(h * scale)
    return image.resize((new_w, new_h), Image.Resampling.LANCZOS)


def callback(frame: np.ndarray, index: int) -> np.ndarray:
    if index % 50 == 0:
        print(f"[cb] frame {index}, shape={frame.shape}")

    h_orig, w_orig = frame.shape[:2]

    # BGR -> RGB PIL
    rgb = frame[:, :, ::-1]
    pil_frame = Image.fromarray(rgb)

    # Resize for inference
    send_frame = resize_long_side(pil_frame, IMG_SIZE)
    w_inf, h_inf = send_frame.size

    # Scale from inference coords -> original coords
    scale_x = w_orig / w_inf
    scale_y = h_orig / h_inf

    # JPEG buffer
    buf = BytesIO()
    send_frame.save(buf, format="JPEG", quality=90)

    payload = {
        "image": base64.b64encode(buf.getvalue()).decode("utf-8"),
        "confidence": 0.5,
        # "classes": ["person"],
    }

    result = predictor.predict(payload)
    detections_raw = result["detections"]

    if not detections_raw:
        return frame

    # RF‑DETR: box = [x1, y1, x2, y2] in resized space
    boxes_orig = []
    for d in detections_raw:
        x1, y1, x2, y2 = d["box"]
        boxes_orig.append(
            [
                x1 * scale_x,
                y1 * scale_y,
                x2 * scale_x,
                y2 * scale_y,
            ]
        )

    detections = sv.Detections(
        xyxy=np.array(boxes_orig),
        class_id=np.array([d["class_id"] for d in detections_raw]),
        confidence=np.array([d["confidence"] for d in detections_raw]),
    )

    labels = [f"{d['label']} {d['confidence'] * 100:.1f}%" for d in detections_raw]

    # count of each label
    unique_labels, counts = np.unique(
        [d["label"] for d in detections_raw], return_counts=True
    )
    label_counts = dict(zip(unique_labels, counts))
    # print(f"Frame {index} label counts: {label_counts}")

    annotated = box_annotator.annotate(
        scene=frame.copy(),
        detections=detections,
    )
    annotated = label_annotator.annotate(
        scene=annotated,
        detections=detections,
        labels=labels,
    )

    # Draw global count summary (cumulative)
    y0 = 30
    for i, (unique_labels, counts) in enumerate(label_counts.items()):
        text = f"{unique_labels}: {counts}"
        cv2.putText(
            annotated,
            text,
            (20, y0 + i * 25),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.8,
            (0, 255, 255),
            2,
            cv2.LINE_AA,
        )

    return annotated


print("Starting video processing...")
sv.process_video(
    source_path=SOURCE_VIDEO_PATH,
    target_path=TARGET_VIDEO_PATH,
    callback=callback,
    show_progress=True,  # supported in recent versions [web:22][web:26]
)
print("Processing complete.")
print("Final file size (bytes):", os.path.getsize(TARGET_VIDEO_PATH))

### Video Object Detection with Class Replacement


In [None]:
%%time

import os
import base64
from io import BytesIO

import cv2
import numpy as np
from PIL import Image

import supervision as sv
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

SOURCE_VIDEO_PATH = "kling_santa_slowed_more.mp4"
TARGET_VIDEO_PATH = "kling_santa_slowed_more_annotated.mp4"
SUMMARY_LABELS = ["magic sleigh", "santa", "reindeer", "gift", "sack of gifts"]

# Map certain model labels to others (text shown on screen)
LABEL_REMAP = {
    "boat": "magic sleigh",
    "bench": "magic sleigh",
    "couch": "magic sleigh",
    "train": "magic sleigh",
    "person": "santa",
    "teddy bear": "santa",
    "sheep": "reindeer",
    "dog": "reindeer",
    "cow": "reindeer",
    "horse": "reindeer",
    "suitcase": "gift",
    "handbag": "sack of gifts",
}

# Optional: remap class IDs too (depends on your model's class index mapping)
# Example COCO-style mapping: 17=cat, 18=dog, 19=horse, 20=sheep, etc.
CLASS_ID_REMAP = {
    17: 20,
    18: 20,
    19: 20,
    9: 7,
    14: 7,
    58: 7,
    78: 1,
}

# Single, reused SageMaker predictor
predictor = Predictor(
    endpoint_name=ENDPOINT_NAME,
    sagemaker_session=session,
    serializer=JSONSerializer(),
    deserializer=JSONDeserializer(),
)

mask_annotator = sv.MaskAnnotator(
    opacity=0.4,
)
box_annotator = sv.BoxAnnotator(
    thickness=2,
)
label_annotator = sv.LabelAnnotator(
    # text_scale=0.4,
    # text_padding=8,
    # text_thickness=0,
    smart_position=True,  # avoid overlapping labels
)


def resize_long_side(image: Image.Image, max_size: int = 560) -> Image.Image:
    w, h = image.size
    if max(w, h) <= max_size:
        return image
    scale = max_size / float(max(w, h))
    new_w, new_h = int(w * scale), int(h * scale)
    return image.resize((new_w, new_h), Image.Resampling.LANCZOS)


def callback(frame: np.ndarray, index: int) -> np.ndarray:
    if index % 50 == 0:
        print(f"[cb] frame {index}, shape={frame.shape}")

    h_orig, w_orig = frame.shape[:2]

    # BGR -> RGB PIL
    rgb = frame[:, :, ::-1]
    pil_frame = Image.fromarray(rgb)

    # Resize for inference
    send_frame = resize_long_side(pil_frame, 560)
    w_inf, h_inf = send_frame.size

    # Scale from inference coords -> original coords
    scale_x = w_orig / w_inf
    scale_y = h_orig / h_inf

    # JPEG buffer
    buf = BytesIO()
    send_frame.save(buf, format="JPEG", quality=90)

    payload = {
        "image": base64.b64encode(buf.getvalue()).decode("utf-8"),
        "confidence": 0.18,
        "classes": [
            "person",
            "train",
            "boat",
            "sheep",
            "dog",
            "cow",
            "horse",
            "suitcase",
            "handbag",
        ],
    }

    result = predictor.predict(payload)
    detections_raw = result["detections"]

    if not detections_raw:
        return frame

    # --- REMAP LABELS AND CLASS IDS HERE ---------------------------------
    for d in detections_raw:
        # remap label text
        if d["label"] in LABEL_REMAP:
            d["label"] = LABEL_REMAP[d["label"]]

        # remap class_id if configured
        cid = d.get("class_id")
        if cid in CLASS_ID_REMAP:
            d["class_id"] = CLASS_ID_REMAP[cid]
    # ---------------------------------------------------------------------

    # RF‑DETR: box = [x1, y1, x2, y2] in resized space
    boxes_orig = []
    for d in detections_raw:
        x1, y1, x2, y2 = d["box"]
        boxes_orig.append(
            [
                x1 * scale_x,
                y1 * scale_y,
                x2 * scale_x,
                y2 * scale_y,
            ]
        )

    detections = sv.Detections(
        xyxy=np.array(boxes_orig),
        class_id=np.array([d["class_id"] for d in detections_raw]),
        confidence=np.array([d["confidence"] for d in detections_raw]),
    )

    # Labels shown above boxes, using remapped names
    labels = [f"{d['label']} {d['confidence'] * 100:.1f}%" for d in detections_raw]

    # Per-frame counts, using remapped names
    unique_labels, counts = np.unique(
        [d["label"] for d in detections_raw],
        return_counts=True,
    )
    label_counts = dict(zip(unique_labels, counts))

    # ensure all summary labels exist, with 0 if missing
    full_counts = {lbl: label_counts.get(lbl, 0) for lbl in SUMMARY_LABELS}

    annotated = box_annotator.annotate(
        scene=frame.copy(),
        detections=detections,
    )
    annotated = label_annotator.annotate(
        scene=annotated,
        detections=detections,
        labels=labels,
    )

    # Draw per-frame count summary
    y0 = 60
    for i, (lbl, cnt) in enumerate(full_counts.items()):
        text = f"{lbl}: {cnt}"
        cv2.putText(
            annotated,
            text,
            (40, y0 + i * 50),
            cv2.FONT_HERSHEY_SIMPLEX,
            1.4,
            (57, 254, 20),
            2,
            cv2.LINE_AA,
        )

    return annotated


print("Starting video processing...")
sv.process_video(
    source_path=SOURCE_VIDEO_PATH,
    target_path=TARGET_VIDEO_PATH,
    callback=callback,
    show_progress=True,
)
print("Processing complete.")
print("Final file size (bytes):", os.path.getsize(TARGET_VIDEO_PATH))