From d9f8fa7ebeb5f39856f01c48893666852f835d16 Mon Sep 17 00:00:00 2001 From: Sayan Sinha Date: Sat, 30 Sep 2023 10:54:48 -0400 Subject: [PATCH] Add support for Neuralforecast (#1115) Adding support for `neuralforecast`. Fixes #1112. ```sql DROP TABLE IF EXISTS AirData; CREATE TABLE AirData ( unique_id TEXT(30), ds TEXT(30), y INTEGER); LOAD CSV 'data/forecasting/air-passengers.csv' INTO AirData; DROP FUNCTION IF EXISTS Forecast; CREATE FUNCTION Forecast FROM (SELECT unique_id, ds, y FROM AirData) TYPE Forecasting PREDICT 'y' HORIZON 12 LIBRARY 'neuralforecast'; SELECT Forecast(12); ``` One quick issue here is that `neuralforecast` needs `horizon` as a parameter while training, unlike `statsforecast`. Thus, a better way to call the UDF would be simply `SELECT Forecast();`, which is currently unsupported. @xzdandy Please let me know your thoughts. List of stuff yet to be done: - [x] Incorporate `neuralforecast` - [x] Fix `HORIZON` redundancy (UPDATE: Being fixed in #1121) - [x] Reuse model with lower horizon no - [x] Add support for ~multivariate forecasting~ exogenous variables - [x] Add tests - [x] Add docs --------- Co-authored-by: xzdandy --- .../source/reference/ai/model-forecasting.rst | 18 ++++ evadb/executor/create_function_executor.py | 89 +++++++++++++++++++ evadb/functions/forecast.py | 47 ++++++++++ setup.py | 3 + .../long/test_model_forecasting.py | 57 ++++++++++++ .../binder/test_statement_binder.py | 7 ++ 6 files changed, 221 insertions(+) diff --git a/docs/source/reference/ai/model-forecasting.rst b/docs/source/reference/ai/model-forecasting.rst index 188bc2652a..39de66cb68 100644 --- a/docs/source/reference/ai/model-forecasting.rst +++ b/docs/source/reference/ai/model-forecasting.rst @@ -217,6 +217,7 @@ Below is an example query specifying the above parameters: ID 'type' Frequency 'W'; <<<<<<< HEAD +<<<<<<< HEAD Below is an example query with `neuralforecast` with `trend` column as exogenous and without automatic hyperparameter optimization: @@ -232,6 +233,8 @@ Below is an example query with `neuralforecast` with `trend` column as exogenous FREQUENCY 'M'; >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) ======= +======= +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) ======= SELECT Forecast(12) FROM AirData; @@ -285,6 +288,8 @@ Below is an example query specifying the above parameters: TIME 'saledate' ID 'type' Frequency 'W'; +======= +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) Below is an example query with `neuralforecast` with `trend` column as exogenous and without automatic hyperparameter optimization: @@ -297,6 +302,19 @@ Below is an example query with `neuralforecast` with `trend` column as exogenous PREDICT 'y' LIBRARY 'neuralforecast' AUTO 'f' +<<<<<<< HEAD FREQUENCY 'M'; >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) >>>>>>> 6d6a14c8 (Bump v0.3.4+ dev) +======= +<<<<<<< HEAD +<<<<<<< HEAD + FREQUENCY 'M'; +======= + FREQUENCY 'M'; +>>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +>>>>>>> eva-master +======= + FREQUENCY 'M'; +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) diff --git a/evadb/executor/create_function_executor.py b/evadb/executor/create_function_executor.py index 2671ed8280..d8ff08fd1b 100644 --- a/evadb/executor/create_function_executor.py +++ b/evadb/executor/create_function_executor.py @@ -85,10 +85,17 @@ ======= >>>>>>> 6d6a14c8 (Bump v0.3.4+ dev) string_comparison_case_insensitive, +<<<<<<< HEAD try_to_import_flaml_automl, try_to_import_ludwig, try_to_import_neuralforecast, try_to_import_statsforecast, +======= + try_to_import_ludwig, + try_to_import_neuralforecast, + try_to_import_sklearn, +<<<<<<< HEAD +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) ======= try_to_import_ludwig, >>>>>>> 2dacff69 (feat: sync master staging (#1050)) @@ -101,6 +108,7 @@ <<<<<<< HEAD try_to_import_statsforecast, >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +<<<<<<< HEAD ======= ======= try_to_import_ludwig, @@ -110,6 +118,12 @@ try_to_import_statsforecast, >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) >>>>>>> 6d6a14c8 (Bump v0.3.4+ dev) +======= +>>>>>>> eva-master +======= + try_to_import_statsforecast, +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) try_to_import_torch, try_to_import_ultralytics, ) @@ -528,6 +542,12 @@ def handle_ultralytics_function(self): <<<<<<< HEAD <<<<<<< HEAD <<<<<<< HEAD +======= + def handle_forecasting_function(self): + """Handle forecasting functions""" + os.environ["CUDA_VISIBLE_DEVICES"] = "" +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) +<<<<<<< HEAD ======= >>>>>>> 53dafecf (feat: sync master staging (#1050)) ======= @@ -539,6 +559,12 @@ def handle_forecasting_function(self): """Handle forecasting functions""" os.environ["CUDA_VISIBLE_DEVICES"] = "" >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +<<<<<<< HEAD +======= +>>>>>>> eva-master +======= +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) aggregated_batch_list = [] child = self.children[0] for batch in child.exec(): @@ -646,6 +672,7 @@ def handle_forecasting_function(self): if library == "neuralforecast": try_to_import_neuralforecast() from neuralforecast import NeuralForecast +<<<<<<< HEAD from neuralforecast.auto import ( AutoDeepAR, AutoFEDformer, @@ -669,12 +696,17 @@ def handle_forecasting_function(self): ) # from neuralforecast.models import Autoformer as AFormer +======= + from neuralforecast.auto import AutoNBEATS, AutoNHITS + from neuralforecast.models import NBEATS, NHITS +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) model_dict = { "AutoNBEATS": AutoNBEATS, "AutoNHITS": AutoNHITS, "NBEATS": NBEATS, "NHITS": NHITS, +<<<<<<< HEAD "PatchTST": PatchTST, "AutoPatchTST": AutoPatchTST, "DeepAR": DeepAR, @@ -691,6 +723,12 @@ def handle_forecasting_function(self): if "model" not in arg_map.keys(): arg_map["model"] = "TFT" +======= + } + + if "model" not in arg_map.keys(): + arg_map["model"] = "NBEATS" +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) if "auto" not in arg_map.keys() or ( arg_map["auto"].lower()[0] == "t" @@ -710,7 +748,11 @@ def handle_forecasting_function(self): model_args["input_size"] = 2 * horizon model_args["early_stop_patience_steps"] = 20 else: +<<<<<<< HEAD model_args_config = { +======= + model_args["config"] = { +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) "input_size": 2 * horizon, "early_stop_patience_steps": 20, } @@ -722,6 +764,7 @@ def handle_forecasting_function(self): if "auto" not in arg_map["model"].lower(): model_args["hist_exog_list"] = exogenous_columns else: +<<<<<<< HEAD model_args_config["hist_exog_list"] = exogenous_columns if "auto" in arg_map["model"].lower(): @@ -734,6 +777,11 @@ def get_optuna_config(trial): model_args["h"] = horizon model_args["loss"] = MQLoss(level=[conf]) +======= + model_args["config"]["hist_exog_list"] = exogenous_columns + + model_args["h"] = horizon +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) model = NeuralForecast( [model_here(**model_args)], @@ -778,21 +826,30 @@ def get_optuna_config(trial): data["ds"] = pd.to_datetime(data["ds"]) +<<<<<<< HEAD model_save_dir_name = ( library + "_" + arg_map["model"] + "_" + new_freq if "statsforecast" in library else library + "_" + str(conf) + "_" + arg_map["model"] + "_" + new_freq ) +======= + model_save_dir_name = library + "_" + arg_map["model"] + "_" + new_freq +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) if len(data.columns) >= 4 and library == "neuralforecast": model_save_dir_name += "_exogenous_" + str(sorted(exogenous_columns)) model_dir = os.path.join( +<<<<<<< HEAD self.db.catalog().get_configuration_catalog_value("model_dir"), +======= + self.db.config.get_value("storage", "model_dir"), +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) "tsforecasting", model_save_dir_name, str(hashlib.sha256(data.to_string().encode()).hexdigest()), ) Path(model_dir).mkdir(parents=True, exist_ok=True) +<<<<<<< HEAD model_save_name = "horizon" + str(horizon) + ".pkl" @@ -850,6 +907,33 @@ def get_optuna_config(trial): data = data._append( [data[data["unique_id"] == col]], ignore_index=True ) +======= + + model_save_name = "horizon" + str(horizon) + ".pkl" + + model_path = os.path.join(model_dir, model_save_name) + + existing_model_files = sorted( + os.listdir(model_dir), + key=lambda x: int(x.split("horizon")[1].split(".pkl")[0]), + ) + existing_model_files = [ + x + for x in existing_model_files + if int(x.split("horizon")[1].split(".pkl")[0]) >= horizon + ] + if len(existing_model_files) == 0: + print("Training, please wait...") + if library == "neuralforecast": + model.fit(df=data, val_size=horizon) + else: + model.fit(df=data[["ds", "y", "unique_id"]]) + f = open(model_path, "wb") + pickle.dump(model, f) + f.close() + elif not Path(model_path).exists(): + model_path = os.path.join(model_dir, existing_model_files[-1]) +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) model.fit(df=data[["ds", "y", "unique_id"]]) hypers = "" @@ -900,9 +984,14 @@ def get_optuna_config(trial): ), FunctionMetadataCatalogEntry("horizon", horizon), FunctionMetadataCatalogEntry("library", library), +<<<<<<< HEAD FunctionMetadataCatalogEntry("conf", conf), +======= +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) ] + os.environ.pop("CUDA_VISIBLE_DEVICES", None) + return ( self.node.name, impl_path, diff --git a/evadb/functions/forecast.py b/evadb/functions/forecast.py index 251d47076c..563ce24b1a 100644 --- a/evadb/functions/forecast.py +++ b/evadb/functions/forecast.py @@ -85,6 +85,9 @@ def setup( time_column_rename: str, id_column_rename: str, <<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) horizon: int, library: str, <<<<<<< HEAD @@ -232,6 +235,50 @@ def forward(self, data) -> pd.DataFrame: forecast_df = self.model.predict(h=horizon) <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD +======= +======= +>>>>>>> eva-master + self.predict_column_rename = predict_column_rename + self.time_column_rename = time_column_rename + self.id_column_rename = id_column_rename + self.horizon = int(horizon) + self.library = library + + def forward(self, data) -> pd.DataFrame: +======= +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) + if self.library == "statsforecast": + forecast_df = self.model.predict(h=self.horizon) + else: + forecast_df = self.model.predict() +<<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +>>>>>>> eva-master +======= +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) + forecast_df.reset_index(inplace=True) + forecast_df = forecast_df.rename( + columns={ + "unique_id": self.id_column_rename, + "ds": self.time_column_rename, + self.model_name: self.predict_column_rename, + } +<<<<<<< HEAD +<<<<<<< HEAD + )[: self.horizon * forecast_df["unique_id"].nunique()] + return forecast_df +======= +<<<<<<< HEAD + ) +======= + )[: self.horizon * forecast_df["unique_id"].nunique()] +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) + return forecast_df +======= +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) forecast_df = forecast_df.rename(columns={self.model_name: "y"}) return pd.DataFrame( forecast_df, diff --git a/setup.py b/setup.py index bc695e35d3..43f72972ff 100644 --- a/setup.py +++ b/setup.py @@ -143,6 +143,7 @@ def read(path, encoding="utf-8"): xgboost_libs = ["flaml[automl]"] forecasting_libs = [ +<<<<<<< HEAD "statsforecast", # MODEL TRAIN AND FINE TUNING "neuralforecast", # MODEL TRAIN AND FINE TUNING ] @@ -158,6 +159,8 @@ def read(path, encoding="utf-8"): sklearn_libs = ["scikit-learn"] forecasting_libs = [ +======= +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) "statsforecast", # MODEL TRAIN AND FINE TUNING "neuralforecast" # MODEL TRAIN AND FINE TUNING ] diff --git a/test/integration_tests/long/test_model_forecasting.py b/test/integration_tests/long/test_model_forecasting.py index 1fcf84fe2a..879000363d 100644 --- a/test/integration_tests/long/test_model_forecasting.py +++ b/test/integration_tests/long/test_model_forecasting.py @@ -61,6 +61,15 @@ def setUpClass(cls): ======= >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) >>>>>>> 6d6a14c8 (Bump v0.3.4+ dev) + create_table_query = """ + CREATE TABLE AirDataPanel (\ + unique_id TEXT(30),\ + ds TEXT(30),\ + y INTEGER,\ + trend INTEGER,\ + ylagged INTEGER);""" + execute_query_fetch_all(cls.evadb, create_table_query) + create_table_query = """ CREATE TABLE HomeData (\ saledate TEXT(30),\ @@ -93,21 +102,38 @@ def setUpClass(cls): <<<<<<< HEAD <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD ======= >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) ======= ======= >>>>>>> 6d6a14c8 (Bump v0.3.4+ dev) +======= +======= +<<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> eva-master +======= +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) path = f"{EvaDB_ROOT_DIR}/data/forecasting/AirPassengersPanel.csv" load_query = f"LOAD CSV '{path}' INTO AirDataPanel;" execute_query_fetch_all(cls.evadb, load_query) +<<<<<<< HEAD <<<<<<< HEAD ======= >>>>>>> 53dafecf (feat: sync master staging (#1050)) ======= >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +<<<<<<< HEAD >>>>>>> 6d6a14c8 (Bump v0.3.4+ dev) +======= +>>>>>>> eva-master +======= +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) path = f"{EvaDB_ROOT_DIR}/data/forecasting/home_sales.csv" load_query = f"LOAD CSV '{path}' INTO HomeData;" execute_query_fetch_all(cls.evadb, load_query) @@ -212,13 +238,22 @@ def test_forecast(self): <<<<<<< HEAD <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD <<<<<<< HEAD SELECT AirForecast() order by y; ======= +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) +======= ======= >>>>>>> 6d6a14c8 (Bump v0.3.4+ dev) SELECT AirForecast(12) order by y; +<<<<<<< HEAD >>>>>>> 53dafecf (feat: sync master staging (#1050)) +======= +======= + SELECT AirForecast() order by y; +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) """ result = execute_query_fetch_all(self.evadb, predict_query) self.assertEqual(len(result), 12) @@ -266,6 +301,28 @@ def test_forecast_neuralforecast(self): ], ) + create_predict_udf = """ + CREATE FUNCTION AirPanelForecast FROM + (SELECT unique_id, ds, y, trend FROM AirDataPanel) + TYPE Forecasting + HORIZON 12 + PREDICT 'y' + LIBRARY 'neuralforecast' + AUTO 'false' + FREQUENCY 'M'; + """ + execute_query_fetch_all(self.evadb, create_predict_udf) + + predict_query = """ + SELECT AirPanelForecast() order by y; + """ + result = execute_query_fetch_all(self.evadb, predict_query) + self.assertEqual(len(result), 24) + self.assertEqual( + result.columns, + ["airpanelforecast.unique_id", "airpanelforecast.ds", "airpanelforecast.y"], + ) + @forecast_skip_marker def test_forecast_with_column_rename(self): create_predict_udf = """ diff --git a/test/unit_tests/binder/test_statement_binder.py b/test/unit_tests/binder/test_statement_binder.py index 22373b14b8..33b8d016bf 100644 --- a/test/unit_tests/binder/test_statement_binder.py +++ b/test/unit_tests/binder/test_statement_binder.py @@ -738,6 +738,7 @@ def test_bind_create_function_should_bind_forecast_with_renaming_columns(self): self.assertEqual(create_function_statement.inputs, expected_inputs) self.assertEqual(create_function_statement.outputs, expected_outputs) +<<<<<<< HEAD <<<<<<< HEAD ======= <<<<<<< HEAD @@ -788,7 +789,13 @@ def test_bind_create_function_should_raise_forecast_with_unexpected_columns(self ======= >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +<<<<<<< HEAD >>>>>>> 6d6a14c8 (Bump v0.3.4+ dev) +======= +>>>>>>> eva-master +======= +>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115)) +>>>>>>> ca239aea (Add support for Neuralforecast (#1115)) def test_bind_create_function_should_raise_forecast_missing_required_columns(self): with patch.object(StatementBinder, "bind"): create_function_statement = MagicMock()