Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FugueBackend] Forecast with Exogenous variables fails using a Spark backend #371

Closed
jstammers opened this issue Dec 17, 2022 · 3 comments
Closed
Assignees

Comments

@jstammers
Copy link

jstammers commented Dec 17, 2022

What happened + What you expected to happen

When trying to use the FugueBackend class to distribute my exogenous forecasts, I encounter the following error

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<command-2016733390302825> in <cell line: 13>()
     11 spark = SparkSession.builder.getOrCreate()
     12 backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})
---> 13 forecasts = forecast(
     14     spark.createDataFrame(df),
     15     models=[ETS()],

/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/statsforecast/distributed/utils.py in forecast(df, models, freq, h, X_df, level, parallel)
     20 ):
     21     backend = parallel if parallel is not None else ParallelBackend()
---> 22     return backend.forecast(df, models, freq, h=h, X_df=X_df, level=level)
     23 
     24 # %% ../../nbs/distributed.utils.ipynb 6

/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/statsforecast/distributed/fugue.py in forecast(self, df, models, freq, **kwargs)
     81         schema = schema + ",AutoARIMA_lo_99:float, AutoARIMA_hi_99:float"
     82         print(schema)
---> 83         return transform(
     84             df,
     85             self._forecast_series,

/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/fugue/interfaceless.py in transform(df, using, schema, params, partition, callback, ignore_errors, engine, engine_conf, force_output_fugue_dataframe, persist, as_local, save_path, checkpoint)
    134     else:
    135         src = dag.df(df)
--> 136     tdf = src.transform(
    137         using=using,
    138         schema=schema,

/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/fugue/workflow/workflow.py in transform(self, using, schema, params, pre_partition, ignore_errors, callback)
    539         if pre_partition is None:
    540             pre_partition = self.partition_spec
--> 541         df = self.workflow.transform(
    542             self,
    543             using=using,

/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/fugue/workflow/workflow.py in transform(self, using, schema, params, pre_partition, ignore_errors, callback, *dfs)
   1954         tf._has_rpc_client = not isinstance(callback, EmptyRPCHandler)  # type: ignore
   1955         tf.validate_on_compile()
-> 1956         return self.process(
   1957             *dfs,
   1958             using=RunTransformer,

/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/fugue/workflow/workflow.py in process(self, using, schema, params, pre_partition, *dfs)
   1615             using = _PROCESSOR_REGISTRY.get(using)
   1616         _dfs = self._to_dfs(*dfs)
-> 1617         task = Process(
   1618             len(_dfs),
   1619             processor=using,

/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/fugue/workflow/_tasks.py in __init__(self, input_n, processor, schema, params, pre_partition, deterministic, lazy, input_names)
    314     ):
    315         self._processor = _to_processor(processor, schema)
--> 316         self._processor._params = ParamDict(params)
    317         self._processor._partition_spec = PartitionSpec(pre_partition)
    318         self._processor.validate_on_compile()

/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/triad/collections/dict.py in __init__(self, data, deep)
    175     def __init__(self, data: Any = None, deep: bool = True):
    176         super().__init__()
--> 177         self.update(data, deep=deep)
    178 
    179     def __setitem__(  # type: ignore

/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.9/site-packages/triad/collections/dict.py in update(self, other, on_dup, deep)
    262         for k, v in to_kv_iterable(other):
    263             if on_dup == ParamDict.OVERWRITE or k not in self:
--> 264                 self[k] = copy.deepcopy(v) if deep else v
    265             elif on_dup == ParamDict.THROW:
    266                 raise KeyError(f"{k} exists in dict")

/usr/lib/python3.9/copy.py in deepcopy(x, memo, _nil)
    144     copier = _deepcopy_dispatch.get(cls)
    145     if copier is not None:
--> 146         y = copier(x, memo)
    147     else:
    148         if issubclass(cls, type):

/usr/lib/python3.9/copy.py in _deepcopy_dict(x, memo, deepcopy)
    228     memo[id(x)] = y
    229     for key, value in x.items():
--> 230         y[deepcopy(key, memo)] = deepcopy(value, memo)
    231     return y
    232 d[dict] = _deepcopy_dict

/usr/lib/python3.9/copy.py in deepcopy(x, memo, _nil)
    144     copier = _deepcopy_dispatch.get(cls)
    145     if copier is not None:
--> 146         y = copier(x, memo)
    147     else:
    148         if issubclass(cls, type):

/usr/lib/python3.9/copy.py in _deepcopy_dict(x, memo, deepcopy)
    228     memo[id(x)] = y
    229     for key, value in x.items():
--> 230         y[deepcopy(key, memo)] = deepcopy(value, memo)
    231     return y
    232 d[dict] = _deepcopy_dict

/usr/lib/python3.9/copy.py in deepcopy(x, memo, _nil)
    170                     y = x
    171                 else:
--> 172                     y = _reconstruct(x, memo, *rv)
    173 
    174     # If is its own copy, don't memoize.

/usr/lib/python3.9/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    268     if state is not None:
    269         if deep:
--> 270             state = deepcopy(state, memo)
    271         if hasattr(y, '__setstate__'):
    272             y.__setstate__(state)

/usr/lib/python3.9/copy.py in deepcopy(x, memo, _nil)
    144     copier = _deepcopy_dispatch.get(cls)
    145     if copier is not None:
--> 146         y = copier(x, memo)
    147     else:
    148         if issubclass(cls, type):

/usr/lib/python3.9/copy.py in _deepcopy_dict(x, memo, deepcopy)
    228     memo[id(x)] = y
    229     for key, value in x.items():
--> 230         y[deepcopy(key, memo)] = deepcopy(value, memo)
    231     return y
    232 d[dict] = _deepcopy_dict

/usr/lib/python3.9/copy.py in deepcopy(x, memo, _nil)
    170                     y = x
    171                 else:
--> 172                     y = _reconstruct(x, memo, *rv)
    173 
    174     # If is its own copy, don't memoize.

/usr/lib/python3.9/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    268     if state is not None:
    269         if deep:
--> 270             state = deepcopy(state, memo)
    271         if hasattr(y, '__setstate__'):
    272             y.__setstate__(state)

/usr/lib/python3.9/copy.py in deepcopy(x, memo, _nil)
    144     copier = _deepcopy_dispatch.get(cls)
    145     if copier is not None:
--> 146         y = copier(x, memo)
    147     else:
    148         if issubclass(cls, type):

/usr/lib/python3.9/copy.py in _deepcopy_dict(x, memo, deepcopy)
    228     memo[id(x)] = y
    229     for key, value in x.items():
--> 230         y[deepcopy(key, memo)] = deepcopy(value, memo)
    231     return y
    232 d[dict] = _deepcopy_dict

/usr/lib/python3.9/copy.py in deepcopy(x, memo, _nil)
    159                     reductor = getattr(x, "__reduce_ex__", None)
    160                     if reductor is not None:
--> 161                         rv = reductor(4)
    162                     else:
    163                         reductor = getattr(x, "__reduce__", None)

/databricks/spark/python/pyspark/context.py in __getnewargs__(self)
    493     def __getnewargs__(self) -> NoReturn:
    494         # This method is called when attempting to pickle SparkContext, which is always an error:
--> 495         raise RuntimeError(
    496             "It appears that you are attempting to reference SparkContext from a broadcast "
    497             "variable, action, or transformation. SparkContext can only be used on the driver, "

RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Versions / Dependencies

statsforecast: 1.0.0
fugue: 0.7.3
python: 3.9
OS: Unbuntu

Reproduction script

from statsforecast.distributed.utils import forecast
from statsforecast.distributed.fugue import FugueBackend
from statsforecast.models import ETS
from statsforecast.core import StatsForecast

from pyspark.sql import SparkSession
df = pd.DataFrame({"ds": [1, 2, 3,4,5,6,7,8,9], "y": [1,2,3,4,5,6,7,8,9], "x":[1,2,3,4,5,6,7,8,9]})
df["unique_id"] = 1

X_df = pd.DataFrame({"ds":[4], "x":[4], "unique_id":1})
spark = SparkSession.builder.getOrCreate()
backend = FugueBackend(spark, {"fugue.spark.use_pandas_udf":True})
forecasts = forecast(
    spark.createDataFrame(df),
    models=[ETS()],
    X_df=spark.createDataFrame(X_df),
    h=1,
    freq="D",
    parallel=backend)

Issue Severity

High: It blocks me from completing my task.

@jstammers jstammers added the bug label Dec 17, 2022
@AzulGarza
Copy link
Member

AzulGarza commented Apr 6, 2023

hey @jstammers! Thanks for using statsforecast.

As of statsforecast>=1.5.0 it is unnecessary to create a backend, you can pass the spark dataframes to the forecast method of StatsForecast. Here's an example (I've added AutoARIMA since AutoETS doesn't use exogenous variables):

import pandas as pd
from pyspark.sql import SparkSession
from statsforecast import StatsForecast
from statsforecast.models import AutoETS, AutoARIMA

df = pd.DataFrame({
    "ds": [1, 2, 3,4,5,6,7,8,9], 
    "y": [1,2,3,4,5,6,7,8,9], 
    "x": [9,8,7,6,5,4,3,2,1]
})
df["unique_id"] = 1

X_df = pd.DataFrame({"ds":[10], "x":[10], "unique_id":1})
spark = SparkSession.builder.getOrCreate()
sf = StatsForecast(models=[AutoETS(), AutoARIMA()], freq='D')
fcst = sf.forecast(df=spark.createDataFrame(df), h=1, X_df=spark.createDataFrame(X_df))
fcst.toPandas()

And the output would be,

image

Let me know if this is of help. :)

@AzulGarza AzulGarza removed the bug label Jun 6, 2023
@kvnkho
Copy link
Collaborator

kvnkho commented Jun 17, 2023

Hi @jstammers, Kevin from Fugue here! Wondering if we can close this issue. I am looking into updating the Fugue backend of statsforecast to support more statsforecast features, but i think this one seems to be working.

@kvnkho kvnkho self-assigned this Jun 17, 2023
@kvnkho
Copy link
Collaborator

kvnkho commented Jul 14, 2023

Exogenous variables are tested in the latest master branch on Spark so this should be good. Will be adding it to the docs as well.

@kvnkho kvnkho closed this as completed Jul 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants