In [70]:
import pandas as pd
import math
import json
import numpy as np
import random
from datetime import timedelta

import sagemaker
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.inputs import TrainingInput
import boto3

#### Configuracion

In [37]:
np.random.seed(42)
random.seed(42)

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
image_name = sagemaker.image_uris.retrieve("forecasting-deepar", region)
role='LabRole'
s3_bucket = 'mia-electiva3-dollar-predictor'
s3_data_path = "s3://{}/{}/data".format(s3_bucket, 'deepar-model')
s3_output_path = "s3://{}/{}/output".format(s3_bucket, 'deepar-model')

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: 1.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


### Cargar dataset

In [88]:
# Cargar el dataset de precios por hora del dolar
df = pd.read_csv('dollar_15min_price.csv', parse_dates=['fecha'])

In [89]:
df.head()

Unnamed: 0,fecha,precio
0,2023-02-17 13:15:00,4942.824392
1,2023-02-17 13:30:00,4936.291246
2,2023-02-17 13:45:00,4931.374835
3,2023-02-17 14:00:00,4931.28081
4,2023-02-17 14:15:00,4932.116995


### Entrenamiento - DeepAR
El modelo por defecto de SageMaker para series de tiempo es DeepAR, y teniendo en cuenta los malos resultados obtenidos con otros modelos, se va a realizar el ejercicio de entrenamiento y despligue con este modelo

In [123]:
# Dividir el dataset en datos de entrenamiento y test
train_size = math.ceil(len(df) * 0.7)
train_data = df[:train_size]
test_data = df[train_size:]

##### Convierto el dataset al formato json que acepta DeepAR

In [144]:
freq = '15min' # La serie tiene registros cada 15 min
prediction_length = 1 # se predice para el siguiete periodo
context_length = 12 # 

In [145]:
start_dataset = pd.Timestamp("2023-02-17 13:15:00", freq=freq)
end_training = pd.Timestamp("2023-03-22 17:30:00", freq=freq)

  start_dataset = pd.Timestamp("2023-02-17 13:15:00", freq=freq)
  end_training = pd.Timestamp("2023-03-22 17:30:00", freq=freq)


In [146]:
df_daily_groups_train_data = train_data.groupby(pd.Grouper(key='fecha', freq='D'))
df_daily_groups_test_data = test_data.groupby(pd.Grouper(key='fecha', freq='D'))

In [147]:
training_data = [
    {
        "start": str(group.iloc[0]['fecha']),
        "target": group['precio'].values.tolist(),
    }
    for name, group in df_daily_groups_train_data if len(group) > 0
]
print(training_data[:5])

[{'start': '2023-02-17 13:15:00', 'target': [4942.8243923076925, 4936.291245535715, 4931.37483539823, 4931.280809999999, 4932.116994776119, 4934.248586627908, 4935.192755, 4934.541332307693, 4931.933092647059, 4929.72935, 4926.492103278689, 4924.257728571428, 4923.086633333333, 4922.34411724138, 4921.625366666667, 4921.032043902439]}, {'start': '2023-02-20 13:15:00', 'target': [4891.17114375, 4896.2127, 4902.821188709677, 4908.80661875, 4910.851070588235, 4912.062606666667, 4912.2477, 4912.29805, 4912.352779999999, 4913.6919, 4913.7546, 4913.925657142858, 4914.092010526316, 4914.341209090909, 4914.4217, 4914.4267, 4914.4365, 4914.4387]}, {'start': '2023-02-21 13:00:00', 'target': [4920.99255, 4918.223595789474, 4923.14296, 4926.560226582279, 4929.801058108108, 4931.065254761905, 4931.688609459459, 4932.567686086957, 4934.754388709677, 4936.6460875, 4938.0391, 4939.871590769231, 4941.231171212121, 4943.047677777778, 4944.349084444444, 4945.869313793104, 4947.432927884615, 4948.429970422

In [148]:
testing_data = [
    {
        "start": str(group.iloc[0]['fecha']),
        "target": group['precio'].values.tolist(),
    }
    for name, group in df_daily_groups_test_data if len(group) > 0
]
print(testing_data[5:])

[{'start': '2023-03-30 13:00:00', 'target': [4596.001227777778, 4590.100217808219, 4588.54894074074, 4590.902668852459, 4598.702035365854, 4603.595745569621, 4609.294361764705, 4615.057370967742, 4619.613381481481, 4621.8191109375, 4622.793826530612, 4623.508959016393, 4624.295570526316, 4624.431183098592, 4624.452813043478, 4624.570473913043, 4624.841015384615, 4625.167286842106, 4625.739255, 4626.457698113208, 4626.919]}, {'start': '2023-03-31 13:00:00', 'target': [4621.521809375, 4631.085134883721, 4638.0944, 4641.4163475000005, 4642.24272962963, 4643.2688875, 4644.150120454546, 4645.2608635593215, 4647.431914285715, 4647.8581125, 4646.368723611111, 4645.338863492064, 4644.270390384616, 4643.658, 4643.436547058824, 4643.3061638888885, 4643.8560959459455, 4644.972011111111, 4645.480949999999, 4645.946115000001, 4646.3534]}, {'start': '2023-04-03 13:00:00', 'target': [4620.9185, 4614.185533333333, 4609.4488403225805, 4606.998575, 4605.45095, 4604.862125000001, 4604.704816393442, 4604.

In [149]:
# Guardo el archivo json para ser usado luego en el entranamiento
def write_dicts_to_file(path, data):
    with open(path, "wb") as fp:
        for d in data:
            fp.write(json.dumps(d).encode("utf-8"))
            fp.write("\n".encode("utf-8"))

In [150]:
%%time
write_dicts_to_file("train.json", training_data)
write_dicts_to_file("test.json", testing_data)

CPU times: user 2.46 ms, sys: 180 µs, total: 2.64 ms
Wall time: 2.39 ms


In [151]:
s3 = boto3.resource("s3")

def copy_to_s3(local_file, s3_path, override=True):
    assert s3_path.startswith("s3://")
    split = s3_path.split("/")
    bucket = split[2]
    path = "/".join(split[3:])
    buk = s3.Bucket(bucket)

    if len(list(buk.objects.filter(Prefix=path))) > 0:
        if not override:
            print(
                "File s3://{}/{} already exists.\nSet override to upload anyway.\n".format(
                    s3_bucket, s3_path
                )
            )
            return
        else:
            print("Overwriting existing file")
    with open(local_file, "rb") as data:
        print("Uploading file to {}".format(s3_path))
        buk.put_object(Key=path, Body=data)

In [152]:
%%time
copy_to_s3("train.json", s3_data_path + "/train/train.json")
copy_to_s3("test.json", s3_data_path + "/test/test.json")

Overwriting existing file
Uploading file to s3://mia-electiva3-dollar-predictor/deepar-model/data/train/train.json
Overwriting existing file
Uploading file to s3://mia-electiva3-dollar-predictor/deepar-model/data/test/test.json
CPU times: user 28.2 ms, sys: 4.35 ms, total: 32.6 ms
Wall time: 169 ms


##### Entrenamiento del Modelo

In [153]:
estimator = sagemaker.estimator.Estimator(
    image_uri=image_name,
    sagemaker_session=sagemaker_session,
    role=role,
    instance_count=1,
    instance_type="ml.m4.xlarge",
    base_job_name="dollar-predictor",
    output_path=s3_output_path,
)

In [154]:
hyperparameters = {
    "time_freq": freq,
    "epochs": "400",
    "early_stopping_patience": "40",
    "mini_batch_size": "64",
    "learning_rate": "5E-4",
    "context_length": str(context_length),
    "prediction_length": str(prediction_length),
}

In [155]:
estimator.set_hyperparameters(**hyperparameters)

In [156]:
%%time
data_channels = {"train": "{}/train/".format(s3_data_path), "test": "{}/test/".format(s3_data_path)}

CPU times: user 5 µs, sys: 1 µs, total: 6 µs
Wall time: 7.87 µs


In [157]:
estimator.fit(inputs=data_channels, wait=True)

INFO:sagemaker:Creating training-job with name: dollar-predictor-2023-04-08-01-02-47-418


2023-04-08 01:02:47 Starting - Starting the training job......
2023-04-08 01:03:31 Starting - Preparing the instances for training.........
2023-04-08 01:05:00 Downloading - Downloading input data...
2023-04-08 01:05:25 Training - Downloading the training image.........
2023-04-08 01:07:06 Training - Training image download completed. Training in progress..[34mArguments: train[0m
  if num_device is 1 and 'dist' not in kvstore:[0m
  from collections import Mapping, MutableMapping, Sequence[0m
[34m[04/08/2023 01:07:24 INFO 140448693614400] Reading default configuration from /opt/amazon/lib/python3.8/site-packages/algorithm/resources/default-input.json: {'_kvstore': 'auto', '_num_gpus': 'auto', '_num_kv_servers': 'auto', '_tuning_objective_metric': '', 'cardinality': 'auto', 'dropout_rate': '0.10', 'early_stopping_patience': '', 'embedding_dimension': '10', 'learning_rate': '0.001', 'likelihood': 'student-t', 'mini_batch_size': '128', 'num_cells': '40', 'num_dynamic_feat': 'auto', 'n

#### Crear endpoint y predictor

In [159]:
from sagemaker.serializers import IdentitySerializer

In [160]:
# Esta clase permiti recibir 
class DeepARPredictor(sagemaker.predictor.Predictor):
    def __init__(self, *args, **kwargs):
        super().__init__(
            *args,
            # serializer=JSONSerializer(),
            serializer=IdentitySerializer(content_type="application/json"),
            **kwargs,
        )

    def predict(
        self,
        ts,
        cat=None,
        dynamic_feat=None,
        num_samples=100,
        return_samples=False,
        quantiles=["0.1", "0.5", "0.9"],
    ):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.

        ts -- `pandas.Series` object, the time series to predict
        cat -- integer, the group associated to the time series (default: None)
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        return_samples -- boolean indicating whether to include samples in the response (default: False)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])

        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        prediction_time = ts.index[-1] + ts.index.freq
        quantiles = [str(q) for q in quantiles]
        req = self.__encode_request(ts, cat, dynamic_feat, num_samples, return_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req)
        return self.__decode_response(res, ts.index.freq, prediction_time, return_samples)

    def __encode_request(self, ts, cat, dynamic_feat, num_samples, return_samples, quantiles):
        instance = series_to_dict(
            ts, cat if cat is not None else None, dynamic_feat if dynamic_feat else None
        )

        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles", "samples"] if return_samples else ["quantiles"],
            "quantiles": quantiles,
        }

        http_request_data = {"instances": [instance], "configuration": configuration}

        return json.dumps(http_request_data).encode("utf-8")

    def __decode_response(self, response, freq, prediction_time, return_samples):
        # we only sent one time series so we only receive one in return
        # however, if possible one will pass multiple time series as predictions will then be faster
        predictions = json.loads(response.decode("utf-8"))["predictions"][0]
        prediction_length = len(next(iter(predictions["quantiles"].values())))
        prediction_index = pd.date_range(
            start=prediction_time, freq=freq, periods=prediction_length
        )
        if return_samples:
            dict_of_samples = {"sample_" + str(i): s for i, s in enumerate(predictions["samples"])}
        else:
            dict_of_samples = {}
        return pd.DataFrame(
            data={**predictions["quantiles"], **dict_of_samples}, index=prediction_index
        )

    def set_frequency(self, freq):
        self.freq = freq


def encode_target(ts):
    return [x if np.isfinite(x) else "NaN" for x in ts]


def series_to_dict(ts, cat=None, dynamic_feat=None):
    """Given a pandas.Series object, returns a dictionary encoding the time series.

    ts -- a pands.Series object with the target time series
    cat -- an integer indicating the time series category

    Return value: a dictionary
    """
    obj = {"start": str(ts.index[0]), "target": encode_target(ts)}
    if cat is not None:
        obj["cat"] = cat
    if dynamic_feat is not None:
        obj["dynamic_feat"] = dynamic_feat
    return obj

In [162]:
predictor = estimator.deploy(
    initial_instance_count=1, instance_type="ml.m4.xlarge", predictor_cls=DeepARPredictor
)

INFO:sagemaker:Creating model with name: dollar-predictor-2023-04-08-01-11-50-254
INFO:sagemaker:Creating endpoint-config with name dollar-predictor-2023-04-08-01-11-50-254
INFO:sagemaker:Creating endpoint with name dollar-predictor-2023-04-08-01-11-50-254


--------!

In [None]:
predictor.predict(ts=df.iloc[0]).head()

### Eliminar Endpoint y Model

In [None]:
predictor.delete_model()
predictor.delete_endpoint()