#### Prerequisites 

In [2]:
%%capture

!pip install sagemaker==2.121.2
!pip install boto3==1.26.27

### Imports

In [51]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.steps import ProcessingStep, TransformStep
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import MetricsSource, ModelMetrics, ModelPackage
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.functions import Join, JsonGet
from sagemaker.workflow.automl_step import AutoMLStep
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.functions import Join
from sagemaker.transformer import Transformer
from sagemaker.pipeline import PipelineModel
from sagemaker.processing import Processor
from sagemaker import AutoML, AutoMLInput
from sagemaker.model import Model

import sagemaker
import logging
import boto3
import json
import time
import yaml

In [54]:
 with open('config.yml', 'r') as file_:
    config = yaml.safe_load(file_)

In [58]:
config['data_wrangler_processing_step']

{'instance_type': 'ml.m5.xlarge', 'instance_count': 2}

In [4]:
logger = logging.getLogger('sagemaker')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [5]:
logger.info(f'Using SageMaker: {sagemaker.__version__}')
logger.info(f'Using Boto3: {boto3.__version__}')

Using SageMaker: 2.121.2
Using Boto3: 1.26.27


### Essentials 

In [6]:
sagemaker_session = sagemaker.Session()
pipeline_session = PipelineSession()
s3_client = boto3.client('s3')

In [7]:
ROLE = sagemaker.get_execution_role()
BUCKET = sagemaker_session.default_bucket()  # Can also be a custom S3 bucket
PREFIX = '01-dw-datasets'
PROCESSING_INPUT_PATH = f's3://{BUCKET}/{PREFIX}'
PROCESSING_INPUT_NAME = 'loans.csv' #
PROCESSING_OUTPUT_PATH = f's3://{BUCKET}/{PREFIX}'
NODE_ID = '47d300b9-fca2-4799-a944-efbbddd827de'
PROCESSING_OUTPUT_NAME = f'{NODE_ID}.default'
CURRENT_TIMESTAMP = time.strftime('%d-%H-%M-%S', time.gmtime())
CURRENT_TIMESTAMP

'23-06-20-54'

### Define Pipeline Parameters 

In [8]:
instance_count = ParameterInteger(name='InstanceCount', default_value=1)
instance_type = ParameterString(name='InstanceType', default_value='ml.m5.xlarge')
model_approval_status = ParameterString(name='ModelApprovalStatus', default_value='Approved')
model_package_group_name = ParameterString(name='ModelPackageName', default_value='LowCodePipelineModels')
model_registration_metric_threshold = ParameterFloat(name='ModelRegistrationMetricThreshold', default_value=0.0)

### 1. Create Data Wrangler Processing Step

#### Processing Input and Output

In [9]:
data_sources = []
processing_input = ProcessingInput(source=f'{PROCESSING_INPUT_PATH}/{PROCESSING_INPUT_NAME}', 
                                   destination=f'/opt/ml/processing/{PROCESSING_INPUT_NAME}', 
                                   input_name=PROCESSING_INPUT_NAME, 
                                   s3_data_type='S3Prefix', 
                                   s3_input_mode='File', 
                                   s3_data_distribution_type='FullyReplicated')
data_sources.append(processing_input)

In [10]:
processing_job_output = ProcessingOutput(source='/opt/ml/processing/output', 
                                         destination=f'{PROCESSING_OUTPUT_PATH}/{CURRENT_TIMESTAMP}',
                                         output_name=PROCESSING_OUTPUT_NAME,
                                         s3_upload_mode='EndOfJob')

#### Upload original data flow to S3

In [11]:
FLOW_FILE_NAME = 'loans.flow'
s3_client.upload_file(FLOW_FILE_NAME, 
                      BUCKET, 
                      f'{PREFIX}/{CURRENT_TIMESTAMP}-{FLOW_FILE_NAME}')
FLOW_S3_URI = f's3://{BUCKET}/{PREFIX}/{CURRENT_TIMESTAMP}-{FLOW_FILE_NAME}'
FLOW_S3_URI

's3://sagemaker-us-east-1-119174016168/01-dw-datasets/23-06-20-54-loans.flow'

In [12]:
flow_input = ProcessingInput(source=FLOW_S3_URI, 
                             destination='/opt/ml/processing/flow', 
                             input_name='flow', 
                             s3_data_type='S3Prefix', 
                             s3_input_mode='File', 
                             s3_data_distribution_type='FullyReplicated')

#### Data Wrangler config parameters

In [13]:
PROCESSING_JOB_NAME = f'Data-Wrangler-Processing-job-{CURRENT_TIMESTAMP}'
DW_PROCESSING_CONTAINER_URI = '663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.31.0'
INSTANCE_COUNT = 2
INSTANCE_TYPE = 'ml.m5.4xlarge'
EBS_VOLUME_SIZE = 30  # in GB
OUTPUT_CONTENT_TYPE = 'CSV'

In [14]:
refit_trained_params = {'refit': True, 
                        'output_flow': f'{CURRENT_TIMESTAMP}-refitted-{FLOW_FILE_NAME}'}

#### Create a Processor

In [15]:
processor = Processor(base_job_name=PROCESSING_JOB_NAME,
                      role=ROLE, 
                      image_uri=DW_PROCESSING_CONTAINER_URI, 
                      instance_count=INSTANCE_COUNT, 
                      instance_type=INSTANCE_TYPE, 
                      volume_size_in_gb=EBS_VOLUME_SIZE,  
                      sagemaker_session=sagemaker_session)

#### Create the Data Wrangler Processing Step

In [16]:
data_wrangler_step = ProcessingStep(name='DataWranglerProcessingStep', 
                                    processor=processor, 
                                    inputs=[flow_input] + data_sources, 
                                    outputs=[processing_job_output], 
                                    job_arguments=[f"--refit-trained-params '{json.dumps(refit_trained_params)}'"])

### 2. Create Autopilot Step

In [17]:
TRAINING_INPUT_CONTENT_TYPE = 'text/csv;header=present'
TARGET_ATTRIBUTE_NAME = 'loan_status'

In [18]:
auto_ml = AutoML(role=ROLE, 
                 target_attribute_name=TARGET_ATTRIBUTE_NAME, 
                 sagemaker_session=pipeline_session, 
                 mode='ENSEMBLING')

In [19]:
s3_input = Join(on='/', 
                values=[data_wrangler_step.properties.ProcessingOutputConfig.Outputs[PROCESSING_OUTPUT_NAME].S3Output.S3Uri,
                        data_wrangler_step.properties.ProcessingJobName, 
                        f'{PROCESSING_OUTPUT_NAME.replace(".", "/")}'])
s3_input

Join(on='/', values=[<sagemaker.workflow.properties.Properties object at 0x7f0095830690>, <sagemaker.workflow.properties.Properties object at 0x7f00956d63d0>, '47d300b9-fca2-4799-a944-efbbddd827de/default'])

In [20]:
train_args = auto_ml.fit(inputs=AutoMLInput(inputs=s3_input, 
                                            content_type=TRAINING_INPUT_CONTENT_TYPE, 
                                            target_attribute_name=TARGET_ATTRIBUTE_NAME))



In [21]:
automl_step = AutoMLStep(name='AutoMLStep', 
                         step_args=train_args)

### 3. Inference Pipeline Model Creation Step

In [22]:
INSTANCE_TYPE = 'ml.m5.xlarge'

In [23]:
best_automl_model = automl_step.get_best_auto_ml_model(ROLE, 
                                                       sagemaker_session=pipeline_session)

In [24]:
best_inference_container = {
    'Image': best_automl_model.image_uri,
    'ModelDataUrl': best_automl_model.model_data,
    'Environment': best_automl_model.env
}

In [25]:
inference_flow_uri = 's3://sagemaker-us-east-1-119174016168/data_wrangler_inference_flows/loans_2022-12-22-04-37-36.tar.gz'

In [26]:
dw_container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.x"
algo_container_uri = best_automl_model.image_uri

In [27]:
algo_model_uri = best_automl_model.model_data

In [28]:
pipeline_models = []
target_column_name = 'loan_status'
data_wrangler_model_name = f"DataWranglerInferencePipelineFlowModel-{CURRENT_TIMESTAMP}"

In [29]:
data_wrangler_model = Model(image_uri=dw_container_uri, 
                            model_data=inference_flow_uri, 
                            role=ROLE, 
                            name=data_wrangler_model_name, 
                            sagemaker_session=pipeline_session, 
                            env={"INFERENCE_TARGET_COLUMN_NAME": target_column_name})
pipeline_models.append(data_wrangler_model)

In [30]:
algo_model_name = f"DataWranglerInferencePipelineAlgoModel-{CURRENT_TIMESTAMP}"
algo_environment = best_inference_container["Environment"]

algo_model = Model(image_uri=algo_container_uri, 
                   model_data=algo_model_uri, 
                   role=ROLE, 
                   name=algo_model_name, 
                   sagemaker_session=pipeline_session, 
                   env=algo_environment)

pipeline_models.append(algo_model)

In [31]:
inference_pipeline_model_name = f"DataWranglerInferencePipelineModel-{CURRENT_TIMESTAMP}"

inference_pipeline_model = PipelineModel(models=pipeline_models, 
                                         role=ROLE, 
                                         name=inference_pipeline_model_name, 
                                         sagemaker_session=pipeline_session)

In [32]:
step_args_create_model = inference_pipeline_model.create(instance_type=INSTANCE_TYPE)

In [33]:
step_create_model = ModelStep(name='InferencePipeline', step_args=step_args_create_model)
step_create_model

ModelStep(name='InferencePipeline', steps=[CreateModelStep(name='InferencePipeline-CreateModel', display_name=None, description=None, step_type=<StepTypeEnum.CREATE_MODEL: 'Model'>, depends_on=None)])

### 4. Batch Scoring Step

In [34]:
INSTANCE_TYPE = 'ml.m5.xlarge'
INSTANCE_COUNT = 2

In [35]:
HOLDOUT_S3_PATH = f's3://{BUCKET}/{PREFIX}/holdout.csv'
TRUE_LABELS_S3_PATH = f's3://{BUCKET}/{PREFIX}/true_labels.csv'

In [36]:
transformer = Transformer(model_name=step_create_model.properties.ModelName, 
                          instance_count=INSTANCE_COUNT, 
                          instance_type=INSTANCE_TYPE, 
                          output_path=Join(on='/', values=['s3:/', BUCKET, PREFIX, 'transform']), 
                          sagemaker_session=pipeline_session)

In [37]:
step_batch_transform = TransformStep(name='BatchTransformStep', 
                                     step_args=transformer.transform(data=HOLDOUT_S3_PATH, 
                                                                     content_type='text/csv'))

### Evaluation Step

In [38]:
evaluation_report = PropertyFile(name='evaluation', 
                                 output_name='evaluation_metrics', 
                                 path='evaluation_metrics.json')

In [39]:
sklearn_processor = SKLearnProcessor(role=ROLE, 
                                     framework_version='1.0-1', 
                                     instance_count=instance_count, 
                                     instance_type=instance_type.default_value, 
                                     sagemaker_session=pipeline_session)

Defaulting to only available Python version: py3


In [40]:
step_args_sklearn_processor = sklearn_processor.run(
    inputs=[
        ProcessingInput(
            source=step_batch_transform.properties.TransformOutput.S3OutputPath,
            destination='/opt/ml/processing/input/predictions',
        ),
        ProcessingInput(source=TRUE_LABELS_S3_PATH, destination='/opt/ml/processing/input/true_labels'),
    ],
    outputs=[
        ProcessingOutput(
            output_name='evaluation_metrics',
            source="/opt/ml/processing/evaluation",
            destination=Join(on='/', values=['s3:/', BUCKET, PREFIX, 'evaluation']),
        ),
    ],
    code='evaluation.py')

In [41]:
step_evaluation = ProcessingStep(name='ModelEvaluationStep', 
                                 step_args=step_args_sklearn_processor, 
                                 property_files=[evaluation_report])

### Conditional Registration Step

In [42]:
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=automl_step.properties.BestCandidateProperties.ModelInsightsJsonReportPath,
        content_type='application/json'),
    explainability=MetricsSource(
        s3_uri=automl_step.properties.BestCandidateProperties.ExplainabilityJsonReportPath,
        content_type='application/json'))

In [43]:
step_args_register_model = inference_pipeline_model.register(content_types=['text/csv'], 
                                                      response_types=['text/csv'], 
                                                      inference_instances=[instance_type], 
                                                      transform_instances=[instance_type], 
                                                      model_package_group_name=model_package_group_name, 
                                                      approval_status=model_approval_status, 
                                                      model_metrics=model_metrics)

In [44]:
step_register_model = ModelStep(name='MeetsThreshold', 
                                step_args=step_args_register_model)

In [45]:
step_conditional_registration = ConditionStep(name='ConditionalStep', 
                                              conditions=[ConditionGreaterThanOrEqualTo(
                                                            left=JsonGet(
                                                                step_name=step_evaluation.name,
                                                                property_file=evaluation_report,
                                                                json_path='classification_metrics.weighted_f1.value',
                                                            ),
                                                            right=model_registration_metric_threshold,
                                              )],
                                              if_steps=[step_register_model], 
                                              else_steps=[])

### 4. Create SageMaker Pipeline

#### Create Pipeline

In [46]:
pipeline_name = f'LowCodePipeline'

In [47]:
pipeline_steps = [data_wrangler_step, automl_step, step_create_model, step_batch_transform, step_evaluation, step_conditional_registration]
pipeline = Pipeline(name=pipeline_name, 
                    parameters=[instance_type, instance_count, model_registration_metric_threshold, model_approval_status,
        model_package_group_name,], 
                    steps=pipeline_steps, 
                    sagemaker_session=sagemaker_session)

#### Examine Pipeline Definition

In [48]:
definition = json.loads(pipeline.definition())
definition

Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.



Job Name:  sagemaker-scikit-learn-2022-12-23-06-20-57-466
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7f009420f790>, 'LocalPath': '/opt/ml/processing/input/predictions', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-119174016168/01-dw-datasets/true_labels.csv', 'LocalPath': '/opt/ml/processing/input/true_labels', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-119174016168/LowCodePipeline/code/52e14e6bdb6691b4f3c5cdc0752c0e6a/evaluation.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'Fully

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'InstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'ModelRegistrationMetricThreshold',
   'Type': 'Float',
   'DefaultValue': 0.0},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'ModelPackageName',
   'Type': 'String',
   'DefaultValue': 'LowCodePipelineModels'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'DataWranglerProcessingStep',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.4xlarge',
      'InstanceCount': 2,
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.31.0',
     'ContainerArguments'

#### Start Pipeline Execution

In [49]:
pipeline.upsert(role_arn=ROLE)


Job Name:  sagemaker-scikit-learn-2022-12-23-06-20-57-812
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': <sagemaker.workflow.properties.Properties object at 0x7f009420f790>, 'LocalPath': '/opt/ml/processing/input/predictions', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-119174016168/01-dw-datasets/true_labels.csv', 'LocalPath': '/opt/ml/processing/input/true_labels', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-119174016168/LowCodePipeline/code/52e14e6bdb6691b4f3c5cdc0752c0e6a/evaluation.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'Fully

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:119174016168:pipeline/lowcodepipeline',
 'ResponseMetadata': {'RequestId': '0933d480-cf3d-4fea-8035-4bfbe9412964',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '0933d480-cf3d-4fea-8035-4bfbe9412964',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Fri, 23 Dec 2022 06:20:57 GMT'},
  'RetryAttempts': 0}}

In [50]:
pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:119174016168:pipeline/lowcodepipeline/execution/kru6c2uon469', sagemaker_session=<sagemaker.session.Session object at 0x7f009837d1d0>)