In [1]:
import argparse
import boto3
import json
import os
import urllib.request
import zipfile

import sagemaker
import sagemaker.image_uris
import sagemaker.s3
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput, TransformInput
from sagemaker.model import Model
from sagemaker.processing import ProcessingOutput, ProcessingInput
from sagemaker.sklearn.processing import SKLearnProcessor, ScriptProcessor
from sagemaker.transformer import Transformer
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TransformStep

model_package_group_name = f"AbaloneModelPackageGroupName"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [2]:
def prepare_data(bucket_name: str):
    print()
    s3_prefix = "sagemaker-abalone"
    s3_uri = f"s3://{bucket_name}/{s3_prefix}"

    input_data_dir = "./"
    os.makedirs(os.path.dirname(input_data_dir), exist_ok=True)
    path_to_zip_file = os.path.join(input_data_dir, "abalone.zip")
    dataset_url = "https://archive.ics.uci.edu/static/public/1/abalone.zip"
    urllib.request.urlretrieve(dataset_url, path_to_zip_file)
    with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
        zip_ref.extractall(input_data_dir)
    # rename - using diff name in preprocessing
    os.rename(os.path.join(input_data_dir, "abalone.data"),
              os.path.join(input_data_dir, "abalone-dataset.csv"))
    input_data_path = os.path.join(input_data_dir, 'abalone-dataset.csv')
    upload_s3_uri = sagemaker.s3.S3Uploader.upload(input_data_path, s3_uri)
    print("Downloading dataset and uploading to Amazon S3...")
    print("---upload_s3_uri---", upload_s3_uri)

    # batch transformation
    local_path_batch = os.path.join(input_data_dir, "abalone-dataset-batch")
    # os.makedirs(os.path.dirname(local_path_batch), exist_ok=True)
    s3 = boto3.resource("s3")
    s3.Bucket(f"sagemaker-servicecatalog-seedcode-us-east-1").download_file(
        "dataset/abalone-dataset-batch", local_path_batch
    )

    base_uri = f"s3://{bucket_name}/abalone-batch"
    batch_data_uri = sagemaker.s3.S3Uploader.upload(
        local_path=local_path_batch,
        desired_s3_uri=base_uri,
    )
    print("---upload_s3_uri Batch---", batch_data_uri)
    return upload_s3_uri, batch_data_uri

In [3]:
def create_preprocessing_step(pipeline_sess: PipelineSession, role_arn: str, input_data: str):
    framework_version = "0.23-1"
    processing_instance_count = 1
    sklearn_processor = SKLearnProcessor(
        framework_version=framework_version,
        instance_type="ml.m5.xlarge",
        instance_count=processing_instance_count,
        base_job_name="sklearn-abalone-preprocess",
        sagemaker_session=pipeline_sess,
        role=role_arn,
    )
    processor_args = sklearn_processor.run(
        inputs=[
            ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
        ],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
        ],
        code="preprocessing.py",
    )
    step_process = ProcessingStep(
        name="AbalonePreprocess",
        step_args=processor_args
    )
    return step_process

In [4]:
def get_estimator(pipeline_sess: PipelineSession, region: str, role_arn: str, model_path: str):
    image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=region,
        version="1.0-1",
        py_version="py3",
        instance_type="ml.m5.xlarge"
    )
    xgb_train = Estimator(
        image_uri=image_uri,
        instance_type="ml.m5.xlarge",
        instance_count=1,
        output_path=model_path,
        sagemaker_session=pipeline_sess,
        role=role_arn,
    )
    xgb_train.set_hyperparameters(
        objective="reg:linear",
        num_round=50,
        max_depth=5,
        eta=0.2,
        gamma=4,
        min_child_weight=6,
        subsample=0.7,
        silent=0
    )
    return xgb_train

In [5]:
def create_training_step(estimator: Estimator, train_data_uri: str, valid_data_uri: str):
    train_args = estimator.fit(
        inputs={
            "train": TrainingInput(
                s3_data=train_data_uri,  # step_process.properties.ProcessingOutputConfig.Outputs[
                #     "train"
                # ].S3Output.S3Uri,
                content_type="text/csv"
            ),
            "validation": TrainingInput(
                s3_data=valid_data_uri,  # step_process.properties.ProcessingOutputConfig.Outputs[
                #     "validation"
                # ].S3Output.S3Uri,
                content_type="text/csv"
            )
        },
    )
    step_train = TrainingStep(
        name="AbaloneTrain",
        step_args=train_args
    )
    return step_train

In [6]:
def create_evaluation_step(pipeline_sess: PipelineSession, region: str, role_arn: str, model_uri: str,
                           test_data_uri: str):
    image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=region,
        version="1.0-1",
        py_version="py3",
        instance_type="ml.m5.xlarge"
    )
    script_eval = ScriptProcessor(
        image_uri=image_uri,
        command=["python3"],
        instance_type="ml.m5.xlarge",
        instance_count=1,
        base_job_name="script-abalone-evaluation",
        sagemaker_session=pipeline_sess,
        role=role_arn,
    )
    eval_args = script_eval.run(
        inputs=[
            ProcessingInput(
                source=model_uri,
                destination="/opt/ml/processing/model"
            ),
            ProcessingInput(
                source=test_data_uri,
                destination="/opt/ml/processing/test"
            )
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ],
        code="evaluation.py",
    )
    evaluation_report = PropertyFile(
        name="EvaluationReport", output_name="evaluation", path="evaluation.json"
    )
    step_eval = ProcessingStep(
        name="AbaloneEval",
        step_args=eval_args,
        property_files=[evaluation_report],
    )
    return step_eval

In [7]:
def create_model_step(pipeline_sess: PipelineSession, region: str, role_arn: str, model_uri: str):
    image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=region,
        version="1.0-1",
        py_version="py3",
        instance_type="ml.m5.xlarge"
    )
    model = Model(
        image_uri=image_uri,
        model_data=model_uri,
        sagemaker_session=pipeline_sess,
        role=role_arn,
    )
    step_creating_model = ModelStep(
        name="AbaloneCreateModel",
        step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
    )
    return step_creating_model, model

In [9]:
def create_transform_step(pipeline_sess: PipelineSession, model_name: str, output_path: str, batch_data_uri: str):
    transformer = Transformer(
        model_name=model_name,
        instance_type="ml.m5.xlarge",
        instance_count=1,
        output_path=output_path,
    )
    step_transform = TransformStep(
        name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data_uri)
    )
    return step_transform

In [10]:
def register_model_step(model: Model):
    register_args = model.register(
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
        transform_instances=["ml.m5.xlarge"],
        model_package_group_name=model_package_group_name,
    )
    step_register = ModelStep(name="AbaloneRegisterModel", step_args=register_args)
    return step_register

In [11]:
def execute():
    pipeline_name = "AbalonePipeline"
    os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
    
    boto_session = boto3.session.Session(region_name=os.environ["AWS_DEFAULT_REGION"])
    sagemaker_session = sagemaker.session.Session(boto_session=boto_session)
    default_bucket = sagemaker_session.default_bucket()
    print("---default_bucket---", default_bucket)

    # region = sagemaker_session.boto_region_name
    role = "arn:aws:iam::703051050276:role/service-role/AmazonSageMaker-ExecutionRole-20231129T151175"
    pipeline_session = PipelineSession(boto_session=boto_session,
                                       sagemaker_client=boto_session.client("sagemaker"),
                                       default_bucket=sagemaker_session.default_bucket())
    print("---pipeline_session values---", pipeline_session.sagemaker_config.values())

    s3_data_loc, batch_data_uri = prepare_data(default_bucket)
    print("---s3_data_loc---", s3_data_loc, "\n---s3_data_loc Batch---", batch_data_uri)

    step_preprocess = create_preprocessing_step(
        pipeline_sess=pipeline_session, role_arn=role, input_data=s3_data_loc)

    train_data_loc = step_preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri
    valid_data_loc = step_preprocess.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri
    test_data_loc = step_preprocess.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri

    xgb_estimator: Estimator = get_estimator(pipeline_sess=pipeline_session, region=os.environ["AWS_DEFAULT_REGION"],
                                             role_arn=role, model_path=f"s3://{default_bucket}/AbaloneTrain")
    
    step_training = create_training_step(estimator=xgb_estimator, train_data_uri=train_data_loc,
                                         valid_data_uri=valid_data_loc)
    step_training.add_depends_on([step_preprocess])

    model_loc = step_training.properties.ModelArtifacts.S3ModelArtifacts
    step_evaluation = create_evaluation_step(pipeline_sess=pipeline_session, region=os.environ["AWS_DEFAULT_REGION"],
                                             role_arn=role, model_uri=model_loc, test_data_uri=test_data_loc)
    step_evaluation.add_depends_on([step_training])

    step_model, model = create_model_step(pipeline_sess=pipeline_session, region=os.environ["AWS_DEFAULT_REGION"],
                                          role_arn=role, model_uri=model_loc, )

    step_register_model = register_model_step(model)

    step_transform = create_transform_step(pipeline_sess=pipeline_session, model_name=step_model.properties.ModelName,
                                           output_path=f"s3://{default_bucket}/AbaloneTransform",
                                           batch_data_uri=batch_data_uri)

    pipeline = Pipeline(
        name=pipeline_name,
        steps=[step_preprocess, step_training, step_evaluation,
               step_register_model, step_model, step_transform
              ]
    )

    definition = json.loads(pipeline.definition())
    print(definition)
    pipeline.upsert(role_arn=role)
    execution = pipeline.start()
    execution.wait(delay=30, max_attempts=120)
    print("execution completed. steps:\n", execution.list_steps())

In [12]:
execute()

---default_bucket--- sagemaker-us-east-1-703051050276
---pipeline_session values--- dict_values([])

Downloading dataset and uploading to Amazon S3...
---upload_s3_uri--- s3://sagemaker-us-east-1-703051050276/sagemaker-abalone/abalone-dataset.csv


INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


---upload_s3_uri Batch--- s3://sagemaker-us-east-1-703051050276/abalone-batch/abalone-dataset-batch
---s3_data_loc--- s3://sagemaker-us-east-1-703051050276/sagemaker-abalone/abalone-dataset.csv 
---s3_data_loc Batch--- s3://sagemaker-us-east-1-703051050276/abalone-batch/abalone-dataset-batch




{'Version': '2020-12-01', 'Metadata': {}, 'Parameters': [], 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'}, 'TrialName': {'Get': 'Execution.PipelineExecutionId'}}, 'Steps': [{'Name': 'AbalonePreprocess', 'Type': 'Processing', 'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge', 'InstanceCount': 1, 'VolumeSizeInGB': 30}}, 'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3', 'ContainerEntrypoint': ['python3', '/opt/ml/processing/input/code/preprocessing.py']}, 'RoleArn': 'arn:aws:iam::703051050276:role/service-role/AmazonSageMaker-ExecutionRole-20231129T151175', 'ProcessingInputs': [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-703051050276/sagemaker-abalone/abalone-dataset.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicate



execution completed. steps:
 [{'StepName': 'AbaloneTransform', 'StartTime': datetime.datetime(2024, 5, 25, 15, 6, 56, 355000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 5, 25, 15, 12, 8, 844000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:703051050276:transform-job/pipelines-695w0lkcqvr6-AbaloneTransform-x72cCkLTjR'}}, 'AttemptCount': 1}, {'StepName': 'AbaloneEval', 'StartTime': datetime.datetime(2024, 5, 25, 15, 6, 54, 664000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 5, 25, 15, 11, 38, 474000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:703051050276:processing-job/pipelines-695w0lkcqvr6-AbaloneEval-wwITFXvbu4'}}, 'AttemptCount': 1}, {'StepName': 'AbaloneRegisterModel-RegisterModel', 'StartTime': datetime.datetime(2024, 5, 25, 15, 6, 54, 664000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2024, 5, 25, 15, 6, 55, 650000, tzi