## Import required libraries including SageMaker Python SDK

In [None]:
import json
import time
import pathlib
import numpy as np
import pandas as pd
from time import gmtime, strftime

import boto3
import sagemaker
from sagemaker.inputs import TrainingInput
from sagemaker.processing import ProcessingInput, ProcessingOutput, FeatureStoreOutput
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep

import utils

### Update SageMaker SDK if necessary 

In [2]:
if int(sagemaker.__version__.split('.')[0]) != 2:
    !pip install sagemaker==2.24.1
    print("Updating SageMakerVersion. Please restart the kernel")
else:
    print("SageMaker SDK version is good")

SageMaker SDK version is good


### Set region, boto3 and SageMaker SDK variables

In [3]:
boto_session = boto3.Session()
region = boto_session.region_name
print("Region = {}".format(region))

s3_client = boto3.client('s3', region_name=region)

sagemaker_boto_client = boto_session.client('sagemaker')

sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_boto_client)

sagemaker_role = sagemaker.get_execution_role()

Region = us-west-2


### Create directories in the SageMaker default bucket for this tutorial¶


In [4]:
default_bucket = sagemaker_session.default_bucket()  # Alterantively you can use our custom bucket here.

prefix = 'sagemaker-tutorial'  # use this prefix to store all files pertaining to this workshop.
data_prefix = prefix + '/data'

training_job_output_path = f's3://{default_bucket}/{prefix}/training_jobs'
create_dataset_script_uri = f's3://{default_bucket}/{prefix}/code/create_dataset.py'
deploy_model_script_uri = f's3://{default_bucket}/{prefix}/code/deploy_model.py'

processing_dir = "/opt/ml/processing"

deploy_model_instance_type = "ml.m4.xlarge"

# Preprocess data

Use the following code snippet to download the dataset to /data/ folder

In [5]:
!mkdir ./data
!wget -O ./data/default_of_credit_card.xls  https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls

mkdir: cannot create directory ‘./data’: File exists
--2021-03-05 05:03:36--  https://archive.ics.uci.edu/ml/machine-learning-databases/00350/default%20of%20credit%20card%20clients.xls
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5539328 (5.3M) [application/x-httpd-php]
Saving to: ‘./data/default_of_credit_card.xls’


2021-03-05 05:03:37 (19.7 MB/s) - ‘./data/default_of_credit_card.xls’ saved [5539328/5539328]



In [6]:
# load data as dataframe
local_data_path = './data/default_of_credit_card.xls'

df = pd.read_excel(local_data_path, header=1)
df.head()

Unnamed: 0,ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,...,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,default payment next month
0,1,20000,2,2,1,24,2,2,-1,-1,...,0,0,0,0,689,0,0,0,0,1
1,2,120000,2,2,2,26,-1,2,0,0,...,3272,3455,3261,0,1000,1000,1000,0,2000,1
2,3,90000,2,2,2,34,0,0,0,0,...,14331,14948,15549,1518,1500,1000,1000,1000,5000,0
3,4,50000,2,2,1,37,0,0,0,0,...,28314,28959,29547,2000,2019,1200,1100,1069,1000,0
4,5,50000,1,2,1,57,-1,0,-1,0,...,20940,19146,19131,2000,36681,10000,9000,689,679,0


In [7]:
timestamp = pd.to_datetime('now').timestamp()
df['EVENT_TIME'] = timestamp
df = df.astype(np.float64)

In [21]:
cols = list(df)
cols.insert(0, cols.pop(cols.index('default payment next month')))
df = df.loc[:, cols]

df.rename(columns={"default payment next month": "LABEL"}, inplace=True)

Now we need to upload the raw csv data to S3

In [8]:
df.to_csv('./data/dataset.csv', index=False)

response = sagemaker_session.upload_data('./data/dataset.csv', bucket=default_bucket, key_prefix=data_prefix)
print(response)

s3://sagemaker-us-west-2-367158743199/sagemaker-tutorial/data/dataset.csv


# SageMaker Feature Store

Amazon SageMaker Feature Store is a purpose-built repository where you can store and access features so it’s much easier to name, organize, and reuse them across teams. SageMaker Feature Store provides a unified store for features during training and real-time inference without the need to write additional code or create manual processes to keep features consistent. SageMaker Feature Store keeps track of the metadata of stored features (e.g. feature name or version number) so that you can query the features for the right attributes in batches or in real time using Amazon Athena, an interactive query service. SageMaker Feature Store also keeps features updated, because as new data is generated during inference, the single repository is updated so new features are always available for models to use during training and inference.

A feature store consists of an offline componet stored in S3 and an online component stored in a low-latency database. The online database is optional, but very useful if you need supplemental features to be available at inference. In this section, we will create a feature groups for our Claims and Customers datasets. After inserting the claims and customer data into their respective feature groups, you need to query the offline store with Athena to build the training dataset.

You can reference the [SageMaker Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html) for more information about the SageMaker Feature Store.

In [16]:
featurestore_runtime = boto_session.client(
    service_name='sagemaker-featurestore-runtime',
    region_name=region
)

feature_store_session = sagemaker.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_boto_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

## Configure the feature groups

When you set up your feature groups, you need to customize the feature names with a unique name and set up each feature group with the FeatureGroup class. 


The datatype for each feature is set by passing a dataframe and inferring the proper datatype. Feature data types can also be set via a config variable, but it will have to match the correspongin Python data type in the Pandas dataframe when it's ingested to the Feature Group.

In [24]:
fg_name = f'credit-default' + strftime('%d-%H-%M-%S', gmtime())

credit_feature_group = FeatureGroup(
    name=fg_name,
    sagemaker_session=feature_store_session)

# You can now load the feature definitions by passing a data frame containing the feature data.
credit_feature_group.load_feature_definitions(data_frame=df)

[FeatureDefinition(feature_name='ID', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='LIMIT_BAL', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='SEX', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='EDUCATION', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='MARRIAGE', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='AGE', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='PAY_0', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='PAY_2', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='PAY_3', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='PAY_4', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 Featu

# Create the feature groups

You must tell the Feature Group which columns in the dataframe correspond to the required record indentifier and event time features.

In [25]:
print(f"{fg_name} is the feature group name in use")

credit-default05-05-15-04 is the feature group name in use


In this step, you use the create function to create the feature group. The following code shows all of the available parameters. The online store is not created by default, so you must set this as True if you want to enable it. The s3_uri is the S3 bucket location of your offline store.

In [26]:
# record identifier and event time feature names
record_identifier_feature_name = 'ID'
event_time_feature_name = 'EVENT_TIME'


# check if the feature groups is created successfully
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")


print(f"\n Using s3://{default_bucket}/{prefix}")
credit_feature_group.create(
    s3_uri=f"s3://{default_bucket}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=sagemaker_role,
    enable_online_store=True
)

wait_for_feature_group_creation_complete(feature_group=credit_feature_group)


 Using s3://sagemaker-us-west-2-367158743199/sagemaker-tutorial
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup credit-default05-05-15-04 successfully created.


In [27]:
credit_feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:us-west-2:367158743199:feature-group/credit-default05-05-15-04',
 'FeatureGroupName': 'credit-default05-05-15-04',
 'RecordIdentifierFeatureName': 'ID',
 'EventTimeFeatureName': 'EVENT_TIME',
 'FeatureDefinitions': [{'FeatureName': 'ID', 'FeatureType': 'Fractional'},
  {'FeatureName': 'LIMIT_BAL', 'FeatureType': 'Fractional'},
  {'FeatureName': 'SEX', 'FeatureType': 'Fractional'},
  {'FeatureName': 'EDUCATION', 'FeatureType': 'Fractional'},
  {'FeatureName': 'MARRIAGE', 'FeatureType': 'Fractional'},
  {'FeatureName': 'AGE', 'FeatureType': 'Fractional'},
  {'FeatureName': 'PAY_0', 'FeatureType': 'Fractional'},
  {'FeatureName': 'PAY_2', 'FeatureType': 'Fractional'},
  {'FeatureName': 'PAY_3', 'FeatureType': 'Fractional'},
  {'FeatureName': 'PAY_4', 'FeatureType': 'Fractional'},
  {'FeatureName': 'PAY_5', 'FeatureType': 'Fractional'},
  {'FeatureName': 'PAY_6', 'FeatureType': 'Fractional'},
  {'FeatureName': 'BILL_AMT1', 'FeatureType': 'Fractional'}

### Ingest records into the Feature Groups

After the Feature Groups have been created, we can put data into each store by using the PutRecord API. This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to s3 in chunks. The files will be written to the offline store within a few minutes of ingestion.

In [28]:
if 'credit_table' in locals():
    print(
        "You may have already ingested the data into your Feature Groups. If you'd like to do this again, you can run the ingest methods outside of the 'if/else' statement.")

else:
    credit_feature_group.ingest(
        data_frame=df, max_workers=5, wait=True
    )

# Create a SageMaker Pipeline to Automate All the Steps from Data Prep to Model Deployment
Now that youve manually done each step in our machine learning workflow, you can create a pipeline which trains a new model, persists the model in SageMaker and then adds the model to the registry.

### Pipeline parameters
An important feature of SageMaker Pipelines is the ability to define the steps ahead of time, but be able to change the parameters to those steps at execution without having to re-define the pipeline. This can be achieved by using ParameterInteger, ParameterFloat or ParameterString to define a value upfront which can be modified when you call pipeline.start(parameters=parameters) later. Only certain parameters can be defined this way.

In [29]:
train_instance_param = ParameterString(
    name="TrainingInstance",
    default_value="ml.m4.xlarge"
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

# this line automatically looks for the XGBoost image URI and builds an XGBoost container.
# specify the repo_version depending on your preference.
xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-1")

# Step 1: Preprocess

In [38]:
s3_client.upload_file(Filename='preprocessing.py', Bucket=default_bucket, Key=f'{prefix}/code/preprocessing.py')

create_dataset_script_uri = f's3://{default_bucket}/{prefix}/code/preprocessing.py'


create_dataset_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=sagemaker_role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name='credit-create-dataset',
    sagemaker_session=sagemaker_session)

create_dataset_step = ProcessingStep(
    name='CreateDataset',
    processor=create_dataset_processor,
    inputs=[ProcessingInput(
                        source='s3://sagemaker-us-west-2-367158743199/sagemaker-tutorial/data/dataset.csv',
                        destination='/opt/ml/processing/input')],
    outputs=[ProcessingOutput(output_name='train_data', source='/opt/ml/processing/output/train'),
             ProcessingOutput(output_name='test_data',  source='/opt/ml/processing/output/test')],
    job_arguments=["--train-test-split-ratio", '0.8'],
    code=create_dataset_script_uri)

# Step 2: Train XGBoost Model

In [39]:
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"
content_type = "text/csv"

hyperparameters = {
    "max_depth": "5",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "6",
    "subsample": "0.7",
    "objective": "binary:logistic",
    "num_round": "50"}

# construct a SageMaker estimator that calls the xgboost-container
xgb_estimator = sagemaker.estimator.Estimator(image_uri=xgboost_container,
                                              hyperparameters=hyperparameters,
                                              role=sagemaker.get_execution_role(),
                                              instance_count=train_instance_count,
                                              instance_type=train_instance_type,
                                              volume_size=5,  # 5 GB
                                              output_path=training_job_output_path)


train_step = TrainingStep(
    name='XgboostTrain',
    estimator=xgb_estimator,
    inputs={
        'train': TrainingInput(
            s3_data=create_dataset_step.properties.ProcessingOutputConfig.Outputs['train_data'].S3Output.S3Uri,
        content_type="csv")
    }
)

# Step 3: Model Pre-Deployment Step

In [40]:
model = sagemaker.model.Model(
    name='credit-default-demo-pipeline-xgboost',
    image_uri=train_step.properties.AlgorithmSpecification.TrainingImage,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=sagemaker_role
)

inputs = sagemaker.inputs.CreateModelInput(
    instance_type="ml.m4.xlarge"
)

create_model_step = CreateModelStep(
    name="ModelPreDeployment",
    model=model,
    inputs=inputs
)

# Step 4: Run Bias Metrics with Clarify

In [41]:
# clarify config
bias_report_output_path = f's3://{default_bucket}/{prefix}/clarify-output/bias'
s3_client = boto3.client('s3', region_name=region)

bias_data_config = sagemaker.clarify.DataConfig(
    s3_data_input_path=create_dataset_step.properties.ProcessingOutputConfig.Outputs['train_data'].S3Output.S3Uri,
    label='LABEL',
    dataset_type='text/csv',
    s3_output_path=bias_report_output_path)

bias_config = sagemaker.clarify.BiasConfig(
    label_values_or_threshold=[0],
    facet_name='SEX',
    facet_values_or_threshold=[1])

analysis_config = bias_data_config.get_config()
analysis_config.update(bias_config.get_config())
analysis_config["methods"] = {"pre_training_bias": {"methods": "all"}}

clarify_config_dir = pathlib.Path('config')
clarify_config_dir.mkdir(exist_ok=True)
with open(clarify_config_dir / 'analysis_config.json', 'w') as f:
    json.dump(analysis_config, f)

s3_client.upload_file(Filename='config/analysis_config.json', Bucket=default_bucket,
                      Key=f'{prefix}/clarify-config/analysis_config.json')

In [42]:
# clarify processing step
clarify_processor = sagemaker.processing.Processor(
    base_job_name='fraud-detection-demo-clarify-processor',
    image_uri=sagemaker.clarify.image_uris.retrieve(framework='clarify', region=region),
    role=sagemaker.get_execution_role(),
    instance_count=1,
    instance_type='ml.c5.xlarge')

clarify_step = ProcessingStep(
    name="ClarifyProcessor",
    processor=clarify_processor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            input_name="analysis_config",
            source=f's3://{default_bucket}/{prefix}/clarify-config/analysis_config.json',
            destination="/opt/ml/processing/input/config"),
        sagemaker.processing.ProcessingInput(
            input_name="dataset",
            source=create_dataset_step.properties.ProcessingOutputConfig.Outputs['train_data'].S3Output.S3Uri,
            destination="/opt/ml/processing/input/data")
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            source="/opt/ml/processing/output/analysis.json",
            destination=bias_report_output_path,
            output_name="analysis_result")
    ]
)

# Step 5: Register Model

In [43]:
model_metrics = utils.ModelMetrics(
    bias=sagemaker.model_metrics.MetricsSource(
        s3_uri=clarify_step.properties.ProcessingOutputConfig.Outputs['analysis_result'].S3Output.S3Uri,
        content_type="application/json"
    )
)

if 'mpg_name' not in locals():
    mpg_name = prefix
    print(f'Model Package Group name: {mpg_name}')

register_step = RegisterModel(
    name="XgboostRegisterModel",
    estimator=xgb_estimator,
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=mpg_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

Model Package Group name: sagemaker-tutorial


# Step 6: Deploy Model

In [44]:
endpoint_name = "xgboost-model-pipeline-0120"

s3_client.upload_file(Filename='deploy_model.py', Bucket=default_bucket, Key=f'{prefix}/code/deploy_model.py')

deploy_model_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=sagemaker_role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name='fraud-detection-demo-deploy-model',
    sagemaker_session=sagemaker_session)

deploy_step = ProcessingStep(
    name='DeployModel',
    processor=deploy_model_processor,
    job_arguments=[
        "--model-name", create_model_step.properties.ModelName,
        "--region", region,
        "--endpoint-instance-type", deploy_model_instance_type,
        "--endpoint-name", endpoint_name],
    code=deploy_model_script_uri)

# Combine the Pipeline Steps and Run

In [47]:
pipeline_name = f'credit-default'

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        train_instance_param,
        model_approval_status],
    steps=[
        credit_flow_step,
        create_dataset_step,
        train_step,
        create_model_step,
        clarify_step,
        register_step,
        deploy_step
    ])

## Submit the pipeline definition to the SageMaker Pipeline service¶

Note: If an existing pipeline has the same name it will be overwritten.


In [48]:
pipeline.upsert(role_arn=sagemaker_role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-west-2:367158743199:pipeline/credit-default',
 'ResponseMetadata': {'RequestId': '7fc0d912-07f3-45f2-832f-83d5229a7b89',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '7fc0d912-07f3-45f2-832f-83d5229a7b89',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '82',
   'date': 'Fri, 05 Mar 2021 05:20:57 GMT'},
  'RetryAttempts': 0}}

In [49]:
start_response = pipeline.start()

start_response.wait()
start_response.describe()

KeyboardInterrupt: 

#  View results of Clarify job

# Make a prediction

In [None]:
predictor = sagemaker.predictor.Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemaker_session)

#### Sample an application from the test data and get it's features from online feature store

In [None]:
sample_id = df_test.sample(1).iloc[0]['ID']

response = featurestore_runtime.get_record(
    FeatureGroupName=fg_name,
    RecordIdentifierValueAsString=str(sample_id)

)

In [None]:
df_sample = pd.DataFrame(response['Record']).set_index('FeatureName').drop('LABEL')
data_input = ','.join(df_sample['ValueAsString'])

In [None]:
results = predictor.predict(data_input, initial_args={"ContentType": "text/csv"})
prediction = json.loads(results)

print (f'Probablitity of the default payment next month for ID:{int(sample_id)} is : {round(prediction * 100)}%')

# Clean up

After running the demo, you should remove the resources which were created. You can also delete all the objects in the project's S3 directory by passing the keyword argument delete_s3_objects=True.



In [None]:
from utils import delete_project_resources

In [None]:
# delete_project_resources(
#     sagemaker_boto_client=sagemaker_boto_client,
#     endpoint_name=endpoint_name, 
#     pipeline_name=pipeline_name, 
#     mpg_name=mpg_name, 
#     prefix=prefix,
#     delete_s3_objects=False,
#     bucket_name=default_bucket)