In [1]:
import numpy as np
import pandas as pd
import xgboost
import joblib
import tarfile
from time import gmtime, strftime
import time
import sys
import os
import boto3
from sagemaker import Session
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker import get_execution_role

### Loading the pre-trained model

In [3]:
model_pkl=joblib.load('../models/flow_mosal.pkl')
booster=model_pkl.get_booster()
booster.save_model('xgboost-model')
# add xgboost-model to tar.gz file, the model file also need to tar-zipped.
fp = tarfile.open("model.tar.gz","w:gz")
fp.add('xgboost-model')
fp.close()

### Upload the model to S3

In [5]:
bucket = Session().default_bucket()
prefix = "sagemaker/iris"
model_file_name = 'xgboost-model'

In [6]:
fObj = open("model.tar.gz", "rb")
key = os.path.join(prefix, model_file_name, "model.tar.gz")
boto3.Session().resource("s3").Bucket(bucket).Object(key).upload_fileobj(fObj)

### Set up hosting for the model
This involves creating a SageMaker model from the model file previously uploaded to S3.

In [7]:
container = get_image_uri(boto3.Session().region_name, "xgboost", "1.5-1")

The method get_image_uri has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


In [9]:
region = Session().boto_region_name
#xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.5-1")

role = get_execution_role()

In [10]:
%%time
model_name = model_file_name + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
model_url = "https://s3-{}.amazonaws.com/{}/{}".format(region, bucket, key)
sm_client = boto3.client("sagemaker")

print(model_url)

primary_container = {
    "Image": container,
    "ModelDataUrl": model_url,
}

create_model_response2 = sm_client.create_model(
    ModelName=model_name, ExecutionRoleArn=role, PrimaryContainer=primary_container
)

print(create_model_response2["ModelArn"])

https://s3-ca-central-1.amazonaws.com/sagemaker-ca-central-1-013272036621/sagemaker/iris/xgboost-model/model.tar.gz
arn:aws:sagemaker:ca-central-1:013272036621:model/xgboost-model2022-12-13-21-47-57
CPU times: user 64.1 ms, sys: 26.1 ms, total: 90.2 ms
Wall time: 502 ms


### Create endpoint configuration

SageMaker supports configuring REST endpoints in hosting with multiple models, e.g. for A/B testing purposes. In order to support this, you can create an endpoint configuration, that describes the distribution of traffic across the models, whether split, shadowed, or sampled in some wa

In [11]:
endpoint_config_name = "DEMO-XGBoostEndpointConfig-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "InstanceType": "ml.m4.xlarge",
            "InitialInstanceCount": 1,
            "InitialVariantWeight": 1,
            "ModelName": model_name,
            "VariantName": "AllTraffic",
        }
    ],
)

print("Endpoint Config Arn: " + create_endpoint_config_response["EndpointConfigArn"])

DEMO-XGBoostEndpointConfig-2022-12-13-21-48-01
Endpoint Config Arn: arn:aws:sagemaker:ca-central-1:013272036621:endpoint-config/demo-xgboostendpointconfig-2022-12-13-21-48-01


### Create endpoint
Needs:
- The name and configuration defined above.
- The end result is an endpoint that can be validated and incorporated into production applications. 

*This takes 9-11 minutes to complete.*

In [12]:
%%time

endpoint_name = "DEMO-XGBoostEndpoint-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
)
print(create_endpoint_response["EndpointArn"])

resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
print("Status: " + status)

while status == "Creating":
    time.sleep(60)
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp["EndpointStatus"]
    print("Status: " + status)

print("Arn: " + resp["EndpointArn"])
print("Status: " + status)

DEMO-XGBoostEndpoint-2022-12-13-21-48-06
arn:aws:sagemaker:ca-central-1:013272036621:endpoint/demo-xgboostendpoint-2022-12-13-21-48-06
Status: Creating
Status: Creating
Status: Creating
Status: InService
Arn: arn:aws:sagemaker:ca-central-1:013272036621:endpoint/demo-xgboostendpoint-2022-12-13-21-48-06
Status: InService
CPU times: user 51.2 ms, sys: 7.44 ms, total: 58.6 ms
Wall time: 3min


In [12]:
runtime_client = boto3.client("runtime.sagemaker")

### Validate the model for use: Batch processing
- Get the endpoint from the client library using the result from previous operations
- Prepare the data
- Generate classifications from the model using that endpoint

In [5]:
point_X = np.array([[5.3,3.7,1.6],
                   [4.8,3.0,0.5],
                   [5.9,3,5.1]
                   ])   

In [6]:
np.savetxt("../data/test_point.csv", point_X, delimiter=",")

In [31]:
# batch processing
def do_predict(data, endpoint_name, content_type):
    payload = "\n".join(data)
    response = runtime_client.invoke_endpoint(
        EndpointName=endpoint_name, ContentType=content_type, Body=payload
    )
    result = response["Body"].read()
    result = result.decode("utf-8")
    result = result.strip("\n0").split("\n")
    preds = list(map(float,result))
    return  preds


def batch_predict(data, batch_size, endpoint_name, content_type):
    items = len(data)
    arrs = []

    for offset in range(0, items, batch_size):
        if offset + batch_size < items:
            results = do_predict(data[offset : (offset + batch_size)], endpoint_name, content_type)
            arrs.extend(results)
        else:
            arrs.extend(do_predict(data[offset:items], endpoint_name, content_type))
        sys.stdout.write(".")
    return arrs

In [33]:
%%time
import json
import numpy as np


FILE_TEST = '../data/test_point.csv'

def predict_batch(test_file):
    with open(test_file, "r") as f:
        payload = f.read().strip()

    labels = [line.split(",")[0] for line in payload.split("\n")]
    test_data = [line for line in payload.split("\n")]
    preds = batch_predict(test_data, 5, 'DEMO-XGBoostEndpoint-2022-12-13-21-48-06', "csv")
    
    return preds

    # print(
    #     "\n Median Absolute Percent Error (MdAPE) = ",
    #     np.median(np.abs(np.array(labels) - np.array(preds)) / np.array(labels)),
    # )

CPU times: user 7 µs, sys: 0 ns, total: 7 µs
Wall time: 11.7 µs


In [33]:
preds


[0, 0, 0]

In [13]:
#sm_client.delete_endpoint(EndpointName=endpoint_name)


In [None]:
### Testing

In [4]:
test_features_file = '../data/features_iris.csv'
test_targt_file = '../data/target_iris.csv'

test_iris_features = pd.read_csv(test_features_file,header = None)
test_iris_target = pd.read_csv(test_targt_file)

In [58]:
pkl_model_predict = list(model_pkl.predict(test_iris_features))


In [62]:
predict_proba_list = predict_batch(test_features_file)


......

In [59]:
end_point_predict = list(map(int,map(lambda x:x>0.5,predict_proba_list)))

In [65]:
assert pkl_model_predict ==end_point_predict , 'end point do not result the same outputs as the pickle model' 