## 1. Import Python Libraries

In [None]:
import os, sagemaker, subprocess, boto3, time

## 2. (OPTIONAL) Download Images from S3

In [None]:
bucket_name = "uniben-data"
prefix = "yolo-validation/"   
local_dir = "images-test"

if not os.path.isdir(local_dir):
    raise FileNotFoundError(f"Local directory not found: {local_dir}")
        
s3 = boto3.client('s3')
paginator = s3.get_paginator('list_objects_v2')

for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
    for obj in page.get('Contents', []):
        key = obj['Key']
        if key.endswith('/'):
            continue  # Skip Folders
        filename   = os.path.basename(key)
        local_path = os.path.join(local_dir, filename)
        print('.', end='')
        s3.download_file(bucket_name, key, local_path)
print("\nDone downloading images!")

## 3. TRAININGS

### 3.1. Install Ultralytics for YOLO11 model

https://docs.ultralytics.com/models/yolo11/#supported-tasks-and-modes

In [None]:
!pip3 install ultralytics
from ultralytics import YOLO

model_name = 'yolo11x.pt'
model = YOLO(model_name)

### 3.2. Train the model

In [None]:
print("Start time:", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
train_start_time = time.time()

model.train(data="uniben-label.yaml", 
            epochs=1000, 
            imgsz=720, 
            augment=True, 
            batch=10, 
            verbose=False) # Toggle the epoch summaries

train_end_time = time.time()
print(f"Training Time = {(train_end_time - train_start_time)/60:0.2f} minutes.")

### 3.3. (OPTIONAL) Resume training from the lastest epoch

### Get the latest train folder

In [None]:
runs_dir = "runs/detect"
train_dirs = [
    d for d in os.listdir(runs_dir)
    if os.path.isdir(os.path.join(runs_dir, d)) and d.startswith("train")
]

latest_train_attempt = max(
    train_dirs,
    key=lambda d: os.path.getmtime(os.path.join(runs_dir, d))
)

print("Latest train folder: ", latest_train_attempt)

### Resume the training process of the latest train

In [None]:
model = YOLO(f'runs/detect/{latest_train_attempt}/weights/last.pt')
model.train(resume=True)

### 3.4. (IMPORTANT, OPTIONAL) Local Inference

### Get the latest best.pt from latest train, put it into latest-best-model

In [None]:
import shutil

runs_dir   = "runs/detect"            
target_dir = "latest-best-model"   

candidates = []
for name in os.listdir(runs_dir):
    path = os.path.join(runs_dir, name)
    if os.path.isdir(path) and name.startswith("train"):
        candidates.append(path)

if not candidates:
    raise RuntimeError(f"No 'train*' subfolders in {runs_dir}")

latest = max(candidates, key=lambda p: os.path.getmtime(p))
print(f"Latest train folder: {latest}")

src = os.path.join(latest, "weights", "best.pt")
dst = os.path.join(target_dir, "best.pt")
shutil.copy2(src, dst)
print(f"Copied:\n  {src}\n→ {dst}")

### Create local model to invoke

In [25]:
import cv2, numpy as np, matplotlib.pyplot as plt, random
import base64, json

model = YOLO('./latest-best-model/best.pt')

In [None]:
img_path = 'images-test/z6686604803522_8d40b41b8489d82e13f15d9db092d1d8.jpg'
result = model.predict(
    source=img_path,
    save=False,      
)

print(result)

In [None]:
orig_image = cv2.imread(img_path)

CLASS_COLORS = {
    0: (0,255,0),
    1: (255,0,0),
    2: (0,0,255),
    3: (0,0,0),
}

CLASS_NAMES = {
    0: "abben",
    1: "boncha",
    2: "joco",
    3: "shelf",
}

for r in result:  # results is a list
    boxes = r.boxes.xyxy.cpu().numpy() 
    classes = r.boxes.cls.cpu().numpy().astype(int)
    for (x1, y1, x2, y2), classID in zip(boxes, classes):
        color = CLASS_COLORS.get(classID, (255, 255, 255))
        cv2.rectangle(
            orig_image, 
            (int(x1), int(y1)), 
            (int(x2), int(y2)), 
            color, 
            thickness=2
        )

plt.figure(figsize=(12,8))
plt.imshow(cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB))
plt.axis('off')
plt.show()

# out_path = 'output_with_boxes.jpg'
# cv2.imwrite(out_path, orig_image)
# print(f"Annotated image saved to {out_path}")


# -----------------------------
# Deploy endpoint

## 4. Zip the code and model into `model.tar.gz` and upload it to specific S3 bucket
Here permission is granted to the S3 bucket created with CDK and not any other bucket

In [13]:
model_zip = "latest-best-model/best.pt"
code_zip = "code/"
bashCommand = f"tar -cpzf  model.tar.gz {zip_model} {code_zip}"
process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
output, error = process.communicate()

## 5. Select yolo11 bucket

In [14]:
s3_client = boto3.client('s3')
response = s3_client.list_buckets()
for bucket in response['Buckets']:
    if 'yolo11' in bucket["Name"]:
        bucket = 's3://' + bucket["Name"]
        break
print(f'Bucket: {bucket}')

Bucket: s3://yolo11sagemakerstack-yolo11s31ff79126-3g6ezijge4tc


## 6. Upload model to S3

In [None]:
from sagemaker import s3
from sagemaker import get_execution_role

sm_client = boto3.client(service_name="sagemaker")
runtime_sm_client = boto3.client(service_name="sagemaker-runtime")

account_id = boto3.client("sts").get_caller_identity()["Account"]
print(f'Account ID: {account_id}')
role = get_execution_role()
print(f'Role: {role}')

prefix = "yolo11"
model_data = s3.S3Uploader.upload("model.tar.gz", bucket + "/" + prefix)
print(f'Model Data: {model_data}')

## 7. Create the SageMaker PyTorchModel

In [16]:
from sagemaker.pytorch import PyTorchModel

sess = sagemaker.Session(default_bucket=bucket.split('s3://')[-1])
model = PyTorchModel(entry_point='inference.py',
                     model_data=model_data, 
                     framework_version='1.12', 
                     py_version='py38',
                     role=role,
                     env={'TS_MAX_RESPONSE_SIZE':'20000000', 'YOLO11_MODEL': 'best.pt'},
                     sagemaker_session=sess)

## 8. Deploy the model on SageMaker Endpoint:

In [None]:
from sagemaker.deserializers import JSONDeserializer

INSTANCE_TYPE = 'ml.c5.xlarge'
ENDPOINT_NAME = 'yolo11-pytorch-' + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

# Store the endpoint name in the history to be accessed by 2_TestEndpoint.ipynb notebook
%store ENDPOINT_NAME
print(f'Endpoint Name: {ENDPOINT_NAME}')

predictor = model.deploy(initial_instance_count=1, 
                         instance_type=INSTANCE_TYPE,
                         deserializer=JSONDeserializer(),
                         endpoint_name=ENDPOINT_NAME)

## 9.(OPTIONAL) Cleanup by removing Endpoint, Endpoint Config and Model

In [None]:
response = sm_client.describe_endpoint_config(EndpointConfigName=ENDPOINT_NAME)
print(response)
endpoint_config_name = response['EndpointConfigName']

# Delete Endpoint
sm_client.delete_endpoint(EndpointName=ENDPOINT_NAME)

# Delete Endpoint Configuration
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

# Delete Model
for prod_var in response['ProductionVariants']:
    model_name = prod_var['ModelName']
    sm_client.delete_model(ModelName=model_name)     