In [None]:
#|default_exp distributed.fugue

# Fugue

> Distributed fugue backend

In [None]:
#|export
import copy
from collections import namedtuple
from typing import Any, Callable, Iterable, Iterator, List, Optional

import cloudpickle
try:
    import dask.dataframe as dd
    DASK_INSTALLED = True
except ModuleNotFoundError:
    DASK_INSTALLED = False
import fugue
import fugue.api as fa
import pandas as pd
try:
    from pyspark.ml.feature import VectorAssembler
    from pyspark.sql import DataFrame as SparkDataFrame
    SPARK_INSTALLED = True
except ModuleNotFoundError:
    SPARK_INSTALLED = False
from sklearn.base import clone

from mlforecast.core import (
    DateFeature,
    Differences,
    Freq,
    LagTransforms,
    Lags,
    TimeSeries,
    _name_models,
)

In [None]:
#|exporti
WindowInfo = namedtuple('WindowInfo', ['n_windows', 'window_size', 'step_size', 'i_window'])

In [None]:
#|export
class FugueMLForecast:
    """Multi backend distributed pipeline"""
    
    def __init__(
        self,
        models,
        freq: Optional[Freq] = None,
        lags: Optional[Lags] = None,
        lag_transforms: Optional[LagTransforms] = None,
        date_features: Optional[Iterable[DateFeature]] = None,
        differences: Optional[Differences] = None,
        num_threads: int = 1,
        engine = None,
    ):
        """Create distributed forecast object

        Parameters
        ----------
        models : regressor or list of regressors
            Models that will be trained and used to compute the forecasts.
        freq : str or int, optional (default=None)
            Pandas offset alias, e.g. 'D', 'W-THU' or integer denoting the frequency of the series.
        lags : list of int, optional (default=None)
            Lags of the target to use as features.
        lag_transforms : dict of int to list of functions, optional (default=None)
            Mapping of target lags to their transformations.
        date_features : list of str or callable, optional (default=None)
            Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input.
        differences : list of int, optional (default=None)
            Differences to take of the target before computing the features. These are restored at the forecasting step.
        num_threads : int (default=1)
            Number of threads to use when computing the features.
        engine : fugue execution engine, optional (default=None)
            Dask Client, Spark Session, etc to use for the distributed computation.
            If None will use default depending on input type.
        """        
        if not isinstance(models, dict) and not isinstance(models, list):
            models = [models]
        if isinstance(models, list):
            model_names = _name_models([m.__class__.__name__ for m in models])
            models_with_names = dict(zip(model_names, models))
        else:
            models_with_names = models
        self.models = models_with_names
        self._base_ts = TimeSeries(
            freq, lags, lag_transforms, date_features, differences, num_threads
        )
        self.engine = engine
        
    def __repr__(self) -> str:
        return (
            f'{self.__class__.__name__}(models=[{", ".join(self.models.keys())}], '
            f"freq={self._base_ts.freq}, "
            f"lag_features={list(self._base_ts.transforms.keys())}, "
            f"date_features={self._base_ts.date_features}, "
            f"num_threads={self._base_ts.num_threads}, "
            f"engine={self.engine})"
        )

    @staticmethod
    def _preprocess_partition(
        part: pd.DataFrame,
        base_ts: TimeSeries,        
        id_col: str,
        time_col: str,
        target_col: str,
        static_features: Optional[List[str]] = None,
        dropna: bool = True,
        keep_last_n: Optional[int] = None,
        window_info: Optional[WindowInfo] = None,
    ) -> List[List[Any]]:
        ts = copy.deepcopy(base_ts)
        if window_info is None:
            train = part
            valid = None
        else:
            n_windows, window_size, step_size, i_window = window_info
            if step_size is None:
                step_size = window_size
            test_size = window_size + step_size * (n_windows - 1)
            offset = test_size - i_window * step_size
            max_dates = part.groupby(id_col)[time_col].transform('max')
            train_ends = max_dates - offset * base_ts.freq
            valid_ends = train_ends + window_size * base_ts.freq
            train_mask = part[time_col].le(train_ends)
            valid_mask = part[time_col].gt(train_ends) & part[time_col].le(valid_ends)
            train = part[train_mask]
            valid_keep_cols = part.columns
            if static_features is not None:
                valid_keep_cols.drop(static_features)
            valid = part.loc[valid_mask, valid_keep_cols]
        transformed = ts.fit_transform(
            train,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            static_features=static_features,
            dropna=dropna,
            keep_last_n=keep_last_n,
        )
        return [[cloudpickle.dumps(ts), cloudpickle.dumps(transformed), cloudpickle.dumps(valid)]]

    @staticmethod
    def _retrieve_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]:
        for _, serialized_train, _ in items:
            yield cloudpickle.loads(serialized_train)

    def _preprocess(
        self,
        data: fugue.AnyDataFrame,
        id_col: str,
        time_col: str,
        target_col: str,
        static_features: Optional[List[str]] = None,
        dropna: bool = True,
        keep_last_n: Optional[int] = None,
        window_info: Optional[WindowInfo] = None,
    ) -> fugue.AnyDataFrame:
        self.id_col = id_col
        self.time_col = time_col
        self.target_col = target_col
        self.partition_results = fa.transform(
            data,
            FugueMLForecast._preprocess_partition,
            params={
                'base_ts': self._base_ts,
                'id_col': id_col,
                'time_col': time_col,
                'target_col': target_col,
                'static_features': static_features,
                'dropna': dropna,
                'keep_last_n': keep_last_n,
                'window_info': window_info,
            },
            schema='ts:binary,train:binary,valid:binary',
            engine=self.engine,
            as_fugue=True,
        )
        base_schema = str(fa.get_schema(data))
        features_schema = ','.join(f'{feat}:double' for feat in self._base_ts.features)
        res = fa.transform(
            self.partition_results,
            FugueMLForecast._retrieve_df,
            schema=f'{base_schema},{features_schema}',
            engine=self.engine,
        )
        return fa.get_native_as_df(res)
    
    def preprocess(
        self,
        data: fugue.AnyDataFrame,
        id_col: str,
        time_col: str,
        target_col: str,
        static_features: Optional[List[str]] = None,
        dropna: bool = True,
        keep_last_n: Optional[int] = None,
    ) -> fugue.AnyDataFrame:
        """Add the features to `data`.

        Parameters
        ----------
        data : dask or spark DataFrame.
            Series data in long format.
        id_col : str
            Column that identifies each serie. If 'index' then the index is used.
        time_col : str
            Column that identifies each timestep, its values can be timestamps or integers.
        target_col : str
            Column that contains the target.
        static_features : list of str, optional (default=None)
            Names of the features that are static and will be repeated when forecasting.
        dropna : bool (default=True)
            Drop rows with missing values produced by the transformations.
        keep_last_n : int, optional (default=None)
            Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.

        Returns
        -------
        result : same type as input
            data with added features.
        """        
        return self._preprocess(
            data,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            static_features=static_features,
            dropna=dropna,
            keep_last_n=keep_last_n,
        )
    
    def _fit(
        self,
        data: fugue.AnyDataFrame,
        id_col: str,
        time_col: str,
        target_col: str,
        static_features: Optional[List[str]] = None,
        dropna: bool = True,
        keep_last_n: Optional[int] = None,
        window_info: Optional[WindowInfo] = None,
    ) -> 'FugueMLForecast':
        prep = self._preprocess(
            data,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            static_features=static_features,
            dropna=dropna,
            keep_last_n=keep_last_n,
            window_info=window_info,
        )
        features = [x for x in prep.columns if x not in {id_col, time_col, target_col}]
        self.models_ = {}
        if SPARK_INSTALLED and isinstance(data, SparkDataFrame):
            try:
                import lightgbm as lgb
                from synapse.ml.lightgbm import LightGBMRegressor as SynapseLGBMRegressor
                LGBM_INSTALLED = True
            except ModuleNotFoundError:
                LGBM_INSTALLED = False
            try:
                import xgboost as xgb
                from xgboost.spark import SparkXGBRegressor  # type: ignore
                XGB_INSTALLED = True
            except ModuleNotFoundError:
                XGB_INSTALLED = False

            featurizer = VectorAssembler(inputCols=features, outputCol="features")
            train_data = featurizer.transform(prep)[target_col, "features"]
            for name, model in self.models.items():
                if LGBM_INSTALLED and isinstance(model, SynapseLGBMRegressor):
                    trained_model = model.setLabelCol(target_col).fit(train_data)
                    model_str = trained_model.getNativeModel()
                    local_model = lgb.Booster(model_str=model_str)                    
                elif XGB_INSTALLED and isinstance(model, SparkXGBRegressor):
                    model.setParams(label_col=target_col)
                    trained_model = model.fit(train_data)
                    model_str = trained_model.get_booster().save_raw('ubj')
                    local_model = xgb.XGBRegressor()
                    local_model.load_model(model_str)
                else:
                    raise ValueError('Only LightGBMRegressor from SynapseML and SparkXGBRegressor are supported in spark.')
                self.models_[name] = local_model
        elif DASK_INSTALLED and isinstance(data, dd.DataFrame):
            try:
                from mlforecast.distributed.models.lgb import LGBMForecast
                LGBM_INSTALLED = True
            except ModuleNotFoundError:
                LGBM_INSTALLED = False
            try:
                from mlforecast.distributed.models.xgb import XGBForecast
                XGB_INSTALLED = True
            except ModuleNotFoundError:
                XGB_INSTALLED = False
            X, y = prep[features], prep[target_col]
            for name, model in self.models.items():
                if not ((LGBM_INSTALLED and isinstance(model, LGBMForecast)) or (XGB_INSTALLED and isinstance(model, XGBForecast))):
                    raise ValueError('Models must be either LGBMForecast or XGBForecast with dask backend.')
                self.models_[name] = clone(model).fit(X, y).model_
        else:
            raise NotImplementedError('Only spark and dask engines are supported.')
        return self
    
    def fit(
        self,
        data: fugue.AnyDataFrame,
        id_col: str,
        time_col: str,
        target_col: str,
        static_features: Optional[List[str]] = None,
        dropna: bool = True,
        keep_last_n: Optional[int] = None,        
    ) -> 'FugueMLForecast':
        """Apply the feature engineering and train the models.

        Parameters
        ----------
        data : dask or spark DataFrame
            Series data in long format.
        id_col : str
            Column that identifies each serie. If 'index' then the index is used.
        time_col : str
            Column that identifies each timestep, its values can be timestamps or integers.
        target_col : str
            Column that contains the target.
        static_features : list of str, optional (default=None)
            Names of the features that are static and will be repeated when forecasting.
        dropna : bool (default=True)
            Drop rows with missing values produced by the transformations.
        keep_last_n : int, optional (default=None)
            Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.

        Returns
        -------
        self : FugueMLForecast
            Forecast object with series values and trained models.
        """        
        return self._fit(
            data,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            static_features=static_features,
            dropna=dropna,
            keep_last_n=keep_last_n,
        )

    @staticmethod
    def _predict(
        items: List[List[Any]],
        models,        
        horizon,
        dynamic_dfs,
        before_predict_callback,
        after_predict_callback,
    ) -> Iterable[pd.DataFrame]:
        for serialized_ts, _, serialized_valid in items:
            valid = cloudpickle.loads(serialized_valid)
            ts = cloudpickle.loads(serialized_ts)
            if valid is not None:
                dynamic_features = valid.columns.drop(
                    [ts.id_col, ts.time_col, ts.target_col]
                )
                if not dynamic_features.empty:
                    dynamic_dfs = [valid.drop(columns=ts.target_col)]
            res = ts.predict(
                models=models,
                horizon=horizon,
                dynamic_dfs=dynamic_dfs,
                before_predict_callback=before_predict_callback,
                after_predict_callback=after_predict_callback,
            ).reset_index()
            if valid is not None:
                res = res.merge(valid, how='left')
            yield res

    def predict(
        self,
        horizon: int,
        dynamic_dfs: Optional[List[pd.DataFrame]] = None,
        before_predict_callback: Optional[Callable] = None,
        after_predict_callback: Optional[Callable] = None,
    ) -> fugue.AnyDataFrame:
        """Compute the predictions for the next `horizon` steps.

        Parameters
        ----------
        horizon : int
            Number of periods to predict.
        dynamic_dfs : list of pandas DataFrame, optional (default=None)
            Future values of the dynamic features, e.g. prices.
        before_predict_callback : callable, optional (default=None)
            Function to call on the features before computing the predictions.
                This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
                The series identifier is on the index.
        after_predict_callback : callable, optional (default=None)
            Function to call on the predictions before updating the targets.
                This function will take a pandas Series with the predictions and should return another one with the same structure.
                The series identifier is on the index.

        Returns
        -------
        result : dask or spark DataFrame
            Predictions for each serie and timestep, with one column per model.
        """        
        model_names = self.models.keys()
        models_schema = ','.join(f'{model_name}:double' for model_name in model_names)
        schema = f'{self.id_col}:string,{self.time_col}:datetime,' + models_schema
        if getattr(self, '_n_windows', None) is not None:
            schema += f',{self.target_col}:double'
        res = fa.transform(
            self.partition_results,
            FugueMLForecast._predict,
            params={
                'models': self.models_,
                'horizon': horizon,
                'dynamic_dfs': dynamic_dfs,
                'before_predict_callback': before_predict_callback,
                'after_predict_callback': after_predict_callback,
            },
            schema=schema,
            engine=self.engine,
        )
        return fa.get_native_as_df(res)

    def cross_validation(
        self,
        data: fugue.AnyDataFrame,
        n_windows: int,
        window_size: int,
        id_col: str,
        time_col: str,
        target_col: str,
        step_size: Optional[int] = None, 
        static_features: Optional[List[str]] = None,
        dropna: bool = True,
        keep_last_n: Optional[int] = None,
        before_predict_callback: Optional[Callable] = None,
        after_predict_callback: Optional[Callable] = None,
    ) -> Iterator[fugue.AnyDataFrame]:
        """Perform time series cross validation.
        Creates `n_windows` splits where each window has `window_size` test periods,
        trains the models, computes the predictions and merges the actuals.

        Parameters
        ----------
        data : dask DataFrame
            Series data in long format.
        n_windows : int
            Number of windows to evaluate.
        window_size : int
            Number of test periods in each window.
        id_col : str
            Column that identifies each serie. If 'index' then the index is used.
        time_col : str
            Column that identifies each timestep, its values can be timestamps or integers.
        target_col : str
            Column that contains the target.
        step_size : int, optional (default=None)
            Step size between each cross validation window. If None it will be equal to `window_size`.
        static_features : list of str, optional (default=None)
            Names of the features that are static and will be repeated when forecasting.
        dropna : bool (default=True)
            Drop rows with missing values produced by the transformations.
        keep_last_n : int, optional (default=None)
            Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
        before_predict_callback : callable, optional (default=None)
            Function to call on the features before computing the predictions.
                This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
                The series identifier is on the index.
        after_predict_callback : callable, optional (default=None)
            Function to call on the predictions before updating the targets.
                This function will take a pandas Series with the predictions and should return another one with the same structure.
                The series identifier is on the index.

        Returns
        -------
        result : dask or spark DataFrame
            Predictions for each window with the series id, timestamp, target value and predictions from each model.
        """            
        self.cv_models_ = []
        for i in range(n_windows):
            window_info = WindowInfo(n_windows, window_size, step_size, i)
            self._fit(
                data,
                id_col=id_col,
                time_col=time_col,
                target_col=target_col,
                static_features=static_features,
                dropna=dropna,
                keep_last_n=keep_last_n,
                window_info=window_info,
            )
            self.cv_models_.append(self.models_)
            preds = self.predict(
                window_size,
                before_predict_callback=before_predict_callback,
                after_predict_callback=after_predict_callback,
            )
            yield preds

In [None]:
import warnings

from window_ops.expanding import expanding_mean
from mlforecast.utils import generate_daily_series, generate_prices_for_series

In [None]:
warnings.simplefilter('ignore', FutureWarning)

In [None]:
series = (
    generate_daily_series(100, n_static_features=2, static_as_categorical=False, equal_ends=True)
    .reset_index()
    .rename(columns={'static_1': 'product_id'})
)
prices = generate_prices_for_series(series, horizon=14)
series = series.merge(prices, on=['product_id', 'ds'], how='left')
series['unique_id'] = series['unique_id'].astype(str)
series

Unnamed: 0,unique_id,ds,y,static_0,product_id,price
0,id_00,2000-10-05,3.981198,79,45,0.856289
1,id_00,2000-10-06,10.327401,79,45,0.710628
2,id_00,2000-10-07,17.657474,79,45,0.418277
3,id_00,2000-10-08,25.898790,79,45,0.363121
4,id_00,2000-10-09,34.494040,79,45,0.201515
...,...,...,...,...,...,...
26998,id_99,2001-05-10,45.340051,69,35,0.458545
26999,id_99,2001-05-11,3.022948,69,35,0.881891
27000,id_99,2001-05-12,10.131371,69,35,0.306065
27001,id_99,2001-05-13,14.572434,69,35,0.932251


## Spark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = (
    SparkSession.builder.appName("MyApp")
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.10.2")
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
    .getOrCreate()
)
from synapse.ml.lightgbm import LightGBMRegressor as SynapseLGBMRegressor

In [None]:
spark_series = spark.createDataFrame(series).repartitionByRange(4, 'unique_id')

In [None]:
from xgboost.spark import SparkXGBRegressor

In [None]:
fcst = FugueMLForecast(
    [
        SynapseLGBMRegressor(),
        SparkXGBRegressor()
    ],
    freq='D',
    lags=[1],
    lag_transforms={
        1: [expanding_mean]
    },
    date_features=['dayofweek'],
)

In [None]:
fcst.fit(
    spark_series,
    id_col='unique_id',
    time_col='ds',
    target_col='y',
    static_features=['static_0', 'product_id'],
)
fcst.predict(14, dynamic_dfs=[prices]).toPandas()

[21:02:53] task 0 got new rank 0                                    (0 + 1) / 1]
                                                                                

Unnamed: 0,unique_id,ds,LightGBMRegressor,SparkXGBRegressor
0,id_00,2001-05-15,42.227353,42.170174
1,id_00,2001-05-16,49.678239,49.665058
2,id_00,2001-05-17,1.425428,2.123648
3,id_00,2001-05-18,10.021719,10.150534
4,id_00,2001-05-19,18.167309,18.219870
...,...,...,...,...
1395,id_99,2001-05-24,43.879492,43.089096
1396,id_99,2001-05-25,1.592245,1.300191
1397,id_99,2001-05-26,8.765065,8.563540
1398,id_99,2001-05-27,15.720301,15.627042


In [None]:
cv_res = fcst.cross_validation(spark_series, n_windows=2, window_size=14, id_col='unique_id', time_col='ds', target_col='y')
res1 = next(cv_res)
res1.toPandas()

[21:03:12] task 0 got new rank 0                                    (0 + 1) / 1]
                                                                                

Unnamed: 0,unique_id,ds,LightGBMRegressor,SparkXGBRegressor
0,id_00,2001-04-17,41.397289,41.412704
1,id_00,2001-04-18,50.008758,49.848236
2,id_00,2001-04-19,1.868972,1.531837
3,id_00,2001-04-20,10.266771,9.750021
4,id_00,2001-04-21,18.296489,17.535915
...,...,...,...,...
1395,id_99,2001-04-26,43.910674,43.634216
1396,id_99,2001-04-27,1.955158,1.962174
1397,id_99,2001-04-28,8.864172,8.601549
1398,id_99,2001-04-29,15.983674,15.883894


In [None]:
res2 = next(cv_res)
res2.toPandas()

[21:03:31] task 0 got new rank 0                                    (0 + 1) / 1]
                                                                                

Unnamed: 0,unique_id,ds,LightGBMRegressor,SparkXGBRegressor
0,id_00,2001-05-01,41.510874,41.991829
1,id_00,2001-05-02,49.580176,49.946548
2,id_00,2001-05-03,1.699062,1.733775
3,id_00,2001-05-04,10.143278,9.953071
4,id_00,2001-05-05,18.001024,17.789112
...,...,...,...,...
1395,id_99,2001-05-10,43.846853,43.703159
1396,id_99,2001-05-11,1.832246,1.999386
1397,id_99,2001-05-12,8.805302,8.618651
1398,id_99,2001-05-13,15.946444,15.728466


## Dask

In [None]:
from dask.distributed import Client

from mlforecast.distributed.models.lgb import LGBMForecast
from mlforecast.distributed.models.xgb import XGBForecast

In [None]:
client = Client(n_workers=2)

In [None]:
dask_series = (
    dd
    .from_pandas(series.set_index('unique_id'), npartitions=4)  # make sure we split by the series identifier
    .map_partitions(lambda df: df.reset_index())
)

In [None]:
fcst = FugueMLForecast(
    [LGBMForecast(), XGBForecast()],
    freq='D',
    lags=[1],
    lag_transforms={
        1: [expanding_mean]
    },
    date_features=['dayofweek'],
    engine=client,
)
_ = fcst.fit(dask_series, id_col='unique_id', time_col='ds', target_col='y', static_features=['static_0', 'product_id'])

In [None]:
fcst.predict(14, dynamic_dfs=[prices]).compute()

Unnamed: 0,unique_id,ds,LGBMForecast,XGBForecast
0,id_00,2001-05-15,42.328150,42.544678
1,id_00,2001-05-16,49.966854,50.006393
2,id_00,2001-05-17,1.614102,2.404574
3,id_00,2001-05-18,10.246828,10.144515
4,id_00,2001-05-19,18.183167,17.768204
...,...,...,...,...
345,id_99,2001-05-24,42.986614,43.913208
346,id_99,2001-05-25,1.536043,1.769517
347,id_99,2001-05-26,8.651628,8.689207
348,id_99,2001-05-27,15.467835,15.858318


In [None]:
cv_res = fcst.cross_validation(dask_series, n_windows=2, window_size=14, id_col='unique_id', time_col='ds', target_col='y')
res1 = next(cv_res)

In [None]:
res1.compute()

Unnamed: 0,unique_id,ds,LGBMForecast,XGBForecast
0,id_00,2001-04-17,41.101257,41.509098
1,id_00,2001-04-18,49.439778,49.994793
2,id_00,2001-04-19,2.209694,1.885389
3,id_00,2001-04-20,10.016894,9.791873
4,id_00,2001-04-21,18.006730,17.518440
...,...,...,...,...
345,id_99,2001-04-26,44.320700,42.248009
346,id_99,2001-04-27,2.230125,2.526252
347,id_99,2001-04-28,8.579381,8.485373
348,id_99,2001-04-29,15.496969,15.847349


In [None]:
res2 = next(cv_res)

In [None]:
res2.compute()

Unnamed: 0,unique_id,ds,LGBMForecast,XGBForecast
0,id_00,2001-05-01,42.610164,41.107208
1,id_00,2001-05-02,50.179124,50.424072
2,id_00,2001-05-03,1.690276,1.991987
3,id_00,2001-05-04,10.159849,9.895548
4,id_00,2001-05-05,18.321141,17.538578
...,...,...,...,...
345,id_99,2001-05-10,42.920872,43.232620
346,id_99,2001-05-11,1.932801,1.821584
347,id_99,2001-05-12,8.724589,8.611847
348,id_99,2001-05-13,15.355793,15.651299
