# Feature transformation with Amazon SageMaker Processing and Dask

Typically a machine learning (ML) process consists of few steps. First, gathering data with various ETL jobs, then pre-processing the data, featurizing the dataset by incorporating standard techniques or prior knowledge, and finally training an ML model using an algorithm.

Often, distributed data processing frameworks such as Dask are used to pre-process data sets in order to prepare them for training. In this notebook we'll use Amazon SageMaker Processing, and leverage the power of Dask in a managed SageMaker environment to run our preprocessing workload.

### What is Dask Distributed?
Dask.distributed: is a lightweight and open source library for distributed computing in Python. It is also a centrally managed, distributed, dynamic task scheduler. It is also a centrally managed, distributed, dynamic task scheduler. Dask has three main components:

**dask-scheduler process:** coordinates the actions of several workers. The scheduler is asynchronous and event-driven, simultaneously responding to requests for computation from multiple clients and tracking the progress of multiple workers.

**dask-worker processes:** Which are spread across multiple machines and the concurrent requests of several clients.

**dask-client process:** which is is the primary entry point for users of dask.distributed

<img src="https://docs.dask.org/en/latest/_images/dask-overview.svg">

source: https://docs.dask.org/en/latest/


## Contents

1. [Objective](#Objective:-predict-the-age-of-an-Abalone-from-its-physical-measurement)
1. [Setup](#Setup)
1. [Using Amazon SageMaker Processing to execute a Dask Job](#Using-Amazon-SageMaker-Processing-to-execute-a-Dask-Job)
  1. [Downloading dataset and uploading to S3](#Downloading-dataset-and-uploading-to-S3)
  1. [Build a Dask container for running the preprocessing job](#Build-a-Dask-container-for-running-the-preprocessing-job)
  1. [Run the preprocessing job using Amazon SageMaker Processing](#Run-the-preprocessing-job-using-Amazon-SageMaker-Processing)
    1. [Inspect the preprocessed dataset](#Inspect-the-preprocessed-dataset)

## Setup

Let's start by specifying:
* The S3 bucket and prefixes that you use for training and model data. Use the default bucket specified by the Amazon SageMaker session.
* The IAM role ARN used to give processing and training access to the dataset.

In [4]:
from time import gmtime, strftime
import sagemaker

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

In [5]:
bucket

'sagemaker-us-east-1-877465308896'

In [12]:
data_subset = "validation"
input_prefix = 'row-data/' + data_subset + ".csv"
input_preprocessed_prefix = 'processed-data/' + data_subset

## Using Amazon SageMaker Processing to execute a Dask job

### Downloading dataset and uploading to Amazon Simple Storage Service (Amazon S3)

The dataset used here is the Census-Income KDD Dataset. The first step are to select features, clean the data, and turn the data into features that the training algorithm can use to train a binary classification model which can then be used to predict whether rows representing census responders have an income greater or less than $50,000. In this example, we will use Dask distributed to preprocess and transform the data to make it ready for the training process. In the next section, you download from the bucket below then upload to your own bucket so that Amazon SageMaker can access the dataset.

In [7]:
import boto3

s3 = boto3.client('s3')
region = sagemaker_session.boto_region_name

### Build a dask container for running the preprocessing job

An example Dask container is included in the `./container` directory of this example. The container handles the bootstrapping of Dask Scheduler and mapping each instance to a Dask Worke. At a high level the container provides:

* A set of default worker/scheduler configurations
* A bootstrapping script for configuring and starting up  scheduler/worker nodes
* Starting dask cluster from all the workers including the scheduler node


After the container build and push process is complete, use the Amazon SageMaker Python SDK to submit a managed, distributed dask application that performs our dataset preprocessing.

### Build the example Dask container.

In [15]:
%cd container
!docker build -t sagemaker-dask-example .
%cd ../

/home/ec2-user/SageMaker/amazon-sagemaker-examples/sagemaker_processing/feature_transformation_with_sagemaker_processing_dask/container
Sending build context to Docker daemon  8.704kB
Step 1/21 : FROM continuumio/miniconda3:4.7.12
 ---> 406f2b43ea59
Step 2/21 : RUN apt-get update
 ---> Using cache
 ---> 6ba1dd8a9b44
Step 3/21 : RUN apt-get install -y curl unzip python3 python3-setuptools python3-pip python-dev python3-dev python-psutil
 ---> Using cache
 ---> cdf2de405d5e
Step 4/21 : RUN pip3 install py4j psutil==5.6.5 numpy==1.17.4
 ---> Using cache
 ---> b3ff4e4ec52f
Step 5/21 : RUN apt-get clean
 ---> Using cache
 ---> 9568dd651e28
Step 6/21 : RUN rm -rf /var/lib/apt/lists/*
 ---> Using cache
 ---> b7939b0f11ab
Step 7/21 : ENV PYTHONHASHSEED 0
 ---> Using cache
 ---> 8e369638938c
Step 8/21 : ENV PYTHONIOENCODING UTF-8
 ---> Using cache
 ---> ab317a7a48ee
Step 9/21 : ENV PIP_DISABLE_PIP_VERSION_CHECK 1
 ---> Using cache
 ---> b698a6a930b3
Step 10/21 : RUN conda install --yes     -c c

### Create an Amazon Elastic Container Registry (Amazon ECR) repository for the Dask container and push the image.

In [8]:
import boto3

account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name

ecr_repository = 'sagemaker-dask-example'
tag = ':latest'
uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
dask_repository_uri = '{}.dkr.ecr.{}.{}/{}'.format(account_id, region, uri_suffix, ecr_repository + tag)

In [None]:
# Create ECR repository and push docker image
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $dask_repository_uri
!docker push $dask_repository_uri

### Run the preprocessing job using Amazon SageMaker Processing on Dask Cluster

Next, use the Amazon SageMaker Python SDK to submit a processing job. Use the the custom Dask container that was just built, and a Scikit Learn script for preprocessing in the job configuration.

#### Create the Dask preprocessing script.

In [15]:
%%writefile preprocess.py
from __future__ import print_function, unicode_literals
import argparse
import json
import logging
import os
import sys
import time
import warnings
import numpy as np
import pandas as pd
from tornado import gen
import dask.dataframe as dd
import joblib
from dask.distributed import Client

df_freq = pd.read_csv('s3://sagemaker-us-east-1-877465308896/freq_df.csv')

def split_sequence(sequence):
    '''
    Separate cahrs in sequence with spaces: 'ABCDEFG' to ' A B C D E F G'
    '''
    
    separated_seq = ''
    
    for i in range(len(sequence)):
        separated_seq = separated_seq + ' ' + sequence[i]
        
    return separated_seq

def sequence_to_ID(sequence):
    """
    Replace chars in a sequence with its ID
    """
    return list(map(char_to_ID, sequence.split()))

def char_to_ID(char):
    return df_freq.index[df_freq['feature'] == char.lower()].tolist()[0] + 1

if __name__ == "__main__":
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()
    
    # Get processor scrip arguments
    args_iter = iter(sys.argv[1:])
    script_args = dict(zip(args_iter, args_iter))
    scheduler_ip = sys.argv[-1]

    # S3 client
    s3_region = script_args["s3_region"]
    print(f'Using the {s3_region} region')
    
    # Start the Dask cluster client
    try:
        client = Client("tcp://{ip}:8786".format(ip=scheduler_ip))
        logging.info("Printing cluster information: {}".format(client))
    except Exception as err:
        logging.exception(err)
    
    input_data_path = "s3://{}".format(os.path.join(script_args["s3_input_bucket"], script_args["s3_input_key_prefix"]))
    output_data_path = "s3://{}".format(os.path.join(script_args["s3_output_bucket"], script_args["s3_output_key_prefix"]))

    logging.info("Reading input data from {}".format(input_data_path))
    df_row_data = pd.read_csv(input_data_path)

    # Read dictionary mapping family accessions to their values
    dict_class = pd.read_csv("s3://{}".format(os.path.join(script_args["s3_input_bucket"], 'dict_class.csv')))
    dict_class = dict_class.to_dict("dict")
    
    logging.info("Running preprocessing and feature engineering transformations in Dask")
    with joblib.parallel_backend("dask"):
        
        logging.info("Working with sequences")
        
        # Perform spacing between the chars of sequences
        x_splited = (df_row_data.sequence).apply(split_sequence)
        
        # Convert chars to their IDs
        x_splited_IDs = x_splited.apply(sequence_to_ID)
        
        
        logging.info("Working with labels")

        # Construct a DF of labels (e.g., true values)
        labels = df_row_data.family_accession.apply(lambda x: dict_class[x][0])

    logging.info("Done, writing data to: {}".format(output_data_path))
    pd.DataFrame(x_splited_IDs).to_csv(output_data_path + "_x.csv", index=False, mode = 'w', header = True)
    pd.DataFrame(labels).to_csv(output_data_path + "_y.csv", index=False, mode = 'w', header = True)

    
    print(client)
    sys.exit(os.EX_OK)

Overwriting preprocess.py


Run a processing job using the Docker image and preprocessing script you just created. When invoking the `dask_processor.run()` function, pass the Amazon S3 input and output paths as arguments that are required by our preprocessing script to determine input and output location in Amazon S3. Here, you also specify the number of instances and instance type that will be used for the distributed Spark job.

In [13]:
from sagemaker.processing import ProcessingInput, ScriptProcessor

dask_processor = ScriptProcessor(
    base_job_name="dask-preprocessor",
    image_uri=dask_repository_uri,
    command=["/opt/program/bootstrap.py"],
    role=role,
    instance_count=3,
    instance_type="ml.m5.xlarge",
    max_runtime_in_seconds=None,
)

In [14]:
dask_processor.run(
    code="preprocess.py",
    arguments=[
        "s3_input_bucket",
        bucket,
        "s3_input_key_prefix",
        input_prefix,
        "s3_output_bucket",
        bucket,
        "s3_output_key_prefix",
        input_preprocessed_prefix,
        "s3_region",
        region
    ],
    logs=True
)

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.



Job Name:  dask-preprocessor-2020-10-12-19-39-11-931
Inputs:  [{'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-877465308896/dask-preprocessor-2020-10-12-19-39-11-931/input/code/preprocess.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  []
.......................[32mdistributed.nanny - INFO -         Start Nanny at: 'tcp://10.0.131.225:46767'[0m
[32mdistributed.worker - INFO -       Start worker at:   tcp://10.0.131.225:34079[0m
[32mdistributed.worker - INFO -          Listening to:   tcp://10.0.131.225:34079[0m
[32mdistributed.worker - INFO -          dashboard at:         10.0.131.225:36079[0m
[32mdistributed.worker - INFO - Waiting to connect to:     tcp://10.0.172.50:8786[0m
[32mdistributed.worker - INFO - -------------------------------------------------[0m
[32mdistributed.worker - INFO -               Threads: 