In [1]:
feature_group_name = "FG-job-aggs-d34eb034"
output_name = "45e79823-e575-4f06-9723-0e7fb6f1e6b2.default"
flow_uri='s3://sagemaker-us-east-1-769265885190/data_wrangler_flows/flow-01-01-10-10-33a59ad7.flow'

In [2]:
# SageMaker Python SDK version 2.x is required
import sagemaker
import subprocess
import sys
import os
import uuid
import json
import time
import boto3
from zipfile import ZipFile
import inspect

#module containing utility functions for this notebook
import pipeline_utils

original_version = sagemaker.__version__
if sagemaker.__version__ != "2.20.0":
    subprocess.check_call(
        [sys.executable, "-m", "pip", "install", "sagemaker==2.20.0"]
    )
    import importlib
    importlib.reload(sagemaker)
    
# S3 bucket for saving processing job outputs
# Feel free to specify a different bucket here if you wish.
sess = sagemaker.Session()
default_bucket = sagemaker.session.Session().default_bucket()
sm_client = boto3.client('sagemaker')
iam_role = sagemaker.get_execution_role()
region = sess.boto_region_name
base_job_prefix="sagemaker/DEMO-xgboost-banking"

In [4]:
pipeline_utils.get_historical_record_count(feature_group_name)

s3://sagemaker-us-east-1-769265885190/offline-store/query_results/
sagemaker_featurestore
Running query:
 SELECT COUNT(*) FROM "fg-job-aggs-d34eb034-1646097010"


12

In [5]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.4xlarge"
)

input_flow= ParameterString(
    name='InputFlow',
    default_value= flow_uri
)


In [6]:
from sagemaker.processing import Processor

container_id = pipeline_utils.get_container(region)

container_uri=f"{container_id}.dkr.ecr.{region}.amazonaws.com/sagemaker-data-wrangler-container:1.x"

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type
)

In [7]:
from sagemaker.processing import FeatureStoreOutput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="DailyJobDataETL", # DailyFlightDataETL
    processor=processor,
    inputs=[
        ProcessingInput(input_name='flow', 
                        destination='/opt/ml/processing/flow',
                        source=input_flow,
                        s3_data_type= 'S3Prefix',
                        s3_input_mode= 'File'
                       )
    ],
    outputs=[
        ProcessingOutput(
            output_name=output_name,
            app_managed=True, 
            feature_store_output=FeatureStoreOutput(feature_group_name=feature_group_name))
    ]
)

In [8]:
from sagemaker.workflow.pipeline import Pipeline

# pipeline_name=f"daily-job-ETL-pipeline-{time.strftime('%d-%H-%M-%S', time.gmtime())}"
pipeline_name=f"daily-featurestore-preprocessing-ETL" ## Replaced 3/6/2022 to determine if any other modifications are needed to convert pipeline name to not include date timestamp

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        input_flow
    ],
    steps=[step_process],
    sagemaker_session=sess
)

In [9]:
pipeline.upsert(iam_role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:769265885190:pipeline/daily-job-etl-pipeline-02-18-04-03',
 'ResponseMetadata': {'RequestId': '92220fb2-122f-4f70-b552-ea12d927daea',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '92220fb2-122f-4f70-b552-ea12d927daea',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '102',
   'date': 'Wed, 02 Mar 2022 18:04:05 GMT'},
  'RetryAttempts': 0}}

In [11]:
prefix='daily_data'

role_name = f"sm-lambda-role-{time.strftime('%d-%H-%M-%S', time.gmtime())}"
fcn_name = f"sm-lambda-fcn-{time.strftime('%d-%H-%M-%S', time.gmtime())}"

account_num = boto3.client('sts').get_caller_identity()['Account']

#Create IAM role for the Lambda function
lambda_role = pipeline_utils.create_role(role_name)

#Wait for the role to be activated
print('Waiting for 30 seconds for the newly created role to be active.')
time.sleep(30)
print('30 seconds are up; proceeding with rest of the execution.')

Creating an IAM role for AWS Lambda function ...
SUCCESS: Successfully created IAM role for AWS Lambda function!
Adding permissions to AWS Lambda function's IAM role ...
SUCCESS: Successfully added permissions AWS Lambda function's IAM role!
Waiting for 30 seconds for the newly created role to be active.
30 seconds are up; proceeding with rest of the execution.


In [13]:
#Create code for AWS Lambda function
lambda_code = pipeline_utils.create_lambda_fcn(flow_uri, pipeline_name)

Gathering variables ...
Creating code for AWS Lambda function ...
SUCCESS: Successfully created code for AWS Lambda function!


In [14]:
print(lambda_code)


    import json
    import boto3

    s3 = boto3.resource('s3')
    sm = boto3.client('sagemaker')

    def lambda_handler(event, context):

        #Check version of Boto3 - It must be at least 1.16.55
        print(f"The version of Boto3 is {boto3.__version__}")

        #Get location for where the new data (csv) file was uploaded
        data_bucket = event['Records'][0]['s3']['bucket']['name']
        data_key = event['Records'][0]['s3']['object']['key']
        print(f"A new file named {data_key} was just uploaded to Amazon S3 in {data_bucket}")

        #Update values for where Data Wrangler .flow is saved
        flow_bucket = 'sagemaker-us-east-1-769265885190'
        flow_key = 'data_wrangler_flows/flow-01-01-10-10-33a59ad7.flow'
        pipeline_name = 'daily-job-ETL-pipeline-02-18-04-03'
        execution_display = f"{data_key.split('/')[-1].replace('_','').replace('.csv','')}"


        #Get .flow file from Amazon S3
        get_object = s3.Object(flow_bucket,flow_key)
   

In [15]:
#Zip AWS Lambda function code
#Write code to a .py file
with open('lambda_function.py', 'w') as f:
    f.write(inspect.cleandoc(lambda_code))
#Compress file into a zip
with ZipFile('function.zip','w') as z:
    z.write('lambda_function.py')
#Use zipped code as AWS Lambda function code
with open('lambda_function.py', 'w') as f:
    f.write(lambda_code)

#Create AWS Lambda function
with open('function.zip', 'rb') as f:
    fcn_code = f.read()   
lambda_arn = pipeline_utils.create_lambda(fcn_name, fcn_code, lambda_role['arn'])

Creating AWS Lambda function ...
SUCCESS: Successfully created AWS Lambda function!


In [16]:
print(fcn_name)

sm-lambda-fcn-02-18-07-33


In [18]:
print(default_bucket)

sagemaker-us-east-1-769265885190


In [19]:
print(prefix)

daily_data


In [20]:
print(account_num)

769265885190


In [21]:
print(lambda_arn)

arn:aws:lambda:us-east-1:769265885190:function:sm-lambda-fcn-02-18-07-33


In [22]:
#Add permission for Amazon S3 to trigger AWS Lambda and set up trigger
pipeline_utils.create_s3_trigger(fcn_name, default_bucket, prefix, account_num, lambda_arn)

Adding permissions to Amazon S3 ...
SUCCESS: Successfully added permissions to Amazon S3!
Initialising Amazon S3 Bucket client ...
SUCCESS: Successfully initilised Amazon S3 Bucket client!
Setting up notifications on Amazon S3 Bucket
SUCCESS: Successfully added notifications to Amazon S3 Bucket!


In [24]:
sagemaker.s3.S3Uploader.upload("../sagemaker-banking-classification-p-apjvzlbx2a9o-modelbuild/bank-additional-full.csv", f"s3://{default_bucket}/{prefix}")
#wait for file to finish uploading 
time.sleep(5)

In [25]:
# check pipeline execution 
latest_execution = sm_client.list_pipeline_executions(PipelineName=pipeline_name).get('PipelineExecutionSummaries')[0].get('PipelineExecutionArn')
sm_client.describe_pipeline_execution(PipelineExecutionArn=latest_execution)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:769265885190:pipeline/daily-job-etl-pipeline-02-18-04-03',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:769265885190:pipeline/daily-job-etl-pipeline-02-18-04-03/execution/9nz01atwzybk',
 'PipelineExecutionDisplayName': 'bank-additional-full',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExecutionDescription': 'daily_data/bank-additional-full.csv',
 'CreationTime': datetime.datetime(2022, 3, 2, 18, 18, 54, 926000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 3, 2, 18, 18, 54, 926000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '776bea64-68f4-4e6f-90aa-ce101bb879e3',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '776bea64-68f4-4e6f-90aa-ce101bb879e3',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '499',
   'date': 'Wed, 02 Mar 2022 18:19:15 GMT'},
  'RetryAttempts': 0}}

In [26]:
record_id='services'
sample_record = sess.boto_session.client('sagemaker-featurestore-runtime', region_name=region).get_record(FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=str(record_id))


In [27]:
sample_record

{'ResponseMetadata': {'RequestId': '1811159c-94b2-452a-9a3a-93be71422d83',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '1811159c-94b2-452a-9a3a-93be71422d83',
   'content-type': 'application/json',
   'content-length': '193',
   'date': 'Wed, 02 Mar 2022 18:24:42 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'RECORD_ID', 'ValueAsString': 'services'},
  {'FeatureName': 'y', 'ValueAsString': '0.08138070042831948'},
  {'FeatureName': 'EVENT_TIME', 'ValueAsString': '2022-03-02T18:24:11Z'}]}