#### Cargue de librerias

In [1]:
from datetime import datetime, timedelta, timezone
import json
import os
import re
import boto3
from time import sleep
from threading import Thread

import pandas as pd
import numpy as np
import sagemaker
from sagemaker import get_execution_role, session, Session, image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer

from sagemaker.model import Model
from sagemaker.model_monitor import DataCaptureConfig

from sagemaker.predictor import Predictor

session = Session()
region = "us-east-1"
role = "Labrole"

#### Configuración del bucket

In [2]:
s3_bucket = "pronostico-dolar-04-2023"  # reemplazar con un bucket existente si es necesario

# seccion 1

# Setup S3 bucket
# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured

bucket = s3_bucket
print("Bucket:", bucket)
prefix = "deepar-dolar-notebook/ModelQualityMonitor-dolar"

##S3 prefixes
data_capture_prefix = f"{prefix}/datacapture"
s3_capture_upload_path = f"s3://{bucket}/{data_capture_prefix}"

ground_truth_upload_path = (
    f"s3://{bucket}/{prefix}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
)

reports_prefix = f"{prefix}/reports"
s3_report_path = f"s3://{bucket}/{reports_prefix}"


##Get the model monitor image
monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region)

print("Image URI:", monitor_image_uri)
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Report path: {s3_report_path}")

Bucket: pronostico-dolar-04-2023
Image URI: 156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer
Capture path: s3://pronostico-dolar-04-2023/deepar-dolar-notebook/ModelQualityMonitor-dolar/datacapture
Ground truth path: s3://pronostico-dolar-04-2023/deepar-dolar-notebook/ModelQualityMonitor-dolar/ground_truth_data/2023-04-10-19-34-20
Report path: s3://pronostico-dolar-04-2023/deepar-dolar-notebook/ModelQualityMonitor-dolar/reports


In [7]:
##Ubicación del modelo pre-entrenado en s3
model_url = "s3://pronostico-dolar-04-2023/deepar-dolar-notebook/output/deepar-dolar-2023-04-09-23-05-44-553/output/model.tar.gz"
model_url

's3://pronostico-dolar-04-2023/deepar-dolar-notebook/output/deepar-dolar-2023-04-09-23-05-44-553/output/model.tar.gz'

#### Crear entidad de modelo de SageMaker

In [8]:
model_name = f"Dolar-pred-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"

image_name = sagemaker.image_uris.retrieve("forecasting-deepar", region=region)

model = Model(image_uri=image_name, model_data=model_url, role=role, sagemaker_session=session)

#### Despliegue del modelo con la captura de datos habilitada

In [24]:
endpoint_name = f"Dolar-model-quality-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("EndpointName =", endpoint_name)

data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.large",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
    )

EndpointName = Dolar-model-quality-monitor-2023-04-10-1627
-----!

#### Creación del objeto SageMaker Predictor

In [3]:
from sagemaker.serializers import IdentitySerializer

Esta clase (Creada en ejemplo de AWS para implementación del algoritmo) permite realizar solicitudes utilizando objetos pandas.Series en lugar de cadenas JSON sin procesar.

In [4]:
# usamos una frecuencia de 1 día para la serie de tiempo
freq = "1D"

class DeepARPredictor(Predictor):
    def __init__(self, *args, **kwargs):
        super().__init__(
            *args,
            # serializer=JSONSerializer(),
            serializer=IdentitySerializer(content_type="application/json"),
            **kwargs,
        )

    def predict(
        self,
        ts,
        cat=None,
        dynamic_feat=None,
        num_samples=100,
        return_samples=False,
        quantiles=["0.1", "0.5", "0.9"],
    ):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.

        ts -- `pandas.Series` object, the time series to predict
        cat -- integer, the group associated to the time series (default: None)
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        return_samples -- boolean indicating whether to include samples in the response (default: False)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])

        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        ts.index.freq = 'D' # Establecer una frecuencia diaria
        prediction_time = ts.index[-1] + ts.index.freq
        quantiles = [str(q) for q in quantiles]
        req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, ts.index.freq, prediction_time, return_samples)

    def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):
        instance = series_to_dict(
            ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None
        )

        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles", "samples"] if return_samples else ["quantiles"],
            "quantiles": quantiles,
        }

        http_request_data = {"instances": [instance], "configuration": configuration}

        return json.dumps(http_request_data).encode("utf-8")

    def __decode_response(self, response, freq, prediction_time, return_samples):
        # we only sent one time series so we only receive one in return
        # however, if possible one will pass multiple time series as predictions will then be faster
        predictions = json.loads(response.decode("utf-8"))["predictions"][0]
        prediction_length = len(next(iter(predictions["quantiles"].values())))
        prediction_index = pd.date_range(
            start=prediction_time, freq=freq, periods=prediction_length
        )
        if return_samples:
            dict_of_samples = {"sample_" + str(i): s for i, s in enumerate(predictions["samples"])}
        else:
            dict_of_samples = {}
        return pd.DataFrame(
            data={**predictions["quantiles"], **dict_of_samples}, index=prediction_index
        )

    def set_frequency(self, freq):
        self.freq = freq


def encode_target(ts):
    return [x if np.isfinite(x) else "NaN" for x in ts]


def series_to_dict(ts, cat=None, dynamic_feat=None):
    """Given a pandas.Series object, returns a dictionary encoding the time series.

    ts -- a pands.Series object with the target time series
    cat -- an integer indicating the time series category

    Return value: a dictionary
    """
    obj = {"start": str(ts.index[0]), "target": encode_target(ts)}
    if cat is not None:
        obj["cat"] = cat
    if dynamic_feat is not None:
        obj["dynamic_feat"] = dynamic_feat
    return obj

In [10]:
predictor = DeepARPredictor(endpoint_name="Dolar-model-quality-monitor-2023-04-10-1627")

#### Prueba de predicción con monitoreo del modelo activado

In [51]:
# Cargue de datos de serie de tiempo

FILE_NAME = "Serie_historica_dolar_Deep_AR.csv"

data = pd.read_csv(FILE_NAME, index_col=0, parse_dates=True, dayfirst=True)

In [52]:
timeseries = data.iloc[:, 0]

In [60]:
predictor.predict(ts=timeseries, quantiles=[0.10, 0.5, 0.90]).head()

Unnamed: 0,0.1,0.5,0.9
2023-03-28,4707.32959,4785.177246,4850.517578


#### Ejecutar predicciones utilizando el conjunto de datos de validación

In [22]:
validate_dataset = "validation_with_predictions.csv"

In [36]:
limit = 13  # Need at least 200 samples to compute standard deviations
i = 0
with open(f"{validate_dataset}", "w") as baseline_file:
    baseline_file.write("date,prediction,label\n")  # encabezado
    for x in range(1, 15):
        timeseries = data.iloc[:-x, 0]
        y_hat = predictor.predict(ts=timeseries, quantiles=[0.5])
        date = y_hat.index[0].strftime('%Y-%m-%d')
        prediction = y_hat.values[0][0]
        label = data.loc[date].values[0]
        baseline_file.write(f"{date},{prediction},{label}\n")
        i += 1
        if i > limit:
            break
        print(".", end="", flush=True)
        sleep(0.5)
print()
print("listo!")

.............
listo!


In [37]:
!head validation_with_predictions.csv

date,prediction,label
2023-04-10,4618.9892578125,4570.91
2023-04-09,4629.6708984375,4570.91
2023-04-08,4642.58984375,4570.91
2023-04-07,4647.0,4570.91
2023-04-06,4657.9838867188,4570.91
2023-04-05,4677.1215820312,4587.31
2023-04-04,4683.0004882812,4603.0
2023-04-03,4691.2099609375,4646.08
2023-04-02,4724.9819335938,4646.08


#### Cargue las predicciones como un conjunto de datos de referencia

In [38]:
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = f"s3://{bucket}/{baseline_data_prefix}"
baseline_results_uri = f"s3://{bucket}/{baseline_results_prefix}"
print(f"Baseline data uri: {baseline_data_uri}")
print(f"Baseline results uri: {baseline_results_uri}")

Baseline data uri: s3://pronostico-dolar-04-2023/deepar-dolar-notebook/ModelQualityMonitor-dolar/baselining/data
Baseline results uri: s3://pronostico-dolar-04-2023/deepar-dolar-notebook/ModelQualityMonitor-dolar/baselining/results


In [40]:
baseline_dataset_uri = S3Uploader.upload(f"{validate_dataset}", baseline_data_uri)
baseline_dataset_uri

's3://pronostico-dolar-04-2023/deepar-dolar-notebook/ModelQualityMonitor-dolar/baselining/data/validation_with_predictions.csv'

#### Baselining job con predicciones sobre datos de validación

In [41]:
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker.model_monitor import EndpointInput
from sagemaker.model_monitor.dataset_format import DatasetFormat

In [44]:
# Create the model quality monitoring object
Dolar_model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m4.xlarge",
    volume_size_in_gb=10,
    max_runtime_in_seconds=1800,
    sagemaker_session=session,
)

In [45]:
# Name of the model quality baseline job
baseline_job_name = f"Dolar-model-baseline-job-{datetime.utcnow():%Y-%m-%d-%H%M}"

In [46]:
# Execute the baseline suggestion job.
# You will specify problem type, and provide other required attributes.
job = Dolar_model_quality_monitor.suggest_baseline(
    job_name=baseline_job_name,
    baseline_dataset=baseline_dataset_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    problem_type="Regression",
    inference_attribute="prediction",
    ground_truth_attribute="label",
)
job.wait(logs=False)

INFO:sagemaker:Creating processing-job with name Dolar-model-baseline-job-2023-04-10-2045


....................................................................................!

#### Vialización del resultado del baseline_job

In [48]:
baseline_job = Dolar_model_quality_monitor.latest_baselining_job

In [49]:
Regression_metrics = baseline_job.baseline_statistics().body_dict["regression_metrics"]
pd.json_normalize(Regression_metrics).T


Unnamed: 0,0
mae.value,80.027492
mae.standard_deviation,
mse.value,6858.235258
mse.standard_deviation,
rmse.value,82.814463
rmse.standard_deviation,
r2.value,-3.601574
r2.standard_deviation,


In [50]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["regression_constraints"]).T

Unnamed: 0,threshold,comparison_operator
mae,80.027492,GreaterThanThreshold
mse,6858.235258,GreaterThanThreshold
rmse,82.814463,GreaterThanThreshold
r2,-3.601574,LessThanThreshold
