In [None]:
#| default_exp distributed.fugue

In [None]:
#| hide
%load_ext autoreload
%autoreload 2

# Fugue Backend

> Distribute time series forecasting using Fugue

[Fugue](https://github.com/fugue-project/fugue) is a unified interface for distributed computing. The backend allows `StatsForecast` to fit time series using `Dask` and `Spark`.

In [None]:
#| hide
from fastcore.test import test_eq
from nbdev.showdoc import add_docs, show_doc

In [None]:
#| export
from typing import Any, Dict

import numpy as np
import pandas as pd
try:
    from fugue import transform
except ModuleNotFoundError as e:
    msg = (
        f'{e}. To use fugue you have to install it.'
        'Please run `pip install fugue`. '
    )
    raise ModuleNotFoundError(msg) from e
from statsforecast.core import StatsForecast
from statsforecast.distributed.core import ParallelBackend
from triad import Schema

In [None]:
#| export
class FugueBackend(ParallelBackend):
    def __init__(
            self, 
            engine: Any = None, # Fugue engine
            conf: Any = None, # Engine configuration
            **transform_kwargs: Any # Additional kwargs to pass to `transform`'s fugue
        ):
        self._engine = engine
        self._conf = conf
        self._transform_kwargs = dict(transform_kwargs)

    def __getstate__(self) -> Dict[str, Any]:
        return {}

    def forecast(
            self, 
            df, # DataFrame with columns `unique_id`, `ds`, `y`, and exogenous variables 
            models, # List of instantiated models (`statsforecast.models`) 
            freq, # Frequency of the data
            **kwargs: Any,
        ) -> Any:
        schema = "*-y+" + str(self._get_output_schema(models))
        return transform(
            df,
            self._forecast_series,
            params=dict(models=models, freq=freq, kwargs=kwargs),
            schema=schema,
            partition={"by": "unique_id"},
            engine=self._engine,
            engine_conf=self._conf,
            **self._transform_kwargs,
        )

    def cross_validation(
            self, 
            df, # DataFrame with columns `unique_id`, `ds`, `y`, and exogenous variables 
            models, # List of instantiated models (`statsforecast.models`) 
            freq, # Frequency of the data
            **kwargs: Any, 
        ) -> Any:
        schema = "*-y+" + str(self._get_output_schema(models, mode="cv"))
        return transform(
            df,
            self._cv,
            params=dict(models=models, freq=freq, kwargs=kwargs),
            schema=schema,
            partition={"by": "unique_id"},
            engine=self._engine,
            engine_conf=self._conf,
            **self._transform_kwargs,
        )

    def _forecast_series(self, df: pd.DataFrame, models, freq, kwargs) -> pd.DataFrame:
        tdf = df.set_index("unique_id")
        model = StatsForecast(df=tdf, models=models, freq=freq, n_jobs=1)
        return model.forecast(**kwargs).reset_index()

    def _cv(self, df: pd.DataFrame, models, freq, kwargs) -> pd.DataFrame:
        tdf = df.set_index("unique_id")
        model = StatsForecast(df=tdf, models=models, freq=freq, n_jobs=1)
        return model.cross_validation(**kwargs).reset_index()

    def _get_output_schema(self, models, mode="forecast") -> Schema:
        cols = [(repr(model), np.float32) for model in models]
        if mode == "cv":
            cols = [("cutoff", "datetime"), ("y", np.float32)] + cols
        return Schema(cols)

In [None]:
#| hide
add_docs(
    FugueBackend, 'Fugue Backend',
    forecast='Forecast using fugue as backend',
    cross_validation='Perform cross validation using fugue as backend'
)

In [None]:
show_doc(FugueBackend)

In [None]:
show_doc(FugueBackend.forecast)

In [None]:
show_doc(FugueBackend.cross_validation)

## Dask

In [None]:
#| eval: false
from dask.distributed import Client
from fugue_dask import DaskExecutionEngine
from statsforecast.models import Naive
from statsforecast.utils import generate_series

df = generate_series(10).reset_index()
df['unique_id'] = df['unique_id'].astype(str)

dask_client = Client()
engine = DaskExecutionEngine(dask_client=dask_client)
fcst = FugueBackend(engine=engine, as_local=True)

### Forecast

In [None]:
#| eval: false
fcst.forecast(df, models=[Naive()], freq='D', h=12)

### Cross Validation

In [None]:
#| eval: false
fcst.cross_validation(df, models=[Naive()], freq='D', h=12, n_windows=2)

In [None]:
#| hide
#| eval: false
from statsforecast.models import Naive
from statsforecast.utils import generate_series

df = generate_series(10).reset_index()
df['unique_id'] = df['unique_id'].astype(str)

backend = FugueBackend()
#forecast
fcst_fugue = backend.forecast(df, models=[Naive()], freq='D', h=12)
fcst_stats = StatsForecast(models=[Naive()], freq='D').forecast(df=df, h=12)
test_eq(fcst_fugue, fcst_stats.reset_index())

#cross validation
fcst_fugue = backend.cross_validation(df, models=[Naive()], freq='D', h=12)
fcst_stats = StatsForecast(models=[Naive()], freq='D').cross_validation(df=df, h=12)
test_eq(fcst_fugue, fcst_stats.reset_index())