In [None]:
#| default_exp distributed.fugue

In [None]:
#| hide
%load_ext autoreload
%autoreload 2

# FugueBackend

The computational efficiency of `StatsForecast` can be tracked to its two core components:
1. Its `models` written in NumBa that optimizes Python code to reach C speeds.
2. Its `core.StatsForecast` class that enables distributed computing.

Here we use [Fugue](https://github.com/fugue-project/fugue) which is a unified interface for `Dask` and `Spark`.

In [None]:
#| hide
from fastcore.test import test_eq
from nbdev.showdoc import add_docs, show_doc

In [None]:
#| export
import inspect
from typing import Any, Dict, List

import fugue.api as fa
import numpy as np
import pandas as pd
from fugue import transform, DataFrame, FugueWorkflow, ExecutionEngine
from fugue.collections.yielded import Yielded
from fugue.constants import FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT
from statsforecast.core import _StatsForecast, ParallelBackend, make_backend
from triad import Schema

In [None]:
#| export
def _cotransform(
    df1: Any,
    df2: Any,
    using: Any,
    schema: Any = None,
    params: Any = None,
    partition: Any = None,
    engine: Any = None,
    engine_conf: Any = None,
    force_output_fugue_dataframe: bool = False,
    as_local: bool = False,
) -> Any:
    dag = FugueWorkflow(compile_conf={FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 0})
    
    src = dag.create_data(df1).zip(dag.create_data(df2), partition=partition)
    tdf = src.transform(
        using=using,
        schema=schema,
        params=params,
        pre_partition=partition,
    )
    tdf.yield_dataframe_as("result", as_local=as_local)
    dag.run(engine, conf=engine_conf)
    result = dag.yields["result"].result  # type:ignore
    if force_output_fugue_dataframe or isinstance(df1, (DataFrame, Yielded)):
        return result
    return result.as_pandas() if result.is_local else result.native  # type:ignore

In [None]:
#| export
class FugueBackend(ParallelBackend):
    """FugueBackend for Distributed Computation.
    [Source code](https://github.com/Nixtla/statsforecast/blob/main/statsforecast/distributed/fugue.py).

    This class uses [Fugue](https://github.com/fugue-project/fugue) backend capable of distributing 
    computation on Spark, Dask and Ray without any rewrites.

    **Parameters:**<br>
    `engine`: fugue.ExecutionEngine, a selection between Spark, Dask, and Ray.<br>
    `conf`: fugue.Config, engine configuration.<br>
    `**transform_kwargs`: additional kwargs for Fugue's transform method.<br>

    **Notes:**<br>
    A short introduction to Fugue, with examples on how to scale pandas code to Spark, Dask or Ray
     is available [here](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html).
    """
    def __init__(
            self, 
            engine: Any = None,
            conf: Any = None,
            **transform_kwargs: Any
        ):        
        self._engine = engine
        self._conf = conf
        self._transform_kwargs = dict(transform_kwargs)

    def __getstate__(self) -> Dict[str, Any]:
        return {}

    def forecast(
            self, 
            df,
            models,
            freq,
            fallback_model = None,
            X_df = None,
            **kwargs: Any,
        ) -> Any:
        """Memory Efficient core.StatsForecast predictions with FugueBackend.

        This method uses Fugue's transform function, in combination with 
        `core.StatsForecast`'s forecast to efficiently fit a list of StatsForecast models.

        **Parameters:**<br>
        `df`: pandas.DataFrame, with columns [`unique_id`, `ds`, `y`] and exogenous.<br>
        `freq`: str, frequency of the data, [pandas available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).<br>
        `models`: List[typing.Any], list of instantiated objects `StatsForecast.models`.<br>
        `fallback_model`: Any, Model to be used if a model fails.<br>
        `X_df`: pandas.DataFrame, with [unique_id, ds] columns and df’s future exogenous.
        `**kwargs`: Additional `core.StatsForecast` parameters. Example forecast horizon `h`.<br>

        **Returns:**<br>
        `fcsts_df`: pandas.DataFrame, with `models` columns for point predictions and probabilistic
        predictions for all fitted `models`.<br>
        
        **References:**<br>
        For more information check the 
        [Fugue's transform](https://fugue-tutorials.readthedocs.io/tutorials/beginner/transform.html)
        tutorial.<br>
        The [core.StatsForecast's forecast](https://nixtla.github.io/statsforecast/core.html#statsforecast.forecast)
        method documentation.<br>
        Or the list of available [StatsForecast's models](https://nixtla.github.io/statsforecast/src/core/models.html).
        """
        level = kwargs.get("level", [])
        schema = self._get_output_schema(df, models, level)
        if X_df is None:
            return transform(
                df,
                self._forecast_series,
                params=dict(models=models, freq=freq, 
                            kwargs=kwargs, fallback_model=fallback_model),
                schema=schema,
                partition={"by": "unique_id"},
                engine=self._engine,
                engine_conf=self._conf,
                **self._transform_kwargs,
            )
        return _cotransform(
            df,
            X_df,
            self._forecast_series_X,
            params=dict(models=models, freq=freq, 
                        kwargs=kwargs, fallback_model=fallback_model),
            schema=schema,
            partition={"by": "unique_id"},
            engine=self._engine,
            engine_conf=self._conf,
            **self._transform_kwargs,
        )
            

    def cross_validation(
            self, 
            df,
            models,
            freq,
            fallback_model=None,
            **kwargs: Any, 
        ) -> Any:
        """Temporal Cross-Validation with core.StatsForecast and FugueBackend.

        This method uses Fugue's transform function, in combination with 
        `core.StatsForecast`'s cross-validation to efficiently fit a list of StatsForecast 
        models through multiple training windows, in either chained or rolled manner.

        `StatsForecast.models`' speed along with Fugue's distributed computation allow to 
        overcome this evaluation technique high computational costs. Temporal cross-validation 
        provides better model's generalization measurements by increasing the test's length 
        and diversity.

        **Parameters:**<br>
        `df`: pandas.DataFrame, with columns [`unique_id`, `ds`, `y`] and exogenous.<br>
        `freq`: str, frequency of the data, [panda's available frequencies](https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases).<br>
        `models`: List[typing.Any], list of instantiated objects `StatsForecast.models`.<br>
        `fallback_model`: Any, Model to be used if a model fails.<br>

        **Returns:**<br>
        `fcsts_df`: pandas.DataFrame, with `models` columns for point predictions and probabilistic
        predictions for all fitted `models`.<br>
        
        **References:**<br>
        The [core.StatsForecast's cross validation](https://nixtla.github.io/statsforecast/core.html#statsforecast.cross_validation)
        method documentation.<br>
        [Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Temporal Cross-Validation"](https://otexts.com/fpp3/tscv.html).
        """
        level = kwargs.get("level", [])
        schema =self._get_output_schema(df, models, level, mode="cv")
        return transform(
            df,
            self._cv,
            params=dict(models=models, freq=freq, 
                        kwargs=kwargs, 
                        fallback_model=fallback_model),
            schema=schema,
            partition={"by": "unique_id"},
            engine=self._engine,
            engine_conf=self._conf,
            **self._transform_kwargs,
        )

    def _forecast_series(self, df: pd.DataFrame, models, freq, fallback_model, kwargs) -> pd.DataFrame:
        model = _StatsForecast(df=df, models=models, freq=freq, 
                               fallback_model=fallback_model, n_jobs=1)
        return model.forecast(**kwargs).reset_index()
    
    # schema: unique_id:str, ds:str, *
    def _forecast_series_X(self, df: pd.DataFrame, X_df: pd.DataFrame, models, freq, fallback_model, kwargs) -> pd.DataFrame:
        model = _StatsForecast(df=df, models=models, freq=freq, 
                               fallback_model=fallback_model, n_jobs=1)
        if len(X_df) != kwargs['h']:
            raise Exception(
                'Please be sure that your exogenous variables `X_df` '
                'have the same length than your forecast horizon `h`'
            )
        return model.forecast(X_df=X_df, **kwargs).reset_index()

    def _cv(self, df: pd.DataFrame, models, freq, fallback_model, kwargs) -> pd.DataFrame:
        model = _StatsForecast(df=df, models=models, freq=freq, 
                               fallback_model=fallback_model, n_jobs=1)
        return model.cross_validation(**kwargs).reset_index()

    def _get_output_schema(self, df, models, level=None, mode="forecast") -> Schema:
        keep_schema = fa.get_schema(df).extract(['unique_id', 'ds'])
        cols: List[Any] = []
        if level is None:
            level = []
        for model in models:
            has_levels = (
                "level" in inspect.signature(getattr(model, "forecast")).parameters
                and len(level) > 0
            )
            cols.append((repr(model), np.float32))
            if has_levels:
                cols.extend([(f"{repr(model)}-lo-{l}", np.float32) for l in reversed(level)])
                cols.extend([(f"{repr(model)}-hi-{l}", np.float32) for l in level])
        if mode == "cv":
            cols = [("cutoff", keep_schema['ds'].type), ("y", np.float32)] + cols
        return Schema(keep_schema) + Schema(cols)


@make_backend.candidate(lambda obj, *args, **kwargs: isinstance(obj, ExecutionEngine))
def _make_fugue_backend(obj:ExecutionEngine, *args, **kwargs) -> ParallelBackend:
    return FugueBackend(obj, **kwargs)

In [None]:
from statsforecast.core import StatsForecast
from statsforecast.models import ( 
    AutoARIMA,
    AutoETS,
)
from statsforecast.utils import generate_series

In [None]:
n_series = 4
horizon = 7

series = generate_series(n_series)

sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)

sf.cross_validation(df=series, h=horizon, step_size = 24,
    n_windows = 2, level=[90]).head()

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

# Make unique_id a column
series = series.reset_index()
series['unique_id'] = series['unique_id'].astype(str)

# Convert to Spark
sdf = spark.createDataFrame(series)

# Returns a Spark DataFrame
sf.cross_validation(df=sdf, h=horizon, step_size = 24,
    n_windows = 2, level=[90]).show()

In [None]:
show_doc(FugueBackend, title_level=3)

## Dask Distributed Predictions

Here we provide an example for the distribution of the `StatsForecast` predictions using `Fugue` to execute the code in a Dask cluster.

To do it we instantiate the `FugueBackend` class with a `DaskExecutionEngine`.

In [None]:
#| eval: false
import dask.dataframe as dd
from dask.distributed import Client
from fugue_dask import DaskExecutionEngine
from statsforecast import StatsForecast
from statsforecast.models import Naive
from statsforecast.utils import generate_series

In [None]:
# Generate Synthetic Panel Data
df = generate_series(10).reset_index()
df['unique_id'] = df['unique_id'].astype(str)
df = dd.from_pandas(df, npartitions=10)

# Instantiate FugueBackend with DaskExecutionEngine
dask_client = Client()
engine = DaskExecutionEngine(dask_client=dask_client)

We have simply create the class to the usual `StatsForecast` instantiation.

In [None]:
#| eval: false
sf = StatsForecast(models=[Naive()], freq='D')

### Distributed Forecast

For extremely fast distributed predictions we use FugueBackend as backend that operates like the original [StatsForecast.forecast](https://nixtla.github.io/statsforecast/core.html#statsforecast.forecast) method.

It receives as input a pandas.DataFrame with columns [`unique_id`,`ds`,`y`] and exogenous, where the `ds` (datestamp) column should be of a format expected by Pandas. The `y` column must be numeric, and represents the measurement we wish to forecast. And the `unique_id` uniquely identifies the series in the panel data.

In [None]:
#| eval: false

# Distributed predictions with FugueBackend.
sf.forecast(df=df, h=12).compute()

In [None]:
#| hide
#| eval: false
# fallback model
class FailNaive:
    def forecast(self):
        pass
    def __repr__(self):
        return 'Naive'
sf = StatsForecast(models=[Naive()], freq='D', fallback_model=Naive())
dask_fcst = sf.forecast(df=df, h=12).compute()
fcst_stats = sf.forecast(df=df.compute(), h=12)
test_eq(dask_fcst.sort_values(by=['unique_id', 'ds']).reset_index(drop=True).astype({"unique_id": str}), 
        fcst_stats.reset_index().astype({"unique_id": str}))

In [None]:
#| hide
#| eval: false

# Distributed exogenous regressors
class ReturnX:
    
    def __init__(self):
        pass
    
    def fit(self, y, X):
        return self
    
    def predict(self, h, X):
        mean = X
        return X
    
    def __repr__(self):
        return 'ReturnX'
    
    def forecast(self, y, h, X=None, X_future=None, fitted=False):
        return {'mean': X_future.flatten()}
    
    def new(self):
        b = type(self).__new__(type(self))
        b.__dict__.update(self.__dict__)
        return b
    
df_w_ex = pd.DataFrame(
    {
        'ds': np.hstack([np.arange(10), np.arange(10)]),
        'y': np.random.rand(20),
        'x': np.arange(20, dtype=np.float32),
    },
    index=pd.Index([0] * 10 + [1] * 10, name='unique_id'),
).reset_index()
train_mask = df_w_ex['ds'] < 6
train_df = dd.from_pandas(df_w_ex[train_mask], npartitions=10)
test_df = df_w_ex[~train_mask]
xreg = dd.from_pandas(test_df.drop(columns='y').reset_index(drop=True), npartitions=10)

In [None]:
#| hide
#| eval: false

# Distributed exogenous regressors
fcst_x = StatsForecast(models=[ReturnX()], freq='D')
res = fcst_x.forecast(df=train_df, 
                      X_df=xreg, 
                      h=4).compute()
expected_res = xreg.rename(columns={'x': 'ReturnX'}).compute()
# we expect strings for unique_id, and ds using exogenous
expected_res[['unique_id', 'ds']] = expected_res[['unique_id', 'ds']].astype(str)
pd.testing.assert_frame_equal(res.sort_values('unique_id').reset_index(drop=True), 
                              expected_res, 
                              check_dtype=False, 
                              check_index_type=False)

### Distributed Cross-Validation

For extremely fast distributed temporcal cross-validation we use `cross_validation` method that operates like the original [StatsForecast.cross_validation](https://nixtla.github.io/statsforecast/core.html#statsforecast) method.

In [None]:
#| eval: false

# Distributed cross-validation with FugueBackend.
sf.cross_validation(df=df, h=12, n_windows=2).compute()

In [None]:
#| hide
#| eval: false
from statsforecast.models import Naive
from statsforecast.utils import generate_series

In [None]:
#| hide
#| eval: false
# Generate Synthetic Panel Data.
df = generate_series(10).reset_index()
df['unique_id'] = df['unique_id'].astype(str)
df = dd.from_pandas(df, npartitions=10)

# Distribute predictions.
sf = StatsForecast(models=[Naive()], freq='D')
fcst_fugue = sf.forecast(df=df, h=12).compute().sort_values(['unique_id', 'ds']).reset_index(drop=True)
fcst_stats = sf.forecast(df=df.compute(), h=12).reset_index().astype({"unique_id": str})
test_eq(fcst_fugue, fcst_stats)

# Distribute cross-validation predictions.
sf = StatsForecast(models=[Naive()], freq='D')
fcst_fugue = sf.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_stats = sf.cross_validation(df=df.compute(), h=12).reset_index().astype({"unique_id": str})
test_eq(fcst_fugue, fcst_stats)

# fallback model
class FailNaive:
    def forecast(self):
        pass
    def __repr__(self):
        return 'Naive'
    
#cross validation fallback model
fcst = StatsForecast(models=[FailNaive()], freq='D', fallback_model=Naive())
fcst_fugue = fcst.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_stats = sf.cross_validation(df=df.compute(), h=12).reset_index().astype({"unique_id": str})
test_eq(fcst_fugue, fcst_stats)

In [None]:
#| hide
#| eval: false
# test ray integration
import ray
from statsforecast.models import Naive
from statsforecast.utils import generate_series

In [None]:
#| hide
#| eval: false
# Generate Synthetic Panel Data.
df = generate_series(10).reset_index()
df['unique_id'] = df['unique_id'].astype(str)
df = ray.data.from_pandas(df).repartition(2)

# Distribute predictions.
sf = StatsForecast(models=[Naive()], freq='D')
fcst_fugue = sf.forecast(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
fcst_stats = sf.forecast(df=df.to_pandas(), h=12).reset_index().astype({"unique_id": str})
test_eq(fcst_fugue, fcst_stats)

# Distribute cross-validation predictions.
fcst = StatsForecast(models=[Naive()], freq='D')
fcst_fugue = fcst.cross_validation(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_stats = sf.cross_validation(df=df.to_pandas(), h=12).reset_index().astype({"unique_id": str})
test_eq(fcst_fugue, fcst_stats)

# fallback model
class FailNaive:
    def forecast(self):
        pass
    def __repr__(self):
        return 'Naive'
    
#cross validation fallback model
sf = StatsForecast(models=[FailNaive()], freq='D', fallback_model=Naive())
fcst_fugue = sf.cross_validation(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_stats = sf.cross_validation(df=df.to_pandas(), h=12).reset_index().astype({"unique_id": str})
test_eq(fcst_fugue, fcst_stats)

In [None]:
#| hide
#| eval: false

# Distributed exogenous regressors
sf = StatsForecast(models=[ReturnX()], freq='D')
res = sf.forecast(df=train_df, 
                  X_df=xreg, 
                  h=4).compute()
expected_res = xreg.compute().rename(columns={'x': 'ReturnX'})
# we expect strings for unique_id, and ds using exogenous
expected_res[['unique_id', 'ds']] = expected_res[['unique_id', 'ds']].astype(str)
pd.testing.assert_frame_equal(res.sort_values('unique_id').reset_index(drop=True), 
                              expected_res, 
                              check_dtype=False, 
                              check_index_type=False)