# Consumo do modelo de Advanced Analitics

`keywords.: serving, deploy`

Este notebook tem as instruções que simulam o consumo de um modelo de machine learning em ambiente do AWS SageMaker.


## Setup

Seção com a preparação das variáveis do processo de deploy.

`Ref. Tempo esperado de execução: 2,15s`


In [1]:
%%time

# Célula de configurações

import os
import boto3
import re
import json
from sagemaker import get_execution_role, session

region = boto3.Session().region_name

role = get_execution_role()
print("RoleArn: {}".format(role))

# Você pode usar um bucket, mas certifique-se de que a permissão que escolheu para este notebook
# contempla s3:PutObject . Este é o bucket no qual os dados são capturados
bucket = session.Session(boto3.Session()).default_bucket()
print("Bucket a ser usado: {}".format(bucket))
prefix = "sagemaker/DEMO-ModelMonitor"

data_capture_prefix = "{}/datacapture".format(prefix)
s3_capture_upload_path = "s3://{}/{}".format(bucket, data_capture_prefix)
reports_prefix = "{}/reports".format(prefix)
s3_report_path = "s3://{}/{}".format(bucket, reports_prefix)
code_prefix = "{}/code".format(prefix)
s3_code_preprocessor_uri = "s3://{}/{}/{}".format(bucket, code_prefix, "preprocessor.py")
s3_code_postprocessor_uri = "s3://{}/{}/{}".format(bucket, code_prefix, "postprocessor.py")

print("Path de captura dos dados: {}".format(s3_capture_upload_path))
print("Path dos relatórios: {}".format(s3_report_path))
print("Path para código de pré-proc: {}".format(s3_code_preprocessor_uri))
print("Path para código de pós-proc: {}".format(s3_code_postprocessor_uri))
print("\n")

RoleArn: arn:aws:iam::325011675573:role/service-role/AmazonSageMaker-ExecutionRole-20201008T200444
Bucket a ser usado: sagemaker-us-east-2-325011675573
Path de captura dos dados: s3://sagemaker-us-east-2-325011675573/sagemaker/DEMO-ModelMonitor/datacapture
Path dos relatórios: s3://sagemaker-us-east-2-325011675573/sagemaker/DEMO-ModelMonitor/reports
Path para código de pré-proc: s3://sagemaker-us-east-2-325011675573/sagemaker/DEMO-ModelMonitor/code/preprocessor.py
Path para código de pós-proc: s3://sagemaker-us-east-2-325011675573/sagemaker/DEMO-ModelMonitor/code/postprocessor.py


CPU times: user 750 ms, sys: 118 ms, total: 868 ms
Wall time: 980 ms


In [13]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
import time

In [14]:
endpoint_name = "DEMO-xgb-churn-pred-model-monitor-2021-09-16-16-37-14"

In [15]:
predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer())


----

# Monitoramento do Modelo - Criação de baseline e monitoramento contínuo

Além de coletar os dados, o Amazon SageMaker oferece a capacidade de monitorar e avaliar os dados observados pelos terminais.

Para isso:
1. Vamos criar uma linha de base com a qual você compare o tráfego em tempo real.
1. Quando a baseline estiver pronta, configure um cronograma para avaliar e comparar continuamente com a linha de base.

--

O conjunto de dados de treinamento com o qual treinamos o modelo geralmente é um bom conjunto de dados de linha de base. Observe que o esquema de dados do conjunto de dados de treinamento e o esquema do conjunto de dados de inferência devem corresponder exatamente (ou seja, o número e a ordem dos recursos).

A partir do conjunto de dados de treinamento, você pode pedir ao Amazon SageMaker para sugerir um conjunto de `restrições` de linha de base e gerar` estatísticas` descritivas para explorar os dados. Para este exemplo, carregue o conjunto de dados de treinamento que foi usado para treinar o modelo pré-treinado incluído neste exemplo. Se você já o tem no Amazon S3, pode apontar diretamente para ele.

In [5]:
# copie o conjunto de dados de treinamento para o Amazon S3 (se já o tiver no Amazon S3, você pode reutilizá-lo)
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = "s3://{}/{}".format(bucket, baseline_data_prefix)
baseline_results_uri = "s3://{}/{}".format(bucket, baseline_results_prefix)
print("Baseline data uri: {}".format(baseline_data_uri))
print("Baseline results uri: {}".format(baseline_results_uri))

Baseline data uri: s3://sagemaker-us-east-2-325011675573/sagemaker/DEMO-ModelMonitor/baselining/data
Baseline results uri: s3://sagemaker-us-east-2-325011675573/sagemaker/DEMO-ModelMonitor/baselining/results


In [3]:
training_data_file = open("test_data/training-dataset-with-header.csv", "rb")
s3_key = os.path.join(baseline_prefix, "data", "training-dataset-with-header.csv")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

### Criação de um job de linha de base com o conjunto de dados de treinamento

Agora que  temos os dados de treinamento prontos no Amazon S3, começaremos um job para `sugerir` restrições. 

`DefaultModelMonitor.suggest_baseline (..)` inicia um `ProcessingJob` usando um contêiner do Model Monitor fornecido pelo Amazon SageMaker para gerar as restrições.

`Ref. Tempo esperado de execução:  6min 16s`

In [68]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

In [69]:
%%time

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri + "/training-dataset-with-header.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)


CPU times: user 2.22 s, sys: 338 ms, total: 2.56 s
Wall time: 6min 15s


<sagemaker.processing.ProcessingJob at 0x7f48369dc6d0>

### Exploração das restrições e estatísticas geradas

In [70]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

Found Files:
sagemaker/DEMO-ModelMonitor/baselining/results/constraints.json
 sagemaker/DEMO-ModelMonitor/baselining/results/statistics.json


In [71]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.buckets,numerical_statistics.distribution.kll.sketch.parameters.c,numerical_statistics.distribution.kll.sketch.parameters.k,numerical_statistics.distribution.kll.sketch.data
0,Churn,Integral,2333,0,0.139306,325.0,0.346265,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0,..."
1,Account Length,Integral,2333,0,101.276897,236279.0,39.552442,1.0,243.0,"[{'lower_bound': 1.0, 'upper_bound': 25.2, 'co...",0.64,2048.0,"[[119.0, 100.0, 111.0, 181.0, 95.0, 104.0, 70...."
2,VMail Message,Integral,2333,0,8.214316,19164.0,13.776908,0.0,51.0,"[{'lower_bound': 0.0, 'upper_bound': 5.1, 'cou...",0.64,2048.0,"[[19.0, 0.0, 0.0, 40.0, 36.0, 0.0, 0.0, 24.0, ..."
3,Day Mins,Fractional,2333,0,180.226489,420468.4,53.987179,0.0,350.8,"[{'lower_bound': 0.0, 'upper_bound': 35.08, 'c...",0.64,2048.0,"[[178.1, 160.3, 197.1, 105.2, 283.1, 113.6, 23..."
4,Day Calls,Integral,2333,0,100.259323,233905.0,20.165008,0.0,165.0,"[{'lower_bound': 0.0, 'upper_bound': 16.5, 'co...",0.64,2048.0,"[[110.0, 138.0, 117.0, 61.0, 112.0, 87.0, 122...."
5,Eve Mins,Fractional,2333,0,200.050107,466716.9,50.015928,31.2,361.8,"[{'lower_bound': 31.2, 'upper_bound': 64.26, '...",0.64,2048.0,"[[212.8, 221.3, 227.8, 341.3, 286.2, 158.6, 29..."
6,Eve Calls,Integral,2333,0,99.573939,232306.0,19.675578,12.0,170.0,"[{'lower_bound': 12.0, 'upper_bound': 27.8, 'c...",0.64,2048.0,"[[100.0, 92.0, 128.0, 79.0, 86.0, 98.0, 112.0,..."
7,Night Mins,Fractional,2333,0,201.388598,469839.6,50.627961,23.2,395.0,"[{'lower_bound': 23.2, 'upper_bound': 60.37999...",0.64,2048.0,"[[226.3, 150.4, 214.0, 165.7, 261.7, 187.7, 20..."
8,Night Calls,Integral,2333,0,100.227175,233830.0,19.282029,42.0,175.0,"[{'lower_bound': 42.0, 'upper_bound': 55.3, 'c...",0.64,2048.0,"[[123.0, 120.0, 101.0, 97.0, 129.0, 87.0, 112...."
9,Intl Mins,Fractional,2333,0,10.253065,23920.4,2.778766,0.0,18.4,"[{'lower_bound': 0.0, 'upper_bound': 1.8399999...",0.64,2048.0,"[[10.0, 11.2, 9.3, 6.3, 11.3, 10.5, 0.0, 9.7, ..."


In [72]:
constraints_df = pd.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,Churn,Integral,1.0,True
1,Account Length,Integral,1.0,True
2,VMail Message,Integral,1.0,True
3,Day Mins,Fractional,1.0,True
4,Day Calls,Integral,1.0,True
5,Eve Mins,Fractional,1.0,True
6,Eve Calls,Integral,1.0,True
7,Night Mins,Fractional,1.0,True
8,Night Calls,Integral,1.0,True
9,Intl Mins,Fractional,1.0,True


## 2. Analisando os dados coletados para problemas de qualidade de dados

Depois de coletar os dados acima, analise e monitore os dados com cronogramas de monitoramento

### Criar uma cronograma

In [73]:
# Primeiro, copiamos alguns scripts de teste para o S3 para que eles possam ser usados para pré e pós-processamento
boto3.Session().resource("s3").Bucket(bucket).Object(code_prefix + "/preprocessor.py").upload_file(
    "preprocessor.py"
)
boto3.Session().resource("s3").Bucket(bucket).Object(code_prefix + "/postprocessor.py").upload_file(
    "postprocessor.py"
)

Você pode criar um cronograma de monitoramento de modelo para o terminal criado anteriormente. Use os recursos da linha de base (restrições e estatísticas) para comparar com o tráfego em tempo real.

In [74]:
%%time

from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

mon_schedule_name = "DEMO-xgb-churn-pred-model-monitor-schedule-" + strftime(
    "%Y-%m-%d-%H-%M-%S", gmtime()
)
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    # record_preprocessor_script=pre_processor_script,
    post_analytics_processor_script=s3_code_postprocessor_uri,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

The endpoint attribute has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


CPU times: user 66.7 ms, sys: 4.41 ms, total: 71.1 ms
Wall time: 478 ms


### Vamos agora a gerar tráfego artificial


A célula abaixo inicia um thread para enviar algum tráfego ao endpoint. Observe que você precisa parar o kernel para encerrar este thread. Se não houver tráfego, os trabalhos de monitoramento são marcados como `Falha`, pois não há dados para processar.

In [75]:
from threading import Thread
from time import sleep
import time

endpoint_name = predictor.endpoint
runtime_client = boto3.client("runtime.sagemaker")

# (just repeating code from above for convenience/ able to run this section independently)
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, "r") as f:
        for row in f:
            payload = row.rstrip("\n")
            response = runtime_client.invoke_endpoint(
                EndpointName=ep_name, ContentType="text/csv", Body=payload
            )
            response["Body"].read()
            time.sleep(1)


def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint_name, "test_data/test-dataset-input-cols.csv", runtime_client)


thread = Thread(target=invoke_endpoint_forever)
thread.start()

# Note que você precisa parar o kernel para parar as invocações

The endpoint attribute has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


### Avaliação do cronograma
Depois de iniciar, observe que MonitoringScheduleStatus muda para Scheduled.

In [76]:
desc_schedule_result = my_default_monitor.describe_schedule()
print("Schedule status: {}".format(desc_schedule_result["MonitoringScheduleStatus"]))

Schedule status: Pending


### Listar as execuções
A programação inicia os job nos intervalos especificados anteriormente. Aqui, você listaremos as cinco últimas execuções. Observe que, se iniciar isso depois de criar a programação horária, poderá encontrar as execuções vazias. Você pode ter que esperar até cruzar o limite das horas (em UTC) para ver o início das execuções. O código abaixo tem a lógica de espera.

Nota: Mesmo para uma programação de hora em hora, o Amazon SageMaker tem um período de buffer de 20 minutos para programar sua execução. Você pode ver sua execução começar em qualquer lugar de zero a cerca de 20 minutos a partir do limite da hora. Isso é esperado e feito para balanceamento de carga no back-end.

In [77]:
my_default_monitor.list_executions()

No executions found for schedule. monitoring_schedule_name: DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-16-20-03-19


[]

In [None]:
mon_executions = my_default_monitor.list_executions()

while len(mon_executions) == 0:
    print("Esperando que a 1ª execução aconteça ...")
    time.sleep(60)
    mon_executions = my_default_monitor.list_executions()

No executions found for schedule. monitoring_schedule_name: DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-16-20-03-19
Esperando que a 1ª execução aconteça ...
No executions found for schedule. monitoring_schedule_name: DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-16-20-03-19
Esperando que a 1ª execução aconteça ...
No executions found for schedule. monitoring_schedule_name: DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-16-20-03-19
Esperando que a 1ª execução aconteça ...
No executions found for schedule. monitoring_schedule_name: DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-16-20-03-19
Esperando que a 1ª execução aconteça ...
No executions found for schedule. monitoring_schedule_name: DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-16-20-03-19
Esperando que a 1ª execução aconteça ...
No executions found for schedule. monitoring_schedule_name: DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-16-20-03-19
Esperando que a 1ª execução aconteça ...
No executions fo

### Avaliação de uma execução específica (execução mais recente)
Na célula anterior, escolhemos a última execução planejada concluída ou com falha. Aqui estão os possíveis estados terminais e o que cada um deles significa:

* Completed - Significa que a execução do monitoramento foi concluída e nenhum problema foi encontrado no relatório de violações.
* CompletedWithViolations - Isso significa que a execução foi concluída, mas foram detectadas violações de restrição.
* Failed - A execução do monitoramento falhou, talvez devido a um erro do cliente (talvez premissas de função incorretas) ou problemas de infraestrutura. Um exame mais aprofundado de FailureReason e ExitMessage é necessário para identificar o que exatamente aconteceu.
* Stopped - o trabalho excedeu o tempo de execução máximo ou foi interrompido manualmente.

In [None]:
latest_execution = mon_executions[
    -1
]  # latest execution's index is -1, second to last is -2 and so on..
time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()["ProcessingJobStatus"]))
print("Latest execution result: {}".format(latest_execution.describe()["ExitMessage"]))

latest_job = latest_execution.describe()
if latest_job["ProcessingJobStatus"] != "Completed":
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

In [43]:
report_uri = latest_execution.output.destination
print("Report Uri: {}".format(report_uri))

Report Uri: s3://sagemaker-us-east-2-325011675573/sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-09-12-21-35-18/DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-12-22-57-21/2021/09/12/23


### Listagem dos relatórios gerados

In [44]:
from urllib.parse import urlparse

s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip("/")
print("Report bucket: {}".format(report_bucket))
print("Report key: {}".format(report_key))

s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Report Files:")
print("\n ".join(report_files))

Report bucket: sagemaker-us-east-2-325011675573
Report key: sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-09-12-21-35-18/DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-12-22-57-21/2021/09/12/23
Found Report Files:
sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-09-12-21-35-18/DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-12-22-57-21/2021/09/12/23/constraint_violations.json
 sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-09-12-21-35-18/DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-12-22-57-21/2021/09/12/23/constraints.json
 sagemaker/DEMO-ModelMonitor/reports/DEMO-xgb-churn-pred-model-monitor-2021-09-12-21-35-18/DEMO-xgb-churn-pred-model-monitor-schedule-2021-09-12-22-57-21/2021/09/12/23/statistics.json


### Relatório de violações

Se houver alguma violação em comparação com a linha de base, ela será listada aqui.

In [45]:
violations = my_default_monitor.latest_monitoring_constraint_violations()
pd.set_option("display.max_colwidth", -1)
constraints_df = pd.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

  
  This is separate from the ipykernel package so we can avoid doing imports until


Unnamed: 0,feature_name,constraint_check_type,description
0,State_NH,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.44444444444444% of data is Integral."
1,State_MT,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.44444444444444% of data is Integral."
2,State_IL,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.44444444444444% of data is Integral."
3,Churn,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 0.0% of data is Integral."
4,State_NE,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.44444444444444% of data is Integral."
5,State_DC,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.44444444444444% of data is Integral."
6,State_MD,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.44444444444444% of data is Integral."
7,State_MN,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.44444444444444% of data is Integral."
8,Int'l Plan_no,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.44444444444444% of data is Integral."
9,State_KY,data_type_check,"Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 99.44444444444444% of data is Integral."


### Outros comandos
Também podemos iniciar e interromper os cronogramas de monitoramento.

In [None]:
# my_default_monitor.stop_monitoring_schedule()
# my_default_monitor.start_monitoring_schedule()

## Exclusão dos recursos

Você pode manter seu endpoint em execução para continuar capturando dados. Se você não planeja coletar mais dados ou usar este endpoint posteriormente, você deve excluir o endpoint para evitar incorrer em cobranças adicionais. Observe que a exclusão de seu terminal não exclui os dados que foram capturados durante as chamadas do modelo. Esses dados persistem no Amazon S3 até que você os exclua.

Mas antes disso, você precisa primeiro excluir o cronograma.

In [None]:
my_default_monitor.delete_monitoring_schedule()
time.sleep(60)  # espera pela exclusão