# FastScan AWS SageMaker Pipeline Implementation

The implementation in this jupyter notebook can only be used in AWS SageMaker Studio Code Editor.

https://sagemaker-examples.readthedocs.io/en/latest/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.html#Define-a-Processing-Step-for-Feature-Engineering

## 1.2 Import Packages

In [2]:

import os
import numpy
import sagemaker
import sys




# 2. Create SageMaker Pipeline Session

In [3]:
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()

region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()

pipeline_session = PipelineSession()

default_bucket = sagemaker_session.default_bucket()

model_package_group_name = f"FastScanModelPackageGroupName"

In [6]:
print(pipeline_session.context)

None


# 3. Define Parameters in the Pipeline for Pipeline Execution

In [25]:
image_input_data_uri: str = "s3://angkokleong-bucket/datasets/raw_custom_image_dataset/"
label_input_data_uri: str = "s3://angkokleong-bucket/datasets/raw_label_data/"

train_image_dataset_input_uri: str = "s3://angkokleong-bucket/datasets/fastscandataset/images/train/"
test_image_dataset_input_uri: str = "s3://angkokleong-bucket/datasets/fastscandataset/images/test/"
val_image_dataset_input_uri: str = "s3://angkokleong-bucket/datasets/fastscandataset/images/val/"

In [149]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

#There can be multiple input data


processing_instance_count = 1
instance_type = "ml.m5.xlarge"
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

input_image_data = ParameterString(
    name="InputImageData",
    default_value=image_input_data_uri
)

input_label_data = ParameterString(
    name="InputLabelData",
    default_value=label_input_data_uri
)

input_train_image_dataset_s3_uri = ParameterString(
    name="InputTrainImageDataset_S3_URI",
    default_value=train_image_dataset_input_uri
)

input_test_image_dataset_s3_uri = ParameterString(
    name="InputTestImageDataset_S3_URI",
    default_value=test_image_dataset_input_uri
)

input_val_image_dataset_s3_uri = ParameterString(
    name="InputValImageDataset_S3_URI",
    default_value=val_image_dataset_input_uri
)


mAP50_threshold = ParameterFloat(name="mAP50Threshold", default_value=0.9)
mAP50to95_threshold = ParameterFloat(name="mAP50to95threshold", default_value=0.8)

TypeError: Pipeline variables do not support __int__ operation.

# 4. Video Data Processing to extract video frame and convert to images

This process is the video frame extraction from video file and the extracted video frame will be converted to image.

The image will be resized to 640 by 640 from 4K resolution.

In [None]:
from sagemaker.processing import FrameworkProcessor
from sagemaker.pytorch.estimator import PyTorch

# Refer to https://github.com/aws/deep-learning-containers/blob/master/available_images.md
# 
pytorch_processor = FrameworkProcessor(
    estimator_cls=PyTorch,
    framework_version="2.5.1",
    image_uri="763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:2.5.1-gpu-py311-cu124-ubuntu22.04-sagemaker",
    role=role,
    py_version="py3",
    instance_count=processing_instance_count,
    instance_type="ml.m5.xlarge",
    sagemaker_session=pipeline_session,
    base_job_name="stratified_splitting_image_data_processing"
)


In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
import project_library.file_manager

# Project Directory in this project
project_library_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/home/sagemaker-user/user-default-efs/FastScan/project_library"))


# Docker context
ROOT_INPUT_FOLDER_PATH: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input"))
ROOT_OUTPUT_FOLDER_PATH: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/output"))

aws_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets"))
fastscan_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset"))
raw_image_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset/raw_custom_image_dataset"))
raw_label_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset/raw_custom_label_dataset"))


# https://docs.aws.amazon.com/sagemaker/latest/dg/byoc-input-and-output.html (How Amazon Sagemaker processing configures input and output for your processing container)
# https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker_processing/scikit_learn_data_processing_and_model_evaluation/scikit_learn_data_processing_and_model_evaluation.ipynb (visual guide for ProcessingInput and ProcessingOutput)
video_data_processor_args = pytorch_processor.run(
    inputs=[
        ProcessingInput(source=input_image_data, destination=raw_image_dataset_folder_path),
        ProcessingInput(source=input_label_data, destination=raw_label_dataset_folder_path)
    ],
    dependencies=[str("/home/sagemaker-user/user-default-efs/FastScan/project_library"), str("/home/sagemaker-user/user-default-efs/FastScan/aws_sagemaker_pipeline/preprocessing/requirements.txt")],
    code=str("/home/sagemaker-user/user-default-efs/FastScan/aws_sagemaker_pipeline/preprocessing/image_preprocessing_script.py"),
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/input/aws_datasets/fastscandataset/raw/train", destination="s3://angkokleong-bucket/datasets/fastscandataset/images/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/input/aws_datasets/fastscandataset/raw/test", destination="s3://angkokleong-bucket/datasets/fastscandataset/images/test"),
        ProcessingOutput(output_name="val", source="/opt/ml/processing/input/aws_datasets/fastscandataset/raw/val", destination="s3://angkokleong-bucket/datasets/fastscandataset/images/val"),
        ProcessingOutput(output_name="train_label", source="/opt/ml/processing/input/aws_datasets/fastscandataset/labels/train", destination="s3://angkokleong-bucket/datasets/fastscandataset/labels/train"),
        ProcessingOutput(output_name="test_label", source="/opt/ml/processing/input/aws_datasets/fastscandataset/labels/test", destination="s3://angkokleong-bucket/datasets/fastscandataset/labels/test"),
        ProcessingOutput(output_name="val_label", source="/opt/ml/processing/input/aws_datasets/fastscandataset/labels/val", destination="s3://angkokleong-bucket/datasets/fastscandataset/labels/val")
    ]   
)


video_data_process_step = ProcessingStep(name="video_data_processing", step_args=video_data_processor_args)

In [42]:
from sagemaker import image_uris

image_uris.retrieve(framework="sklearn", region="us-east-1", version="1.2-1", image_scope="training")

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


'683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3'

# 5. Prepare Train, Val and Test dataset using train-test-split (done)

Image file split to create train, test and val dataset for YOLO model training

In [113]:
from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn

# Refer to https://github.com/aws/deep-learning-containers/blob/master/available_images.md

sklearn_train_test_split_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version="1.2-1",
    role=role,
    py_version="py3",
    instance_count=processing_instance_count,
    instance_type="ml.m5.xlarge",
    sagemaker_session=pipeline_session,
    base_job_name="train-test-split-image-data-processing-job"
)


In [114]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
import project_library.file_manager

# Project Directory in this project
project_library_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/home/sagemaker-user/user-default-efs/FastScan/project_library"))


# Docker context
ROOT_INPUT_FOLDER_PATH: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input"))
ROOT_OUTPUT_FOLDER_PATH: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/output"))

aws_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets"))
fastscan_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset"))
raw_image_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset/raw_custom_image_dataset"))
raw_label_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset/raw_custom_label_dataset"))


# https://docs.aws.amazon.com/sagemaker/latest/dg/byoc-input-and-output.html (How Amazon Sagemaker processing configures input and output for your processing container)

sklearn_train_test_split_processor_args = sklearn_train_test_split_processor.run(
    inputs=[
        ProcessingInput(source=input_image_data, destination=raw_image_dataset_folder_path)
    ],
    dependencies=[str("/home/sagemaker-user/user-default-efs/FastScan/project_library"), str("/home/sagemaker-user/user-default-efs/FastScan/project_library/aws_sagemaker_pipeline/train_test_split_image_data_processing/requirements.txt")],
    code=str("/home/sagemaker-user/user-default-efs/FastScan/project_library/aws_sagemaker_pipeline/train_test_split_image_data_processing/processing_script.py"),
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/input/aws_datasets/fastscandataset/raw/train", destination="s3://angkokleong-bucket/datasets/fastscandataset/images/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/input/aws_datasets/fastscandataset/raw/test", destination="s3://angkokleong-bucket/datasets/fastscandataset/images/test"),
        ProcessingOutput(output_name="val", source="/opt/ml/processing/input/aws_datasets/fastscandataset/raw/val", destination="s3://angkokleong-bucket/datasets/fastscandataset/images/val")
    ]   
)


train_test_split_image_data_process_step = ProcessingStep(name="train-test-split-image-data-processing", step_args=sklearn_train_test_split_processor_args)

# 6. Prepare Label data for each image based on the dataset created by train-test-split

This process step is to utilize the image data files residing in the train, val and test dataset folder as a reference and populate the label data file in the same way.

In [121]:
from sagemaker.processing import FrameworkProcessor
from sagemaker.sklearn.estimator import SKLearn

# Refer to https://github.com/aws/deep-learning-containers/blob/master/available_images.md

sklearn_label_data_sorting_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version="1.2-1",
    role=role,
    py_version="py3",
    instance_count=processing_instance_count,
    instance_type="ml.m5.xlarge",
    sagemaker_session=pipeline_session,
    base_job_name="label-data-sorting-processing-job"
)


In [122]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
import project_library.file_manager

# Project Directory in this project
project_library_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/home/sagemaker-user/user-default-efs/FastScan/project_library"))


# Docker context
ROOT_INPUT_FOLDER_PATH: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input"))
ROOT_OUTPUT_FOLDER_PATH: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/output"))

aws_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets"))
fastscan_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset"))
raw_label_dataset_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset/raw_label_data"))

fastscan_dataset_train_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset/images/train"))
fastscan_dataset_test_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset/images/test"))
fastscan_dataset_val_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input/aws_datasets/fastscandataset/images/val"))

label_data_sorting_processor_args = sklearn_label_data_sorting_processor.run(
    inputs=[
        ProcessingInput(source=input_train_image_dataset_s3_uri, destination=fastscan_dataset_train_folder_path),
        ProcessingInput(source=input_test_image_dataset_s3_uri, destination=fastscan_dataset_test_folder_path),
        ProcessingInput(source=input_val_image_dataset_s3_uri, destination=fastscan_dataset_val_folder_path),
        ProcessingInput(source=input_label_data, destination=raw_label_dataset_folder_path)
    ],
    dependencies=[str("/home/sagemaker-user/user-default-efs/FastScan/project_library"), str("/home/sagemaker-user/user-default-efs/FastScan/project_library/aws_sagemaker_pipeline/label_data_processing/requirements.txt")],
    code=str("/home/sagemaker-user/user-default-efs/FastScan/project_library/aws_sagemaker_pipeline/label_data_processing/processing_script.py"),
    outputs=[
        ProcessingOutput(output_name="train_label", source="/opt/ml/processing/input/aws_datasets/fastscandataset/labels/train", destination="s3://angkokleong-bucket/datasets/fastscandataset/labels/train"),
        ProcessingOutput(output_name="test_label", source="/opt/ml/processing/input/aws_datasets/fastscandataset/labels/test", destination="s3://angkokleong-bucket/datasets/fastscandataset/labels/test"),
        ProcessingOutput(output_name="val_label", source="/opt/ml/processing/input/aws_datasets/fastscandataset/labels/val", destination="s3://angkokleong-bucket/datasets/fastscandataset/labels/val")
    ]   
)

label_data_sorting_process_step = ProcessingStep(name="label-data-sorting-processing", step_args=label_data_sorting_processor_args, depends_on=[train_test_split_image_data_process_step])



# 7. Prepare YOLO Model Training

In [293]:
from sagemaker.pytorch.estimator import PyTorch
import time
import os
# Refer to https://github.com/aws/deep-learning-containers/blob/master/available_images.md
# https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/sagemaker.pytorch.html 


timestamp = time.strftime("%d-%B-%Y-%H-%M-%S", time.localtime())

# # https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Framework  (PyTorch Estimator's super class where PyTorch Estimator derived its code from)

pytorch_estimator: PyTorch = PyTorch(
    entry_point="aws_sagemaker_pipeline/model_training/model_training_script.py",
    image_uri="763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:2.5.1-cpu-py311-ubuntu22.04-sagemaker",
    py_version="py3",
    source_dir="/home/sagemaker-user/user-default-efs/FastScan/project_library",
    framework_version="2.5.1",
    instance_count=processing_instance_count,
    instance_type=instance_type,
    role=role,
    output_path="s3://angkokleong-bucket/model/train",
    dependencies=["/home/sagemaker-user/user-default-efs/FastScan/project_library/aws_sagemaker_pipeline/model_training/requirements.txt", "/home/sagemaker-user/user-default-efs/FastScan/project_library"],
    hyperparameters={
        'epochs': 30,
        'batch-size': 32,
        'learning-rate': 0.001
    }
)

In [294]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
import project_library.file_manager

# Project Directory in this project
project_library_folder_path: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/home/sagemaker-user/user-default-efs/FastScan/project_library"))

# Docker context
ROOT_INPUT_FOLDER_PATH: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/input"))
ROOT_OUTPUT_FOLDER_PATH: str = str(project_library.file_manager.FileInformation.get_absolute_folder_location("/opt/ml/processing/output"))

timestamp = time.strftime("%d-%B-%Y-%H-%M-%S", time.localtime())

# TrainingInput documentation: https://sagemaker.readthedocs.io/en/stable/api/utility/inputs.html#sagemaker.inputs.TrainingInput

# https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.Framework 


# https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-training-algo-running-container.html (After PyTorch estimator fit() is invoked, the )

# https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html#sagemaker.estimator.EstimatorBase.fit 
    # Train a model using the input training dataset.

        # The API calls the Amazon SageMaker CreateTrainingJob API to start model training. The API uses configuration you provided to create the estimator and the specified input training data to send the CreatingTrainingJob request to Amazon SageMaker.



inputs={
    "train": TrainingInput(s3_data="s3://angkokleong-bucket/datasets/fastscandataset", input_mode="File")
}
  


#, depends_on=[train_test_split_image_data_process_step, label_data_sorting_process_step]

yolo_model_training_process_step = TrainingStep(name="YOLO_model_training", 
                        display_name="YOLO_model_training_step", 
                        estimator=pytorch_estimator, inputs=inputs)





# 8. Model Tuning

In [None]:
from sagemaker.workflow.steps import TuningStep



# 12. Define the Pipeline

In [295]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"FastScanPipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_image_data,
        input_label_data,
        input_train_image_dataset_s3_uri,
        input_test_image_dataset_s3_uri,
        input_val_image_dataset_s3_uri,
        mAP50_threshold,
        mAP50to95_threshold
    ],
    steps=[yolo_model_training_process_step]
)

In [296]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-east-1:396913742348:pipeline/FastScanPipeline',
 'ResponseMetadata': {'RequestId': '5207d85d-0335-4810-bc82-f0f97aadf1f3',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5207d85d-0335-4810-bc82-f0f97aadf1f3',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '84',
   'date': 'Fri, 28 Feb 2025 07:53:18 GMT'},
  'RetryAttempts': 0}}

# Start the pipeline

In [297]:
execution = pipeline.start()

In [298]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:396913742348:pipeline/FastScanPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:396913742348:pipeline/FastScanPipeline/execution/n3d5yjg5tg8t',
 'PipelineExecutionDisplayName': 'execution-1740729201025',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'fastscanpipeline',
  'TrialName': 'n3d5yjg5tg8t'},
 'CreationTime': datetime.datetime(2025, 2, 28, 7, 53, 20, 919000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2025, 2, 28, 7, 53, 20, 919000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:396913742348:user-profile/d-3gd8xgeqnewi/kokleong-1739789074256',
  'UserProfileName': 'kokleong-1739789074256',
  'DomainId': 'd-3gd8xgeqnewi',
  'IamIdentity': {'Arn': 'arn:aws:sts::396913742348:assumed-role/AmazonSageMaker-ExecutionRole-20250217T183323/SageMaker',
   'PrincipalId': 'AROAVY2PHCIGFAVRENP5G:SageMaker'}},
 'LastModifiedBy': {'UserProfileAr

In [299]:
execution.wait()


KeyboardInterrupt: 

In [127]:
execution.list_steps()

[{'StepName': 'label-data-sorting-processing',
  'StartTime': datetime.datetime(2025, 2, 27, 12, 1, 49, 258000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 2, 27, 12, 7, 5, 273000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:396913742348:processing-job/pipelines-zyqh52aoo7wi-label-data-sorting-p-1SWS2IKbQN'}},
  'AttemptCount': 1},
 {'StepName': 'train-test-split-image-data-processing',
  'StartTime': datetime.datetime(2025, 2, 27, 11, 59, 15, 531000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 2, 27, 12, 1, 48, 775000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:396913742348:processing-job/pipelines-zyqh52aoo7wi-train-test-split-ima-94lU5nGCPj'}},
  'AttemptCount': 1}]