In [2]:
# Make sure to use `sagemaker==2.93.0`
! pip install --quiet sagemaker==2.93.0

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m22.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
import boto3
import sagemaker
import sagemaker.session
from sagemaker.workflow.pipeline_context import PipelineSession

In [4]:
sagemaker.__version__

'2.93.0'

In [5]:
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
boto_session = boto3.Session()
sagemaker_session = sagemaker.Session(boto_session=boto_session)
default_bucket = sagemaker_session.default_bucket()

In [6]:
def get_sagemaker_client(region):
    """Gets the sagemaker client.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """
    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")
    return sagemaker_client


def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )


def get_pipeline_session(region, default_bucket):
    """Gets the pipeline session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        PipelineSession instance
    """

    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")

    return PipelineSession(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        default_bucket=default_bucket,
    )

In [7]:
sagemaker_session = get_session(region, default_bucket)
role = sagemaker.session.get_execution_role(sagemaker_session)
pipeline_session = get_pipeline_session(region, default_bucket)

In [8]:
base_job_name = "intel-image-classification-pipeline"

In [9]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

# Data Processing Step

In [10]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn import SKLearn, SKLearnProcessor
from sagemaker.processing import FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

In [11]:
dvc_repo_url = ParameterString(
    name="DVCRepoURL", default_value="codecommit::ap-south-1://sagemaker-intel-image-classification"
)
dvc_branch = ParameterString(
    name="DVCBranch", default_value="pipeline-processed-dataset"
)

In [12]:
sklearn_processor = FrameworkProcessor(
    estimator_cls=SKLearn,
    framework_version="0.23-1",
    # instance_type="ml.t3.medium",
    instance_type="ml.t3.large",
    image_uri="294495367161.dkr.ecr.ap-south-1.amazonaws.com/sagemaker:latest",
    # instance_type='local',
    instance_count=1,
    base_job_name=f"{base_job_name}/preprocess-dataset",
    sagemaker_session=pipeline_session,
    # sagemaker_session=local_pipeline_session,
    role=role,
    env={
        "DVC_REPO_URL": dvc_repo_url,
        "DVC_BRANCH": dvc_branch,
        "GIT_USER": "Vivek Chaudhary",
        "GIT_EMAIL": "vivek.experiotech@gmail.com",
    },
)

In [13]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

In [14]:
input_dataset = ParameterString(
    name="InputDatasetZip",
    default_value="s3://sagemaker-ap-south-1-294495367161/labelled_dataset/",
)

In [15]:
processing_step_args = sklearn_processor.run(
    code="preprocess.py",
    source_dir="/root/intel_image_classification_sagemaker/",
    # dependencies="sagemaker-flower-pipeline/requirements.txt",
    inputs=[
        ProcessingInput(
            input_name="data",
            source=input_dataset,
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/dataset/train"),
        ProcessingOutput(output_name="val", source="/opt/ml/processing/dataset/val"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/dataset/test"),
    ],
)


Job Name:  intel-image-classification-pipeline/pre-2023-01-26-12-27-32-265
Inputs:  [{'InputName': 'data', 'AppManaged': False, 'S3Input': {'S3Uri': ParameterString(name='InputDatasetZip', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-ap-south-1-294495367161/labelled_dataset/'), 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-294495367161/intel-image-classification-pipeline/pre-2023-01-26-12-27-32-265/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'entrypoint', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-294495367161/intel-image-classification-pipeline/pre-



In [16]:
step_process = ProcessingStep(
    name="PreprocessDataset",
    step_args=processing_step_args,
)

In [17]:
step_process

ProcessingStep(name='PreprocessDataset', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)

# Training Step 

In [18]:
from sagemaker.pytorch import PyTorch
from sagemaker.debugger import TensorBoardOutputConfig
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)

In [19]:
tensorboard_output_config = TensorBoardOutputConfig(
    s3_output_path=f"s3://{default_bucket}/logs/sagemaker_intel_image_classification",
    container_local_output_path="/opt/ml/output/tensorboard",
)

In [20]:
pt_estimator = PyTorch(
    image_uri="294495367161.dkr.ecr.ap-south-1.amazonaws.com/sagemaker:latest",
    base_job_name=f"{base_job_name}/training_intel_image",
    source_dir="/root/intel_image_classification_sagemaker/",
    entry_point="train.py",
    sagemaker_session=pipeline_session,
    role=role,
    py_version="py38",
    framework_version="1.11.0",
    instance_count=1,
    instance_type="ml.c5.4xlarge",
    tensorboard_output_config=tensorboard_output_config,
    use_spot_instances=True,
    max_wait=5500,
    max_run=5200,
    environment={"GIT_USER": "Vivek Chaudhary", "GIT_EMAIL": "vivek.experiotech@gmail.com"},
)

In [21]:
from sagemaker.inputs import TrainingInput

In [22]:
estimator_step_args = pt_estimator.fit(
    {
        # 'train': 's3://sagemaker-ap-south-1-006547668672/testing/training',
        # 'train': step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        # # 'test': 's3://sagemaker-ap-south-1-006547668672/testing/training'
        # 'test': step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
        ),
        "val": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "val"
            ].S3Output.S3Uri,
        )
    }
)

In [23]:
step_train = TrainingStep(
    name="TrainClassifier",
    step_args=estimator_step_args,
)

In [24]:
step_train

TrainingStep(name='TrainClassifier', display_name=None, description=None, step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)

# Eval Step

In [25]:
from sagemaker.pytorch.processing import PyTorchProcessor

In [26]:
pytorch_processor = PyTorchProcessor(
    image_uri="294495367161.dkr.ecr.ap-south-1.amazonaws.com/sagemaker:latest",
    framework_version="1.11.0",
    py_version="py38",
    role=role,
    sagemaker_session=pipeline_session,
    instance_type="ml.t3.xlarge",
    instance_count=1,
    base_job_name=f"{base_job_name}/eval-intel_image-classifier-model",
)

In [27]:
eval_step_args = pytorch_processor.run(
    code="eval.py",
    source_dir="/root/intel_image_classification_sagemaker/",
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            # source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            # source="s3://sagemaker-ap-south-1-006547668672/training-flower-pipeline-2022-12-07-03-20-21-157/output/model.tar.gz",
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            # source=step_process.properties.ProcessingOutputConfig.Outputs[
            #     "test"
            # ].S3Output.S3Uri,
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            # source="s3://sagemaker-ap-south-1-006547668672/testing/training",
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", source="/opt/ml/processing/evaluation"
        ),
    ],
)


Job Name:  intel-image-classification-pipeline/eva-2023-01-26-12-27-41-430
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7f360c8ddfd0>, 'LocalPath': '/opt/ml/processing/model', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7f360c854790>, 'LocalPath': '/opt/ml/processing/test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-294495367161/intel-image-classification-pipeline/eva-2023-01-26-12-27-41-430/source/sourcedir.tar.gz', 'LocalPath': '/opt/ml/processing/input/code/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionTy

In [28]:
from sagemaker.workflow.properties import PropertyFile

In [29]:
evaluation_report = PropertyFile(
    name="IntelImageClassifierEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)
step_eval = ProcessingStep(
    name="ModelEvaluation",
    step_args=eval_step_args,
    property_files=[evaluation_report],
)

In [30]:
step_eval

ProcessingStep(name='ModelEvaluation', display_name=None, description=None, step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)

# Model Metrics

In [31]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

In [32]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
                "S3Uri"
            ]
        ),
        content_type="application/json",
    )
)

# Register Model Step (Conditional)

In [33]:
from sagemaker.pytorch import PyTorchModel
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.workflow.model_step import ModelStep

In [34]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

In [35]:
model_package_group_name = "IntelImageClassifierModelGroup"

In [36]:
model = PyTorchModel(
    entry_point="infer.py",
    source_dir="/root/intel_image_classification_sagemaker/",
    sagemaker_session=pipeline_session,
    role=role,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    # model_data="s3://sagemaker-ap-south-1-006547668672/training-flower-pipeline-2022-12-07-03-20-21-157/output/model.tar.gz",
    framework_version="1.11.0",
    py_version="py38",
)

In [37]:
model_step_args = model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.t2.medium"],
    transform_instances=["ml.m4.xlarge"],
    model_package_group_name=model_package_group_name,
    # approval_status=model_approval_status,
    approval_status="PendingManualApproval",
    model_metrics=model_metrics,
)

In [38]:
step_register = ModelStep(
    name="RegisterModel",
    step_args=model_step_args,
)

In [39]:
step_register

ModelStep(name='RegisterModel', steps=[_RepackModelStep(name='RegisterModel-RepackModel-0', display_name=None, description='Used to repack a model with customer scripts for a register/create model step', step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None), _RegisterModelStep(name='RegisterModel-RegisterModel', display_name=None, description=None, step_type=<StepTypeEnum.REGISTER_MODEL: 'RegisterModel'>, depends_on=None)])

In [40]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import (
    JsonGet,
)
from sagemaker.workflow.condition_step import (
    ConditionStep,
)

In [41]:
cond_gte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="multiclass_classification_metrics.accuracy.value",
    ),
    right=0.6,
)

step_cond = ConditionStep(
    name="CheckEvaluationAccuracy",
    conditions=[cond_gte],
    if_steps=[step_register],
    else_steps=[],
)

# Pipeline

In [42]:
from sagemaker.workflow.pipeline import Pipeline

In [43]:
pipeline_name = "PyTorchLightningKaggleIntelImageClassifier"

In [44]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[dvc_repo_url, dvc_branch, input_dataset, model_approval_status],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

In [45]:
upsert_response = pipeline.upsert(
    role_arn=role, description="testing pytorch intel image pipeline"
)

In [46]:
execution = pipeline.start()

In [47]:
execution.list_steps()

[]

In [48]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-south-1:294495367161:pipeline/pytorchlightningkaggleintelimageclassifier',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-south-1:294495367161:pipeline/pytorchlightningkaggleintelimageclassifier/execution/gm2rftpgwyaj',
 'PipelineExecutionDisplayName': 'execution-1674736066475',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'pytorchlightningkaggleintelimageclassifier',
  'TrialName': 'gm2rftpgwyaj'},
 'CreationTime': datetime.datetime(2023, 1, 26, 12, 27, 46, 417000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 1, 26, 12, 27, 46, 417000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-south-1:294495367161:user-profile/d-e3gzf0a5ytck/default-1673687160619',
  'UserProfileName': 'default-1673687160619',
  'DomainId': 'd-e3gzf0a5ytck'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-south-1:294495367161:user-profile/d-e3gzf0a5ytck/default-1673687160619