In [1]:
# Add mlapi package to sys path

MLAPI_PATH = '/home/jovyan/mlapi'

import sys

if MLAPI_PATH not in sys.path:
    sys.path.append(MLAPI_PATH)

In [2]:
import numpy as np

In [3]:
import requests

In [4]:
%load_ext autoreload
%autoreload 2

# Json request

In [5]:
# Dummy request.

forecaster_data = {
    'name': 'sample',
    'algorithm': 'seq2seq',
    'forecast_horizon': 10,
    'perform_hpo': False,
    'dataset_name': 'sample',
}

# ParquetLoader

In [6]:
from mlapi.celery_app.ml.datasources.parquet_loader import ParquetLoader, S3ParquetLoader
from mlapi.celery_app.client_args import ClientArgs
#from mlapi.main import Forecaster
#from minio import Minio

In [7]:
# Client args (authentication).

client_args = {
    "s3_endpoint": 'minio:9000',
    "access_key": 'user',
    "secret_key": 'password',
    'secure': False
}
client_args = ClientArgs(**client_args)

In [8]:
# Parquet getter: getter for parquet datasets stored in minio buckets.
s3_parquet_loader = S3ParquetLoader(client_args)

In [11]:
args = ['sample_group', 'X_train']
t = s3_parquet_loader.load_many('datasets', *args)['target']

# ParquetMerger

In [24]:
from mlapi.ml._parquet_mergers import TimeSeriesMerger

In [25]:
datasets = parquet_loader.resolve_datasets()
merger = TimeSeriesMerger(**datasets)

In [26]:
X = merger.merge()

In [30]:
merger.get_names()

{'target': ['target'],
 'time_varying_known_reals': [],
 'time_varying_unknown_reals': [],
 'static_categoricals': []}

# PreprocessorCreator

In [13]:
from mlapi.ml._preprocessor import PreprocessorCreator

In [14]:
group_ids = merger.get_group_ids()
timestamp = 'timestamp'
preprocessor_creator = PreprocessorCreator(group_ids, timestamp)
preprocessor = preprocessor_creator.create_preprocessor()

# EstimatorCreator

In [31]:
from mlapi.ml._estimator import EstimatorCreator

In [44]:
estimator_creator = EstimatorCreator(predictor)

In [45]:
features_time_dependence = merger.get_names()

In [47]:
estimator = estimator_creator.create_estimator(group_ids, **features_time_dependence)

# Forecasting task

In [27]:
from mlapi.celery_app.ml.datasources.parquet_loader import S3ParquetLoader, ParquetLoader
from mlapi.celery_app.ml.estimator import EstimatorCreator
from mlapi.celery_app.ml.parquet_resolver import TimeSeriesResolver
from mlapi.celery_app.ml.preprocessor import PreprocessorCreator
from mlapi.celery_app.client_args import ClientArgs
from mlapi.celery_app.ml.utils.data import AttrDict
from mlapi.celery_app.ml.utils.pandas import duplicate_pandas_column
from sklearn.pipeline import Pipeline

In [8]:
from pydantic import BaseModel
from typing import Optional


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None
    access_key: Optional[str] = None
    secret_key: Optional[str] = None
    s3_endpoint: Optional[str] = None
    

class Forecaster(BaseModel):
    task_name: str
    dataset_group_name: str
    dataset_name: str
    algorithm: str
    forecast_horizon: int
    perform_hpo: bool

In [17]:
class CreateForecasterTask:
    """Loads, preprocess and fits data from s3.
    """

    METRICS = ['train_loss']
    DATASETS_BUCKET = 'datasets'

    def run(self, forecaster_data, user_data):
        forecaster = AttrDict(forecaster_data)
        user = AttrDict(user_data)

        # Load data.
        resolved = self._resolve_dataset(forecaster, user)
        X = resolved['X']
        group_ids = resolved['group_ids']
        timestamp = resolved['timestamp']
        target = 'target'

        # Create both preprocessor and estimator.
        preprocessor = self._create_preprocessor(group_ids, target, timestamp)
        estimator = self._create_estimator(forecaster, group_ids)

        # Put everything inside a sklearn Pipeline and fit.
        pipeline = self._fit_pipeline(X, preprocessor, estimator)

        # Save metrics
        logger = MlFlowLogger()
        for metric in self.METRICS:
            estimator = pipeline['estimator']
            history = get_history(estimator, metric)
            logger.save_metric(name=metric, values=history)

        # Save model the model with a signature that defines the schema of
        # the model's inputs and outputs. When the model is deployed, this
        # signature will be used to validate inputs.
        wrapped_pipeline = wrap_pipeline(pipeline)
        signature = infer_signature(X, wrapped_pipeline.predict(None, X))
        logger.save_python_model(
            name='pipeline', python_model=wrapped_pipeline,
            signature=signature,
            artifacts=self._create_inference_artifacts(forecaster))

        # Log all
        logger.log_all()

    def _fit_pipeline(self, X, preprocessor, estimator):
        """Collects both `preprocessor` and `estimator` into a single
        :class:`sklearn.pipeline.Pipeline` object and fits X.
        """
        steps = [('preprocessor', preprocessor), ('estimator', estimator)]
        pipeline = Pipeline(steps)
        pipeline.fit(X)
        return pipeline

    def _create_estimator(self, forecaster, group_ids, callbacks=None):
        """Creates time series estimator.
        """
        estimator_creator = EstimatorCreator(forecaster)
        target = 'target'
        time_varying_unknown_reals = ['target']
        time_varying_known_reals = []
        static_categoricals = []

        estimator = estimator_creator.create_estimator(
            group_ids, target, time_varying_known_reals,
            time_varying_unknown_reals, static_categoricals,
            callbacks=callbacks, time_idx='time_index')
        return estimator

    def _create_preprocessor(self, group_ids, target, timestamp):
        """Creates sklearn preprocessor.
        """
        preprocessor_creator = PreprocessorCreator(
            group_ids, target, timestamp)
        preprocessor = preprocessor_creator.create_preprocessor()
        return preprocessor

    def _resolve_dataset(self, forecaster, user):
        """Calls :meth:`resolve` from :class:`TimeSeriesResolver`.

        Returns
        -------
        dict : str -> obj
        """
        client_args = {
            "s3_endpoint": user.s3_endpoint,
            "access_key": user.access_key,
            "secret_key": user.secret_key,
            'secure': False
        }
        client_args = ClientArgs(**client_args)
        parquet_loader = S3ParquetLoader(client_args)

        # Parquet loading.
        bucket_name = self.DATASETS_BUCKET
        prefix = [forecaster.dataset_group_name, forecaster.dataset_name]
        datasets = parquet_loader.load_many(bucket_name, *prefix)

        timeseries_resolver = TimeSeriesResolver(**datasets)
        return timeseries_resolver.resolve()

    def _create_inference_artifacts(self, forecaster):
        bucket = 'datasets'
        dataset_group_name = forecaster.dataset_group_name
        inference_path = "s3://{}/{}/inference/".format(bucket,
                                                        dataset_group_name)
        return {"inference": inference_path}

In [18]:
user_kwargs = {
    "username": "johndoe",
    "full_name": "John Doe",
    "email": "johndoe@example.com",
    "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
    "access_key": "johndoe",
    "secret_key": "password",
    "s3_endpoint": "minio:9000"
}
user = User(**user_kwargs)

forecaster_kwargs = {
    'task_name': 'seq2seq_training',
    'dataset_group_name': 'sample_group',
    'dataset_name': 'X_train',
    'algorithm': 'seq2seq',
    'forecast_horizon': 10,
    'perform_hpo': False,
}
forecaster = Forecaster(**forecaster_kwargs)

In [28]:
parquet_loader = ParquetLoader()

In [31]:
inference_dataset_name = 'X_test'
abs_path = '/tmp/tmpzxx64kx2/artifacts/.'
parquet_partition = abs_path.replace('.', inference_dataset_name)
parquet_loader.load_many(parquet_partition)

{'target': <pyarrow.parquet._ParquetDatasetV2 at 0xffff2a029fd0>}