# Amazon SageMaker Multi-Model Endpoints using Linear Learner

[Amazon SageMaker multi-model endpoints](https://docs.aws.amazon.com/sagemaker/latest/dg/multi-model-endpoints.html)를 이용해서, 고객들은 수천개의 모델에 대한 endpoint 을 생성할 수 있습니다. 

이러한 endpoint 는 공통된 추론 컨테이너에서 제공할 수 있는 많은 모델 중 하나가 요청시 호출되어야 하고 드물게 호출되는 모델이 추가 대기 시간을 발생시키는 것이 허용되는 사용 사례에 매우 적합합니다. 

지속적으로 낮은 추론 지연 시간이 필요한 애플리케이션의 경우 전통적인  엔드포인트가 여전히 최선의 선택입니다.

Amazon SageMaker는 필요에 따라 다중 모델 엔드포인트에 대한 모델 로드 및 언로드를 관리합니다. 

특정 모델에 대한 호출 요청이 이루어지면 Amazon SageMaker는 해당 모델에 할당된 인스턴스로 요청을 라우팅하고 S3에서 해당 인스턴스로 모델 아티팩트를 다운로드하고 컨테이너의 메모리로 모델 로드를 시작합니다. 

로드가 완료되는 즉시 Amazon SageMaker는 요청된 호출을 수행하고 결과를 반환합니다. 모델이 선택한 인스턴스의 메모리에 이미 로드된 경우 다운로드 및 로드 단계를 건너뛰고 호출이 즉시 수행됩니다.

Amazon SageMaker 추론 파이프라인 모델은 사전 처리, 예측 및 사후 처리 데이터 과학 작업을 결합하여 추론 요청을 제공하는 일련의 컨테이너로 구성됩니다. 

추론 파이프라인을 사용하면 모델 훈련 중에 사용된 것과 동일한 전처리 코드를 적용하여 예측에 사용된 추론 요청 데이터를 처리할 수 있습니다.

다중 모델 엔드포인트가 생성되고 추론 파이프라인과 함께 사용되는 방법을 보여주기 위해 이 노트북은 각각 단일 위치의 주택 가격을 예측하는 일련의 linear-learner 모델을 사용하는 예를 제공합니다. 

이 도메인은 다중 모델 endpoint 를 쉽게 테스트하기 위한 간단한 예제로 사용됩니다.

이 노트북은 세 가지 MME 기능을 보여줍니다.
* Amazon SageMaker Linear Learner 알고리즘을 통한 기본 MME 지원. 기본적으로 지원됨으로인해 사용자 정의 컨테이너를 생성할 필요가 없습니다.
* Amazon SageMaker 추론 파이프라인을 통한 기본 MME 지원.
* IAM 조건 키를 사용하여 MME에서 호스팅되는 여러 모델에 대한 세분화된 InvokeModel 액세스.

이러한 기능을 보여주기 위해 노트북에서는 선형 회귀를 사용하여 여러 도시의 주택 가격을 예측하는 사용 사례에 대해 설명합니다. 

주택 가격은 침실 수, 차고 수, 평방 피트수 등과 같은 특성을 기반으로 예측됩니다. 

도시에 따라 특성이 집 가격에 다르게 영향을 미칩니다. 

예를 들어, 평방 피트의 작은 변화는 휴스턴의 가격 변화와 비교할 때 뉴욕의 집 가격에서 더 급격한 변화를 일으킵니다. 

정확한 주택 가격 예측을 위해 도시별로 고유한 위치별 모델인 다중 선형 회귀 모델을 학습합니다.


### Contents

1. [집값예측모델을 위한 합성데이터 생성 Generate synthetic data for housing models](#Generate-synthetic-data-for-housing-models)
1. [Scikit Learn 모델을 사용하여 집값 데이터 전처리 Preprocess the raw housing data using Scikit Learn model](#Preprocess-synthetic-housing-data-using-scikit-learn)
1. [여러 도시에 대한 다중 집값 예측 모델 훈련 Train multiple house value prediction models for multiple cities](#Train-multiple-house-value-prediction-models)
1. [다중 모델 지원 모델 엔티티 생성 Create model entity with multi model support](#Create-sagemaker-multi-model-support)
1. [Sklearn 모델과 MME linear learner 모델로 추론 파이프라인 생성 Create an inference pipeline with sklearn model and MME linear learner model](#Create-inference-pipeline)
2. [추론 파이프라인 테스트 - 다른 linear learner 모델로부터 예측 결과 얻기](#Exercise-inference-pipeline)
3. [신규 모델로 다중 모델 endpoint 갱신](#update-models)
4. [MME의 대상 모델에 대한 세분화된 액세스 탐색](#Finegrain-control-invoke-models)
5. [Endpoint 의 CloudWatch 지표 분석](#CW-metric-analysis)
6. [Clean up](#CleanUp)


## Section 1 - 합성데이터 생성 <a id='Generate-synthetic-data-for-housing-models'></a>

이 섹션에서는 linear leaner 모델을 훈련하는데 사용될 합성 데이터를 생성할 것입니다. 

생성된 데이터는 6개의 숫자형 특성으로 구성됩니다. 
- 집의 건축년도
- 집의 면적 (평방 피트)
- 침실의 수
- 욕실의 수
- lot 크기와 
- 차고 수
- 데크와 현

In [None]:
import numpy as np
import pandas as pd
import json
import datetime
import time
import boto3
import sagemaker
import os

from time import gmtime, strftime
from random import choice

from sagemaker import get_execution_role

from sagemaker.multidatamodel import MULTI_MODEL_CONTAINER_MODE
from sagemaker.multidatamodel import MultiDataModel

from sklearn.model_selection import train_test_split

In [None]:
NUM_HOUSES_PER_LOCATION = 1000
LOCATIONS  = ['NewYork_NY',    'LosAngeles_CA',   'Chicago_IL',    'Houston_TX',   'Dallas_TX',
              'Phoenix_AZ',    'Philadelphia_PA', 'SanAntonio_TX', 'SanDiego_CA',  'SanFrancisco_CA']
MAX_YEAR = 2019

In [None]:
def gen_price(house):
    """Generate price based on features of the house"""
    
    if house['FRONT_PORCH'] == 'y':
        garage = 1
    else:
        garage = 0
        
    if house['FRONT_PORCH'] == 'y':
        front_porch = 1
    else:
        front_porch = 0
        
    price = int(150 * house['SQUARE_FEET'] + \
                10000 * house['NUM_BEDROOMS'] + \
                15000 * house['NUM_BATHROOMS'] + \
                15000 * house['LOT_ACRES'] + \
                10000 * garage + \
                10000 * front_porch + \
                15000 * house['GARAGE_SPACES'] - \
                5000 * (MAX_YEAR - house['YEAR_BUILT']))
    return price

In [None]:
def gen_yes_no():
    """Generate values (y/n) for categorical features"""
    answer = choice(['y', 'n'])
    return answer

In [None]:
def gen_random_house():
    """Generate a row of data (single house information)"""
    house = {'SQUARE_FEET':    np.random.normal(3000, 750),
             'NUM_BEDROOMS':  np.random.randint(2, 7),
             'NUM_BATHROOMS': np.random.randint(2, 7) / 2,
             'LOT_ACRES':     round(np.random.normal(1.0, 0.25), 2),
             'GARAGE_SPACES': np.random.randint(0, 4),
             'YEAR_BUILT':    min(MAX_YEAR, int(np.random.normal(1995, 10))),
             'FRONT_PORCH':   gen_yes_no(),
             'DECK':          gen_yes_no()
            }
    
    price = gen_price(house)
    
    return [house['YEAR_BUILT'],   
            house['SQUARE_FEET'], 
            house['NUM_BEDROOMS'], 
            house['NUM_BATHROOMS'], 
            house['LOT_ACRES'],    
            house['GARAGE_SPACES'],
            house['FRONT_PORCH'],    
            house['DECK'], 
            price]

In [None]:
def gen_houses(num_houses):
    """Generate housing dataset"""
    house_list = []
    
    for _ in range(num_houses):
        house_list.append(gen_random_house())
        
    df = pd.DataFrame(
        house_list, 
        columns=[
            'YEAR_BUILT',    
            'SQUARE_FEET',  
            'NUM_BEDROOMS',            
            'NUM_BATHROOMS',
            'LOT_ACRES',
            'GARAGE_SPACES',
            'FRONT_PORCH',
            'DECK', 
            'PRICE']
    )
    return df

In [None]:
def save_data_locally(location, train, test): 
    """Save the housing data locally"""
    os.makedirs('data/{0}/train'.format(location), exist_ok=True)
    train.to_csv('data/{0}/train/train.csv'.format(location), sep=',', header=False, index=False)
       
    os.makedirs('data/{0}/test'.format(location), exist_ok=True)
    test.to_csv('data/{0}/test/test.csv'.format(location), sep=',', header=False, index=False) 

In [None]:
#Generate housing data for multiple locations.
#Change "PARALLEL_TRAINING_JOBS " to a lower number to limit the number of training jobs and models. Or to a higher value to experiment with more models.

PARALLEL_TRAINING_JOBS = 4

for loc in LOCATIONS[:PARALLEL_TRAINING_JOBS]:
    houses = gen_houses(NUM_HOUSES_PER_LOCATION)
    
    #Spliting data into train and test in 90:10 ratio
    #Not splitting the train data into train and val because its not preprocessed yet
    train, test = train_test_split(houses, test_size=0.1)
    save_data_locally(loc, train, test)


In [None]:
#Shows the first few lines of data.
houses.head()

## Section 2 - Scikit Learn 을 이용하여 집 로데이터의 전처리 <a id='Preprocess-synthetic-housing-data-using-scikit-learn'></a>
 
이 섹션에서 데이터의 분류 특성 (데크와 현관)은 one hot encoding 으로 변환하기 위해 sklearn 을 이용해서 전처리됩니다. 

In [None]:
sm_client = boto3.client(service_name='sagemaker')
runtime_sm_client = boto3.client(service_name='sagemaker-runtime')
sagemaker_session = sagemaker.Session()

s3 = boto3.resource('s3')
s3_client = boto3.client('s3')

BUCKET  = sagemaker_session.default_bucket()
print("BUCKET : ", BUCKET)

role = get_execution_role()
print("ROLE : ", role)

ACCOUNT_ID = boto3.client('sts').get_caller_identity()['Account']
REGION = boto3.Session().region_name

DATA_PREFIX = 'DEMO_MME_LINEAR_LEARNER'
HOUSING_MODEL_NAME = 'housing'
MULTI_MODEL_ARTIFACTS = 'multi_model_artifacts'

In [None]:
#Create the SKLearn estimator with the sklearn_preprocessor.py as the script
from sagemaker.sklearn.estimator import SKLearn

script_path = 'sklearn_preprocessor.py'

sklearn_preprocessor = SKLearn(
    entry_point=script_path,
    role=role,
    instance_type="ml.c4.xlarge",
    framework_version="0.20.0",
    sagemaker_session=sagemaker_session)

In [None]:
#Upload the raw training data to S3 bucket, to be accessed by SKLearn
train_inputs = []

for loc in LOCATIONS[:PARALLEL_TRAINING_JOBS]:

    train_input = sagemaker_session.upload_data(
        path='data/{}/train/train.csv'.format(loc),
        bucket=BUCKET,
        key_prefix='housing-data/{}/train'.format(loc)
    )
    
    train_inputs.append(train_input)
    print("Raw training data uploaded to : ", train_input)

In [None]:
##Launch multiple scikit learn training to process the raw synthetic data generated for multiple locations.
##Before executing this, take the training instance limits in your account and cost into consideration.

sklearn_preprocessors = []
sklearn_preprocessors_preprocessor_jobs = []

for index, loc in enumerate(LOCATIONS[:PARALLEL_TRAINING_JOBS]):
    print("preprocessing fit input data at ", index , " for loc ", loc)
     
    job_name='scikit-learn-preprocessor-{}'.format(strftime('%Y-%m-%d-%H-%M-%S', gmtime()))
    
    sklearn_preprocessor.fit({'train': train_inputs[index]}, job_name=job_name, wait=False)

    sklearn_preprocessors.append(sklearn_preprocessor)
    sklearn_preprocessors_preprocessor_jobs.append(job_name)
    
    time.sleep(1)

In [None]:
def wait_for_training_job_to_complete(job_name):
    """ Wait for the training job to complete """
    print('Waiting for job {} to complete...'.format(job_name))
    
    waiter = sm_client.get_waiter('training_job_completed_or_stopped')
    waiter.wait(TrainingJobName=job_name)

In [None]:
def wait_for_batch_transform_job_to_complete(job_name):
    """Wait for the batch transform job to complete"""
    print('Waiting for job {} to complete...'.format(job_name))
    
    waiter = sm_client.get_waiter('transform_job_completed_or_stopped')
    waiter.wait(TransformJobName=job_name)

In [None]:
#Wait for the preprocessor jobs to finish
for job_name in sklearn_preprocessors_preprocessor_jobs:
    wait_for_training_job_to_complete(job_name)

In [None]:
##Once the preprocessor is fit, use tranformer to preprocess the raw training data and store the transformed data right back into s3.
##Before executing this, take the training instance limits in your account and cost into consideration.

preprocessor_transformers = []

for index, loc in enumerate(LOCATIONS[:PARALLEL_TRAINING_JOBS]):
    print("Transform the raw data at ", index , " for loc ", loc)
       
    sklearn_preprocessor = sklearn_preprocessors[index]
    
    transformer = sklearn_preprocessor.transformer(
        instance_count=1,
        instance_type='ml.m4.xlarge',
        assemble_with='Line',
        accept='text/csv'
    )
    
    preprocessor_transformers.append(transformer)

In [None]:
# Preprocess training input
preprocessed_train_data_path = []

for index, transformer in enumerate(preprocessor_transformers):
    transformer.transform(train_inputs[index], content_type='text/csv')
    print('Launching batch transform job: {}'.format(transformer.latest_transform_job.job_name))
    preprocessed_train_data_path.append(transformer.output_path)

In [None]:
#Wait for all the batch transform jobs to finish
for transformer in preprocessor_transformers: 
    job_name=transformer.latest_transform_job.job_name
    wait_for_batch_transform_job_to_complete(job_name)

In [None]:
##Download the preprocessed data, split into train and val, upload back to S3 in the same directory as tranformer output path
for index, transformer in enumerate(preprocessor_transformers): 
    transformer_output_key='{}/{}'.format(transformer.latest_transform_job.job_name, 'train.csv.out') 
    
    preprocessed_data_download_dir = '{}/'.format("preprocessed-data/"+LOCATIONS[index])
    
    sagemaker_session.download_data(
        path=preprocessed_data_download_dir, 
        bucket=BUCKET,
        key_prefix=transformer_output_key
    )
    
    print('transformer_output_key: {}'.format(transformer_output_key ))
    print('Download directory: {}'.format(preprocessed_data_download_dir ))
    
    train_df = pd.read_csv('{}/{}'.format(preprocessed_data_download_dir,"train.csv.out"))
    
    #Spliting data into train and test in 70:30 ratio
    train, val = train_test_split(train_df, test_size=0.3)
    
    train.to_csv('{}{}'.format(preprocessed_data_download_dir,"train.csv"), sep=',', header=False, index=False)
    val.to_csv('{}{}'.format(preprocessed_data_download_dir,"val.csv"), sep=',', header=False, index=False)
    
    
    train_input = sagemaker_session.upload_data(
        path='{}/{}'.format(preprocessed_data_download_dir, 'train.csv'), 
        bucket=BUCKET,
        key_prefix='{}'.format(transformer.latest_transform_job.job_name, 'train.csv'))
    
    val_input = sagemaker_session.upload_data(
        path='{}/{}'.format(preprocessed_data_download_dir, 'val.csv'), 
        bucket=BUCKET,
        key_prefix='{}'.format(transformer.latest_transform_job.job_name, 'val.csv'))

In [None]:
##S3 location of the preprocessed data
for preprocessed_train_data in preprocessed_train_data_path: 
    print(preprocessed_train_data)

In [None]:
for index, loc in enumerate(LOCATIONS[:PARALLEL_TRAINING_JOBS]):
    preprocessed_data_download_dir = '{}/'.format("preprocessed-data/"+LOCATIONS[index])
    path='{}/{}'.format(preprocessed_data_download_dir, 'train.csv')

## Section 3 : 여러 도시를 위한 집갑 예측 모델 훈련<a id='Train-multiple-house-value-prediction-models'></a>

이 섹션에서 다중 linear leaner 모델을 훈련하기 위해 전처리된 집 데이터를 사용할 것입니다. 

In [None]:
container = sagemaker.image_uris.retrieve(region=boto3.Session().region_name, framework='linear-learner')

### 주어진 집 위치에 대한 단일한 training job 실행

호스트할 모델과 관련하여 다중 모델 endpoint 에 특정한 것은 없습니다. 

다른 모든 SageMaker 모델과 동일한 방식으로 훈련됩니다. 

여기서는 Linear Learner estimator를 사용하고 작업이 완료될 때까지 기다리지 않습니다.

In [None]:
def launch_training_job(location, transformer):
    """Launch a linear learner traing job"""
    
    train_inputs = '{}/{}'.format(transformer.output_path, "train.csv")
    val_inputs = '{}/{}'.format(transformer.output_path, "val.csv")
    
    print("train_inputs:", train_inputs)
    print("val_inputs:", val_inputs)
     
    full_output_prefix = '{}/model_artifacts/{}'.format(DATA_PREFIX, location)
    s3_output_path = 's3://{}/{}'.format(BUCKET, full_output_prefix)
    
    print("s3_output_path ", s3_output_path)
    
    s3_output_path = 's3://{}/{}/model_artifacts/{}'.format(BUCKET, DATA_PREFIX, location)
    
    linear_estimator = sagemaker.estimator.Estimator(
                            container,
                            role, 
                            instance_count=1,
                            instance_type='ml.c4.xlarge',
                            output_path=s3_output_path,
                            sagemaker_session=sagemaker_session)
    
    linear_estimator.set_hyperparameters(
                           feature_dim=10,
                           mini_batch_size=100,
                           predictor_type='regressor',
                           epochs=10,
                           num_models=32,
                           loss='absolute_loss')
    
    DISTRIBUTION_MODE = 'FullyReplicated'
    train_input = sagemaker.inputs.TrainingInput(s3_data=train_inputs,
                                     distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1')
    val_input   = sagemaker.inputs.TrainingInput(s3_data=val_inputs,
                                     distribution=DISTRIBUTION_MODE, content_type='text/csv;label_size=1')
    
    remote_inputs = {'train': train_input, 'validation': val_input}
     
    linear_estimator.fit(remote_inputs, wait=False)
   
    return linear_estimator.latest_training_job.name


### 각 주택 위치에 대한 모델 training job 시작

In [None]:
training_jobs = []
    
for transformer, loc in zip(preprocessor_transformers, LOCATIONS[:PARALLEL_TRAINING_JOBS]): 
    job = launch_training_job(loc, transformer)
    training_jobs.append(job)
    
print('{} training jobs launched: {}'.format(len(training_jobs), training_jobs))


### 모든 training job 이 완료될 때까지 대기 

In [None]:
#Wait for the jobs to finish
for job_name in training_jobs:
    wait_for_training_job_to_complete(job_name)


## Section 4 - 다중 모델 지원 Sagemaker model 생성 <a id='Create-sagemaker-multi-model-support'></a>

In [None]:
import re
def parse_model_artifacts(model_data_url):
    # extract the s3 key from the full url to the model artifacts
    s3_key = model_data_url.split('s3://{}/'.format(BUCKET))[1]
    # get the part of the key that identifies the model within the model artifacts folder
    model_name_plus = s3_key[s3_key.find('model_artifacts') + len('model_artifacts') + 1:]
    # finally, get the unique model name (e.g., "NewYork_NY")
    model_name = re.findall('^(.*?)/', model_name_plus)[0]
    return s3_key, model_name 

In [None]:
# make a copy of the model artifacts from the original output of the training job to the place in
# s3 where the multi model endpoint will dynamically load individual models
def deploy_artifacts_to_mme(job_name):
    print("job_name :", job_name)
    response = sm_client.describe_training_job(TrainingJobName=job_name)
    source_s3_key, model_name = parse_model_artifacts(response['ModelArtifacts']['S3ModelArtifacts'])
    copy_source = {'Bucket': BUCKET, 'Key': source_s3_key}
    key = '{}/{}/{}/{}.tar.gz'.format(DATA_PREFIX, MULTI_MODEL_ARTIFACTS, model_name, model_name)
    
    print('Copying {} model\n   from: {}\n     to: {}...'.format(model_name, source_s3_key, key))
    s3_client.copy_object(Bucket=BUCKET, CopySource=copy_source, Key=key)


In [None]:
# First, clear out old versions of the model artifacts from previous runs of this notebook
s3_bucket = s3.Bucket(BUCKET)
full_input_prefix = '{}/multi_model_artifacts'.format(DATA_PREFIX)
print('Removing old model artifacts from {}'.format(full_input_prefix))
s3_bucket.objects.filter(Prefix=full_input_prefix + '/').delete()

In [None]:
## Deploy all but the last model trained to MME
## We will use the last model to show how to update an existing MME in Section 7
for job_name in training_jobs[:-1]:
    deploy_artifacts_to_mme(job_name)

In [None]:
MODEL_NAME = '{}-{}'.format(HOUSING_MODEL_NAME, strftime('%Y-%m-%d-%H-%M-%S', gmtime()))

_model_url  = 's3://{}/{}/{}/'.format(BUCKET, DATA_PREFIX, MULTI_MODEL_ARTIFACTS)

ll_multi_model = MultiDataModel(
        name=MODEL_NAME,
        model_data_prefix=_model_url,
        image_uri=container,
        role=role,
        sagemaker_session=sagemaker_session
    )


## Section 5 : sklearn 모델과 MME learner leaner 모델을 이용하여 추론 파이프라인 생성 <a id='Create-inference-pipeline'></a>


파이프라인 모델 API를 사용하여 추론 파이프라인을 설정합니다. 

이것은 단일 endpoint 에서 모델 목록을 설정합니다. 

이 예제에서는 피팅된 Scikit-learn 추론 모델과 피팅된 Linear Learner 모델로 파이프라인 모델을 구성합니다.

In [None]:
from sagemaker.model import Model
from sagemaker.pipeline import PipelineModel
import boto3
from time import gmtime, strftime

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

scikit_learn_inference_model = sklearn_preprocessor.create_model()

model_name = '{}-{}'.format('inference-pipeline', timestamp_prefix)
endpoint_name = '{}-{}'.format('inference-pipeline-ep', timestamp_prefix)

sm_model = PipelineModel(
    name=model_name, 
    role=role, 
    sagemaker_session=sagemaker_session,
    models=[
        scikit_learn_inference_model, 
        ll_multi_model])

sm_model.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge', endpoint_name=endpoint_name)


## Section 6 :  추론 파이프파인 테스트 - 다른 linear learner 모델로부터 예측 생성<a id='Exercise-inference-pipeline'></a>

In [None]:
#Create Predictor
from sagemaker.predictor import Predictor

csv_serializer = sagemaker.serializers.CSVSerializer()

predictor = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=csv_serializer)

In [None]:
def predict_one_house_value(features, model_name, predictor_to_use):
    print('Using model {} to predict price of this house: {}'.format(model_name,
                                                                     features))
    body = ','.join(map(str, features)) + '\n'
    start_time = time.time()
     
    response = predictor_to_use.predict(features, target_model=model_name)
    
    response_json = json.loads(response)
        
    predicted_value = response_json['predictions'][0]['score']    
    
    duration = time.time() - start_time
    
    print('${:,.2f}, took {:,d} ms\n'.format(predicted_value, int(duration * 1000)))

In [None]:
for _ in range(10):
    model_name = LOCATIONS[np.random.randint(1, PARALLEL_TRAINING_JOBS - 1)]
    full_model_name = '{}/{}.tar.gz'.format(model_name,model_name)
    predict_one_house_value(gen_random_house()[:-1], full_model_name, predictor)


## Section 7 - endpoint 에 신규 모델 추가, 단순히 모델 artifact 를 S3 로 복사합니다<a id='update-models'></a>

In [None]:
## Copy the last model
last_training_job=training_jobs[PARALLEL_TRAINING_JOBS-1]
deploy_artifacts_to_mme(last_training_job)

In [None]:
model_name = LOCATIONS[PARALLEL_TRAINING_JOBS-1]
full_model_name = '{}/{}.tar.gz'.format(model_name,model_name)
predict_one_house_value(gen_random_house()[:-1], full_model_name, predictor)


## Section 8 - Endpoint CloudWatch 지표 분석<a id='CW-metric-analysis'></a>


MME를 사용하면 모델이 호출될 때 엔드포인트를 호스팅하는 인스턴스의 컨테이너 메모리에 동적으로 로드됩니다. 따라서 모델을 처음 호출할 때 모델 호출 시간이 더 오래 걸릴 수 있습니다. 

그리고 모델이 이미 컨테이너의 메모리에 있으면 후속 호출이 더 빨라집니다. 

인스턴스 메모리 사용률이 높고 새 모델을 로드해야 하는 경우 사용되지 않은 모델이 언로드됩니다. 

언로드된 모델은 인스턴스의 스토리지 볼륨에 남아 있으며 나중에 S3 버킷에서 다시 다운로드하지 않고도 컨테이너의 메모리에 로드할 수 있습니다. 

인스턴스의 스토리지 볼륨이 가득 찬 경우 사용하지 않은 모델은 스토리지 볼륨에서 삭제됩니다.

모델 로드/언로드 관리는 사용자가 특정 조치를 취하지 않고도 Amazon SageMaker에 의해 완전히 처리됩니다. 

그러나 이 동작은 모델 호출 대기 시간에 영향을 미치므로 이해하는 것이 중요합니다.

Amazon SageMaker는 다중 모델 엔드포인트에 대한 CloudWatch 지표를 제공하므로 엔드포인트 사용량과 캐시 적중률을 확인하고 엔드포인트를 최적화할 수 있습니다. 
엔드포인트 및 컨테이너 동작을 분석하기 위해 아래 순서로 여러 모델을 호출합니다.

    a. 원래 모델에 대해 200개의 복제본을 생성하고 다른 이름으로 저장합니다.
    b. 컨테이너 안에 아무런 모델로 로드하지 않은채로 시작합니다. 처음 100개 모델을 호출합니다.
    c. 같은 100개 모델을 다시 호출합니다. 
    d. 모든 200개 모델을 호출합니다. 

이 호출 순서를 사용하여 CloudWatch 지표(LoadedModelCount, MemoryUtilization 및 ModelCacheHit)의 동작을 관찰합니다. 

CloudWatch 차트를 사용하여 다양한 수의 모델을 로드하는 실험을 통해 주어진 엔드포인트가 호스팅해야 하는 최적의 인스턴스 유형, 인스턴스 수 및 모델 수에 대한 지속적인 결정을 내리는 것이 좋습니다.

In [None]:
# Make a copy of the model artifacts in S3 bucket with new names so we have multiple models to understand the latency behavior.
def copy_additional_artifacts_to_mme(num_copies):
    
    source_s3_model_key = '{}/{}/{}/{}.tar.gz'.format(DATA_PREFIX, MULTI_MODEL_ARTIFACTS, model_name, model_name)
    _copy_source = {'Bucket': BUCKET, 'Key': source_s3_model_key}
    for i in range(num_copies):
        new_model_name="{}_{}".format(i, model_name)
        dest_s3_model_key = '{}/{}/{}/{}.tar.gz'.format(DATA_PREFIX, MULTI_MODEL_ARTIFACTS, model_name, new_model_name)
        print('Copying {} model\n   from: {}\n     to: {}...'.format(model_name, source_s3_model_key, dest_s3_model_key))
        s3_client.copy_object(Bucket=BUCKET, CopySource=_copy_source, Key=dest_s3_model_key)

In [None]:
##Create 200 copies of the original model and save with different names.
copy_additional_artifacts_to_mme(200)

In [None]:
##Invoke multiple models in a loop
def invoke_multiple_models_mme(model_range_low, model_range_high):
    for i in range(model_range_low, model_range_high):
        new_model_name="{}_{}".format(i, model_name)
        full_model_name = '{}/{}.tar.gz'.format(model_name, new_model_name)
        predict_one_house_value(gen_random_house()[:-1], full_model_name, predictor)


In [None]:
##Starting with no models loaded into the container
##Invoke the first 100 models
invoke_multiple_models_mme(0, 100)

In [None]:
##Invoke the same 100 models again
invoke_multiple_models_mme(0, 100)

In [None]:
##This time invoke all 200 models to observe behavior
invoke_multiple_models_mme(0, 200)

#### LoadedModelCount,MemoryUtilization and ModelCacheHit 지표는 아래 차트와 유사할 것입니다.

![](cw_charts/ModelCountMemUtilization.png)

"LoadedModelCount"는 더 많은 모델이 호출됨에 따라 121까지 계속 증가합니다. 

컨테이너의 "MemoryUtilization"도 이에 따라 약 79%로 증가했습니다. 

이는 엔드포인트를 호스팅하도록 선택된 인스턴스가 200개의 모델 호출이 수행될 때 실제로는 메모리에 121개의 모델만 유지할 수 있음을 보여줍니다.

![](cw_charts/ModelCountMemUtilizationCacheHit.png)

컨테이너 메모리에 로드되는 모델 수가 증가할수록 ModelCacheHit이 향상됩니다. 

동일한 100개 모델이 두 번째로 호출되면 ModelCacheHit은 1에 도달합니다. 

아직 로드되지 않은 새 모델이 호출되면 ModelCacheHit이 다시 감소합니다.


## Section 9 - MME의 대상 모델에 대한 세분화된 액세스 탐색 <a id='Finegrain-control-invoke-models'></a>

If the role attached to this notebook instance allows invoking SageMaker endpoints, it is able to invoke all models hosted on the MME.  Using IAM conditional keys, you can restrict this model invocation access to specific models.  To explore this, you will create a new IAM role and IAM policy with conditional key to restrict access to a single model.  Assume this new role and verify that only a single target model can be invoked.

이 노트북 인스턴스에 연결된 역할이 SageMaker endpoint 호출을 허용하는 경우 MME에서 호스팅되는 모든 모델을 호출할 수 있습니다. 

IAM conditional key 를 사용하여 이 모델 호출 액세스를 특정 모델로 제한할 수 있습니다. 

이를 탐색하기 위해 단일 모델에 대한 액세스를 제한하는 조건부 키를 사용하여 새 IAM 역할 및 IAM 정책을 생성합니다. 

이 새 역할을 assume 하고 단일 대상 모델만 호출할 수 있는지 확인합니다.

이 섹션을 실행하려면 노트북 인스턴스에 연결된 역할이 다음 작업을 허용해야 합니다.
    "iam:CreateRole",
    "iam:CreatePolicy",
    "iam:AttachRolePolicy",
    "iam:UpdateAssumeRolePolicy"

In [None]:
iam_client = boto3.client('iam')

In [None]:
#Create a new role that can be assumed by this notebook.  The roles should allow access to only a single model.

path='/'

role_name="{}{}".format('allow_invoke_ny_model_role', strftime('%Y-%m-%d-%H-%M-%S', gmtime()))
description='Role that allows invoking a single model'

action_string = "sts:AssumeRole"
    
trust_policy={
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "statement1",
      "Effect": "Allow",
      "Principal": {
        "AWS": role
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

In [None]:
response = iam_client.create_role(
    Path=path,
    RoleName=role_name,
    AssumeRolePolicyDocument=json.dumps(trust_policy),
    Description=description,
    MaxSessionDuration=3600
)

In [None]:
role_arn=response['Role']['Arn']
print("Role arn is :", role_arn)

In [None]:
endpoint_resource_arn = "arn:aws:sagemaker:{}:{}:endpoint/{}".format(REGION, ACCOUNT_ID, endpoint_name)
print("Endpoint arn is :", endpoint_resource_arn)

In [None]:
##Create the IAM policy with the IAM condition key
policy_name = "{}{}".format('allow_invoke_ny_model_policy', strftime('%Y-%m-%d-%H-%M-%S', gmtime()))
managed_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "SageMakerAccess",
            "Action": "sagemaker:InvokeEndpoint",
            "Effect": "Allow",
            "Resource":endpoint_resource_arn,
            "Condition": {
                "StringLike": {
                    "sagemaker:TargetModel": ["NewYork_NY/*"]
                }
            }
        }
    ]
}

response = iam_client.create_policy(
  PolicyName=policy_name,
  PolicyDocument=json.dumps(managed_policy)
)

In [None]:
policy_arn=response['Policy']['Arn']

In [None]:
##Attach policy to role
iam_client.attach_role_policy(
    PolicyArn=policy_arn,
    RoleName=role_name
)

In [None]:
## Invoke with the role that has access to only NY model
sts_connection = boto3.client('sts')
assumed_role_limited_access = sts_connection.assume_role(
    RoleArn=role_arn,
    RoleSessionName="MME_Invoke_NY_Model"
)
assumed_role_limited_access['AssumedRoleUser']['Arn']


In [None]:
trust_policy={
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "statement1",
      "Effect": "Allow",
      "Principal": {
        "AWS": role
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Sid": "statement2",
      "Effect": "Allow",
      "Principal": {
          "AWS": assumed_role_limited_access['AssumedRoleUser']['Arn']
      },
      "Action": "sts:AssumeRole"
    }  
  ]
}

In [None]:
iam_client.update_assume_role_policy(
    RoleName=role_name,
    PolicyDocument=json.dumps(trust_policy)
)

In [None]:
ACCESS_KEY = assumed_role_limited_access['Credentials']['AccessKeyId']
SECRET_KEY = assumed_role_limited_access['Credentials']['SecretAccessKey']
SESSION_TOKEN = assumed_role_limited_access['Credentials']['SessionToken']

runtime_sm_client_with_assumed_role = boto3.client(
    service_name='sagemaker-runtime', 
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    aws_session_token=SESSION_TOKEN,
)

In [None]:
 sagemakerSessionAssumedRole = sagemaker.Session(sagemaker_runtime_client=runtime_sm_client_with_assumed_role)

In [None]:
predictorAssumedRole = Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemakerSessionAssumedRole,
    serializer=csv_serializer)

In [None]:
full_model_name = 'NewYork_NY/NewYork_NY.tar.gz'
predict_one_house_value(gen_random_house()[:-1], full_model_name,predictorAssumedRole)

In [None]:
##This should fail with "AccessDeniedException" since the assumed role does not have access to Chicago model
full_model_name = 'Chicago_IL/Chicago_IL.tar.gz'
predict_one_house_value(gen_random_house()[:-1], full_model_name,predictorAssumedRole)

## 정리<a id='CleanUp'></a>

불필요한 비용을 방지하기 위해 endpoint 를 삭제합니다. 

In [None]:
#Delete the endpoint and underlying model
predictor.delete_model() 
predictor.delete_endpoint()
for t in preprocessor_transformers:
    t.delete_model()

In [None]:
#Delete the IAM Role
iam_client.detach_role_policy(
    PolicyArn=policy_arn,
    RoleName=role_name
)
iam_client.delete_role(RoleName=role_name)

In [None]:
#Delete the IAM Policy
iam_client.delete_policy(PolicyArn=policy_arn)