### Capacity test XGBoost Batch transform
This sample test 10,000,000 millon rows with 70 columns each using XGBoost Builtin algo

## Set up

#### Configure the inputs

In [1]:
# Specify jobname
sagemaker_training_job_name = "xgboost-2019-01-21-00-33-32-349"

In [2]:
columns_size = 69 # This depends on your model columns
rows_size = 10000000

In [None]:
%%time
import sagemaker
from sagemaker import get_execution_role

role = get_execution_role()
print(role)

sess = sagemaker.Session()
bucket=sess.default_bucket() 

prefix = "xgboost_capacity_test_2"


In [5]:
s3_batch_transform_prefix = "s3://{}/{}/{}/batch_transform/".format(bucket, prefix, sagemaker_training_job_name)
s3_batch_transform_input_prefix = "{}input/".format(s3_batch_transform_prefix)
s3_batch_transform_output_prefix = "{}output/".format(s3_batch_transform_prefix)
s3_batch_transform_output_merged_prefix = "{}mergedoutput/".format(s3_batch_transform_prefix)

## Create Model

In [24]:
job_name = sagemaker_training_job_name
model_name = job_name


In [None]:
import boto3
from sagemaker.amazon.amazon_estimator import get_image_uri
hosting_image = get_image_uri(boto3.Session().region_name, 'xgboost')

#### This is a one time operation per training job.

In [None]:
%%time
import boto3
from time import gmtime, strftime
from sagemaker.amazon.amazon_estimator import get_image_uri

sage = boto3.Session().client(service_name='sagemaker') 


info = sage.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
print(model_data)


primary_container = {
    'Image': hosting_image,
    'ModelDataUrl': model_data,
}

create_model_response = sage.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

### Create dummy large data set and upload to s3

In [6]:
tempdir = "tmp_xgboost"

In [7]:
import pandas as pd
import numpy as np

In [8]:
%%time 
df = pd.DataFrame(np.random.randint(0,100,size=(rows_size, columns_size)))
df.head(n=3)

CPU times: user 6.19 s, sys: 2.01 s, total: 8.2 s
Wall time: 8.2 s


In [9]:
df.shape

(10000000, 69)

#### Write one large dataframe into  smaller csv files

In [10]:
%%time 

input_dir= "{}/{}".format(tempdir, "input_parts")


CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 5.96 µs


In [13]:
!rm -rf $input_dir
!mkdir -p $input_dir

In [16]:
#### Slow op... 8 mins on m4.xlarge

In [15]:
%%time 

import numpy as np

chunk_size_num_records = 10000


for i in range(0, df.shape[0], chunk_size_num_records):
    df.iloc[i:i + chunk_size_num_records,].to_csv('{}/input_csv_part_{}.csv'.format(input_dir, i), index=False, header = False) 


CPU times: user 9min 23s, sys: 45 s, total: 10min 8s
Wall time: 8min 55s


In [17]:
!ls -lh $input_dir | head -3

total 1.9G
-rw-rw-r-- 1 ec2-user ec2-user 2.0M Mar 25 09:50 input_csv_part_0.csv
-rw-rw-r-- 1 ec2-user ec2-user 2.0M Mar 25 09:51 input_csv_part_1000000.csv
ls: write error: Broken pipe


This has create smaller files of 2 mb each

#### Copy data to S3

In [18]:
%%time

!aws s3 sync $input_dir $s3_batch_transform_input_prefix --quiet

CPU times: user 226 ms, sys: 96.8 ms, total: 322 ms
Wall time: 15.5 s


### Redshift: Unload csv from redshift to S3 into smaller files

- See example here https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD_command_examples.html. 
- Restrict max file size to say, 2 MB

```sql
unload ('select * from venue')
to 's3://mybucket/unload/' 
iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole'
maxfilesize 2 mb;
```


## S3 utilities
S3 utlities to upload and download files

In [19]:
import boto3

def uploadfile(localpath, s3path):
        """
Uploads a file to s3
        :param localpath: The local path
        :param s3path: The s3 path in format s3://mybucket/mydir/mysample.txt
        """

        bucket, key = get_bucketname_key(s3path)

        if key.endswith("/"):
            key = "{}{}".format(key, os.path.basename(localpath))
        
        s3 = boto3.client('s3')
        
        s3.upload_file(localpath, bucket, key)

def get_bucketname_key(uripath):
    assert uripath.startswith("s3://")

    path_without_scheme = uripath[5:]
    bucket_end_index = path_without_scheme.find("/")

    bucket_name = path_without_scheme
    key = "/"
    if bucket_end_index > -1:
        bucket_name = path_without_scheme[0:bucket_end_index]
        key = path_without_scheme[bucket_end_index + 1:]

    return bucket_name, key


def download_file(s3path, local_dir):
    bucket, key = get_bucketname_key(s3path)
    
    s3 = boto3.client('s3')
    
    local_file = os.path.join(local_dir, s3path.split("/")[-1])
    

    s3.download_file(bucket, key, local_file)
    

def list_files(s3path_prefix):
    assert s3path_prefix.startswith("s3://")
    assert s3path_prefix.endswith("/")
    
    bucket, key = get_bucketname_key(s3path_prefix)
    
   
   
    s3 = boto3.resource('s3')
    
    bucket = s3.Bucket(name=bucket)

    return ( (o.bucket_name, o.key) for o in bucket.objects.filter(Prefix=key))
        


In [20]:
import glob
from multiprocessing.dummy import Pool as ThreadPool


def upload_files(local_dir, s3_prefix, num_threads=100):    
    input_tuples = ( (f,  s3_prefix) for f in glob.glob("{}/*".format(local_dir)))
    
    with ThreadPool(num_threads) as pool:
        pool.starmap(uploadfile, input_tuples)
    


def download_files(s3_prefix, local_dir, num_threads=100):    
    input_tuples = ( ("s3://{}/{}".format(s3_bucket,s3_key),  local_dir) for s3_bucket, s3_key in list_files(s3_prefix))
    
    with ThreadPool(num_threads) as pool:
        results = pool.starmap(download_file, input_tuples)




In [48]:
%%time
upload_files( input_dir, s3_batch_transform_input_prefix, num_threads=15)

CPU times: user 33.8 s, sys: 14.7 s, total: 48.5 s
Wall time: 23.8 s


### Starts here : Run batch transform

In [21]:
%%time
import boto3
import sagemaker
import json
import os
from time import strftime , gmtime


fmttime= strftime("%Y-%m-%d-%H-%M-%S", gmtime())

input_location =  s3_batch_transform_input_prefix
output_location = "{}{}/".format(s3_batch_transform_output_prefix, fmttime)



CPU times: user 27 µs, sys: 2 µs, total: 29 µs
Wall time: 31 µs


In [22]:
print("Results will be saved to {}".format(output_location))

Results will be saved to s3://sagemaker-us-east-2-324346001917/xgboost_capacity_test_2/xgboost-2019-01-21-00-33-32-349/batch_transform/output/2019-03-25-10-00-19/


#### Run transform on a single ml.c4.xlarge instance

In [26]:
%%time

import boto3
import sagemaker

# Initialize the transformer object
transformer =sagemaker.transformer.Transformer(
    base_transform_job_name='Batch-Transform',
    model_name=model_name,
    max_payload = 5,
    instance_count=1,
    instance_type='ml.c4.xlarge',
    output_path=output_location
    )
# To start a transform job:
transformer.transform(input_location, content_type='text/csv', split_type='Line')
# Then wait until transform job is completed
transformer.wait()

INFO:sagemaker:Creating transform job with name: Batch-Transform-2019-03-25-10-01-46-604


....................................................................!
CPU times: user 281 ms, sys: 32.1 ms, total: 313 ms
Wall time: 5min 42s


#### Copy batch results locally

In [27]:
local_output_dir= "{}/batch_out".format(tempdir)

In [28]:
!rm -rf local_output_dir
!mkdir -p $local_output_dir

In [29]:
%%time
download_files( output_location , local_output_dir, num_threads=130)

CPU times: user 25.8 s, sys: 2.58 s, total: 28.4 s
Wall time: 19.1 s


In [30]:
!head $local_output_dir/input_csv_part_9960000.csv.out

0.958344399929
0.958344399929
0.950590014458
0.954473614693
0.950590014458
0.925055503845
0.950590014458
0.958697319031
0.966336548328
0.950590014458


#### Merge the results from batch transform.

The batch tranform results only contain the results and not the corresponding input features. You need to merge the resulst (confidence scores) with the id of the input to later use it to update redshift or any other system


In [31]:
# To fetch validation result 
import glob
import os
import pandas
from multiprocessing.dummy import Pool as ThreadPool
import pathlib



# def merge_input_output(input_id_file, batch_results_file, output_dir):
#     """
#     Merge the id of the input and the results (confidence score), this is slow..
#     """
#     df_i = pd.read_csv(input_id_file, header=None)
#     df_o = pd.read_csv(batch_results_file, header= None)
    
#     df_i["confidence_score"] = df_o.iloc[:, 0]
    
    
    
#     df_i.to_csv(results_path)
    
#     return results_path
  

def merge_input_output_raw(input_id_file, batch_results_file, output_dir):
    """
    Merge the id of the input and the results (confidence score)
    """
    import csv
    
    # input id files
    with open(input_id_file, 'r') as input_csv:
        i_reader = csv.reader(input_csv, delimiter=',')
        
        #Confidence scores
        with open(batch_results_file,  'r') as scores_csv:
            s_reader = csv.reader(scores_csv, delimiter=',')        
            
            # Output file
            results_path = os.path.join(output_dir, os.path.basename(input_id_file))
            with open(results_path, "w") as o:
                writer = csv.writer(o, delimiter=',')
                
                # Merge input, confidence and write to file
                for i, s in zip(i_reader, s_reader):
                    i.extend(s)
                    writer.writerow(i)
  
    
    return results_path
  
    
  
    

def get_input_confidence_file_tuples(input_file_dir, local_batch_output_dir):
    for f in glob.glob("{}/*.out".format(local_batch_output_dir)):
        i_file_name =  os.path.basename(f).replace(".out", "")
        input_file_path= os.path.join(input_file_dir, i_file_name)
        yield (input_file_path, f)
    

def merge_files(num_threads , input_file_dir, local_batch_dir, out_dir):
    #Create output dir
    pathlib.Path(out_dir).mkdir(parents=True, exist_ok=True)
    with ThreadPool(num_threads) as pool:
        merge_tuples = ((i,o, out_dir) for i,o in get_input_confidence_file_tuples(input_file_dir, local_batch_dir))
        results = pool.starmap(merge_input_output_raw, merge_tuples)
    

    




#### 1 minute to complete depending on the instance

In [49]:
%%time
results_dir = os.path.join(tempdir, "results")

merge_files(num_threads = 30, input_file_dir = input_dir, local_batch_dir= local_output_dir, out_dir=results_dir)
    

CPU times: user 1min 16s, sys: 4.47 s, total: 1min 20s
Wall time: 1min 50s


In [39]:
%%time

upload_files(results_dir, s3_batch_transform_output_merged_prefix, num_threads=25)

CPU times: user 33.5 s, sys: 15 s, total: 48.5 s
Wall time: 23 s


### Redshift copy command to load the results into s3

This command will upload multiple files from s3://mybucket/prefix to table temp_customer
For more detail see https://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html 

```sql
copy temp_customer 
from 's3://mybucket/prefix' 
iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole';
```