Skip to content

Commit

Permalink
Add support for Neuralforecast (georgia-tech-db#1115)
Browse files Browse the repository at this point in the history
Adding support for `neuralforecast`. Fixes georgia-tech-db#1112.

```sql
DROP TABLE IF EXISTS AirData;

CREATE TABLE AirData (
    unique_id TEXT(30),
    ds TEXT(30),
    y INTEGER);

LOAD CSV 'data/forecasting/air-passengers.csv' INTO AirData;

DROP FUNCTION IF EXISTS Forecast;

CREATE FUNCTION Forecast FROM
(SELECT unique_id, ds, y FROM AirData)
TYPE Forecasting
PREDICT 'y'
HORIZON 12
LIBRARY 'neuralforecast';

SELECT Forecast(12);
```
One quick issue here is that `neuralforecast` needs `horizon` as a
parameter while training, unlike `statsforecast`. Thus, a better way to
call the UDF would be simply `SELECT Forecast();`, which is currently
unsupported. @xzdandy Please let me know your thoughts.

List of stuff yet to be done:

- [x] Incorporate `neuralforecast`
- [x] Fix `HORIZON` redundancy (UPDATE: Being fixed in georgia-tech-db#1121)
- [x] Reuse model with lower horizon no
- [x] Add support for ~multivariate forecasting~ exogenous variables
- [x] Add tests
- [x] Add docs

---------

Co-authored-by: xzdandy <xzdandy@gmail.com>
  • Loading branch information
2 people authored and a0x8o committed Nov 22, 2023
1 parent 2171555 commit d9f8fa7
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 0 deletions.
18 changes: 18 additions & 0 deletions docs/source/reference/ai/model-forecasting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ Below is an example query specifying the above parameters:
ID 'type'
Frequency 'W';
<<<<<<< HEAD
<<<<<<< HEAD

Below is an example query with `neuralforecast` with `trend` column as exogenous and without automatic hyperparameter optimization:

Expand All @@ -232,6 +233,8 @@ Below is an example query with `neuralforecast` with `trend` column as exogenous
FREQUENCY 'M';
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
=======
=======
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
=======
SELECT Forecast(12) FROM AirData;

Expand Down Expand Up @@ -285,6 +288,8 @@ Below is an example query specifying the above parameters:
TIME 'saledate'
ID 'type'
Frequency 'W';
=======
>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115))

Below is an example query with `neuralforecast` with `trend` column as exogenous and without automatic hyperparameter optimization:

Expand All @@ -297,6 +302,19 @@ Below is an example query with `neuralforecast` with `trend` column as exogenous
PREDICT 'y'
LIBRARY 'neuralforecast'
AUTO 'f'
<<<<<<< HEAD
FREQUENCY 'M';
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
>>>>>>> 6d6a14c8 (Bump v0.3.4+ dev)
=======
<<<<<<< HEAD
<<<<<<< HEAD
FREQUENCY 'M';
=======
FREQUENCY 'M';
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
>>>>>>> eva-master
=======
FREQUENCY 'M';
>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115))
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
89 changes: 89 additions & 0 deletions evadb/executor/create_function_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,17 @@
=======
>>>>>>> 6d6a14c8 (Bump v0.3.4+ dev)
string_comparison_case_insensitive,
<<<<<<< HEAD
try_to_import_flaml_automl,
try_to_import_ludwig,
try_to_import_neuralforecast,
try_to_import_statsforecast,
=======
try_to_import_ludwig,
try_to_import_neuralforecast,
try_to_import_sklearn,
<<<<<<< HEAD
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
=======
try_to_import_ludwig,
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
Expand All @@ -101,6 +108,7 @@
<<<<<<< HEAD
try_to_import_statsforecast,
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
<<<<<<< HEAD
=======
=======
try_to_import_ludwig,
Expand All @@ -110,6 +118,12 @@
try_to_import_statsforecast,
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
>>>>>>> 6d6a14c8 (Bump v0.3.4+ dev)
=======
>>>>>>> eva-master
=======
try_to_import_statsforecast,
>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115))
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
try_to_import_torch,
try_to_import_ultralytics,
)
Expand Down Expand Up @@ -528,6 +542,12 @@ def handle_ultralytics_function(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
def handle_forecasting_function(self):
"""Handle forecasting functions"""
os.environ["CUDA_VISIBLE_DEVICES"] = ""
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
<<<<<<< HEAD
=======
>>>>>>> 53dafecf (feat: sync master staging (#1050))
=======
Expand All @@ -539,6 +559,12 @@ def handle_forecasting_function(self):
"""Handle forecasting functions"""
os.environ["CUDA_VISIBLE_DEVICES"] = ""
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
<<<<<<< HEAD
=======
>>>>>>> eva-master
=======
>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115))
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
aggregated_batch_list = []
child = self.children[0]
for batch in child.exec():
Expand Down Expand Up @@ -646,6 +672,7 @@ def handle_forecasting_function(self):
if library == "neuralforecast":
try_to_import_neuralforecast()
from neuralforecast import NeuralForecast
<<<<<<< HEAD
from neuralforecast.auto import (
AutoDeepAR,
AutoFEDformer,
Expand All @@ -669,12 +696,17 @@ def handle_forecasting_function(self):
)

# from neuralforecast.models import Autoformer as AFormer
=======
from neuralforecast.auto import AutoNBEATS, AutoNHITS
from neuralforecast.models import NBEATS, NHITS
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))

model_dict = {
"AutoNBEATS": AutoNBEATS,
"AutoNHITS": AutoNHITS,
"NBEATS": NBEATS,
"NHITS": NHITS,
<<<<<<< HEAD
"PatchTST": PatchTST,
"AutoPatchTST": AutoPatchTST,
"DeepAR": DeepAR,
Expand All @@ -691,6 +723,12 @@ def handle_forecasting_function(self):

if "model" not in arg_map.keys():
arg_map["model"] = "TFT"
=======
}

if "model" not in arg_map.keys():
arg_map["model"] = "NBEATS"
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))

if "auto" not in arg_map.keys() or (
arg_map["auto"].lower()[0] == "t"
Expand All @@ -710,7 +748,11 @@ def handle_forecasting_function(self):
model_args["input_size"] = 2 * horizon
model_args["early_stop_patience_steps"] = 20
else:
<<<<<<< HEAD
model_args_config = {
=======
model_args["config"] = {
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
"input_size": 2 * horizon,
"early_stop_patience_steps": 20,
}
Expand All @@ -722,6 +764,7 @@ def handle_forecasting_function(self):
if "auto" not in arg_map["model"].lower():
model_args["hist_exog_list"] = exogenous_columns
else:
<<<<<<< HEAD
model_args_config["hist_exog_list"] = exogenous_columns

if "auto" in arg_map["model"].lower():
Expand All @@ -734,6 +777,11 @@ def get_optuna_config(trial):

model_args["h"] = horizon
model_args["loss"] = MQLoss(level=[conf])
=======
model_args["config"]["hist_exog_list"] = exogenous_columns

model_args["h"] = horizon
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))

model = NeuralForecast(
[model_here(**model_args)],
Expand Down Expand Up @@ -778,21 +826,30 @@ def get_optuna_config(trial):

data["ds"] = pd.to_datetime(data["ds"])

<<<<<<< HEAD
model_save_dir_name = (
library + "_" + arg_map["model"] + "_" + new_freq
if "statsforecast" in library
else library + "_" + str(conf) + "_" + arg_map["model"] + "_" + new_freq
)
=======
model_save_dir_name = library + "_" + arg_map["model"] + "_" + new_freq
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
if len(data.columns) >= 4 and library == "neuralforecast":
model_save_dir_name += "_exogenous_" + str(sorted(exogenous_columns))

model_dir = os.path.join(
<<<<<<< HEAD
self.db.catalog().get_configuration_catalog_value("model_dir"),
=======
self.db.config.get_value("storage", "model_dir"),
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
"tsforecasting",
model_save_dir_name,
str(hashlib.sha256(data.to_string().encode()).hexdigest()),
)
Path(model_dir).mkdir(parents=True, exist_ok=True)
<<<<<<< HEAD

model_save_name = "horizon" + str(horizon) + ".pkl"

Expand Down Expand Up @@ -850,6 +907,33 @@ def get_optuna_config(trial):
data = data._append(
[data[data["unique_id"] == col]], ignore_index=True
)
=======

model_save_name = "horizon" + str(horizon) + ".pkl"

model_path = os.path.join(model_dir, model_save_name)

existing_model_files = sorted(
os.listdir(model_dir),
key=lambda x: int(x.split("horizon")[1].split(".pkl")[0]),
)
existing_model_files = [
x
for x in existing_model_files
if int(x.split("horizon")[1].split(".pkl")[0]) >= horizon
]
if len(existing_model_files) == 0:
print("Training, please wait...")
if library == "neuralforecast":
model.fit(df=data, val_size=horizon)
else:
model.fit(df=data[["ds", "y", "unique_id"]])
f = open(model_path, "wb")
pickle.dump(model, f)
f.close()
elif not Path(model_path).exists():
model_path = os.path.join(model_dir, existing_model_files[-1])
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))

model.fit(df=data[["ds", "y", "unique_id"]])
hypers = ""
Expand Down Expand Up @@ -900,9 +984,14 @@ def get_optuna_config(trial):
),
FunctionMetadataCatalogEntry("horizon", horizon),
FunctionMetadataCatalogEntry("library", library),
<<<<<<< HEAD
FunctionMetadataCatalogEntry("conf", conf),
=======
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
]

os.environ.pop("CUDA_VISIBLE_DEVICES", None)

return (
self.node.name,
impl_path,
Expand Down
47 changes: 47 additions & 0 deletions evadb/functions/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def setup(
time_column_rename: str,
id_column_rename: str,
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115))
horizon: int,
library: str,
<<<<<<< HEAD
Expand Down Expand Up @@ -232,6 +235,50 @@ def forward(self, data) -> pd.DataFrame:
forecast_df = self.model.predict(h=horizon)
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
=======
>>>>>>> eva-master
self.predict_column_rename = predict_column_rename
self.time_column_rename = time_column_rename
self.id_column_rename = id_column_rename
self.horizon = int(horizon)
self.library = library

def forward(self, data) -> pd.DataFrame:
=======
>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115))
if self.library == "statsforecast":
forecast_df = self.model.predict(h=self.horizon)
else:
forecast_df = self.model.predict()
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
>>>>>>> eva-master
=======
>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115))
forecast_df.reset_index(inplace=True)
forecast_df = forecast_df.rename(
columns={
"unique_id": self.id_column_rename,
"ds": self.time_column_rename,
self.model_name: self.predict_column_rename,
}
<<<<<<< HEAD
<<<<<<< HEAD
)[: self.horizon * forecast_df["unique_id"].nunique()]
return forecast_df
=======
<<<<<<< HEAD
)
=======
)[: self.horizon * forecast_df["unique_id"].nunique()]
>>>>>>> e8a181c5 (Add support for Neuralforecast (#1115))
return forecast_df
=======
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
forecast_df = forecast_df.rename(columns={self.model_name: "y"})
return pd.DataFrame(
forecast_df,
Expand Down
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def read(path, encoding="utf-8"):
xgboost_libs = ["flaml[automl]"]

forecasting_libs = [
<<<<<<< HEAD
"statsforecast", # MODEL TRAIN AND FINE TUNING
"neuralforecast", # MODEL TRAIN AND FINE TUNING
]
Expand All @@ -158,6 +159,8 @@ def read(path, encoding="utf-8"):
sklearn_libs = ["scikit-learn"]

forecasting_libs = [
=======
>>>>>>> ca239aea (Add support for Neuralforecast (#1115))
"statsforecast", # MODEL TRAIN AND FINE TUNING
"neuralforecast" # MODEL TRAIN AND FINE TUNING
]
Expand Down
Loading

0 comments on commit d9f8fa7

Please sign in to comment.