# Data Wrangler Export to SageMaker Pipelines Notebook

You can use Amazon SageMaker Pipelines to create
end-to-end workflows that manage and deploy SageMaker jobs. Pipelines
come with SageMaker Python SDK integration, so you can build each step
of your workflow using a Python-based interface.

After your workflow is deployed, you can view the Directed Acyclic Graph
(DAG) for your pipeline and manage your executions using Amazon SageMaker Studio.

Use this notebook to create a SageMaker pipeline with a data preperation step,
defined by your Data Wrangler flow.

In this notebook, you will do the following:
* Upload your Data Wrangler .flow file to S3 so that it can be used to define
a processing job step.
* Define a processing job step. This step is used to create a pipeline.
* Define a pipeline that includes a data preperation steps defined by your
Data Wrangler flow. Optionally, you can add additional steps to your pipeline.
* Execute the pipeline and monitor its status using SageMaker Pipeline APIs.

Installing dependencies...

In [1]:
# SageMaker Python SDK version 2.x is required
import sagemaker
import subprocess
import sys

original_version = sagemaker.__version__
if sagemaker.__version__ != "2.20.0":
    subprocess.check_call(
        [sys.executable, "-m", "pip", "install", "sagemaker==2.20.0"]
    )
    import importlib
    importlib.reload(sagemaker)

In [2]:
import json
import os
import time
import uuid

import boto3
import sagemaker

## Parameters
The following lists parameters that are used throughout this notebook.
You can, optionally, use the following cell to configure these variables:
* `bucket` - The S3 bucket used to save the output returned
from the processing job and the flow file you exported from Data Wrangler.
* `prefix` - This is the prefix your .flow file is saved under in S3.
* `flow_id` and `flow_name` - used to name your flow file when it is saved
to S3.
* `instance_type` - The instance type used in your processing job.
* `output_content_type` - The format type used to save the output of the
processing job.
* `sagemaker_endpoint_url` - An endpoint URL used to communicate with SageMaker.

In [3]:
# The S3 bucket and location used to save processing job outputs and your .flow file.
# Specify a different bucket here if you wish.
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "data_wrangler_flows"
flow_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_name = f"flow-{flow_id}"
flow_uri = f"s3://{bucket}/{prefix}/{flow_name}.flow"

# Do not modify flow_file_name
flow_file_name = "retail_sales_preprocessing.flow"

iam_role = sagemaker.get_execution_role()

container_uri = "415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.1.1"

# Processing Job Resources Configurations
# Data wrangler processing job only supports 1 instance.
instance_count = 1
instance_type = "ml.m5.4xlarge"

# Processing Job Path URI Information
output_prefix = f"export-{flow_name}/output"
output_path = f"s3://{bucket}/{output_prefix}"
output_name = "68ca69fe-f780-4e58-a8ed-2c813229be06.default"

processing_dir = "/opt/ml/processing"

# Modify the variable below to specify the content type to be used for writing each output
# Currently supported options are 'CSV' or 'PARQUET', and the default is 'CSV'
output_content_type = "CSV"

# URL to use for sagemaker client.
# If this is None, boto will automatically construct the appropriate URL to use
# when communicating with sagemaker.
sagemaker_endpoint_url = None

Upload the Data Wrangler .flow file to Amazon S3 so that it can be used as an input to the
processing job.

In [4]:
# Load .flow file
with open(flow_file_name) as f:
    flow = json.load(f)

# Upload to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"{prefix}/{flow_name}.flow")

print(f"Data Wrangler Flow uploaded to {flow_uri}")

Data Wrangler Flow uploaded to s3://sagemaker-us-east-2-645431112437/data_wrangler_flows/flow-04-15-33-41-dae3c834.flow


## Create Boto3 Processing Job arguments

This notebook submits a processing job using boto, which will require an argument dictionary to
submit to the boto client. Below, utility methods are defined for creating processing job inputs
for the following sources: S3, Athena, and Redshift. Then, the argument dictionary is generated
using the parsed inputs and job configurations such as instance type.

In [5]:
def create_flow_notebook_processing_input(base_dir, flow_s3_uri):
    return {
        "InputName": "flow",
        "S3Input": {
            "LocalPath": f"{base_dir}/flow",
            "S3Uri": flow_s3_uri,
            "S3DataType": "S3Prefix",
            "S3InputMode": "File",
        },
    }

def create_s3_processing_input(base_dir, name, dataset_definition):
    return {
        "InputName": name,
        "S3Input": {
            "LocalPath": f"{base_dir}/{name}",
            "S3Uri": dataset_definition["s3ExecutionContext"]["s3Uri"],
            "S3DataType": "S3Prefix",
            "S3InputMode": "File",
        },
    }

def create_redshift_processing_input(base_dir, name, dataset_definition):
    return {
        "InputName": name,
        "DatasetDefinition": {
            "RedshiftDatasetDefinition": {
                "ClusterId": dataset_definition["clusterIdentifier"],
                "Database": dataset_definition["database"],
                "DbUser": dataset_definition["dbUser"],
                "QueryString": dataset_definition["queryString"],
                "ClusterRoleArn": dataset_definition["unloadIamRole"],
                "OutputS3Uri": f'{dataset_definition["s3OutputLocation"]}{name}/',
                "OutputFormat": dataset_definition["outputFormat"].upper(),
            },
            "LocalPath": f"{base_dir}/{name}",
        },
    }

def create_athena_processing_input(base_dir, name, dataset_definition):
    return {
        "InputName": name,
        "DatasetDefinition": {
            "AthenaDatasetDefinition": {
                "Catalog": dataset_definition["catalogName"],
                "Database": dataset_definition["databaseName"],
                "QueryString": dataset_definition["queryString"],
                "OutputS3Uri": f'{dataset_definition["s3OutputLocation"]}{name}/',
                "OutputFormat": dataset_definition["outputFormat"].upper(),
            },
            "LocalPath": f"{base_dir}/{name}",
        },
    }

def create_processing_inputs(processing_dir, flow, flow_uri):
    """Helper function for creating processing inputs
    :param flow: loaded data wrangler flow notebook
    :param flow_uri: S3 URI of the data wrangler flow notebook
    """
    processing_inputs = []
    flow_processing_input = create_flow_notebook_processing_input(processing_dir, flow_uri)
    processing_inputs.append(flow_processing_input)

    for node in flow["nodes"]:
        if "dataset_definition" in node["parameters"]:
            data_def = node["parameters"]["dataset_definition"]
            name = data_def["name"]
            source_type = data_def["datasetSourceType"]

            if source_type == "S3":
                s3_processing_input = create_s3_processing_input(
                    processing_dir, name, data_def)
                processing_inputs.append(s3_processing_input)
            elif source_type == "Athena":
                athena_processing_input = create_athena_processing_input(
                    processing_dir, name, data_def)
                processing_inputs.append(athena_processing_input)
            elif source_type == "Redshift":
                redshift_processing_input = create_redshift_processing_input(
                    processing_dir, name, data_def)
                processing_inputs.append(redshift_processing_input)
            else:
                raise ValueError(f"{source_type} is not supported for Data Wrangler Processing.")
    return processing_inputs

def create_container_arguments(output_name, output_content_type):
    output_config = {
        output_name: {
            "content_type": output_content_type
        }
    }
    return [f"--output-config '{json.dumps(output_config)}'"]

# Create Processing Job Arguments
processing_job_arguments = {
    "AppSpecification": {
        "ContainerArguments": create_container_arguments(output_name, output_content_type),
        "ImageUri": container_uri,
    },
    "ProcessingInputs": create_processing_inputs(processing_dir, flow, flow_uri),
    "ProcessingOutputConfig": {
        "Outputs": [
            {
                "OutputName": output_name,
                "S3Output": {
                    "S3Uri": output_path,
                    "LocalPath": os.path.join(processing_dir, "output"),
                    "S3UploadMode": "EndOfJob",
                }
            },
        ],
    },
    "ProcessingResources": {
        "ClusterConfig": {
            "InstanceCount": instance_count,
            "InstanceType": instance_type,
            "VolumeSizeInGB": 30,
        }
    },
    "RoleArn": iam_role,
    "StoppingCondition": {
        "MaxRuntimeInSeconds": 86400,
    },
}

The following cell creates a processing step using your exported Data Wrangler flow.
This step will be used to create a SageMaker Pipeline.

In [11]:
from sagemaker.processing import Processor
from sagemaker.workflow.steps import ProcessingStep, Step, StepTypeEnum

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type
)

class DataWranglerStep(ProcessingStep):

    def __init__(self, name, processor, step_args):
        # super(NaiveStep, self).__init__(name, processor) # << previous line, change to below
        super(DataWranglerStep, self).__init__(name, processor)
        self.step_args = step_args

    @property
    def arguments(self):
        return self.step_args

wrangler_step = DataWranglerStep(
    name="DataWranglerProcessingStep",
    processor=processor,
    step_args=processing_job_arguments
)

## Workflow Creation
The following cell defines a new pipeline with the processing step.
Use this cell to add additional steps to the pipeline. To learn more about adding
steps to a pipeline, see
[Define a Pipeline](http://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html)
in the SageMaker documentation.


In [8]:
import time

from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import CreateModelStep, ProcessingStep, TrainingStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"datawrangler-pipeline-{int(time.time() * 10**7)}"
instance_type = ParameterString(name="InstanceType", default_value="ml.m5.4xlarge")
instance_count = ParameterInteger(name="InstanceCount", default_value=1)

boto_session = boto3.session.Session()
region = boto_session.region_name

sagemaker_client = boto_session.client("sagemaker")
runtime_client = boto_session.client("sagemaker-runtime")

sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_runtime_client=runtime_client,
)

In [9]:
split_data_script_uri = f's3://{bucket}/{prefix}/code/preprocessing.py'
s3_client.upload_file(Filename='preprocessing_3.py', Bucket=bucket, Key=f'{prefix}/code/preprocessing.py')

sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-retail-sales-process",
    role=iam_role
)

In [12]:
from sagemaker.workflow.properties import PropertyFile
state_report = PropertyFile(
    name="PipelineState",
    output_name="state",
    path="state.json"
)

split_data_step = ProcessingStep(
    name="TrainTestSplit",
    processor=sklearn_processor,
    outputs=[
        sagemaker.processing.ProcessingOutput(output_name="train_data", source="/opt/ml/processing/output/train"),
        sagemaker.processing.ProcessingOutput(output_name="test_data", source="/opt/ml/processing/output/test"),
        sagemaker.processing.ProcessingOutput(output_name="state", source="/opt/ml/processing/output/pipeline_state")
    ],
    job_arguments=[
        "--data-s3-uri", wrangler_step.properties.ProcessingOutputConfig.Outputs[output_name].S3Output.S3Uri
        ],
    code=split_data_script_uri,
    property_files=[state_report]
)

In [13]:
from sagemaker.workflow.condition_step import JsonGet
n_features = JsonGet(
    step=split_data_step,
    property_file=state_report,
    json_path="n_features",
)

In [14]:
model_container = sagemaker.image_uris.retrieve("factorization-machines", region=boto_session.region_name)
train_output_prefix  = f's3://{bucket}/personalization/output'

fm = sagemaker.estimator.Estimator(
    model_container,
    iam_role, 
    instance_count=1, 
    instance_type='ml.c5.xlarge',
    output_path=train_output_prefix,
    sagemaker_session=sagemaker_session
)

fm.set_hyperparameters(
    feature_dim=n_features,
    predictor_type='regressor',
    mini_batch_size=1000,
    num_factors=64,
    epochs=20
)

In [15]:
train_step = TrainingStep(
    name="TrainingStep",
    estimator=fm,
    inputs={
        "train": sagemaker.inputs.TrainingInput(
            s3_data=split_data_step.properties.ProcessingOutputConfig.Outputs['train_data'].S3Output.S3Uri
        ),
        "test": sagemaker.inputs.TrainingInput(
            s3_data=split_data_step.properties.ProcessingOutputConfig.Outputs['test_data'].S3Output.S3Uri
        )
    }
)

In [None]:
model = sagemaker.model.Model(
    name='retail-personalization-factorization-machine',
    image_uri=train_step.properties.AlgorithmSpecification.TrainingImage,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=iam_role
)

inputs = sagemaker.inputs.CreateModelInput(
    instance_type="ml.m4.xlarge"
)

create_model_step = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=inputs
)

In [None]:
register_step = RegisterModel(
    name="RegisterModel",
    estimator=fm,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/x-recordio-protobuf", "application/json"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name='Retail-Recommendation',
)

In [None]:
s3_client.upload_file(Filename='deploy.py', Bucket=bucket, Key=f'{prefix}/code/deploy.py')
deploy_script_uri = f's3://{bucket}/{prefix}/code/deploy.py'

deployment_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=iam_role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name=f'personalization-deploy',
    sagemaker_session=sagemaker_session)

deploy_step = ProcessingStep(
    name='DeployModel',
    processor=deployment_processor,
    job_arguments=[
        "--model-name", create_model_step.properties.ModelName, 
        "--region", region,
        "--endpoint-instance-type", "ml.m4.xlarge",
        "--endpoint-name", f'retail-recommendations-endpoint'],
    code=deploy_script_uri)

In [16]:
pipeline_name = f"datawrangler-pipeline-{int(time.time() * 10**7)}"
instance_type = ParameterString(name="InstanceType", default_value="ml.m5.4xlarge")
instance_count = ParameterInteger(name="InstanceCount", default_value=1)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[instance_type, instance_count],
    steps=[
        wrangler_step,
        split_data_step,
        train_step,
#         create_model_step,
#         register_step,
#         deploy_step
        ],
    sagemaker_session=sagemaker_session
)

Run the following to validate the pipeline definition.

In [17]:
import json


definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.4xlarge'},
  {'Name': 'InstanceCount', 'Type': 'Integer', 'DefaultValue': 1}],
 'Steps': [{'Name': 'DataWranglerProcessingStep',
   'Type': 'Processing',
   'Arguments': {'AppSpecification': {'ContainerArguments': ['--output-config \'{"68ca69fe-f780-4e58-a8ed-2c813229be06.default": {"content_type": "CSV"}}\''],
     'ImageUri': '415577184552.dkr.ecr.us-east-2.amazonaws.com/sagemaker-data-wrangler-container:1.1.1'},
    'ProcessingInputs': [{'InputName': 'flow',
      'S3Input': {'LocalPath': '/opt/ml/processing/flow',
       'S3Uri': 's3://sagemaker-us-east-2-645431112437/data_wrangler_flows/flow-04-15-33-41-dae3c834.flow',
       'S3DataType': 'S3Prefix',
       'S3InputMode': 'File'}},
     {'InputName': 'Online_Retail.csv',
      'S3Input': {'LocalPath': '/opt/ml/processing/Online_Retail.csv',
       'S3Uri': 's3://sagemaker-personalization-demo/Online R

## Run Pipeline
Use the following cell to submit a pipeline creation job.
You can check the progress of the pipeline with the pipeline Amazon Resource Name (ARN).


In [18]:
from botocore.exceptions import ClientError, ValidationError

try:
    response = pipeline.create(role_arn=iam_role)
except ClientError as e:
    error = e.response["Error"]
    if error["Code"] == "ValidationError" and "Pipeline names must be unique" in error["Message"]:
        print(error["Message"])
        response = pipeline.describe()
    else:
        raise

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)

arn:aws:sagemaker:us-east-2:645431112437:pipeline/datawrangler-pipeline-16124529492267638


## Pipeline Operations: Examine and Wait for Pipeline Execution

The higher-level resources of the pipeline instance provide a way for the Data Scientist and
Machine Learning Engineer to define a workflow that can be executed by SageMaker.

To monitor operations of this execution, we use the lower-level, raw workflow boto3 client of the
pipeline to describe the pipeline execution and list the pipeline execution steps.


In [19]:
start_response = pipeline.start()
pipeline_execution_arn = start_response.arn
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-2:645431112437:pipeline/datawrangler-pipeline-16124529492267638/execution/ponwbk5s1yt3


## Pipeline Status
You can use the function [describe_pipeline_execution][1] to monitor a pipeline's execution.
to view a pipeline's execution status.To view a pipeline's execution steps, you can use
[list_pipeline_execution_steps][2].The following cell checks the pipeline status and execution
steps using these functions.

[1]: https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html
[2]: https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineExecutionSteps.html


In [20]:
from pprint import pprint

execution_response = sagemaker_session.sagemaker_client.describe_pipeline_execution(
    PipelineExecutionArn=pipeline_execution_arn
)
print("Pipeline: {}.".format(execution_response["PipelineExecutionStatus"]))
print()

execution_steps_response = sagemaker_session.sagemaker_client.list_pipeline_execution_steps(
    PipelineExecutionArn=pipeline_execution_arn
)
execution_steps = execution_steps_response["PipelineExecutionSteps"]
print("Execution steps:")
pprint(execution_steps)

Pipeline: Executing.

Execution steps:
[]


Use the following cells to define and run a function that waits until the pipeline execution status
changes to a terminal state: `Failed` or `Succeeded`.


In [21]:
import botocore.waiter

def get_waiter(pipeline, delay=24, max_attempts=60):
    waiter_id = "PipelineExecutionComplete"
    model = botocore.waiter.WaiterModel({
        "version": 2,
        "waiters": {
            waiter_id: {
                "delay": delay,
                "maxAttempts": max_attempts,
                "operation": 'DescribePipelineExecution',
                "acceptors": [
                    {
                        "expected": "Succeeded",
                        "matcher": "path",
                        "state": "success",
                        "argument": "PipelineExecutionStatus"
                    },
                    {
                        "expected": "Failed",
                        "matcher": "path",
                        "state": "failure",
                        "argument": "PipelineExecutionStatus"
                    },
                ]
            }
        }
    })
    return botocore.waiter.create_waiter_with_client(
        waiter_id, model, sagemaker_session.sagemaker_client
    )

In [22]:
waiter = get_waiter(pipeline)
waiter.wait(PipelineExecutionArn=pipeline_execution_arn)

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [None]:
execution_steps_response = sagemaker_session.sagemaker_client.list_pipeline_execution_steps(
    PipelineExecutionArn=pipeline_execution_arn
)
execution_steps = execution_steps_response["PipelineExecutionSteps"]
print("Execution steps:")
pprint(execution_steps)

### Cleanup
Uncomment the following code cell to revert the SageMaker Python SDK to the original version used
before running this notebook. This notebook upgrades the SageMaker Python SDK to 2.x, which may
cause other example notebooks to break. To learn more about the changes introduced in the
SageMaker Python SDK 2.x update, see
[Use Version 2.x of the SageMaker Python SDK.](https://sagemaker.readthedocs.io/en/stable/v2.html).

In [None]:
# _ = subprocess.check_call(
#     [sys.executable, "-m", "pip", "install", f"sagemaker=={original_version}"]
# )