# Attribute-Based Access Control with  Amazon SageMaker Multi-Model Endpoints  
This Notebook is based on the [XGBoot Multi-Model example](https://github.com/aws/amazon-sagemaker-examples/blob/master/advanced_functionality/multi_model_xgboost_home_value/xgboost_multi_model_endpoint_home_value.ipynb). Please
see that Notebook for details on Multi-Model Endpoint. This one focuses on the ABAC.

### Contents

1. [Generate synthetic data for housing models](#Generate-synthetic-data-for-housing-models)
1. [Train multiple house value prediction models](#Train-multiple-house-value-prediction-models)
1. [Create the Amazon SageMaker MultiDataModel entity](#Create-the-Amazon-SageMaker-MultiDataModel-entity)
  1. [Create the Multi-Model Endpoint](#Create-the-multi-model-endpoint)
  1. [Deploy the Multi-Model Endpoint](#deploy-the-multi-model-endpoint)
1. [Get Predictions from the endpoint](#Get-predictions-from-the-endpoint)
1. [Additional Information](#Additional-information)
1. [Clean up](#Clean-up)

# Generate synthetic data

The code below contains helper functions to generate synthetic data in the form of a `1x7` numpy array representing the features of a house.

The first entry in the array is the randomly generated price of a house. The remaining entries are the features (i.e. number of bedroom, square feet, number of bathrooms, etc.).

These functions will be used to generate synthetic data for training, validation, and testing. It will also allow us to submit synthetic payloads for inference to test our multi-model endpoint.

In [1]:
!pwd
!pip install -Uq sagemaker

/home/ec2-user/SageMaker/abac_multi_model_xgboost_home_value
You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m


In [2]:
import sagemaker
import boto3
import numpy as np
import pandas as pd
import time

In [3]:
# When an endpoint has already been deployed, you can experiment by setting this to False.
TRAIN = True
print("Training" if TRAIN else "Not training. Will use existing endpoint")
# We allow invocation of given model in the endpoint only if it matches the tenant. 
# The model is identified by its filename, so we use that for `TENANT_ID`.

TENANT_ID = "LosAngeles_CA.tar.gz"

account_num = boto3.client('sts').get_caller_identity().get('Account')
role = sagemaker.get_execution_role()

tenant_role_to_assume = f"arn:aws:iam::{account_num}:role/TestInvokeEndpointRole"

print('Execution role for this Notebook', role, ' will assume ', tenant_role_to_assume)

Training
Execution role for this Notebook arn:aws:iam::649592902942:role/TrackingServiceRole  will assume  arn:aws:iam::649592902942:role/TestInvokeEndpointRole


In [4]:
NUM_HOUSES_PER_LOCATION = 1000
# We use only 4 for testing ABAC, but more are available.
LOCATIONS = [
     "NewYork_NY",
     "LosAngeles_CA",
    "Chicago_IL",
    "Houston_TX",
#     "Dallas_TX",
#     "Phoenix_AZ",
#     "Philadelphia_PA",
#     "SanAntonio_TX",
#     "SanDiego_CA",
#     "SanFrancisco_CA",
]
PARALLEL_TRAINING_JOBS = 4  # len(LOCATIONS) if your account limits can handle it
MAX_YEAR = 2019

In [5]:
def gen_price(house):
    _base_price = int(house["SQUARE_FEET"] * 150)
    _price = int(
        _base_price
        + (10000 * house["NUM_BEDROOMS"])
        + (15000 * house["NUM_BATHROOMS"])
        + (15000 * house["LOT_ACRES"])
        + (15000 * house["GARAGE_SPACES"])
        - (5000 * (MAX_YEAR - house["YEAR_BUILT"]))
    )
    return _price

def gen_random_house():
    _house = {
        "SQUARE_FEET": int(np.random.normal(3000, 750)),
        "NUM_BEDROOMS": np.random.randint(2, 7),
        "NUM_BATHROOMS": np.random.randint(2, 7) / 2,
        "LOT_ACRES": round(np.random.normal(1.0, 0.25), 2),
        "GARAGE_SPACES": np.random.randint(0, 4),
        "YEAR_BUILT": min(MAX_YEAR, int(np.random.normal(1995, 10))),
    }
    _price = gen_price(_house)
    return [
        _price,
        _house["YEAR_BUILT"],
        _house["SQUARE_FEET"],
        _house["NUM_BEDROOMS"],
        _house["NUM_BATHROOMS"],
        _house["LOT_ACRES"],
        _house["GARAGE_SPACES"],
    ]


def gen_houses(num_houses):
    _house_list = []
    for i in range(num_houses):
        _house_list.append(gen_random_house())
    _df = pd.DataFrame(
        _house_list,
        columns=[
            "PRICE",
            "YEAR_BUILT",
            "SQUARE_FEET",
            "NUM_BEDROOMS",
            "NUM_BATHROOMS",
            "LOT_ACRES",
            "GARAGE_SPACES",
        ],
    )
    return _df

# Train multiple house value prediction models

We will launch multiple training jobs asynchronously.

In [6]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import image_uris
import boto3

from sklearn.model_selection import train_test_split

s3 = boto3.resource("s3")

sagemaker_session = sagemaker.Session()
role = get_execution_role()
 
BUCKET = sagemaker_session.default_bucket()
 
print(boto3.Session().region_name)
# This is references the AWS managed XGBoost container
XGBOOST_IMAGE = image_uris.retrieve(
    region=boto3.Session().region_name, framework="xgboost", version="1.0-1"
)

DATA_PREFIX = "XGBOOST_BOSTON_HOUSING"
MULTI_MODEL_ARTIFACTS = "multi_model_artifacts"

TRAIN_INSTANCE_TYPE = "ml.m4.xlarge"
ENDPOINT_INSTANCE_TYPE = "ml.m4.xlarge"

ENDPOINT_NAME = "mme-xgboost-housing1"

MODEL_NAME = ENDPOINT_NAME

us-west-2


### Split a given dataset into train, validation, and test

The code below will generate 3 sets of data. 1 set to train, 1 set for validation and 1 for testing.

In [7]:
SEED = 7
SPLIT_RATIOS = [0.6, 0.3, 0.1]


def split_data(df):
    # split data into train and test sets
    seed = SEED
    val_size = SPLIT_RATIOS[1]
    test_size = SPLIT_RATIOS[2]

    num_samples = df.shape[0]
    X1 = df.values[:num_samples, 1:]  # keep only the features, skip the target, all rows
    Y1 = df.values[:num_samples, :1]  # keep only the target, all rows

    # Use split ratios to divide up into train/val/test
    X_train, X_val, y_train, y_val = train_test_split(
        X1, Y1, test_size=(test_size + val_size), random_state=seed
    )
    # Of the remaining non-training samples, give proper ratio to validation and to test
    X_test, X_test, y_test, y_test = train_test_split(
        X_val, y_val, test_size=(test_size / (test_size + val_size)), random_state=seed
    )
    # reassemble the datasets with target in first column and features after that
    _train = np.concatenate([y_train, X_train], axis=1)
    _val = np.concatenate([y_val, X_val], axis=1)
    _test = np.concatenate([y_test, X_test], axis=1)

    return _train, _val, _test

### Launch a  training job for a given housing location

In [8]:
NUM_ROUNDS =2 # Set to 25 for better performance. This number is just to save time and cost in testing ABAC.
def launch_training_job(location):
    # clear out old versions of the data
    s3_bucket = s3.Bucket(BUCKET)
    full_input_prefix = f"{DATA_PREFIX}/model_prep/{location}"
    s3_bucket.objects.filter(Prefix=full_input_prefix + "/").delete()

    # upload the entire set of data for all three channels
    local_folder = f"data/{location}"
    inputs = sagemaker_session.upload_data(path=local_folder, key_prefix=full_input_prefix)
    print(f"Training data uploaded: {inputs}")

    _job = "xgb-{}".format(location.replace("_", "-"))
    full_output_prefix = f"{DATA_PREFIX}/model_artifacts/{location}"
    s3_output_path = f"s3://{BUCKET}/{full_output_prefix}"

    xgb = sagemaker.estimator.Estimator(
        XGBOOST_IMAGE,
        role,
        instance_count=1,
        instance_type=TRAIN_INSTANCE_TYPE,
        output_path=s3_output_path,
        base_job_name=_job,
        sagemaker_session=sagemaker_session,
    )

    xgb.set_hyperparameters(
        max_depth=5,
        eta=0.2,
        gamma=4,
        min_child_weight=6,
        subsample=0.8,
        silent=0,
        early_stopping_rounds=5,
        objective="reg:linear",
        num_round=NUM_ROUNDS,
    )

    DISTRIBUTION_MODE = "FullyReplicated"

    train_input = sagemaker.inputs.TrainingInput(
        s3_data=inputs + "/train", distribution=DISTRIBUTION_MODE, content_type="csv"
    )

    val_input = sagemaker.inputs.TrainingInput(
        s3_data=inputs + "/val", distribution=DISTRIBUTION_MODE, content_type="csv"
    )

    remote_inputs = {"train": train_input, "validation": val_input}

    xgb.fit(remote_inputs, wait=False)

    # Return the estimator object
    return xgb

### Kick off a model training job for each housing location

In [9]:
import shutil
import os

def save_data_locally(location, train, val, test):
    os.makedirs(f"data/{location}/train")
    np.savetxt(f"data/{location}/train/{location}_train.csv", train, delimiter=",", fmt="%.2f")

    os.makedirs(f"data/{location}/val")
    np.savetxt(f"data/{location}/val/{location}_val.csv", val, delimiter=",", fmt="%.2f")

    os.makedirs(f"data/{location}/test")
    np.savetxt(f"data/{location}/test/{location}_test.csv", test, delimiter=",", fmt="%.2f")
estimators = []

shutil.rmtree("data", ignore_errors=True)
if TRAIN:
    for loc in LOCATIONS[:PARALLEL_TRAINING_JOBS]:
        _houses = gen_houses(NUM_HOUSES_PER_LOCATION)
        _train, _val, _test = split_data(_houses)
        save_data_locally(loc, _train, _val, _test)
        estimator = launch_training_job(loc)
        estimators.append(estimator)

print()
print(
    f"{len(estimators)} training jobs launched: {[x.latest_training_job.job_name for x in estimators]}"
)

Training data uploaded: s3://sagemaker-us-west-2-649592902942/XGBOOST_BOSTON_HOUSING/model_prep/NewYork_NY
Training data uploaded: s3://sagemaker-us-west-2-649592902942/XGBOOST_BOSTON_HOUSING/model_prep/LosAngeles_CA
Training data uploaded: s3://sagemaker-us-west-2-649592902942/XGBOOST_BOSTON_HOUSING/model_prep/Chicago_IL
Training data uploaded: s3://sagemaker-us-west-2-649592902942/XGBOOST_BOSTON_HOUSING/model_prep/Houston_TX

4 training jobs launched: ['xgb-NewYork-NY-2021-08-18-14-32-42-345', 'xgb-LosAngeles-CA-2021-08-18-14-32-43-014', 'xgb-Chicago-IL-2021-08-18-14-32-45-770', 'xgb-Houston-TX-2021-08-18-14-32-47-100']


### Wait for all model training to finish

In [10]:
def wait_for_training_job_to_complete(estimator):
    job = estimator.latest_training_job.job_name
    print(f"Waiting for job: {job}")
    status = estimator.latest_training_job.describe()["TrainingJobStatus"]
    while status == "InProgress":
        time.sleep(45)
        status = estimator.latest_training_job.describe()["TrainingJobStatus"]
        if status == "InProgress":
            print(f"{job} job status: {status}")
    print(f"DONE. Status for {job} is {status}\n")

In [11]:
if TRAIN:
    for est in estimators:
        wait_for_training_job_to_complete(est)

Waiting for job: xgb-NewYork-NY-2021-08-18-14-32-42-345
xgb-NewYork-NY-2021-08-18-14-32-42-345 job status: InProgress
xgb-NewYork-NY-2021-08-18-14-32-42-345 job status: InProgress
xgb-NewYork-NY-2021-08-18-14-32-42-345 job status: InProgress
xgb-NewYork-NY-2021-08-18-14-32-42-345 job status: InProgress
DONE. Status for xgb-NewYork-NY-2021-08-18-14-32-42-345 is Completed

Waiting for job: xgb-LosAngeles-CA-2021-08-18-14-32-43-014
DONE. Status for xgb-LosAngeles-CA-2021-08-18-14-32-43-014 is Completed

Waiting for job: xgb-Chicago-IL-2021-08-18-14-32-45-770
DONE. Status for xgb-Chicago-IL-2021-08-18-14-32-45-770 is Completed

Waiting for job: xgb-Houston-TX-2021-08-18-14-32-47-100
DONE. Status for xgb-Houston-TX-2021-08-18-14-32-47-100 is Completed



# Create the multi-model endpoint with the SageMaker SDK

### Create a SageMaker Model from one of the Estimators

The models will be added to the endpoint later.

In [12]:
if TRAIN:
    estimator = estimators[0]
    model = estimator.create_model(role=role, image_uri=XGBOOST_IMAGE)

### Create the Amazon SageMaker MultiDataModel entity

We create the multi-model endpoint using the [```MultiDataModel```](https://sagemaker.readthedocs.io/en/stable/api/inference/multi_data_model.html) class.

We later add more models to the endpoint.

In [13]:
from sagemaker.multidatamodel import MultiDataModel

# This is where our MME will read models from on S3. The filenames for models that identify the model are for files at this prefix.
 
model_data_prefix = f"s3://{BUCKET}/{DATA_PREFIX}/{MULTI_MODEL_ARTIFACTS}/"
print( f"Model artifacts in s3://{BUCKET}/{DATA_PREFIX}/{MULTI_MODEL_ARTIFACTS}/")

Model artifacts in s3://sagemaker-us-west-2-649592902942/XGBOOST_BOSTON_HOUSING/multi_model_artifacts/


In [14]:
if TRAIN:
    mme = MultiDataModel(
        name=MODEL_NAME,
        model_data_prefix=model_data_prefix,
        model=model,  # passing our model - passes container image needed for the endpoint
        sagemaker_session=sagemaker_session,
    )

# Deploy the Multi Model Endpoint

In [15]:
if TRAIN:
    predictor = mme.deploy(
        initial_instance_count=1, instance_type=ENDPOINT_INSTANCE_TYPE, endpoint_name=ENDPOINT_NAME
    )

Using already existing model: mme-xgboost-housing1


ClientError: An error occurred (ValidationException) when calling the CreateEndpoint operation: Cannot create already existing endpoint "arn:aws:sagemaker:us-west-2:649592902942:endpoint/mme-xgboost-housing1".

### Our endpoint has launched

Let's look at what models are available to the endpoint.

By 'available', what we mean is, what model artfiacts are currently stored under the S3 prefix we defined when setting up the `MultiDataModel` above i.e. `model_data_prefix`.

Currently, since we have no artifacts (i.e. `tar.gz` files) stored under  our defined S3 prefix, our endpoint, will have no models 'available' to serve inference requests.

We will demonstrate how to make models 'available' to our endpoint below.

In [None]:
if TRAIN:
    print("Models are", list(mme.list_models()))

### Lets deploy model artifacts to be found by the endpoint

We are now using the `.add_model()` method of the `MultiDataModel` to copy over our model artifacts from where they were initially stored, during training, to where our endpoint will source model artifacts for inference requests.

`model_data_source` refers to the location of our model artifact (i.e. where it was deposited on S3 after training completed)

`model_data_path` is the **relative** path to the S3 prefix we specified above (i.e. `model_data_prefix`) where our endpoint will source models for inference requests.

Since this is a **relative** path, we can simply pass the name of what we wish to call the model artifact at inference time (i.e. `Chicago_IL.tar.gz`)

### Dynamically deploying additional models

It is also important to note, that we can always use the `.add_model()` method, as shown below, to dynamically deploy more models to the endpoint, to serve up inference requests as needed.

In [None]:
if TRAIN:
    for est in estimators:
        artifact_path = est.latest_training_job.describe()["ModelArtifacts"]["S3ModelArtifacts"]
        model_name = artifact_path.split("/")[-4] + ".tar.gz"
        # This is copying over the model artifact to the S3 location for the MME.
        mme.add_model(model_data_source=artifact_path, model_data_path=model_name)

## We have added the 4 model artifacts from our training jobs!

We can see that the S3 prefix we specified when setting up `MultiDataModel` now has 4 model artifacts. As such, the endpoint can now serve up inference requests for these models.

In [None]:
if TRAIN:
    print("Models are", list(mme.list_models()))

## Using Boto APIs to invoke the endpoint

We use the lower level Boto3 SDK, since it is what you would use as a part of a broader architecture.

Imagine an API gateway frontend that uses a Lambda Proxy in order to transform request payloads before hitting a SageMaker Endpoint - in this example, Lambda does not have access to the SageMaker Python SDK, and as such, Boto3 can  allow you to interact with your endpoint and serve inference requests.


Additional documentation on `.invoke_endpoint()` can be found [here.](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker-runtime.html)

In [None]:
import json

def create_temp_tenant_session(access_role_arn, session_name, tenant_id, duration_sec):
    """
    Create a temporary session
    :param access_role_arn: The ARN of the role that the caller is assuming
    :param session_name: An identifier for the assumed session
    :param tenant_id: The tenant identifier the session is created for
    :param duration_sec: The duration,in seconds, of the temporary session
    :return: The session object that allows you to create service clients and resources
    """
    sts = boto3.client('sts')
    print("Assuming the access role:", access_role_arn)
    print("Tenant ID:", tenant_id)
    assume_role_response = sts.assume_role(
        RoleArn=access_role_arn,
        DurationSeconds=duration_sec,
        RoleSessionName=session_name,
        Tags=[
            {
                'Key': 'TenantID',
                'Value': tenant_id
            }
        ]
    )
    
     
    session = boto3.Session(aws_access_key_id=assume_role_response['Credentials']['AccessKeyId'],
                    aws_secret_access_key=assume_role_response['Credentials']['SecretAccessKey'],
                    aws_session_token=assume_role_response['Credentials']['SessionToken'])
    return session



def predict_one_house_value(features, model_name,runtime_sm_client):
    print(f"Using model {model_name}±")

    # Notice how we alter the list into a string as the payload
    body = ",".join(map(str, features)) + "\n"

    start_time = time.time()
 
    response = runtime_sm_client.invoke_endpoint(
        EndpointName=ENDPOINT_NAME, ContentType="text/csv", TargetModel=model_name, Body=body
    )

    predicted_value = json.loads(response["Body"].read())[0]

    duration = time.time() - start_time

    print("The predicted value was ${:,.2f}".format(predicted_value))
    print("{:,d} ms for prediction".format(int(duration * 1000)))

In [None]:
temp_session = create_temp_tenant_session(tenant_role_to_assume, "scoped-session", TENANT_ID, 900)

runtime_sm_client = temp_session.client(service_name="sagemaker-runtime")

# When the model filename equals the tenant ID, invoke_endpoint should pass the IAM check. 
# Change to "NewYork_NY.tar.gz" etc to see the IAM block the invocation.

model_filename = TENANT_ID 

predict_one_house_value(gen_random_house()[1:], model_filename,runtime_sm_client)

## Clean up
Here, to be sure we are not billed for endpoints we are no longer using, we clean up.
For experimentation, leave `DELETE = False`. 

In [None]:
DELETE = True
if DELETE:
    deployment_name = ENDPOINT_NAME # Same value used for endpoint and endpoint config

    client_del = boto3.client('sagemaker')
    response = client_del.describe_endpoint_config(EndpointConfigName=ENDPOINT_NAME)
    variants = response['ProductionVariants']

    for v in variants:
        model_name = v['ModelName']
        client_del.delete_model(ModelName=model_name)    

    client_del.delete_endpoint(EndpointName=ENDPOINT_NAME)
    client_del.delete_endpoint_config(EndpointConfigName=ENDPOINT_NAME)
    print("Done deleting")
else:
    print("Not deleting resources")