In [3]:
!pip install --upgrade pip
!pip install --upgrade boto3
!pip install --upgrade sagemaker
!pip install --upgrade tensorflow

Collecting pip
  Using cached pip-24.3.1-py3-none-any.whl.metadata (3.7 kB)
Using cached pip-24.3.1-py3-none-any.whl (1.8 MB)
Installing collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.2
    Uninstalling pip-24.2:
      Successfully uninstalled pip-24.2
Successfully installed pip-24.3.1
Collecting boto3
  Using cached boto3-1.35.54-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<1.36.0,>=1.35.54 (from boto3)
  Using cached botocore-1.35.54-py3-none-any.whl.metadata (5.7 kB)
Using cached boto3-1.35.54-py3-none-any.whl (139 kB)
Using cached botocore-1.35.54-py3-none-any.whl (12.7 MB)
Installing collected packages: botocore, boto3
  Attempting uninstall: botocore
    Found existing installation: botocore 1.34.131
    Uninstalling botocore-1.34.131:
      Successfully uninstalled botocore-1.34.131
  Attempting uninstall: boto3
    Found existing installation: boto3 1.34.131
    Uninstalling boto3-1.34.131:
      Successfully uninstalled bo

In [4]:
# Standard libraries
import os
import sys
import json
from datetime import datetime

# AWS SDK for Python (Boto3) and SageMaker SDK
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.inputs import TrainingInput, CreateModelInput
from sagemaker.model import Model
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
from sagemaker.tensorflow import TensorFlow
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Additional libraries
import tensorflow as tf

print(f"Python version: {sys.version}")
print(f"Boto3 version: {boto3.__version__}")
print(f"SageMaker version: {sagemaker.__version__}")
print(f"TensorFlow version: {tf.__version__}")

session = sagemaker.Session()
region = boto3.Session().region_name
role = get_execution_role()

print(f"SageMaker session initialized in region: {region}")
print(f"Using IAM role: {role}")
bucket = session.default_bucket()
print(f"Using S3 bucket: {bucket}")
pipeline_session = PipelineSession()

2024-11-03 12:20:26.128749: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-11-03 12:20:26.132650: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-11-03 12:20:26.144062: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1730636426.164940     150 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1730636426.170900     150 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-11-03 12:20:26.192360: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU ins

Python version: 3.11.9 | packaged by conda-forge | (main, Apr 19 2024, 18:36:13) [GCC 12.3.0]
Boto3 version: 1.34.131
SageMaker version: 2.227.0
TensorFlow version: 2.18.0
SageMaker session initialized in region: eu-central-1
Using IAM role: arn:aws:iam::631045770794:role/service-role/AmazonSageMaker-ExecutionRole-20240829T134213
Using S3 bucket: sagemaker-eu-central-1-631045770794


In [5]:
# Define pipeline parameters
training_instance = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.g4dn.xlarge"
)

processing_instance = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

deployment_instance = ParameterString(
    name="DeploymentInstanceType",
    default_value="ml.g4dn.xlarge"
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
batch_size = ParameterInteger(name="BatchSize", default_value=32)
epochs = ParameterInteger(name="Epochs", default_value=10)
accuracy_threshold = ParameterFloat(name="AccuracyThreshold", default_value=0.85)

In [6]:
from sagemaker import image_uris
from sagemaker.sklearn.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

def create_pipeline(pipeline_name):
    # Create processor for data preprocessing
    tensorflow_version = "2.16" 
    tensorflow_image_uri = image_uris.retrieve(
        framework="tensorflow",
        region=region,
        version=tensorflow_version,
        instance_type=processing_instance,
        image_scope="training"
    )

    tensorflow_processor = ScriptProcessor(
        image_uri=tensorflow_image_uri,
        command=["python3"],
        instance_type=processing_instance,
        instance_count=processing_instance_count,
        base_job_name="tensorflow-preprocessing",
        role=role,
        sagemaker_session=pipeline_session,
    )

    preprocessing_step = ProcessingStep(
        name="PreprocessData",
        processor=tensorflow_processor,
        inputs=[],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
            ProcessingOutput(output_name="run_id", source="/opt/ml/processing/output", destination=f"s3://{session.default_bucket()}/pipeline-output/run_id")
        ],
        code="code/preprocessing.py",
        description="Data preprocessing step for preparing CIFAR-10 data for training."
    )
    
    estimator = TensorFlow(
        entry_point='train.py',
        source_dir='code',
        role=role,
        instance_count=1,
        instance_type=training_instance,
        framework_version='2.16',
        py_version='py310',
        hyperparameters={
            'epochs': epochs,
            'batch_size': batch_size
        },
        output_path=f"s3://{session.default_bucket()}/demo-pipeline/training-output"
    )
    
    training_step = TrainingStep(
        name="TrainModel",
        estimator=estimator,
        inputs={
            'train': sagemaker.inputs.TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri,
                content_type='application/python-pickle'
            ),
            'test': sagemaker.inputs.TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
                content_type='application/python-pickle'
            ),
            'run_id': TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs['run_id'].S3Output.S3Uri,
                content_type="text/plain"
            )
        },
        description="Model training step using TensorFlow on CIFAR-10 data." 
    )

    evaluation_processor = ScriptProcessor(
        image_uri=tensorflow_image_uri,
        command=["python3"],
        instance_type=processing_instance,
        instance_count=1,
        base_job_name="model-evaluation",
        role=role,
        sagemaker_session=pipeline_session,
    )
    
    evaluation_report = PropertyFile(
        name="EvaluationReport",
        output_name="evaluation",
        path="evaluation.json"
    )
    
    evaluation_step = ProcessingStep(
        name="EvaluateModel",
        processor=evaluation_processor,
        inputs=[
            ProcessingInput(
                source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model"
            ),
            ProcessingInput(
                source=preprocessing_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
                destination="/opt/ml/processing/test"
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name="evaluation",
                source="/opt/ml/processing/evaluation",
                destination=f"s3://{session.default_bucket()}/evaluation"
            )
        ],
        code="code/evaluation.py",
        property_files=[evaluation_report],
        description="Evaluate the model performance on test data and log results."
    )

    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            training_instance,
            processing_instance,
            deployment_instance,
            model_approval_status,
            batch_size,
            epochs,
            accuracy_threshold,
            processing_instance_count
        ],
        steps=[preprocessing_step, training_step, evaluation_step]
    )

    return pipeline

In [7]:
def main():
    # Create pipeline name
    pipeline_name = f"DogCatClassification-{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}"
    pipeline = create_pipeline(pipeline_name)
    print(f"Pipeline created with name: {pipeline_name}")

    pipeline.upsert(role_arn=role)
    print("Pipeline upserted successfully")

    # Start pipeline execution
    execution = pipeline.start()
    print(f"Pipeline execution started with ARN: {execution.arn}")


if __name__ == "__main__":
    main()

INFO:sagemaker.image_uris:Defaulting to only available Python version: py310


Pipeline created with name: DogCatClassification-2024-11-03-12-20-58


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Pipeline upserted successfully
Pipeline execution started with ARN: arn:aws:sagemaker:eu-central-1:631045770794:pipeline/DogCatClassification-2024-11-03-12-20-58/execution/lt2g9gpkcspo
