# Deploying YOLO Model to SageMaker Using PyTorch

This notebook demonstrates end-to-end deployment of the YOLO11 object detection model to Amazon SageMaker using PyTorch. The workflow includes downloading pre-trained weights, creating custom inference code, packaging the model artifact, and deploying to a real-time SageMaker endpoint with GPU acceleration.


## Load Required Packages


In [None]:
%pip install ultralytics>=8.3.0

## Prepare and Deploy Model and Associated Files


In [None]:
from ultralytics import YOLO

# Download YOLOv11l model to current directory
model = YOLO("yolo11l.pt")

In [None]:
import shutil
import os

# Create code directory and move files
os.makedirs("code", exist_ok=True)

# Check if inference.py and requirements.txt exist before moving
if os.path.exists("inference.py") and os.path.exists("requirements.txt"):
    shutil.move("inference.py", "code/inference.py")
    shutil.move("requirements.txt", "code/requirements.txt")
    print("Files moved to code/ directory")
else:
    print(
        "Warning: inference.py or requirements.txt not found. Make sure both files exist before moving."
    )

In [None]:
# Programmatically create a model artifact (tar.gz) containing the chosen weights and upload to S3
import tarfile
import os
import sagemaker

# Choose the weight file you want to package (one of the downloaded files)
model_name = "yolo11l.pt"
weights_path = os.path.join(os.getcwd(), model_name)
if not os.path.exists(weights_path):
    raise FileNotFoundError(f"Weights not found: {weights_path}")

artifact_path = "model.tar.gz"
with tarfile.open(artifact_path, "w:gz") as tar:
    tar.add(weights_path, arcname=model_name)

# Upload to S3 using the SageMaker session default bucket for portability
session = sagemaker.Session()
bucket = session.default_bucket()
model_s3_path = session.upload_data(
    path=artifact_path, bucket=bucket, key_prefix="pytorch_models"
)
print("Uploaded model artifact to:", model_s3_path)

In [None]:
import sagemaker
import boto3
import os

session = sagemaker.Session()
try:
    role = sagemaker.get_execution_role()
except Exception:
    role = os.environ.get("SAGEMAKER_ROLE")
    if role is None:
        raise RuntimeError(
            "SageMaker role not found. Set SAGEMAKER_ROLE env var when running locally."
        )
region = boto3.Session().region_name

print("RoleArn:", role)
print("Region:", region)
print("Default S3 Bucket:", session.default_bucket())

In [None]:
from sagemaker.pytorch import PyTorchModel
from sagemaker.deserializers import JSONDeserializer
from datetime import datetime, timezone
import sagemaker

# Reuse session from upload cell or create new one
if "session" not in globals():
    session = sagemaker.Session()
bucket = session.default_bucket()
model_destination = f"s3://{bucket}/pytorch_models/model.tar.gz"

# Prefer the artifact uploaded earlier by the notebook (model_s3_path),
# Otherwise, fall back to the constructed model_destination
model_data = globals().get("model_s3_path", model_destination)

print(f"Using model_data: {model_data}")

pytorch_model = PyTorchModel(
    model_data=model_data,
    role=role,
    framework_version="2.6.0",
    py_version="py312",
    entry_point="inference.py",
    source_dir="code",
    env={
        "YOLO_MODEL": "yolo11l.pt",
        "YOLO_CONF": "0.25",
    },
)

instance_type = "ml.g4dn.xlarge"
endpoint_name = "yolov11-pytorch-" + str(
    datetime.now(timezone.utc).strftime("%Y-%m-%d-%H-%M-%S-%f")
)

print(f"Deploying to endpoint: {endpoint_name}")

# Should take about 4-6 minutes to deploy
predictor = pytorch_model.deploy(
    initial_instance_count=1,
    instance_type=instance_type,
    endpoint_name=endpoint_name,
    deserializer=JSONDeserializer(),
)

## Real-time Inference

Perform real-time inference on the directory of sample images, display with bounding box visualization.


In [None]:
from sagemaker.serializers import IdentitySerializer
from PIL import Image, ImageDraw, ImageFont
from io import BytesIO
import json
import random
import glob
import os

predictor.serializer = IdentitySerializer(content_type="image/jpeg")

base_dir = "sample_images"
out_dir = "sample_images_output"
os.makedirs(out_dir, exist_ok=True)

image_paths = sorted(
    glob.glob(os.path.join(base_dir, "*.jpg"))
    + glob.glob(os.path.join(base_dir, "*.jpeg"))
    + glob.glob(os.path.join(base_dir, "*.png"))
)

if not image_paths:
    raise FileNotFoundError(f"No images found in {base_dir}")

print(f"Found {len(image_paths)} images in {base_dir}")

font = ImageFont.load_default(size=24)
IMG_SIZE = 640  # your YOLO imgsz


def resize_long_side(image: Image.Image, max_size: int = 640) -> Image.Image:
    w, h = image.size
    long_side = max(w, h)
    if long_side <= max_size:
        return image  # no upscaling
    scale = max_size / float(long_side)
    new_w, new_h = int(w * scale), int(h * scale)
    return image.resize((new_w, new_h), Image.Resampling.LANCZOS)


for image_path in image_paths:
    try:
        orig_image = Image.open(image_path).convert("RGB")
    except Exception as e:
        print(f"Skipping unreadable image: {image_path} - {e}")
        continue

    # Downscale client-side: long side = 640, keep aspect ratio
    send_image = resize_long_side(orig_image, IMG_SIZE)

    buffer = BytesIO()
    send_image.save(buffer, format="JPEG", quality=90)
    payload = buffer.getvalue()

    result = predictor.predict(payload)
    print(json.dumps(result, indent=2))

    # ASSUMPTION: boxes are in coordinates of send_image
    draw = ImageDraw.Draw(orig_image)

    send_w, send_h = send_image.size
    orig_w, orig_h = orig_image.size
    x_ratio = orig_w / send_w
    y_ratio = orig_h / send_h

    for det in result.get("detections", []):
        x1, y1, x2, y2 = det["box"]
        conf = det["confidence"]
        label = det["label"]

        # Scale from send_image coords back to original
        x1, x2 = int(x_ratio * x1), int(x_ratio * x2)
        y1, y2 = int(y_ratio * y1), int(y_ratio * y2)

        color = (
            random.randint(10, 255),
            random.randint(10, 255),
            random.randint(10, 255),
        )

        draw.rectangle([(x1, y1), (x2, y2)], outline=color, width=4)

        text = f"{label} ({int(conf * 100)}%)"
        draw.text((x1, max(0, y1 - 30)), text, fill=color, font=font)

    base_name = os.path.basename(image_path)
    name, ext = os.path.splitext(base_name)
    out_path = os.path.join(out_dir, f"{name}_detected{ext}")
    orig_image.save(out_path, quality=95)
    print(f"Saved: {out_path}")

print(f"Done. {len(image_paths)} images processed; results in: {out_dir}")