### Imports

In [8]:
import boto3
import sagemaker
import pickle
import pandas as pd

from time import gmtime, strftime

from sagemaker import get_execution_role
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.amazon.amazon_estimator import get_image_uri

### Environment Setup

In [9]:
sagemaker_session = sagemaker.Session()
role = get_execution_role()

# Get headerless CSV
!wget --directory-prefix=./titanic_data https://github.com/bryanfree66/titanic-sagemaker-inference/blob/master/data/train_no_header.csv

--2020-07-24 22:58:58--  https://github.com/bryanfree66/titanic-sagemaker-inference/blob/master/data/train_no_header.csv
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/html]
Saving to: ‘./titanic_data/train_no_header.csv.1’

train_no_header.csv     [ <=>                ] 503.78K  --.-KB/s    in 0.02s   

2020-07-24 22:58:59 (21.9 MB/s) - ‘./titanic_data/train_no_header.csv.1’ saved [515873]



### Endpoint Functions

In [10]:
# Format prediction output
def output_fn(prediction, accept):
    if accept == "application/json":
        instances = []
        for row in prediction.tolist():
            instances.append({"features": row})

        json_output = {"instances": instances}

        return worker.Response(json.dumps(json_output), accept, mimetype=accept)
    elif accept == 'text/csv':
        return worker.Response(encoders.encode(prediction, accept), accept, mimetype=accept)
    else:
        raise RuntimeException("{} accept type is not supported by this script.".format(accept))

In [11]:
#Parse input data payload
def input_fn(input_data, content_type):
    if content_type == 'text/csv':
        # Read the raw input data as CSV.
        df = pd.read_csv(StringIO(input_data), 
                         header=None)

        if len(df.columns) == len(feature_columns_names) + 1:
            # This is a labelled example, includes the ring label
            df.columns = feature_columns_names + [label_column]
        elif len(df.columns) == len(feature_columns_names):
            # This is an unlabelled example.
            df.columns = feature_columns_names

        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))

In [12]:
# Preprocess input data
def predict_fn(input_data, model):
    
    features = model.transform(input_data)

    if label_column in input_data:
        # Return the label (as the first column) and the set of features.
        return np.insert(features, 0, input_data[label_column], axis=1)
    else:
        # Return only the set of features
        return features

In [13]:
# Deserialize fitted model
def model_fn(model_dir):
    preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
    return preprocessor

### Create Inference Pipeline

In [14]:
SCRIPT_PATH = 'feature_eng/titanic_featurizer.py'
BUCKET = 'bf-titanic-data'
WORK_DIRECTORY = 'titanic_data'
PREFIX = 'titanic-inference-pipeline'

train_data = sagemaker_session.upload_data(
    path='{}/{}'.format(WORK_DIRECTORY, 'train_no_header.csv'), 
    bucket=BUCKET,
    key_prefix='{}/{}'.format(PREFIX, 'train'))

sklearn_preprocessor = SKLearn(
    entry_point=SCRIPT_PATH,
    role=role,
    train_instance_type="ml.c4.xlarge",
    sagemaker_session=sagemaker_session,
    framework_version="0.23-1")

sklearn_preprocessor.fit({'train': train_data})

's3_input' class will be renamed to 'TrainingInput' in SageMaker Python SDK v2.


2020-07-24 22:59:00 Starting - Starting the training job...
2020-07-24 22:59:02 Starting - Launching requested ML instances......
2020-07-24 23:00:15 Starting - Preparing the instances for training......
2020-07-24 23:01:27 Downloading - Downloading input data...
2020-07-24 23:01:57 Training - Downloading the training image.....[34m2020-07-24 23:02:37,614 sagemaker-training-toolkit INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2020-07-24 23:02:37,615 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2020-07-24 23:02:37,625 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2020-07-24 23:02:37,968 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2020-07-24 23:02:38,191 sagemaker-training-toolkit INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2020-07-24 23:02:38,202 sagemaker-training-toolkit INFO     No GPUs detected (norm

UnexpectedStatusException: Error for Training job sagemaker-scikit-learn-2020-07-24-22-58-59-543: Failed. Reason: AlgorithmError: ExecuteUserScriptError:
Command "/miniconda3/bin/python titanic_featurizer.py"

In [None]:
timestamp_prefix = strftime("%m/%d/%Y, %H:%M:%S")
model_name = 'inference-pipeline-' + timestamp_prefix
endpoint_name = 'inference-pipeline-ep-' + timestamp_prefix
prod_model = PipelineModel(
    name=model_name, 
    role=role,
    models=inference_model)

sklearn_preprocessor.fit({'train': train_input})

### Batch Transform Training Data

In [None]:
# Define a SKLearn Transformer from the trained SKLearn Estimator
transformer = sklearn_preprocessor.transformer(
    instance_count=1, 
    instance_type='ml.m4.xlarge',
    assemble_with = 'Line',
    accept = 'text/csv')

# Preprocess training input
transformer.transform(train_input, content_type='text/csv')
print('Waiting for transform job: ' + transformer.latest_transform_job.job_name)
transformer.wait()
preprocessed_train = transformer.output_path