Skip to content

Commit

Permalink
fix: Register BatchFeatureView in feature repos correctly (#3092)
Browse files Browse the repository at this point in the history
* fix: Registry  BatchFeatureView in feature repos correctly

Signed-off-by: Achal Shah <achals@gmail.com>

* tests

Signed-off-by: Achal Shah <achals@gmail.com>

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Aug 16, 2022
1 parent c93b4cc commit b8e39ea
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 2 deletions.
9 changes: 9 additions & 0 deletions sdk/python/feast/repo_operations.py
Expand Up @@ -172,6 +172,15 @@ def parse_repo(repo_root: Path) -> RepoContents:
assert stream_source
if not any((stream_source is ds) for ds in res.data_sources):
res.data_sources.append(stream_source)
elif isinstance(obj, BatchFeatureView) and not any(
(obj is bfv) for bfv in res.feature_views
):
res.feature_views.append(obj)

# Handle batch sources defined with feature views.
batch_source = obj.batch_source
if not any((batch_source is ds) for ds in res.data_sources):
res.data_sources.append(batch_source)
elif isinstance(obj, Entity) and not any(
(obj is entity) for entity in res.entities
):
Expand Down
52 changes: 52 additions & 0 deletions sdk/python/tests/example_repos/example_feature_repo_with_bfvs.py
@@ -0,0 +1,52 @@
from datetime import timedelta

from feast import BatchFeatureView, Entity, Field, FileSource
from feast.types import Float32, Int32, Int64

driver_hourly_stats = FileSource(
path="%PARQUET_PATH%", # placeholder to be replaced by the test
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

driver = Entity(
name="driver_id",
description="driver id",
)


driver_hourly_stats_view = BatchFeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
Field(name="driver_id", dtype=Int32),
],
online=True,
source=driver_hourly_stats,
tags={},
)


global_daily_stats = FileSource(
path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test
timestamp_field="event_timestamp",
created_timestamp_column="created",
)


global_stats_feature_view = BatchFeatureView(
name="global_daily_stats",
entities=None,
ttl=timedelta(days=1),
schema=[
Field(name="num_rides", dtype=Int32),
Field(name="avg_ride_length", dtype=Float32),
],
online=True,
source=global_daily_stats,
tags={},
)
10 changes: 10 additions & 0 deletions sdk/python/tests/unit/local_feast_tests/test_e2e_local.py
Expand Up @@ -51,6 +51,16 @@ def test_e2e_local() -> None:
runner, store, start_date, end_date, driver_df
)

with runner.local_repo(
get_example_repo("example_feature_repo_with_bfvs.py")
.replace("%PARQUET_PATH%", driver_stats_path)
.replace("%PARQUET_PATH_GLOBAL%", global_stats_path),
"file",
) as store:
_test_materialize_and_online_retrieval(
runner, store, start_date, end_date, driver_df
)

with runner.local_repo(
get_example_repo("example_feature_repo_with_ttl_0.py")
.replace("%PARQUET_PATH%", driver_stats_path)
Expand Down
8 changes: 6 additions & 2 deletions sdk/python/tests/utils/cli_repo_creator.py
Expand Up @@ -88,7 +88,9 @@ def local_repo(self, example_repo_py: str, offline_store: str):
stderr = result.stderr.decode("utf-8")
print(f"Apply stdout:\n{stdout}")
print(f"Apply stderr:\n{stderr}")
assert result.returncode == 0
assert (
result.returncode == 0
), f"stdout: {result.stdout}\nstderr: {result.stderr}"

yield FeatureStore(repo_path=str(repo_path), config=None)

Expand All @@ -97,4 +99,6 @@ def local_repo(self, example_repo_py: str, offline_store: str):
stderr = result.stderr.decode("utf-8")
print(f"Apply stdout:\n{stdout}")
print(f"Apply stderr:\n{stderr}")
assert result.returncode == 0
assert (
result.returncode == 0
), f"stdout: {result.stdout}\nstderr: {result.stderr}"

0 comments on commit b8e39ea

Please sign in to comment.