# Create SageMaker Pipelines from a Data Wrangler Flow

You can use Amazon SageMaker Pipelines to create end-to-end workflows that manage and deploy SageMaker jobs. 
Pipelines come with SageMaker Python SDK integration, so you can build each step of your workflow using a 
Python-based interface.

This notebook create a SageMaker pipeline that executes your Data Wrangler Flow `gsml-use-case-2-etl.flow` on the 
entire dataset as a data preparation step and optionally you can add additional steps to the pipeline.
You will execute the pipeline and monitor its status using SageMaker Pipeline APIs.

The pipeline will be processing data from the step `Join Tables` (**Step Output Name: default**). To save from a different step, go to Data Wrangler 
to select a new step to export. After your workflow is deployed, you can view the Directed Acyclic Graph
(DAG) for your pipeline and manage your executions using Amazon SageMaker Studio.

# Install SageMaker Autopilot Dependencies
To use SageMaker Autopilot in your pipeline, you must install the newest versions of botocore, boto3, and sagemaker.

In [None]:
!pip install -U 'sagemaker>=2.118.0'

In [None]:
import sagemaker
from packaging import version

if version.parse(sagemaker.__version__) >= version.parse("2.118.0"):
    automl_enabled = True
else:
    automl_enabled = False
    print(f"The AutoML pipeline step requires sagemaker >= 2.118.0, but you have version {sagemaker.__version__}. The notebook uses the XGBoost step for training instead. To use the AutoML step, rerun the preceding cell and make sure the that you're using sagemaker 2.118.0 or later.")

# Inputs and Outputs

The below settings configure the inputs and outputs for the flow export.

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

In <b>Input - Source</b> you can configure the data sources that will be used as input by Data Wrangler

1. For S3 sources, configure the source attribute that points to the input S3 prefixes
2. For all other sources, configure attributes like query_string, database in the source's 
<b>DatasetDefinition</b> object.

If you modify the inputs the provided data must have the same schema and format as the data used in the Flow. 
You should also re-execute the cells in this section if you have modified the settings in any data sources.

Parametrized data sources will be ignored when creating ProcessingInputs, and will directly read from the source.
Network isolation is not supported for parametrized data sources.
</div>

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition, RedshiftDatasetDefinition

data_sources = []

## Input - S3 Source: part-00000-tid-3325758054719273621-f557a9df-d255-4241-bc37-5c43869c3044-879-8.c000.parquet

In [None]:
data_sources.append(ProcessingInput(
    source="s3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-info/year=2019/part-00000-tid-3325758054719273621-f557a9df-d255-4241-bc37-5c43869c3044-879-8.c000.parquet", # You can override this to point to other dataset on S3
    destination="/opt/ml/processing/part-00000-tid-3325758054719273621-f557a9df-d255-4241-bc37-5c43869c3044-879-8.c000.parquet",
    input_name="part-00000-tid-3325758054719273621-f557a9df-d255-4241-bc37-5c43869c3044-879-8.c000.parquet",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

## Input - S3 Source: part-00000-tid-4629508899230787795-13cdd6fc-f68a-4bbe-a8d9-b1f89f5baabd-1054-8.c000.parquet

In [None]:
data_sources.append(ProcessingInput(
    source="s3://dsoaws/nyc-taxi-orig-cleaned-split-parquet-per-year/ride-fare/year=2019/part-00000-tid-4629508899230787795-13cdd6fc-f68a-4bbe-a8d9-b1f89f5baabd-1054-8.c000.parquet", # You can override this to point to other dataset on S3
    destination="/opt/ml/processing/part-00000-tid-4629508899230787795-13cdd6fc-f68a-4bbe-a8d9-b1f89f5baabd-1054-8.c000.parquet",
    input_name="part-00000-tid-4629508899230787795-13cdd6fc-f68a-4bbe-a8d9-b1f89f5baabd-1054-8.c000.parquet",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

## Output: S3 settings

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

1. <b>bucket</b>: you can configure the S3 bucket where Data Wrangler will save the output. The default bucket from 
the SageMaker notebook session is used. 
2. <b>flow_export_id</b>: A randomly generated export id. The export id must be unique to ensure the results do not 
conflict with other flow exports 
3. <b>s3_ouput_prefix</b>:  you can configure the directory name in your bucket where your data will be saved.
</div>

In [None]:
import time
import uuid
import boto3
import sagemaker

# Sagemaker session
sess = sagemaker.Session()

region = boto3.Session().region_name

# You can configure this with your own bucket name, e.g.
# bucket = "my-bucket"
bucket = sess.default_bucket()
print(f"Data Wrangler export storage bucket: {bucket}")

# unique flow export ID
flow_export_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_export_name = f"flow-{flow_export_id}"

Below are the inputs required by the SageMaker Python SDK to launch a processing job.

In [None]:
# Output name is auto-generated from the select node's ID + output name from the flow file.
output_name = "885d0017-0c4c-4151-8361-059fefc53fee.default"

s3_output_prefix = f"export-{flow_export_name}/output"
s3_output_base_path = f"s3://{bucket}/{s3_output_prefix}"
print(f"Processing output base path: {s3_output_base_path}\nThe final output location will contain additional subdirectories.")

processing_job_output = ProcessingOutput(
    output_name=output_name,
    source="/opt/ml/processing/output",
    destination=s3_output_base_path,
    s3_upload_mode="EndOfJob"
)

## Upload Flow to S3

To use the Data Wrangler as an input to the processing job,  first upload your flow file to Amazon S3.

In [None]:
import os
import json
import boto3

# name of the flow file which should exist in the current notebook working directory
flow_file_name = "gsml-use-case-2-etl.flow"

# Load .flow file from current notebook working directory 
!echo "Loading flow file from current notebook working directory: $PWD"

with open(flow_file_name) as f:
    flow = json.load(f)

# Upload flow to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"data_wrangler_flows/{flow_export_name}.flow", ExtraArgs={"ServerSideEncryption": "aws:kms"})

flow_s3_uri = f"s3://{bucket}/data_wrangler_flows/{flow_export_name}.flow"

print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

The Data Wrangler Flow is also provided to the Processing Job as an input source which we configure below.

In [None]:
## Input - Flow: gsml-use-case-2-etl.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination="/opt/ml/processing/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

## Create Processor

In [None]:
from sagemaker import image_uris

# IAM role for executing the processing job.
iam_role = sagemaker.get_execution_role()

# Unique processing job name. Give a unique name every time you re-execute processing jobs.
processing_job_name = f"data-wrangler-flow-processing-{flow_export_id}"

# Data Wrangler Container URL.
container_uri = image_uris.retrieve("data-wrangler", region, version="1.x")
# Pinned Data Wrangler Container URL.
container_uri_pinned = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.33.2"

# Processing Job Instance count and instance type.
instance_count = 2
instance_type = "ml.m5.4xlarge"

# Size in GB of the EBS volume to use for storing data during processing.
volume_size_in_gb = 30

# Content type for each output. Data Wrangler supports CSV as default and Parquet.
output_content_type = "CSV"

# Delimiter to use for the output if the output content type is CSV. Uncomment to set.
# delimiter = ","

# Compression to use for the output. Uncomment to set.
# compression = "gzip"

# Configuration for partitioning the output. Uncomment to set.
# "num_partition" sets the number of partitions/files written in the output.
# "partition_by" sets the column names to partition the output by.
# partition_config = {
#     "num_partitions": 1,
#     "partition_by": ["column_name_1", "column_name_2"],
# }

# Network Isolation mode; default is off.
enable_network_isolation = False

# List of tags to be passed to the processing job.
user_tags = []

# Output configuration used as processing job container arguments. Only applies when writing to S3.
# Uncomment to set additional configurations.
output_config = {
    output_name: {
        "content_type": output_content_type,
        # "delimiter": delimiter,
        # "compression": compression,
        # "partition_config": partition_config,
    }
}

# Refit configuration determines whether Data Wrangler refits the trainable parameters on the entire dataset. 
# When True, the processing job relearns the parameters and outputs a new flow file.
# You can specify the name of the output flow file under 'output_flow'.
# Note: There are length constraints on the container arguments (max 256 characters).
refit_trained_params = {
    "refit": False,
    "output_flow": f"data-wrangler-flow-processing-{flow_export_id}.flow"
}

# KMS key for per object encryption; default is None.
kms_key = None

### (Optional) Configure Spark Cluster Driver Memory

In [None]:
# The Spark memory configuration. Change to specify the driver and executor memory in MB for the Spark cluster during processing.
driver_memory_in_mb = 2048
executor_memory_in_mb = 55742

config = json.dumps({
    "Classification": "spark-defaults",
    "Properties": {
        "spark.driver.memory": f"{driver_memory_in_mb}m",
        "spark.executor.memory": f"{executor_memory_in_mb}m"
    }
})

config_file = f"config-{flow_export_id}.json"
with open(config_file, "w") as f:
    f.write(config)

config_s3_path = f"spark_configuration/{processing_job_name}/configuration.json"
config_s3_uri = f"s3://{bucket}/{config_s3_path}"
s3_client.upload_file(config_file, bucket, config_s3_path, ExtraArgs={"ServerSideEncryption": "aws:kms"})
print(f"Spark Config file uploaded to {config_s3_uri}")
os.remove(config_file)

# Provides the spark config file to processing job and set the cluster driver memory. Uncomment to set.
# data_sources.append(ProcessingInput(
#     source=config_s3_uri,
#     destination="/opt/ml/processing/input/conf",
#     input_name="spark-config",
#     s3_data_type="S3Prefix",
#     s3_input_mode="File",
#     s3_data_distribution_type="FullyReplicated"
# ))

## Create Processer for the Pipeline

Create a Processor object. The pipeline uses the processor to apply the transformations from your flow file to the dataset.


In [None]:
# Setup processing job network configuration
from sagemaker.network import NetworkConfig

network_config = NetworkConfig(
        enable_network_isolation=enable_network_isolation,
        security_group_ids=None,
        subnets=None
    )

In [None]:
from sagemaker.processing import Processor

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=volume_size_in_gb,
    network_config=network_config,
    sagemaker_session=sess,
    output_kms_key=kms_key,
    tags=user_tags
)

# Create SageMaker Pipeline
A SageMaker pipeline is composed of a series of steps. You begin by creating a processing step to 
transform your data. If you’re using the notebook to train a model, you also define a training step. 

## Define Pipeline Steps
To create a SageMaker pipeline, create a `ProcessingStep` using the Data Wrangler processor defined 
in the preceding section.

In [None]:
from sagemaker.workflow.steps import ProcessingStep

data_wrangler_step = ProcessingStep(
    name="DataWranglerProcessingStep",
    processor=processor,
    inputs=[flow_input] + data_sources, 
    outputs=[processing_job_output],
    job_arguments=[f"--output-config '{json.dumps(output_config)}'"] 
        + [f"--refit-trained-params '{json.dumps(refit_trained_params)}'"],
)

You can add a `TrainingStep` to the pipeline. The step trains a model on the transformed dataset. By default, the notebook does not add a training step. To add a training step, set `add_training_step` to True.
If you add a training step, you can choose between an AutoML training step and an XGBoost training step.
You can also add more steps. For information about adding steps to a pipeline, see [Define a Pipeline](http://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html).

In [None]:
add_training_step = False

To use the AutoML training step, run the following cells as-is. To use the XGBoost training step, set `use_automl_step` to False.

Currently, the AutoML training step only supports the `ENSEMBLING` mode. For more information about the AutoML training step, 
see [Pipeline Steps](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-automl).

In [None]:
use_automl_step = automl_enabled

For AutoML, specifying the target column of your dataset is required. 
The target column is the column that the model is trained to predict. 
Provide the name of the target column in the following cell.

In [None]:
target_attribute_name = ""  # Provide the target column name here

if use_automl_step and not target_attribute_name:
    raise RuntimeError("You must specify the target column name.")

In [None]:
if add_training_step and use_automl_step:
    from sagemaker import AutoML, AutoMLInput
    from sagemaker.workflow.automl_step import AutoMLStep
    from sagemaker.workflow.functions import Join
    from sagemaker.workflow.pipeline_context import PipelineSession

    pipeline_session = PipelineSession()

    training_input_content_type = None

    if output_content_type == "CSV":
        training_input_content_type = 'text/csv;header=present'
    elif output_content_type == "Parquet":
        training_input_content_type = 'x-application/vnd.amazon+parquet'

    auto_ml = AutoML(
        role=iam_role,
        target_attribute_name=target_attribute_name,
        sagemaker_session=pipeline_session,
        mode="ENSEMBLING"
    )

    s3_input = Join(
        on="/",
        values=[
            data_wrangler_step.properties.ProcessingOutputConfig.Outputs[output_name].S3Output.S3Uri,
            data_wrangler_step.properties.ProcessingJobName,
            f'{output_name.replace(".", "/")}',
        ]
    )

    train_args = auto_ml.fit(
        inputs=AutoMLInput(
            inputs=s3_input,
            content_type=training_input_content_type,
            target_attribute_name=target_attribute_name
        )
    )

    training_step = AutoMLStep(
        name="DataWrangerAutoML",
        step_args=train_args,
    )

The XGBoost algorithm uses the first column as the target column. If this is not the case for 
the data that you’ve processed, use the "Move column" transform to make the target column the 
first column.

In [None]:
if add_training_step and not use_automl_step:
    from sagemaker.estimator import Estimator
    from sagemaker.workflow.functions import Join

    image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=region,
        version="1.5-1",
        py_version="py3",
        instance_type=instance_type,
    )
    xgb_train = Estimator(
        image_uri=image_uri,
        instance_type=instance_type,
        instance_count=1,
        role=iam_role,
    )
    xgb_train.set_hyperparameters(
        objective="reg:squarederror",
        num_round=3,
    )

    from sagemaker.inputs import TrainingInput
    from sagemaker.workflow.steps import TrainingStep

    xgb_input_content_type = None

    if output_content_type == "CSV":
        xgb_input_content_type = 'text/csv'
    elif output_content_type == "Parquet":
        xgb_input_content_type = 'application/x-parquet'

    training_step = TrainingStep(
        name="DataWrangerTrain",
        estimator=xgb_train,
        inputs={
            "train": TrainingInput(
                s3_data=Join(
                    on="/",
                    values=[
                        data_wrangler_step.properties.ProcessingOutputConfig.Outputs[output_name].S3Output.S3Uri,
                        data_wrangler_step.properties.ProcessingJobName,
                        f'{output_name.replace(".", "/")}',
                    ]
                ),
                content_type=xgb_input_content_type
            )
        }
    )

## Define a Pipeline of Parameters, Steps
Now you will create the SageMaker pipeline that combines the steps created above so it can be executed. 

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The parameters supported in this notebook includes:

- `instance_type` - The ml.* instance type of the processing job.
- `instance_count` - The instance count of the processing job.

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
# Define Pipeline Parameters
instance_type = ParameterString(name="InstanceType", default_value="ml.m5.4xlarge")
instance_count = ParameterInteger(name="InstanceCount", default_value=1)

You will create a pipeline with the steps and parameters defined above

In [None]:
import time
import uuid

from sagemaker.workflow.pipeline import Pipeline

# Create a unique pipeline name with flow export name
pipeline_name = f"pipeline-{flow_export_name}"

# Combine pipeline steps
pipeline_steps = [data_wrangler_step]
if add_training_step:
    pipeline_steps.append(training_step)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[instance_type, instance_count],
    steps=pipeline_steps,
    sagemaker_session=sess
)

### (Optional) Examining the pipeline definition

The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and 
the parameters and step properties resolve correctly.

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

## Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the SageMaker Pipeline service and start an execution. The role passed in 
will be used by the Pipeline service to create all the jobs defined in the steps.

In [None]:
pipeline.upsert(role_arn=iam_role)
execution = pipeline.start()

## Pipeline Operations: Examine and Wait for Pipeline Execution

Describe the pipeline execution and wait for its completion.

In [None]:
try:
    # Waiter will wait for up to 1 hour; increase the delay and max_attempts if necessary
    execution.wait(delay=30, max_attempts=120)
except Exception as e:
    listed_steps = execution.list_steps()
    errors = []
    for step in listed_steps:
        if "FailureReason" in step:
            errors.append(step["FailureReason"])
    raise RuntimeError(str(errors)) from e

List the steps in the execution. These are the steps in the pipeline that have been resolved by the step 
executor service.

In [None]:
execution.list_steps()

You can visualize the pipeline execution status and details in Studio. For details please refer to 
[View, Track, and Execute SageMaker Pipelines in SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html)

## Data Wrangler Step S3 Output Location
The output of your Data Wrangler processing step will be printed below. To prevent data of different processing jobs 
and different output nodes from being overwritten or combined, Data Wrangler uses the name of the processing job and 
the name of the output to write the output.

In [None]:
from sagemaker.processing import ProcessingJob

data_wrangler_job_arn = execution.list_steps()[-1]["Metadata"]["ProcessingJob"]["Arn"]
data_wrangler_job = ProcessingJob.from_processing_arn(sess, data_wrangler_job_arn)
data_wrangler_job_name = data_wrangler_job.describe()["ProcessingJobName"]

s3_job_results_path = f"{s3_output_base_path}/{data_wrangler_job_name}/{output_name.replace('.', '/')}"
print(f"Job results are saved to S3 path: {s3_job_results_path}")

## (Optional) Pipeline cleanup
To delete the SageMaker resources created in this notebook, set `pipeline_deletion` to True.

In [None]:
pipeline_deletion = False

In [None]:
if pipeline_deletion:
    pipeline.delete()