Skip to content

Commit

Permalink
fix: parallel custom cols (#790)
Browse files Browse the repository at this point in the history
  • Loading branch information
AzulGarza committed Mar 8, 2024
1 parent 4772d7b commit 3630408
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 52 deletions.
89 changes: 71 additions & 18 deletions action_files/test_fit_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,74 @@
from statsforecast import StatsForecast
from statsforecast.utils import AirPassengersDF as df
from statsforecast.models import (
AutoARIMA, AutoETS, AutoCES, AutoTheta,
MSTL, GARCH, ARCH, HistoricAverage,
Naive, RandomWalkWithDrift,
SeasonalNaive, WindowAverage, SeasonalWindowAverage,
ADIDA, CrostonClassic,
CrostonOptimized, CrostonSBA, IMAPA,
TSB
AutoARIMA,
AutoETS,
AutoCES,
AutoTheta,
MSTL,
GARCH,
ARCH,
HistoricAverage,
Naive,
RandomWalkWithDrift,
SeasonalNaive,
WindowAverage,
SeasonalWindowAverage,
ADIDA,
CrostonClassic,
CrostonOptimized,
CrostonSBA,
IMAPA,
TSB,
)

def get_data():

def get_data(rename_cols: bool = False):
df2 = df.copy(deep=True)
df2['unique_id'] = 2.0
df2['y'] *= 2
return pd.concat([df, df2])
df2["unique_id"] = 2.0
df2["y"] *= 2
panel_df = pd.concat([df, df2])[["unique_id", "ds", "y"]]
if rename_cols:
renamer = {
"unique_id": "item_id",
"ds": "timestamp",
"y": "target",
}
panel_df.rename(columns=renamer, inplace=True)
return panel_df


@pytest.mark.parametrize('n_jobs', [-1, 1])
@pytest.mark.parametrize("n_jobs", [-1, 1])
def test_custom_cols(n_jobs):
AirPassengersPanel = get_data(rename_cols=True)
custom_cols = {
"id_col": "item_id",
"time_col": "timestamp",
"target_col": "target",
}
models = [
HistoricAverage(),
Naive(),
RandomWalkWithDrift(),
SeasonalNaive(season_length=12),
]
sf = StatsForecast(models=models, freq="D", n_jobs=n_jobs)
df_fcst = sf.forecast(df=AirPassengersPanel, h=7, fitted=True, **custom_cols)
assert all(
exp_col in df_fcst.columns
for exp_col in custom_cols.values()
if exp_col != "target"
)
assert all(
exp_col in sf.forecast_fitted_values().columns
for exp_col in custom_cols.values()
)
sf.fit(df=AirPassengersPanel, **custom_cols)
df_predict = sf.predict(h=7)
pd.testing.assert_frame_equal(df_fcst, df_predict)


@pytest.mark.parametrize("n_jobs", [-1, 1])
def test_fit_predict(n_jobs):
AirPassengersPanel = get_data()
models = [
Expand All @@ -29,7 +81,7 @@ def test_fit_predict(n_jobs):
AutoTheta(),
MSTL(season_length=12),
GARCH(),
ARCH(),
ARCH(),
HistoricAverage(),
Naive(),
RandomWalkWithDrift(),
Expand All @@ -39,12 +91,13 @@ def test_fit_predict(n_jobs):
ADIDA(),
CrostonClassic(),
CrostonOptimized(),
CrostonSBA(),
IMAPA(),
TSB(0.5, 0.5)
TSB(0.5, 0.5),
]
sf = StatsForecast(models=models, freq='D', n_jobs=n_jobs)
df_fcst = sf.forecast(df=AirPassengersPanel[['unique_id', 'ds', 'y']], h=7)
sf.fit(df=AirPassengersPanel[['unique_id', 'ds', 'y']])

sf = StatsForecast(models=models, freq="D", n_jobs=n_jobs)
df_fcst = sf.forecast(df=AirPassengersPanel, h=7)
sf.fit(df=AirPassengersPanel)
df_predict = sf.predict(h=7)
pd.testing.assert_frame_equal(df_fcst, df_predict)
42 changes: 27 additions & 15 deletions nbs/src/core/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1625,8 +1625,18 @@
" futures = []\n",
" for ga, X_ in zip(gas, Xs):\n",
" future = executor.apply_async(\n",
" ga.forecast, \n",
" (self.models, h, self.fallback_model, fitted, X_, level, target_col)\n",
" ga.forecast,\n",
" tuple(),\n",
" dict(\n",
" models=self.models,\n",
" h=h,\n",
" fallback_model=self.fallback_model,\n",
" fitted=fitted,\n",
" X=X_,\n",
" level=level,\n",
" verbose=self.verbose,\n",
" target_col=target_col,\n",
" ),\n",
" )\n",
" futures.append(future)\n",
" out = [f.get() for f in futures]\n",
Expand All @@ -1652,19 +1662,21 @@
" futures = []\n",
" for ga in gas:\n",
" future = executor.apply_async(\n",
" ga.cross_validation, \n",
" (\n",
" self.models,\n",
" h,\n",
" test_size,\n",
" self.fallback_model,\n",
" step_size,\n",
" input_size,\n",
" fitted,\n",
" level,\n",
" refit,\n",
" target_col,\n",
" )\n",
" ga.cross_validation,\n",
" tuple(),\n",
" dict(\n",
" models=self.models,\n",
" h=h,\n",
" test_size=test_size,\n",
" fallback_model=self.fallback_model,\n",
" step_size=step_size,\n",
" input_size=input_size,\n",
" fitted=fitted,\n",
" level=level,\n",
" refit=refit,\n",
" verbose=self.verbose,\n",
" target_col=target_col,\n",
" ),\n",
" )\n",
" futures.append(future)\n",
" out = [f.get() for f in futures]\n",
Expand Down
42 changes: 23 additions & 19 deletions statsforecast/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1194,14 +1194,16 @@ def _forecast_parallel(self, h, fitted, X, level, target_col):
for ga, X_ in zip(gas, Xs):
future = executor.apply_async(
ga.forecast,
(
self.models,
h,
self.fallback_model,
fitted,
X_,
level,
target_col,
tuple(),
dict(
models=self.models,
h=h,
fallback_model=self.fallback_model,
fitted=fitted,
X=X_,
level=level,
verbose=self.verbose,
target_col=target_col,
),
)
futures.append(future)
Expand Down Expand Up @@ -1231,17 +1233,19 @@ def _cross_validation_parallel(
for ga in gas:
future = executor.apply_async(
ga.cross_validation,
(
self.models,
h,
test_size,
self.fallback_model,
step_size,
input_size,
fitted,
level,
refit,
target_col,
tuple(),
dict(
models=self.models,
h=h,
test_size=test_size,
fallback_model=self.fallback_model,
step_size=step_size,
input_size=input_size,
fitted=fitted,
level=level,
refit=refit,
verbose=self.verbose,
target_col=target_col,
),
)
futures.append(future)
Expand Down

0 comments on commit 3630408

Please sign in to comment.