In [None]:
# Import the latest sagemaker and boto3 SDKs.
import sys

!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker>=2.1.0<3" tqdm
!{sys.executable} -m pip install -qU "stepfunctions==2.0.0"
!{sys.executable} -m pip show sagemaker stepfunctions

Restart your SageMaker kernel then continue with this notebook.

In [None]:
# Replace `None` with the project name when creating SageMaker Project
# You can find it from the left panel in Studio

PROJECT_NAME = None

assert PROJECT_NAME is not None and isinstance(
    PROJECT_NAME, str
), "Please specify the project name as string"

In [None]:
import boto3
from IPython.core.display import HTML, display


def get_provisioned_product_name(project_name):
    region = boto3.Session().region_name
    sc = boto3.client("servicecatalog")
    products = sc.search_provisioned_products(
        Filters={
            "SearchQuery": [
                project_name,
            ]
        }
    )
    pp = products["ProvisionedProducts"]
    if len(pp) != 1:
        print("Invalid provisioned product name. Open the link below and search manually")
        display(
            HTML(
                f'<a target="_blank" href="https://{region}.console.aws.amazon.com/servicecatalog/home?region={region}#provisioned-products">Service Catalog</a>'
            )
        )
        raise ValueError("Invalid provisioned product")

    return pp[0]["Name"]


PROVISIONED_PRODUCT_NAME = get_provisioned_product_name(PROJECT_NAME)
print(
    f"The associated Service Catalog Provisioned Product Name to this SagaMaker project: {PROVISIONED_PRODUCT_NAME}"
)

In case of any errors, you can examine the Service Catalog console from the above link and find the associated provisioned product name which is something like `example-p-1v7hbpwe594n` and assigns it to `PROVISIONED_PRODUCT_NAME` manually.

## Data Prep
 
In this section of the notebook, you will download the publicly available New York Taxi dataset in preparation for uploading it to S3.

### Download Dataset

First, download a sample of the New York City Taxi [dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/)⇗ to this notebook instance. This dataset contains information on trips taken by taxis and for-hire vehicles in New York City, including pick-up and drop-off times and locations, fares, distance traveled, and more. 

In [None]:
!aws s3 cp 's3://nyc-tlc/trip data/green_tripdata_2018-02.csv' 'nyc-tlc.csv'

Now load the dataset into a pandas data frame, taking care to parse the dates correctly.

In [None]:
import pandas as pd

parse_dates = ["lpep_dropoff_datetime", "lpep_pickup_datetime"]
trip_df = pd.read_csv("nyc-tlc.csv", parse_dates=parse_dates)

trip_df.head()

### Data manipulation

Instead of the raw date and time features for pick-up and drop-off, let's use these features to calculate the total time of the trip in minutes, which will be easier to work with for our model.

In [None]:
trip_df["duration_minutes"] = (
    trip_df["lpep_dropoff_datetime"] - trip_df["lpep_pickup_datetime"]
).dt.seconds / 60

The dataset contains a lot of columns we don't need, so let's select a sample of columns for our machine learning model. Keep only `total_amount` (fare), `duration_minutes`, `passenger_count`, and `trip_distance`.

In [None]:
cols = ["total_amount", "duration_minutes", "passenger_count", "trip_distance"]
data_df = trip_df[cols]
print(data_df.shape)
data_df.head()

Generate some quick statistics for the dataset to understand the quality.

In [None]:
data_df.describe()

The table above shows some clear outliers, e.g. -400 or 2626 as fare, or 0 passengers. There are many intelligent methods for identifying and removing outliers, but data cleaning is not the focus of this notebook, so just remove the outliers by setting some min and max values which seem more reasonable. Removing the outliers results in a final dataset of 754,671 rows.

In [None]:
data_df = data_df[
    (data_df.total_amount > 0)
    & (data_df.total_amount < 200)
    & (data_df.duration_minutes > 0)
    & (data_df.duration_minutes < 120)
    & (data_df.trip_distance > 0)
    & (data_df.trip_distance < 121)
    & (data_df.passenger_count > 0)
].dropna()
print(data_df.shape)

### Data visualization

Since this notebook will build a regression model for the taxi data, it's a good idea to check if there is any correlation between the variables in our data. Use scatter plots on a sample of the data to compare trip distance with duration in minutes, and total amount (fare) with duration in minutes.

In [None]:
import seaborn as sns

sample_df = data_df.sample(1000)
sns.scatterplot(data=sample_df, x="duration_minutes", y="trip_distance")

In [None]:
sns.scatterplot(data=sample_df, x="duration_minutes", y="total_amount")

These scatter plots look fine and show at least some correlation between our variables. 

### Data splitting and saving

We are now ready to split the dataset into train, validation, and test sets. 

In [None]:
from sklearn.model_selection import train_test_split

train_df, val_df = train_test_split(data_df, test_size=0.20, random_state=42)
val_df, test_df = train_test_split(val_df, test_size=0.05, random_state=42)

# Reset the index for our test dataframe
test_df.reset_index(inplace=True, drop=True)

print(
    "Size of\n train: {},\n val: {},\n test: {} ".format(
        train_df.shape[0], val_df.shape[0], test_df.shape[0]
    )
)

In [None]:
train_cols = ["total_amount", "duration_minutes", "passenger_count", "trip_distance"]
train_df.to_csv("train.csv", index=False, header=False)
val_df.to_csv("validation.csv", index=False, header=False)
test_df.to_csv("test.csv", index=False, header=False)

# Save test and baseline with headers
train_df.to_csv("baseline.csv", index=False, header=True)

Now upload these CSV files to your default SageMaker S3 bucket. 

In [None]:
import sagemaker

# Get the session and default bucket
session = sagemaker.session.Session()
bucket = session.default_bucket()

# Specify data prefix and version
prefix = "nyc-tlc/v1"

s3_train_uri = session.upload_data("train.csv", bucket, prefix + "/data/training")
s3_val_uri = session.upload_data("validation.csv", bucket, prefix + "/data/validation")
s3_test_uri = session.upload_data("test.csv", bucket, prefix + "/data/test")
s3_baseline_uri = session.upload_data("baseline.csv", bucket, prefix + "/data/baseline")

You will use the datasets which you have prepared and saved in this section to trigger the pipeline to train and deploy a model in the next section.

In [None]:
import boto3
from botocore.exceptions import ClientError
import os
import time


def get_config(provisioned_product_name):
    sc = boto3.client("servicecatalog")
    outputs = sc.get_provisioned_product_outputs(ProvisionedProductName=provisioned_product_name)[
        "Outputs"
    ]
    config = {}
    for out in outputs:
        config[out["OutputKey"]] = out["OutputValue"]
    return config


config = get_config(PROVISIONED_PRODUCT_NAME)
region = config["Region"]
artifact_bucket = config["ArtifactBucket"]
pipeline_name = config["PipelineName"]
model_name = config["ModelName"]
workflow_pipeline_arn = config["WorkflowPipelineARN"]

print("region: {}".format(region))
print("artifact bucket: {}".format(artifact_bucket))
print("pipeline: {}".format(pipeline_name))
print("model name: {}".format(model_name))
print("workflow: {}".format(workflow_pipeline_arn))

In [None]:
from io import BytesIO
import zipfile
import json

input_data = {
    "TrainingUri": s3_train_uri,
    "ValidationUri": s3_val_uri,
    "TestUri": s3_test_uri,
    "BaselineUri": s3_baseline_uri,
}

hyperparameters = {"num_round": 50}

zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, "a") as zf:
    zf.writestr("inputData.json", json.dumps(input_data))
    zf.writestr("hyperparameters.json", json.dumps(hyperparameters))
zip_buffer.seek(0)

data_source_key = "{}/data-source.zip".format(pipeline_name)

Now upload the zip package to your artifact S3 bucket - this action will trigger the pipeline to train and deploy a model.

In [None]:
s3 = boto3.client("s3")
s3.put_object(Bucket=artifact_bucket, Key=data_source_key, Body=bytearray(zip_buffer.read()))

In [None]:
from IPython.core.display import HTML

HTML(
    '<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codepipeline/pipelines/{1}/view?region={0}">Code Pipeline</a>'.format(
        region, pipeline_name
    )
)

In [None]:
codepipeline = boto3.client("codepipeline")


def get_pipeline_stage(pipeline_name, stage_name):
    response = codepipeline.get_pipeline_state(name=pipeline_name)
    for stage in response["stageStates"]:
        if stage["stageName"] == stage_name:
            return stage


# Get last execution id
build_stage = get_pipeline_stage(pipeline_name, "Build")
if not "latestExecution" in build_stage:
    raise (Exception("Please wait.  Build not started"))

build_url = build_stage["actionStates"][0]["latestExecution"]["externalExecutionUrl"]

# Out a link to the code build logs
HTML('<a target="_blank" href="{0}">Code Build Logs</a>'.format(build_url))

The AWS CodeBuild process is responsible for creating a number of AWS CloudFormation templates which we will explore in more detail in the next section.  Two of these templates are used to set up the **Train** step by creating the AWS Step Functions worklow and the custom AWS Lambda functions used within this workflow.

## Train Model



In [None]:
from stepfunctions.workflow import Workflow

while True:
    try:
        workflow = Workflow.attach(workflow_pipeline_arn)
        break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

workflow

In [None]:
%store input_data PROVISIONED_PRODUCT_NAME

### Training Analytics

Once the training and baseline jobs are complete (meaning they are displayed in a green color in the Step Functions workflow, this takes around 5 minutes), you can inspect the experiment metrics. The code below will display all experiments in a table. Note that the baseline processing job won't have RMSE metrics - it calculates metrics based on the training data, but does not train a machine learning model. 

You will [explore the baseline](#Explore-Baseline) results later in this notebook. <a id="validation-results"></a>

In [None]:
from sagemaker import analytics

experiment_name = "mlops-{}".format(model_name)
model_analytics = analytics.ExperimentAnalytics(experiment_name=experiment_name)
analytics_df = model_analytics.dataframe()

if analytics_df.shape[0] == 0:
    raise (Exception("Please wait.  No training or baseline jobs"))

pd.set_option("display.max_colwidth", 100)  # Increase column width to show full copmontent name
cols = [
    "TrialComponentName",
    "DisplayName",
    "SageMaker.InstanceType",
    "train:rmse - Last",
    "validation:rmse - Last",
]  # return the last rmse for training and validation
analytics_df[analytics_df.columns & cols].head(2)

## Deploy Dev

### Test Dev Deployment

When the pipeline has finished training a model, it automatically moves to the next step, where the model is deployed as a SageMaker Endpoint. This endpoint is part of your dev deployment, therefore, in this section, you will run some tests on the endpoint to decide if you want to deploy this model into production.

First, run the cell below to fetch the name of the SageMaker Endpoint.

In [None]:
codepipeline = boto3.client("codepipeline")

deploy_dev = get_pipeline_stage(pipeline_name, "DeployDev")
if not "latestExecution" in deploy_dev:
    raise (Exception("Please wait.  Deploy dev not started"))

execution_id = deploy_dev["latestExecution"]["pipelineExecutionId"]
dev_endpoint_name = "mlops-{}-dev-{}".format(model_name, execution_id)

print("endpoint name: {}".format(dev_endpoint_name))

In [None]:
sm = boto3.client("sagemaker")

while True:
    try:
        response = sm.describe_endpoint(EndpointName=dev_endpoint_name)
        print("Endpoint status: {}".format(response["EndpointStatus"]))
        if response["EndpointStatus"] == "InService":
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Now that your endpoint is ready, let's write some code to run the test data (which you split off from the dataset and saved to file at the start of this notebook) through the endpoint for inference. The code below supports both v1 and v2 of the SageMaker SDK, but we recommend using v2 of the SDK in all of your future projects.

In [None]:
import numpy as np
from tqdm import tqdm

from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer


def get_predictor(endpoint_name):
    xgb_predictor = Predictor(endpoint_name)
    xgb_predictor.serializer = CSVSerializer()
    return xgb_predictor


def predict(predictor, data, rows=500):
    split_array = np.array_split(data, round(data.shape[0] / float(rows)))
    predictions = ""
    for array in tqdm(split_array):
        predictions = ",".join([predictions, predictor.predict(array).decode("utf-8")])
    return np.fromstring(predictions[1:], sep=",")

Now use the `predict` function, which was defined in the code above, to run the test data through the endpoint and generate the predictions.

In [None]:
dev_predictor = get_predictor(dev_endpoint_name)
predictions = predict(dev_predictor, test_df[test_df.columns[1:]].values)

Next, load the predictions into a data frame, and join it with your test data. Then, calculate absolute error as the difference between the actual taxi fare and the predicted taxi fare. Display the results in a table, sorted by the highest absolute error values.

In [None]:
pred_df = pd.DataFrame({"total_amount_predictions": predictions})
pred_df = test_df.join(pred_df)  # Join on all
pred_df["error"] = abs(pred_df["total_amount"] - pred_df["total_amount_predictions"])

pred_df.sort_values("error", ascending=False).head()

From this table, we note that some short trip distances have large errors because the low predicted fare does not match the high actual fare. This could be the result of a generous tip which we haven't included in this dataset.

You can also analyze the results by plotting the absolute error to visualize outliers. In this graph, we see that most of the outliers are cases where the model predicted a much lower fare than the actual fare. There are only a few outliers where the model predicted a higher fare than the actual fare.

In [None]:
sns.scatterplot(data=pred_df, x="total_amount_predictions", y="total_amount", hue="error")

If you want one overall measure of quality for the model, you can calculate the root mean square error (RMSE) for the predicted fares compared to the actual fares. Compare this to the [results calculated on the validation set](#validation-results) at the end of the 'Inspect Training Job' section.

In [None]:
from math import sqrt
from sklearn.metrics import mean_squared_error


def rmse(pred_df):
    return sqrt(mean_squared_error(pred_df["total_amount"], pred_df["total_amount_predictions"]))


print("RMSE: {}".format(rmse(pred_df)))

In [None]:
from sklearn.metrics import r2_score

def accuracy(pred_df):
    return r2_score(pred_df["total_amount"], pred_df["total_amount_predictions"])

print("Accuracy (R-squared): {}".format(accuracy(pred_df)))

## Deploy Prod

### Approve Deployment to Production

If you are happy with the results of the model, you can go ahead and approve the model to be deployed into production. You can do so by clicking the **Review** button in the CodePipeline UI, leaving a comment to explain why you approve this model, and clicking on **Approve**. 

Alternatively, you can create a Jupyter widget which (when enabled) allows you to comment and approve the model directly from this notebook. Run the cell below to see this in action.

In [None]:
import ipywidgets as widgets


def on_click(obj):
    result = {"summary": approval_text.value, "status": obj.description}
    response = codepipeline.put_approval_result(
        pipelineName=pipeline_name,
        stageName="DeployDev",
        actionName="ApproveDeploy",
        result=result,
        token=approval_action["token"],
    )
    button_box.close()
    print(result)


# Create the widget if we are ready for approval
deploy_dev = get_pipeline_stage(pipeline_name, "DeployDev")
if not "latestExecution" in deploy_dev["actionStates"][-1]:
    raise (Exception("Please wait.  Deploy dev not complete"))

approval_action = deploy_dev["actionStates"][-1]["latestExecution"]
if approval_action["status"] == "Succeeded":
    print("Dev approved: {}".format(approval_action["summary"]))
elif "token" in approval_action:
    approval_text = widgets.Text(placeholder="Optional approval message")
    approve_btn = widgets.Button(description="Approved", button_style="success", icon="check")
    reject_btn = widgets.Button(description="Rejected", button_style="danger", icon="close")
    approve_btn.on_click(on_click)
    reject_btn.on_click(on_click)
    button_box = widgets.HBox([approval_text, approve_btn, reject_btn])
    display(button_box)
else:
    raise (Exception("Please wait. No dev approval"))

### Test Production Deployment



In [None]:
deploy_prd = get_pipeline_stage(pipeline_name, "DeployPrd")
if not "latestExecution" in deploy_prd or not "latestExecution" in deploy_prd["actionStates"][0]:
    raise (Exception("Please wait.  Deploy prd not started"))

execution_id = deploy_prd["latestExecution"]["pipelineExecutionId"]

In [None]:
from datetime import datetime, timedelta
from dateutil.tz import tzlocal


def get_event_dataframe(events):
    stack_cols = [
        "LogicalResourceId",
        "ResourceStatus",
        "ResourceStatusReason",
        "Timestamp",
    ]
    stack_event_df = pd.DataFrame(events)[stack_cols].fillna("")
    stack_event_df["TimeAgo"] = datetime.now(tzlocal()) - stack_event_df["Timestamp"]
    return stack_event_df.drop("Timestamp", axis=1)


cfn = boto3.client("cloudformation")

stack_name = stack_name = "{}-deploy-prd".format(pipeline_name)
print("stack name: {}".format(stack_name))

# Get latest stack events
while True:
    try:
        response = cfn.describe_stack_events(StackName=stack_name)
        break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

get_event_dataframe(response["StackEvents"]).head()

Use the code below to fetch the name of the endpoint, then run a loop to wait for the endpoint to be fully deployed. You need the status to be 'InService'.

In [None]:
prd_endpoint_name = "mlops-{}-prd-{}".format(model_name, execution_id)
print("prod endpoint: {}".format(prd_endpoint_name))

In [None]:
sm = boto3.client("sagemaker")

while True:
    try:
        response = sm.describe_endpoint(EndpointName=prd_endpoint_name)
        print("Endpoint status: {}".format(response["EndpointStatus"]))
        # Wait until the endpoint is in service with data capture enabled
        if (
            response["EndpointStatus"] == "InService"
            and "DataCaptureConfig" in response
            and response["DataCaptureConfig"]["EnableCapture"]
        ):
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

When the endpoint status is 'InService', you can continue. Earlier in this notebook, you created some code to send data to the dev endpoint. Reuse this code now to send a sample of the test data to the production endpoint. Since data capture is enabled on this endpoint, you want to send single records at a time, so the model monitor can map these records to the baseline. 

You will [inspect the model monitor](#Inspect-Model-Monitor) later in this notebook. For now, just check if you can send data to the endpoint and receive predictions in return.

In [None]:
prd_predictor = get_predictor(prd_endpoint_name)
sample_values = test_df[test_df.columns[1:]].sample(100).values
predictions = predict(prd_predictor, sample_values, rows=1)
predictions

## Cleanup

Execute the following cell to delete the stacks created in the pipeline. For a model name of **nyctaxi** these would be:

1. *nyctaxi*-deploy-prd
2. *nyctaxi*-deploy-dev
3. *nyctaxi*-workflow
4. sagemaker-custom-resource

In [None]:
cfn = boto3.client("cloudformation")

# Delete the prod and then dev stack
for stack_name in [
    f"{pipeline_name}-deploy-prd",
    f"{pipeline_name}-deploy-dev",
    f"{pipeline_name}-workflow",
    f"mlops-{model_name}-{config['SageMakerProjectId']}-sagemaker-custom-resource",
]:
    print("Deleting stack: {}".format(stack_name))
    cfn.delete_stack(StackName=stack_name)
    cfn.get_waiter("stack_delete_complete").wait(StackName=stack_name)

The following code will delete the dashboard.

In [None]:
cloudwatch.delete_dashboards(DashboardNames=[dashboard_name])
print("Dashboard deleted")

The following code will clean up all objects in the artifact bucket and delete the SageMaker project.

In [None]:
s3_resource = boto3.resource('s3')
s3_artifact_bucket = s3_resource.Bucket(artifact_bucket)
s3_artifact_bucket.object_versions.delete()
print("Artifact bucket objects deleted")

sm.delete_project(
    ProjectName=PROJECT_NAME
)
print("SageMaker Project deleted")

Finally, close this notebook.