In this notebook, we will take the .flow files that define the transformations to the raw data. and apply them using a SageMaker Processing job that will apply those transformations to the raw data deposited in the S3 bucket as .csv files.

# 0. SETUP
## 0.1 Install required and/or update third-party libraries

In [94]:
!python -m pip install -Uq pip
!python -m pip install -q awswrangler==2.2.0 imbalanced-learn==0.7.0 sagemaker==2.41.0 boto3==1.17.70

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes


## 0.2 Loading stored variables
If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don’t see anything printed then it’s probably the first time you are running the notebook!

In [95]:
%store -r
%store

Stored variables and their in-db values:
bucket             -> 'sagemaker-eu-west-1-688316159674'
prefix             -> 'fraud-detect-demo'


## 0.3 Import Libraries

In [96]:
import json
import time
import boto3
import string
import sagemaker
import pandas as pd
import awswrangler as wr

from sagemaker.feature_store.feature_group import FeatureGroup

## 0.4 Getting started: Creating Resources

In order to successfully run this notebook you will need to create some AWS resources. First, an S3 bucket will be created to store all the data for this tutorial. Once created, you will then need to create an AWS Glue role using the IAM console then attach a policy to the S3 bucket to allow FeatureStore access to this notebook. If you’ve already run this notebook and are picking up where you left off, then running the cells below should pick up the resources you already created without creating any additional resources.

1. In a separate brower tab go to the IAM section of the AWS Console
2. Navigate to the Roles section and select the execution role you’re using for your SageMaker Studio user
    - If you’re not sure what role you’re using, run the cell below to print it out
3.Attach the AmazonSageMakerFeatureStoreAccess policy to this role. Once attached, the changes take effect immediately.

In [97]:
print("SageMaker Role:", sagemaker.get_execution_role().split("/")[-1])

SageMaker Role: AmazonSageMaker-ExecutionRole-FiftyFive


In [98]:
# You can change this to a region of your choice
import sagemaker

region = sagemaker.Session().boto_region_name
print("Using AWS Region: {}".format(region))

Using AWS Region: eu-west-1


In [99]:
boto3.setup_default_session(region_name=region)

boto_session = boto3.Session(region_name=region)

s3_client = boto3.client("s3", region_name=region)

sagemaker_boto_client = boto_session.client("sagemaker")

sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sagemaker_boto_client
)

In [100]:
"""
Note: if you are not running this notebook from SageMaker Studio or SageMaker Classic Notebooks you will need to instanatiate
the sagemaker_execution_role_name with an AWS role that has SageMakerFullAccess and SageMakerFeatureStoreFullAccess
"""
sagemaker_execution_role_name = "AmazonSageMaker-ExecutionRole-20210107T234882"
try:
    sagemaker_role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client("iam")
    sagemaker_role = iam.get_role(RoleName=sagemaker_execution_role_name)["Role"]["Arn"]
    print(f"\n instantiating sagemaker_role with supplied role name : {sagemaker_role}")

account_id = boto3.client("sts").get_caller_identity()["Account"]

## 0.5 Create a directory in the SageMaker default bucket for this tutorial

In [101]:
if 'bucket' not in locals():
    bucket = sagemaker_session.default_bucket()
    prefix = 'fraud-detect-demo'
    %store bucket
    %store prefix
    print(f'Creating bucket: {bucket}...')

If you want to use your own S3 bucket that’s already existing, uncomment and utilize the following example code.

In [102]:
"""
try:
    s3_client.create_bucket(Bucket=bucket, ACL='private', CreateBucketConfiguration={'LocationConstraint': region})
    print('Create S3 bucket: SUCCESS')

except Exception as e:
    if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
        print(f'Using existing bucket: {bucket}/{prefix}')
    else:
        raise(e)
"""

"\ntry:\n    s3_client.create_bucket(Bucket=bucket, ACL='private', CreateBucketConfiguration={'LocationConstraint': region})\n    print('Create S3 bucket: SUCCESS')\n\nexcept Exception as e:\n    if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':\n        print(f'Using existing bucket: {bucket}/{prefix}')\n    else:\n        raise(e)\n"

In [103]:
# ======> Tons of output_paths
traing_job_output_path = f"s3://{bucket}/{prefix}/training_jobs"
bias_report_1_output_path = f"s3://{bucket}/{prefix}/clarify-bias-1"
bias_report_2_output_path = f"s3://{bucket}/{prefix}/clarify-bias-2"
explainability_output_path = f"s3://{bucket}/{prefix}/clarify-explainability"

train_data_uri = f"s3://{bucket}/{prefix}/data/train/train.csv"
test_data_uri = f"s3://{bucket}/{prefix}/data/test/test.csv"

# =======> variables used for parameterizing the notebook run
train_instance_count = 1
train_instance_type = "ml.m4.xlarge"

claify_instance_count = 1
clairfy_instance_type = "ml.c5.xlarge"

predictor_instance_count = 1
predictor_instance_type = "ml.c5.xlarge"

## 0.6 Upload raw data to S3

In [104]:
s3_client.upload_file(
    Filename="data/claims.csv", Bucket=bucket, Key=f"{prefix}/data/raw/claims.csv"
)
s3_client.upload_file(
    Filename="data/customers.csv", Bucket=bucket, Key=f"{prefix}/data/raw/customers.csv"
)

# 1 DATA WRANGLING
## 1.1 Update attributes within the .flow file

DataWrangler will generate a .flow file. It contains a reference to an S3 bucket used during the Wrangling. This may be different from the one you have as a default in this notebook eg if the Wrangling was done by someone else, you will probably not have access to their bucket and you now need to point to your own S3 bucket so you can actually load the .flow file into Wrangler or access the data.

Running the cell below wil genereate the data wrangler flow file and export the data to S3.

## 1.2 Save to S3 with a SageMaker Processing Job

<div class="alert alert-info"> 💡 <strong> Quick Start </strong>
To save your processed data to S3, select the Run menu above and click <strong>Run all cells</strong>. 
<strong><a style="color: #0397a7 " href="#Job-Status-&-S3-Output-Location">
    <u>View the status of the export job and the output S3 location</u></a>.
</strong>
</div>


This notebook executes your Data Wrangler Flow `customers.flow` on the entire dataset using a SageMaker 
Processing Job and will save the processed data to S3.

This notebook saves data from the step `Cast Single Data Type` from `Source: Customers.Csv`. To save from a different step, go to Data Wrangler 
to select a new step to export. 

---

## Contents

1. [Inputs and Outputs](#Inputs-and-Outputs)
1. [Run Processing Job](#Run-Processing-Job)
   1. [Job Configurations](#Job-Configurations)
   1. [Create Processing Job](#Create-Processing-Job)
   1. [Job Status & S3 Output Location](#Job-Status-&-S3-Output-Location)
1. [Optional Next Steps](#(Optional)Next-Steps)
    1. [Load Processed Data into Pandas](#(Optional)-Load-Processed-Data-into-Pandas)
    1. [Train a model with SageMaker](#(Optional)Train-a-model-with-SageMaker)
---

# 1.3 Inputs and Outputs

The below settings configure the inputs and outputs for the flow export.

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

In <b>Input - Source</b> you can configure the data sources that will be used as input by Data Wrangler

1. For S3 sources, configure the source attribute that points to the input S3 prefixes
2. For all other sources, configure attributes like query_string, database in the source's 
<b>DatasetDefinition</b> object.

If you modify the inputs the provided data must have the same schema and format as the data used in the Flow. 
You should also re-execute the cells in this section if you have modified the settings in any data sources.
</div>

In [105]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.dataset_definition.inputs import AthenaDatasetDefinition, DatasetDefinition, RedshiftDatasetDefinition
import time
import uuid
import sagemaker
import re
import os
import json
import boto3


### 1.3.1 Input - S3 data Sources

In [106]:
#list all the datasources
data_sources = []

data_sources.append(ProcessingInput(
    source="s3://sagemaker-eu-west-1-688316159674/fraud-detect-demo/data/raw/customers.csv", # You can override this to point to other dataset on S3
    destination="/opt/ml/processing/customers.csv",
    input_name="customers.csv",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

### 1.3.2 Output: S3 settings

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

1. <b>bucket</b>: you can configure the S3 bucket where Data Wrangler will save the output. The default bucket from 
the SageMaker notebook session is used. 
2. <b>flow_export_id</b>: A randomly generated export id. The export id must be unique to ensure the results do not 
conflict with other flow exports 
3. <b>s3_ouput_prefix</b>:  you can configure the directory name in your bucket where your data will be saved.
</div>

In [107]:
# Sagemaker session
sess = sagemaker.Session()

# You can configure this with your own bucket name, e.g.
# bucket = "my-bucket"
bucket = sess.default_bucket()
print(f"Data Wrangler export storage bucket: {bucket}")




Data Wrangler export storage bucket: sagemaker-eu-west-1-688316159674


Below are the inputs required by the SageMaker Python SDK to launch a processing job.

In [108]:


# Output name is auto-generated from the select node's ID + output name from the flow file.
output_name = "a38c7be6-7fdb-4445-95b9-065b4a88eab5.default"

s3_output_prefix = f"export-{flow_export_name}/output"
s3_output_path = f"s3://{bucket}/{s3_output_prefix}"
print(f"Flow S3 export result path: {s3_output_path}")

processing_job_output = ProcessingOutput(
    output_name=output_name,
    source="/opt/ml/processing/output",
    destination=s3_output_path,
    s3_upload_mode="EndOfJob"
)

Flow S3 export result path: s3://sagemaker-eu-west-1-688316159674/export-flow-2021-02-12-54-26-claims/output


### 1.3.3 Upload Flow to S3

To use the Data Wrangler as an input to the processing job,  first upload your flow file to Amazon S3.

In [109]:

# name of the flow file which should exist in the current notebook working directory
flow_file_name = "customers.flow"

# unique flow export ID
flow_export_id = f"{time.strftime('%Y-%d-%H-%M-%S', time.gmtime())}"
flow_export_name = f"flow-{flow_export_id}-{flow_file_name}".replace(".csv", "")
# Load .flow file from current notebook working directory 
!echo "Loading flow file from current notebook working directory: $PWD"

with open(flow_file_name) as f:
    flow = json.load(f)

# Upload flow to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"data_wrangler_flows/{flow_file_name}")

flow_s3_uri = f"s3://{bucket}/data_wrangler_flows/{flow_file_name}"

print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

Loading flow file from current notebook working directory: /root/full-ml-process
Data Wrangler flow customers.flow uploaded to s3://sagemaker-eu-west-1-688316159674/data_wrangler_flows/customers.flow


The Data Wrangler Flow is also provided to the Processing Job as an input source which we configure below.

In [110]:
## Input - Flow: customers.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination="/opt/ml/processing/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

## 1.4 Run Processing Job 
### 1.4.1 Job Configurations

<div class="alert alert-info"> 💡 <strong> Configurable Settings </strong>

You can configure the following settings for Processing Jobs. If you change any configurations you will 
need to re-execute this and all cells below it by selecting the Run menu above and click 
<b>Run Selected Cells and All Below</b>

1. IAM role for executing the processing job. 
2. A unique name of the processing job. Give a unique name every time you re-execute processing jobs
3. Data Wrangler Container URL.
4. Instance count, instance type and storage volume size in GB.
5. Content type for each output. Data Wrangler supports CSV as default and Parquet.
6. Network Isolation settings
</div>

In [111]:
# IAM role for executing the processing job.
iam_role = sagemaker.get_execution_role()

# Unique processing job name. Give a unique name every time you re-execute processing jobs
processing_job_name = f"data-wrangler-flow-processing-{flow_export_id}"

# Data Wrangler Container URL.
container_uri = "245179582081.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-data-wrangler-container:1.x"
# Pinned Data Wrangler Container URL. 
container_uri_pinned = "245179582081.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-data-wrangler-container:1.6.2"

# Processing Job Instance count and instance type.
instance_count = 2
instance_type = "ml.m5.4xlarge"

# Size in GB of the EBS volume to use for storing data during processing
volume_size_in_gb = 30

# Content type for each output. Data Wrangler supports CSV as default and Parquet.
output_content_type = "CSV"

# Network Isolation mode; default is off
enable_network_isolation = False

# Output configuration used as processing job container arguments 
output_config = {
    output_name: {
        "content_type": output_content_type
    }
}

### 1.4.2 Create Processing Job

To launch a Processing Job, you will use the SageMaker Python SDK to create a Processor function.

In [112]:
from sagemaker.processing import Processor
from sagemaker.network import NetworkConfig

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=volume_size_in_gb,
    network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),
    sagemaker_session=sess
)

# Start Job
processor.run(
    inputs=[flow_input] + data_sources, 
    outputs=[processing_job_output],
    arguments=[f"--output-config '{json.dumps(output_config)}'"],
    wait=False,
    logs=False,
    job_name=processing_job_name
)


Job Name:  data-wrangler-flow-processing-2021-03-07-34-15
Inputs:  [{'InputName': 'flow', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-688316159674/data_wrangler_flows/customers.flow', 'LocalPath': '/opt/ml/processing/flow', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'customers.csv', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-688316159674/fraud-detect-demo/data/raw/customers.csv', 'LocalPath': '/opt/ml/processing/customers.csv', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'a38c7be6-7fdb-4445-95b9-065b4a88eab5.default', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-eu-west-1-688316159674/export-flow-2021-02-12-54-26-claims/output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]


### 1.4.4 Job Status & S3 Output Location

Below you wait for processing job to finish. If it finishes successfully, the raw parameters used by the 
Processing Job will be printed

In [None]:
s3_job_results_path = f"s3://{bucket}/{s3_output_prefix}/{processing_job_name}"
print(f"Job results are saved to S3 path: {s3_job_results_path}")

job_result = sess.wait_for_processing_job(processing_job_name)
job_result

Job results are saved to S3 path: s3://sagemaker-eu-west-1-688316159674/export-flow-2021-02-12-54-26-claims/output/data-wrangler-flow-processing-2021-03-07-34-15
......................................

## Claims data source

In [None]:
data_sources=[]

data_sources.append(ProcessingInput(
    source="s3://sagemaker-eu-west-1-688316159674/fraud-detect-demo/data/raw/claims.csv", # You can override this to point to other dataset on S3
    destination="/opt/ml/processing/claims.csv",
    input_name="claims.csv",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
))

In [None]:
# Sagemaker session
sess = sagemaker.Session()

# You can configure this with your own bucket name, e.g.
# bucket = "my-bucket"
bucket = sess.default_bucket()
print(f"Data Wrangler export storage bucket: {bucket}")

# unique flow export ID
flow_export_id = f"{time.strftime('%Y-%d-%H-%M-%S', time.gmtime())}"
flow_export_name = f"flow-{flow_export_id}-{flow_file_name}".replace(".csv", "")

In [None]:
# Output name is auto-generated from the select node's ID + output name from the flow file.
output_name = "9ff4c2a4-e1fe-4a66-819f-f7d8df7a5fed.default"


s3_output_prefix = f"export-{flow_export_name}/output"
s3_output_path = f"s3://{bucket}/{s3_output_prefix}"
print(f"Flow S3 export result path: {s3_output_path}")

processing_job_output = ProcessingOutput(
    output_name=output_name,
    source="/opt/ml/processing/output",
    destination=s3_output_path,
    s3_upload_mode="EndOfJob"
)

In [None]:
# name of the flow file which should exist in the current notebook working directory
flow_file_name = "claims.flow"

# Load .flow file from current notebook working directory 
!echo "Loading flow file from current notebook working directory: $PWD"

with open(flow_file_name) as f:
    flow = json.load(f)

# Upload flow to S3
s3_client = boto3.client("s3")
s3_client.upload_file(flow_file_name, bucket, f"data_wrangler_flows/{flow_file_name}")

flow_s3_uri = f"s3://{bucket}/data_wrangler_flows/{flow_file_name}"

print(f"Data Wrangler flow {flow_file_name} uploaded to {flow_s3_uri}")

In [None]:
## Input - Flow: claims.flow
flow_input = ProcessingInput(
    source=flow_s3_uri,
    destination="/opt/ml/processing/flow",
    input_name="flow",
    s3_data_type="S3Prefix",
    s3_input_mode="File",
    s3_data_distribution_type="FullyReplicated"
)

In [None]:
# IAM role for executing the processing job.
iam_role = sagemaker.get_execution_role()

# Unique processing job name. Give a unique name every time you re-execute processing jobs
processing_job_name = f"data-wrangler-flow-processing-{flow_export_id}"

# Data Wrangler Container URL.
container_uri = "245179582081.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-data-wrangler-container:1.x"
# Pinned Data Wrangler Container URL. 
container_uri_pinned = "245179582081.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-data-wrangler-container:1.6.2"

# Processing Job Instance count and instance type.
instance_count = 2
instance_type = "ml.m5.4xlarge"

# Size in GB of the EBS volume to use for storing data during processing
volume_size_in_gb = 30

# Content type for each output. Data Wrangler supports CSV as default and Parquet.
output_content_type = "CSV"

# Network Isolation mode; default is off
enable_network_isolation = False

# Output configuration used as processing job container arguments 
output_config = {
    output_name: {
        "content_type": output_content_type
    }
}

In [None]:
from sagemaker.processing import Processor
from sagemaker.network import NetworkConfig

processor = Processor(
    role=iam_role,
    image_uri=container_uri,
    instance_count=instance_count,
    instance_type=instance_type,
    volume_size_in_gb=volume_size_in_gb,
    network_config=NetworkConfig(enable_network_isolation=enable_network_isolation),
    sagemaker_session=sess
)

# Start Job
processor.run(
    inputs=[flow_input] + data_sources, 
    outputs=[processing_job_output],
    arguments=[f"--output-config '{json.dumps(output_config)}'"],
    wait=False,
    logs=False,
    job_name=processing_job_name
)

In [None]:
s3_job_results_path = f"s3://{bucket}/{s3_output_prefix}/{processing_job_name}"
print(f"Job results are saved to S3 path: {s3_job_results_path}")

job_result = sess.wait_for_processing_job(processing_job_name)
job_result

### Load Processed Data into Pandas

We use the [AWS Data Wrangler library](https://github.com/awslabs/aws-data-wrangler) to load the exported 
dataset into a Pandas dataframe for a preview of first 1000 rows.

In [61]:
!pip install -q awswrangler pandas
import awswrangler as wr

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes


In [62]:
chunksize = 1000

if output_content_type.upper() == "CSV":
    dfs = wr.s3.read_csv(s3_output_path, chunksize=chunksize)
elif output_content_type.upper() == "PARQUET":
    dfs = wr.s3.read_parquet(s3_output_path, chunked=chunksize)
else:
    print(f"Unexpected output content type {output_content_type}") 

df = next(dfs)
df

Unnamed: 0,policy_id,customer_age,customer_education,months_as_customer,policy_deductable,policy_annual_premium,policy_liability,auto_year,num_claims_past_year,num_insurers_past_5_years,customer_gender_male,customer_gender_female,customer_gender_unkown,customer_gender_other,policy_state_ca,policy_state_wa,policy_state_az,policy_state_or,policy_state_nv,policy_state_id
0,1,54,2,94,750,3000,1,2006,0,1,0,0,1,0.0,0.0,1.0,0.0,0.0,0.0,0.0
1,2,41,3,165,750,2950,0,2012,0,1,1,0,0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
2,3,57,3,155,750,3000,0,2017,0,1,0,1,0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
3,4,39,4,80,750,3000,2,2020,0,1,0,1,0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
4,5,39,1,60,750,3000,0,2018,0,1,0,1,0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,996,41,3,91,750,3000,0,2016,0,1,1,0,0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
996,997,45,3,60,750,3000,3,2019,0,1,0,1,0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
997,998,41,4,96,750,3000,0,2019,0,1,1,0,0,0.0,1.0,0.0,0.0,0.0,0.0,0.0
998,999,63,3,184,750,3000,2,2019,0,1,0,1,0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
