diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 814058c77e5..eb70fbe98c3 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -1154,8 +1154,17 @@ def _initialize_dynamodb_resource( def _get_table_name( online_config: DynamoDBOnlineStoreConfig, config: RepoConfig, table: FeatureView ) -> str: + table_name = table.name + if config.registry.enable_online_feature_view_versioning: + # Prefer version_tag from the projection (set by version-qualified refs like @v2) + # over current_version_number (the FV's active version in metadata). + version = getattr(table.projection, "version_tag", None) + if version is None: + version = getattr(table, "current_version_number", None) + if version is not None and version > 0: + table_name = f"{table.name}_v{version}" return online_config.table_name_template.format( - project=config.project, table_name=table.name + project=config.project, table_name=table_name ) diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 4913046470c..2c2f1c2a597 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -256,9 +256,10 @@ def get_online_features( def _check_versioned_read_support(self, grouped_refs): """Raise an error if versioned reads are attempted on unsupported stores.""" + from feast.infra.online_stores.dynamodb import DynamoDBOnlineStore from feast.infra.online_stores.sqlite import SqliteOnlineStore - if isinstance(self, SqliteOnlineStore): + if isinstance(self, (SqliteOnlineStore, DynamoDBOnlineStore)): return for table, _ in grouped_refs: version_tag = getattr(table.projection, "version_tag", None) diff --git a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py index 7e5558e19d7..c760e9675fc 100644 --- a/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py +++ b/sdk/python/tests/unit/infra/online_store/test_dynamodb_online_store.py @@ -12,6 +12,7 @@ from feast.infra.online_stores.dynamodb import ( DynamoDBOnlineStore, DynamoDBOnlineStoreConfig, + _get_table_name, _latest_data_to_write, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -1050,3 +1051,120 @@ def tracking_client(*args, **kwargs): f"Expected 1 shared client for thread-safety, " f"got {len(set(dynamodb_clients))} unique clients" ) + + +@dataclass +class MockProjection: + version_tag: Optional[int] = None + + +@dataclass +class MockFeatureViewWithProjection: + name: str + projection: MockProjection + current_version_number: Optional[int] = None + tags: Optional[dict] = None + + +def _make_repo_config(enable_versioning: bool = False) -> RepoConfig: + from feast.repo_config import RegistryConfig + + registry_cfg = RegistryConfig( + path="s3://test_registry/registry.db", + enable_online_feature_view_versioning=enable_versioning, + ) + return RepoConfig( + registry=registry_cfg, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig(region=REGION), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + + +def test_get_table_name_no_versioning(): + """Without versioning enabled, table name is always the plain template.""" + config = _make_repo_config(enable_versioning=False) + online_config = config.online_store + fv = MockFeatureViewWithProjection( + name="driver_stats", + projection=MockProjection(version_tag=2), + current_version_number=2, + ) + assert _get_table_name(online_config, config, fv) == f"{PROJECT}.driver_stats" + + +def test_get_table_name_versioning_with_projection_version_tag(): + """When versioning is enabled, projection.version_tag takes precedence.""" + config = _make_repo_config(enable_versioning=True) + online_config = config.online_store + fv = MockFeatureViewWithProjection( + name="driver_stats", + projection=MockProjection(version_tag=2), + current_version_number=5, # should be ignored in favour of projection tag + ) + assert _get_table_name(online_config, config, fv) == f"{PROJECT}.driver_stats_v2" + + +def test_get_table_name_versioning_falls_back_to_current_version_number(): + """When projection.version_tag is None, current_version_number is used.""" + config = _make_repo_config(enable_versioning=True) + online_config = config.online_store + fv = MockFeatureViewWithProjection( + name="driver_stats", + projection=MockProjection(version_tag=None), + current_version_number=3, + ) + assert _get_table_name(online_config, config, fv) == f"{PROJECT}.driver_stats_v3" + + +def test_get_table_name_versioning_version_zero_is_unversioned(): + """A version value of 0 should not add a suffix (sentinel for 'no version').""" + config = _make_repo_config(enable_versioning=True) + online_config = config.online_store + fv = MockFeatureViewWithProjection( + name="driver_stats", + projection=MockProjection(version_tag=0), + current_version_number=0, + ) + assert _get_table_name(online_config, config, fv) == f"{PROJECT}.driver_stats" + + +def test_get_table_name_versioning_no_version_info(): + """When versioning is enabled but no version is set, plain name is used.""" + config = _make_repo_config(enable_versioning=True) + online_config = config.online_store + fv = MockFeatureViewWithProjection( + name="driver_stats", + projection=MockProjection(version_tag=None), + current_version_number=None, + ) + assert _get_table_name(online_config, config, fv) == f"{PROJECT}.driver_stats" + + +def test_get_table_name_custom_template_with_versioning(): + """Custom table_name_template respects the versioned table_name fragment.""" + from feast.repo_config import RegistryConfig + + registry_cfg = RegistryConfig( + path="s3://test_registry/registry.db", + enable_online_feature_view_versioning=True, + ) + config = RepoConfig( + registry=registry_cfg, + project=PROJECT, + provider=PROVIDER, + online_store=DynamoDBOnlineStoreConfig( + region=REGION, + table_name_template="{project}__{table_name}", + ), + offline_store=DaskOfflineStoreConfig(), + entity_key_serialization_version=3, + ) + online_config = config.online_store + fv = MockFeatureViewWithProjection( + name="driver_stats", + projection=MockProjection(version_tag=4), + ) + assert _get_table_name(online_config, config, fv) == f"{PROJECT}__driver_stats_v4"