## Sagemaker Pipeline for Feature Store Processing

This notebook shows you how to create a SageMaker Pipeline to automate feature transformations and ingestion into Feature Store, triggered off of new data files that are uploaded to S3.


This notebook creates a SageMaker Pipeline which:

    Performs the transformations contained in a Data Wrangler .flow file stored in Amazon S3 using a SageMaker Processing Job Stores the transformed features in the Amazon SageMaker Feature Store


In [None]:
# SageMaker Python SDK version 2.x is required
import sagemaker
import subprocess
import sys
import os
import uuid
import json
import time
import boto3
from zipfile import ZipFile
import inspect

First, we need to copy these variables from the Data Wrangler generated output from the previous step: 

In [None]:
feature_group_name = "<FEATURE GROUP NAME>"
output_name = "<OUTPUT NAME>"
flow_uri='<FLOW URI>'

In [None]:
sess = sagemaker.Session()
bucket = sess.default_bucket()
sm_client = boto3.client('sagemaker')
iam_role = sagemaker.get_execution_role()
region = sess.boto_region_name

## Create a SageMaker Pipeline from the Data Wrangler Flow

The transformations we defined in Data Wrangler are encapsulated in a .flow file. We will parameterize our SageMaker pipeline with the S3 URI of a new input flow file we will create on the fly once new data is made available in S3. 

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.4xlarge"
)

input_flow= ParameterString(
    name='InputFlow',
    default_value='s3://placeholder-bucket/placeholder.flow'
)

In [None]:
def get_container(region):
    registries = {
          "af-south-1": "143210264188",
          "ap-east-1": "707077482487",
          "ap-northeast-1": "649008135260",
          "ap-northeast-2": "131546521161",
          "ap-south-1": "089933028263",
          "ap-southeast-1": "119527597002",
          "ap-southeast-2": "422173101802",
          "ca-central-1": "557239378090",
          "eu-central-1": "024640144536",
          "eu-north-1": "054986407534",
          "eu-south-1": "488287956546",
          "eu-west-1": "245179582081",
          "eu-west-2": "894491911112",
          "eu-west-3": "807237891255",
          "me-south-1": "376037874950",
          "sa-east-1": "424196993095",
          "us-east-1": "663277389841",
          "us-east-2": "415577184552",
          "us-west-1": "926135532090",
          "us-west-2": "174368400705",
          "cn-north-1": "245909111842",
          "cn-northwest-1": "249157047649"
        }
    
    return (registries[region])

In [None]:
# Data Wrangler Container URL. 
container_reg = get_container(region)

container_uri = container_reg + ".dkr.ecr." + region + ".amazonaws.com/sagemaker-data-wrangler-container:1.x"
print(container_uri)

# Pinned Data Wrangler Container URL. 
container_uri_pinned = container_reg + "dkr.ecr." + region + ".amazonaws.com/sagemaker-data-wrangler-container:1.20.1"

In [None]:
from sagemaker.processing import Processor


processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type
)

In [None]:
from sagemaker.processing import FeatureStoreOutput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="ReadmissionHealthETLFeatureStore",
    processor=processor,
    inputs=[
        ProcessingInput(input_name='flow', 
                        destination='/opt/ml/processing/flow',
                        source=input_flow,
                        s3_data_type= 'S3Prefix',
                        s3_input_mode= 'File'
                       )
    ],
    outputs=[
        ProcessingOutput(
            output_name=output_name,
            app_managed=True, 
            feature_store_output=FeatureStoreOutput(feature_group_name=feature_group_name))
    ]
)

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name=f"healthcare-etl-pipeline-{time.strftime('%d-%H-%M-%S', time.gmtime())}"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        input_flow
    ],
    steps=[step_process],
    sagemaker_session=sess
    
)

In [None]:
pipeline.upsert(iam_role)