# Automating feature transformations with SageMaker Data Wrangler, Pipelines, and Feature Store

1. Create a data wrangler flow
2. Export the flow and create feature groups
3. Ingest historical data into feature store
4. Set up SageMaker pipeline and Lambdas triggered off new data from S3
5. (maybe) Build and deploy model to show e2e

The first part of the blog post will walk the reader through creating a set of feature transformations on the flight delay dataset and then export that flow to a generated notebook that creates feature groups and ingests historical flight data into the feature store. This notebook forms the second half of the example, where readers will create a SageMaker pipeline and a lambda function to automate the feature transformations and feature store ingest on new data each day.

## Create a SM Pipeline from the Data Wrangler Flow

In [1]:
# SageMaker Python SDK version 2.x is required
import sagemaker
import subprocess
import sys

original_version = sagemaker.__version__
if sagemaker.__version__ != "2.20.0":
    subprocess.check_call(
        [sys.executable, "-m", "pip", "install", "sagemaker==2.20.0"]
    )
    import importlib
    importlib.reload(sagemaker)

In [2]:
import os
import uuid
import json
import time
import boto3
import sagemaker

In [3]:
# S3 bucket for saving processing job outputs
# Feel free to specify a different bucket here if you wish.
sess = sagemaker.Session()
bucket = sess.default_bucket()
prefix = "data_wrangler_flows"
flow_id = f"{time.strftime('%d-%H-%M-%S', time.gmtime())}-{str(uuid.uuid4())[:8]}"
flow_name = f"flow-{flow_id}"
flow_uri = f"s3://{bucket}/{prefix}/{flow_name}.flow"
print(bucket)
print(flow_uri)
print(prefix + '/' + flow_name + '.flow')

flow_file_name = "flight_data_test_2.flow"

iam_role = sagemaker.get_execution_role()

container_uri = "663277389841.dkr.ecr.us-east-1.amazonaws.com/sagemaker-data-wrangler-container:1.1.1"

# Processing Job Resources Configurations
processing_job_name = f"data-wrangler-feature-store-processing-{flow_id}"
processing_dir = "/opt/ml/processing"
feature_group_name = "FG-flow-14-18-14-03-23a5ae4a"

# URL to use for sagemaker client.
# If this is None, boto will automatically construct the appropriate URL to use
# when communicating with sagemaker.
sagemaker_endpoint_url = None

sagemaker-us-east-1-926082456644
s3://sagemaker-us-east-1-926082456644/data_wrangler_flows/flow-16-15-49-59-82ce8678.flow
data_wrangler_flows/flow-16-15-49-59-82ce8678.flow


In [4]:
## to do 

## grab feature group name, save as variable
## have reader copy and paste output_name for proc job 

In [5]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.4xlarge"
)
#input_data = ParameterString(
 #   name="InputData",
    #must have S3 URI for default_value
  #  default_value='s3://sagemaker-studio-83hjatr6mug/test_data/fake.csv'
#)
input_flow= ParameterString(
    name='InputFlow',
    default_value='s3://sagemaker-studio-83hjatr6mug/test_data/fake.flow'
)

In [6]:
from sagemaker.processing import Processor

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type
)

In [7]:
from sagemaker.processing import FeatureStoreOutput
feature_store_output = FeatureStoreOutput(feature_group_name=feature_group_name)
feature_store_output

FeatureStoreOutput(feature_group_name='FG-flow-14-18-14-03-23a5ae4a')

In [8]:
output_name = "26b67c19-4e8b-401b-a817-db82c20a17ed.default"
output_content_type = "CSV"

# are container arguments needed?

def create_container_arguments(output_name, output_content_type):
    output_config = {
        output_name: {
            "content_type": output_content_type
        }
    }
    return [f"--output-config '{json.dumps(output_config)}'"]

In [9]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="DailyFlightDataETL",
    processor=processor,
    #inputs=create_processing_inputs(processing_dir, flow, flow_uri),
    inputs=[
        ProcessingInput(input_name='flow', 
                        destination='/opt/ml/processing/flow',
                        #source='s3://sagemaker-us-east-1-926082456644/data_wrangler_flows/flow-14-13-21-25-aaf22ec2.flow',
                        source=input_flow,
                        s3_data_type= 'S3Prefix',
                        s3_input_mode= 'File'
                       )
        #ProcessingInput(input_name='input_csv',
                       # destination='/opt/ml/processing/input',
                       # source=input_data,
                       # s3_data_type= 'S3Prefix',
                       # s3_input_mode= 'File')
    ],
    outputs=[
        ProcessingOutput(
            output_name="26b67c19-4e8b-401b-a817-db82c20a17ed.default",
            app_managed=True, feature_store_output= feature_store_output)
    ],
   job_arguments=create_container_arguments(output_name, output_content_type)
   
)

In [10]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name='dw-fs-test-2'

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        #input_data,
        input_flow
    ],
    steps=[step_process],
    sagemaker_session=sess
)

In [11]:
import json

definition = json.loads(pipeline.definition())

In [12]:
pipeline.upsert(sagemaker.get_execution_role())

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:926082456644:pipeline/dw-fs-test-2',
 'ResponseMetadata': {'RequestId': '4c5cb65a-bac6-4172-86b6-b9c4a6d5b632',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4c5cb65a-bac6-4172-86b6-b9c4a6d5b632',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '80',
   'date': 'Tue, 16 Mar 2021 15:50:01 GMT'},
  'RetryAttempts': 0}}

In [13]:

#flow_uri='s3://sagemaker-us-east-1-926082456644/data_wrangler_flows/new_source_flow.flow'
flow_uri='s3://sagemaker-us-east-1-926082456644/data_wrangler_flows/flow-14-13-21-25-aaf22ec2.flow'

execution=pipeline.start(execution_description='removing all ref to output name',
                        parameters={
                            'InputFlow':flow_uri
                                   })

## Setup Automation

### Import Libraries

In [14]:
import boto3
from zipfile import ZipFile
import time
import inspect
import Utils

### Setup Variables
#### We now set variables that will be used to setup the automation. The default placeholder values will work but you can update them as well, if you wish.

In [15]:
# feature_group_name = "FG-flow-10-15-59-59-79694afa"
# output_name = "49ca6759-4428-4991-be63-805b3d283301.default"
# flow_name = "flow-10-15-59-59-79694afa.flow"
role_name = f"sm-lambda-role-{time.strftime('%d-%H-%M-%S', time.gmtime())}"
fcn_name = f"sm-lambda-fcn-{time.strftime('%d-%H-%M-%S', time.gmtime())}"
iam_desc = 'IAM Policy for Lambda triggering AWS SageMaker Pipeline'
fcn_desc = 'AWS Lambda function for automatically triggering AWS SageMaker Pipeline'
bucket_arn = f"arn:aws:s3:::{bucket}"
account_num = boto3.client('sts').get_caller_identity()['Account']

### Setup IAM Roles
#### AWS Lambda needs permissions to be able to call other AWS services. These permissions are provided by IAM roles. We first create the IAM role that will be assumed by AWS Lambda and then assign permissions to it.

In [16]:
#Create IAM role for the Lambda function
new_role = Utils.create_role(role_name, iam_desc)

#Wait for IAM role to be active
print('Pause for 10 seconds ...')
for i in range(10,0,-1):
    time.sleep(1)
    print('Resuming in {} seconds'.format(i))
print('Resuming now!')
#Add permissions to the IAM role
Utils.add_permissions(new_role['name'])

Creating an IAM role for AWS Lambda function ...
SUCCESS: Successfully created IAM role for AWS Lambda function!
Pause for 10 seconds ...
Resuming in 10 seconds
Resuming in 9 seconds
Resuming in 8 seconds
Resuming in 7 seconds
Resuming in 6 seconds
Resuming in 5 seconds
Resuming in 4 seconds
Resuming in 3 seconds
Resuming in 2 seconds
Resuming in 1 seconds
Resuming now!
Adding permissions to AWS Lambda function's IAM role ...
SUCCESS: Successfully added permissions AWS Lambda function's IAM role!


### Setup AWS Lambda function
#### We need AWS Lambda to automtically trigger Amazon SageMaker Pipelines to process newly arrived dataset in Amazon S3. The detailed code is available in `Utils.py`. Once the AWS Lambda function is created, we zip it into a deployment package ready for upload onto AWS Lambda. Once the package is ready, we create the AWS Lambda function using the IAM role created earlier

In [17]:
import inspect
#Create code for AWS Lambda function
lambda_code = Utils.create_lambda_fcn(bucket, flow_uri, pipeline_name)

#Zip AWS Lambda function code
#Write code to a .py file
with open('lambda_function.py', 'w') as f:
    f.write(inspect.cleandoc(lambda_code))
#Compress file into a zip
with ZipFile('function.zip','w') as z:
    z.write('lambda_function.py')
#Use zipped code as AWS Lambda function code
with open('lambda_function.py', 'w') as f:
    f.write(lambda_code)

#Create AWS Lambda function
with open('function.zip', 'rb') as f:
    fcn_code = f.read()   
new_lambda_arn = Utils.create_lambda(fcn_name, fcn_desc, fcn_code, new_role['arn'])

Gathering variables ...
Creating code for AWS Lambda function ...
SUCCESS: Successfully created code for AWS Lambda function!
Creating AWS Lambda function ...
SUCCESS: Successfully created AWS Lambda function!


In [18]:
### Setup Amazon S3
#### Lastly, we setup Amazon S3 to trigger AWS Lambda whenever a new CSV file is uploaded into the Bucket specified earlier. 

In [19]:
#Add permission for Amazon S3 to trigger AWS Lambda
Utils.allow_s3(fcn_name, bucket_arn, account_num)

#Setup new CSV upload notifications on Amazon S3
Utils.add_notif(bucket, new_lambda_arn)

Adding permissions to Amazon S3 ...
SUCCESS: Successfully added permissions to Amazon S3!
Initialising Amazon S3 Bucket client ...
SUCCESS: Successfully initilised Amazon S3 Bucket client!
Setting up notifications on Amazon S3 Bucket
SUCCESS: Successfully added notifications to Amazon S3 Bucket!


### Completion
#### You have now successfully setup the automation. Try and test the setup by uploading a CSV file into your Amazon S3 Bucket and see if it triggers 

In [20]:
#Print Confirmation
print('GOOD JOB: You are all set!')

GOOD JOB: You are all set!
