## Building an end-to-end ML Pipeline with AWS Sagemaker & API

This Notebook shows a basic example how to build an end-to-end machine learing pipeline on AWS by using the [AWS Sagemaker Python SDK](https://sagemaker.readthedocs.io/en/stable/). 

The AWS Sagemaker Components provide a service for running your python scripts in docker containers, using either AWS maintained images or custom images. Additionally the Sagemaker Python SDK implements many convenience functions for handling parameters such as instance size, IO-Handling, and deployment. They are great building blocks for scalable, consistent, reproducable ml pipelines. They can easily be orchestarated by either using an open source Workflow Tool (Airlflow, Prefect) or AWS Step Functions. They provide a perfect fit for ordinary ml workflows with medium sized datasets, tabular data. They support images for the most widely used ml frameworks, eg. scikit-learn, tensorflow and pytorch.

The Notebook contains both the source code for preprocessing, training and deployment, as well as the calls to the sagemaker API that are executing the jobs. 

### The Pipeline

##### Data Processing:
- The flow begins with a preprocessing script that uses `pandas` and `scikit-learn` to read a csv, apply transformations to the data, splits the data into train and test set, and saves the data to S3.
- The preprocessing file will be executed with the `SKLearnProcessor`, where instance size and IO paths will be configured. 

##### Model Training & Deployment:
- Next, another script for model training and deployment will be created. This script includes the algorithm, the training rutine, the serialization of the model, and the serving functions that will be used for model deployment.
- This script will be executed with the `SKLearn` estimator class. When calling `fit()` on it, model training will be executed. When calling `deploy()`, the model will be deployed. 

##### Model Serving:
- A lmabda function is created as an intermediate layer between your sagemaker model endpoint and your REST API.
- A REST API is configured with API Gateway. It consists of a simple `POST` method that calls the lambda function with live data as payload. The live data is passed to the enpoint and predictions are receid and returned to the caller. 

<img src="img/flowchart_ml_pipeline.png" alt="Flowchart" width="1200" height="675" style="horizontal-align:middle">

### Prerequisites
To run this demo, you will need access to an AWS account, a user that let's you access the ressource needed, and roles to grand permessions for the services. 

This demo will not cover how to set up IAM roles and permissions. It will assume that the reader will be a


To run this demo, you will need access to an AWS account, create a user with a policy that grants permissions to all services that will be used in this example. The notbook can be run on any environment, given that authentication is provided, however the recommended, and easiest way is to run this tutorial on an AWS sagemakeer notebook instance. You can find more information about setting that up [here](https://docs.aws.amazon.com/sagemaker/latest/dg/howitworks-create-ws.html).

In [None]:
# TODO: Write about IAM configuration needed

#### Import libraries

In [None]:
import os
import pandas as pd
import boto3

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.sklearn import SKLearnModel
from sagemaker.processing import ProcessingInput, ProcessingOutput

#### Load Environment Variables

I am using [dotenv](https://github.com/theskumar/python-dotenv) to handle my environment variables. You could either directly define them in the notebook below, replacing the `os.getenv()` call (e.g. `script_path = "path/to/your/script"`), or you define them in an `.env` file in your root directory.

In [None]:
%load_ext dotenv
%dotenv

role= os.getenv("ROLE") # Sagemaker Role TODO: Define Separate Roles for SageMaker, Lambda and Gateway
preprocessing_source_path=os.getenv("PREPROCESSING_SOURCE_PATH")
preprocessing_output_path=os.getenv("PREPROCESSING_OUTPUT_PATH")

## Preprocessing

#### Develop preprocessing script

This is an example preprocessing script. It will read the data into a Pandas DataFrame, and apply a  scikit-learn column transformer pipeline, one-hot-encoding categorical variables and scaling interval-scaled variables. The it will split the data into training and test set and write the data to a flat file.

When executing the cell, the magic command `%%writefile filename.py` will save the file the code as a python file in your current working directory. This will allow the SageMaker preprocessing job to use the script in a seperate docker container, where the preprocessing will be executed. 

In [None]:
%%writefile preprocessing.py

import argparse
import os

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import make_column_transformer

input_columns = [
    "species",
    "island",
    "bill_length_mm",
    "bill_depth_mm",
    "flipper_length_mm",
    "body_mass_g",
    "sex",
]

target = "sex"

if __name__ == "__main__":
    # Parse Arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()
    split_ratio = args.train_test_split_ratio
    print("Received arguments {}".format(args))

    # Process input data
    input_data_path = os.path.join("/opt/ml/processing/input", "penguins.csv")
    print("Reading input data from {}".format(input_data_path))
    df = pd.read_csv(input_data_path)
    df = pd.DataFrame(data=df, columns=input_columns)
    df.dropna(inplace=True)
    df.drop_duplicates(inplace=True)

    preprocess = make_column_transformer(
        (["bill_length_mm", "bill_depth_mm", "flipper_length_mm"], StandardScaler()),
        (["species", "island"], OneHotEncoder(sparse=False)),
    )

    X = preprocess.fit_transform(df.drop(columns="sex"))

    # Split data into training and test set
    print("Splitting data into train and test sets with ratio {}".format(split_ratio))
    X_train, X_test, y_train, y_test = train_test_split(
        pd.DataFrame(X),
        df[target],
        test_size=split_ratio,
        random_state=42,
    )

    train_features_output_path: str = os.path.join(
        "/opt/ml/processing/train", "train_features.csv"
    )
    train_labels_output_path: str = os.path.join(
        "/opt/ml/processing/train", "train_labels.csv"
    )
    test_features_output_path: str = os.path.join(
        "/opt/ml/processing/test", "test_features.csv"
    )
    test_labels_output_path: str = os.path.join(
        "/opt/ml/processing/test", "test_labels.csv"
    )

    # Save processed data as csv
    print("Saving training features to {}".format(train_features_output_path))
    X_train.to_csv(train_features_output_path, header=False, index=False)

    print("Saving test features to {}".format(test_features_output_path))
    X_test.to_csv(test_features_output_path, header=False, index=False)

    print("Saving training labels to {}".format(train_labels_output_path))
    y_train.to_csv(train_labels_output_path, header=False, index=False)

    print("Saving test labels to {}".format(test_labels_output_path))
    y_test.to_csv(test_labels_output_path, header=False, index=False)


#### Define & Run SKLearn Preprocessor

The `SKLearnProcessor` is the object that lets you configure the preprocessing job, including the framework_version, the instance_type and the number of instances. You could also pass a custom docker image to the object that would be used instead of the scikit-learn image maintained by AWS. 

When calling `run()` the preprocessing job will be executed. The function accepts the path to the preprocessing script that was defined in the cell above as input. Additionally,  data input and output paths will be defined. S3 buckets can be used for retrieval of raw data and storing of proceeesed data. With the objects `ProccesingInput` and `ProcessingOutput` we make sure that the paths in S3 and in our docker container are mapped accordingly. 

In [None]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    base_job_name="preprocessing",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
)

docker_base_path: str = "/opt/ml/processing/"

sklearn_processor.run(
    code="preprocessing.py",
    inputs=[
        ProcessingInput(
            source=preprocessing_source_path, 
            destination=os.path.join(docker_base_path, "input")
        ),
    ],
    outputs=[
        ProcessingOutput(
            destination=preprocessing_output_path,
            output_name="train_data", 
            source=os.path.join(docker_base_path, "train")
        ),
        ProcessingOutput(
            destination=preprocessing_output_path,
            output_name="test_data", 
            source=os.path.join(docker_base_path, "test")
        ),
    ],
)
preprocessing_job_description = sklearn_processor.jobs[-1].describe()

#### Inspect generated training data

Let's have a look at our processed data.

In [None]:
training_features = pd.read_csv(preprocessing_output_path + "train_features.csv", nrows=10, header=None)
print("Training features shape: {}".format(training_features.shape))
training_features.head(n=3)

## Model Training

#### Create SKLearn training and deploy script

In order to execetue model training and deployment of the trained model, we need to write another script. 

The script will comprise of the training routine, which will ingest the processed training data that was generated in the Sagemaker Processing step above. It reads the data, instanciates the model - here a simple `LogisticRegression` and calls `fit` on the model. The model is then serialized and saved in our working directory. The `SKLearn` object will then move the artifacts to the desired output path in S3. If no output path is specified, Sagemaker will create a new bucket to store the artifacts of the training job.

The script also contasins several serving functions that Sagemaker requires for model serving via the sagemaker model endpoint service. These functions comprise of `model_fn()` ensuring that the model gets loaded from file, `input_fn()` handling the input in a way that it can be used for calling the `predict()` function on the model, the `predict_fn()` which actually calls `predict` on the model and the `output_fn()`, which will convert the model output to a format that can be send back to the caller. 

The script will also be saved to disc with the `%%writefile` magic command. 

In [None]:
%%writefile train_and_deploy.py

import os

import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.externals import joblib


""" 
Define model serving functions. More aboutthese functions at:
https://sagemaker.readthedocs.io/en/stable/frameworks/sklearn/using_sklearn.html#load-a-model
"""
def model_fn(model_dir):
    model = joblib.load(os.path.join(model_dir, "model.joblib"))
    return model

def input_fn(request_body, content_type):
    if content_type == 'text/csv':
        samples = []
        for r in request_body.split('|'):
            samples.append(list(map(float,r.split(','))))
        return np.array(samples)
    else:
        raise ValueError("Thie model only supports text/csv input")

def predict_fn(input_data, model):
    return model.predict(input_data)

def output_fn(prediction, content_type):
    return str(prediction)


if __name__ == "__main__":
    training_data_directory = "/opt/ml/input/data/train"
    train_features_data = os.path.join(training_data_directory, "train_features.csv")
    train_labels_data = os.path.join(training_data_directory, "train_labels.csv")
    print("Reading input data")
    X_train = pd.read_csv(train_features_data, header=None)
    y_train = pd.read_csv(train_labels_data, header=None)

    model = LogisticRegression(class_weight="balanced", solver="lbfgs")
    model.fit(X_train, y_train)
    model_output_directory = os.path.join("/opt/ml/model", "model.joblib")
    print("Saving model to {}".format(model_output_directory))
    joblib.dump(model, model_output_directory)

The `SKLearn` object is the standard interface for scheduling and defining model training and deployment of scikit-learn models. After specifying the ressources needed, the framework version and the entry_point, we can call `fit()` in order to execute the training job. We pass a dictionary with a single keyword `"train"` that specifies the path to the processed data in S3.

In [None]:
sklearn = SKLearn(
    entry_point="train_and_deploy.py",
    framework_version="0.20.0", 
    instance_type="ml.m5.xlarge", 
    role=role
)
sklearn.fit({"train": preprocessing_output_path})

In [None]:
# Get model data in order to load model
model_data_s3_uri = sklearn.output_path + sklearn.latest_training_job.name + "/output/model.tar.gz"
model_data_s3_uri

Because we did not specify a bucket, where our model artifact should be stored, the training job created a new one. The uri can be retrieved from the metadata contained in the `sklearn` object:

#### Evaluate Model

In [7]:
%%writefile evaluate.py

import json
import os
import tarfile

import pandas as pd

from sklearn.externals import joblib
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score

if __name__ == "__main__":
    model_path = os.path.join("/opt/ml/processing/model", "model.tar.gz")
    print("Extracting model from path: {}".format(model_path))
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    print("Loading model")
    model = joblib.load("model.joblib")

    print("Loading test input data")
    test_features_data = os.path.join("/opt/ml/processing/test", "test_features.csv")
    test_labels_data = os.path.join("/opt/ml/processing/test", "test_labels.csv")

    X_test = pd.read_csv(test_features_data, header=None)
    y_test = pd.read_csv(test_labels_data, header=None)
    predictions = model.predict(X_test)

    print("Creating classification evaluation report")
    report_dict = classification_report(y_test, predictions, output_dict=True)
    report_dict["accuracy"] = accuracy_score(y_test, predictions)
    # report_dict["roc_auc"] = roc_auc_score(y_test, predictions)

    print("Classification report:\n{}".format(report_dict))

    evaluation_output_path = os.path.join(
        "/opt/ml/processing/evaluation", "evaluation.json"
    )
    print("Saving classification report to {}".format(evaluation_output_path))

    with open(evaluation_output_path, "w") as f:
        f.write(json.dumps(report_dict))

Writing evaluate.py


#### Evaluate Model Performance

Another script is created in order to evaluate the perfomance of the model created above. The evaluation step will again be executed as an individual step in our ml pipeline. It loads both the model and the processed test data, collects several metrics (classification report, roc_auc score, accuracy) and stores them in a JSON file. 

In [None]:
sklearn_processor.run(
    code="evaluate.py",
    inputs=[
        ProcessingInput(
            source=model_data_s3_uri, 
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source="s3://mlops-test-processed-data/", 
            destination="/opt/ml/processing/test"),
    ],
    outputs=[ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
)
evaluation_job_description = sklearn_processor.jobs[-1].describe()

Execute model evaluation using the same processing configurations as for the preprocessing job and the same object instantiated above. Two inputs are specified, one for the model and another one for the test data. 

In [None]:
import json
from sagemaker.s3 import S3Downloader

sklearn_processor.run(
    code="evaluation.py",
    inputs=[
        ProcessingInput(source=model_data_s3_uri, destination="/opt/ml/processing/model"),
        ProcessingInput(source=preprocessing_output_path, destination="/opt/ml/processing/test"),
    ],
    outputs=[ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
)
evaluation_job_description = sklearn_processor.jobs[-1].describe()

In [None]:
client = boto3.client('s3')
s3_path=evaluation_job_description["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
bucket, key = s3_path.split("//")[1].split("/",1)
result = client.get_object(Bucket=bucket, Key= key + '/evaluation.json') 
json.loads(result['Body'].read().decode('utf-8'))

#### Inspect Evaluation result

The JSON file that was created in the evaluation job can now be read and inspected. 

### Model Deployment

In [None]:
predictor = sklearn.deploy(instance_type='ml.m4.xlarge', initial_instance_count=1)

#### Deploy Estimator to Sagemaker Endpoint

After evaluating our model, we can now go on and deploy it. In order to do so, we only have to call `deploy()` on the `sklearn` object that we used for model training.

#### Test Sagemaker Endpoint

We can now run our first test against our model endpoint directly from our jupyter notebook. To do so, we can simply take some of the training features, add them to a request and then call our model by using the Sagemaker client with the `invoce_endpoint` method. 

In [None]:
# Load in the deploy_test data
deploy_test = training_features.head(2).values.tolist()

# Format the deploy_test data features
request_body = ""
for sample in deploy_test:
    request_body += ",".join([str(n) for n in sample]) + "|"
request_body = request_body[:-1] 
print("*"*20)
print(f"Calling Sagemaker Endopint with the following request_body: {request_body}")

# create sagemaker client using boto3
client = boto3.client('sagemaker-runtime')

# Specify endpoint and content_type
endpoint_name = predictor.endpoint
content_type = 'text/csv'

# Make call to endpoint
response = client.invoke_endpoint(
    EndpointName=endpoint_name,
    Body=request_body,
    ContentType=content_type
    )
response_from_endpoint = response['Body'].read().decode("utf-8")
print("*"*20)
print(f"Response from Endpoint: {response_from_endpoint}")

#### Delete Endpoint, if no longer in use

Because your endpoint has incurring costs while in use, it is advisable to delete it as soon as it is no longer needed. If you follow this tutorial for testing purposes, make sure that your endpoint is deleted as soon as you stop working on it. 

In [None]:
# This call will delete the endpoint
# predictor.delete_endpoint()

Beware that directly calling the model endpoint should only be done for testing purposes. If you want to make your model available for live predictions, it is advisable to add a proper REST API that handles incoming requests. How this can be done, will be described in the next step.

## Build REST API

#### Create Lambda Function for handling API <-> Sagemaker Endpoint traffic

First we will write a lambda function for handling the traffic between our REST API and our model enpoint. It will be receiving requests from the API as input, invoke the model endpoint and return the results.  

In [None]:
%%writefile serving_lambda.py

import os
import boto3
import json

endpoint_name = os.environ['ENDPOINT_NAME']
runtime= boto3.client('runtime.sagemaker')

def lambda_handler(event, context):
    print("Received event: " + json.dumps(event, indent=2))
    
    data = json.loads(json.dumps(event))
    payload = json.loads(data['data'])
    print(payload)
    
    # Format the data so that it can be processed by the model endpoint
    request_body = ""
    for sample in payload:
        request_body += ",".join([str(n) for n in sample]) + "|"
    request_body = request_body[:-1] 
    print("request_body: ", request_body)
    
    response = runtime.invoke_endpoint(EndpointName=endpoint_name,
                                       ContentType='text/csv',
                                       Body=request_body)
                                       
    label = response['Body'].read().decode('utf-8').strip("[]").strip("'")
    
    return label

#### Configure API Gateway

This tutrial will walk you through setting up API Gateway via the management console. If you whish to run this in production, it is advisable to provision and configure this ressource with an infrastructure management tool, such as AWS Cloud Formation or Terraform. 

##### Step I: Go to API Gateway & Select Create new REST Endpoint

![REST API](img/REST.png)

##### Step II: Choose a name and create a new API

![REST API](img/CREATE_NEW.png)

##### Step III: Create a new method of type POST and choose your lambda as target

![REST API](img/POST.png)

##### Step IV: Deploy API

![REST API](img/DEPLOY.png)

##### Step V: Go to APIs --> Stages --> Inspect your newly created stage and collect Invocation Endpoint
Set invocation endpoint URL as environemnt variable `"API_URL"`

#### Invoke Request against REST API

After you have successfully created your REST API with API Gateway, you can now test it.

In [None]:
import requests

url = os.getenv("API_URL")
payload = json.dumps({"data":"[[-0.6396528091784842, 0.3738717119645826, -0.9980179785096928, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0]]"})
print(f"Calling ML Api with the following payload {payload}")
response = requests.post(url, data=payload)
print("*"*20)
print(f"Return Message. Status code: {response.status_code}, Message: {response.text}")

### Outro 

That's it. After following all steps, you should now have successfully created an end-to-end ml pipeline with AWS Sagemaker and configured a REST API that serves your predictions online. WOW!!!