diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index aaccff938c..bf20e51df9 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -96,9 +96,9 @@ def get_online_features(body=Depends(get_body)): full_feature_names = body.get("full_feature_names", False) - response_proto = store._get_online_features( + response_proto = store.get_online_features( features=features, - entity_values=body["entities"], + entity_rows=body["entities"], full_feature_names=full_feature_names, ).proto diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 716e706ebe..24d7988f17 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -11,12 +11,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import copy import itertools import logging import os import warnings -from collections import Counter, defaultdict from datetime import datetime, timedelta from pathlib import Path from typing import ( @@ -28,7 +26,6 @@ Mapping, Optional, Sequence, - Set, Tuple, Union, cast, @@ -57,20 +54,15 @@ from feast.errors import ( DataFrameSerializationError, DataSourceRepeatNamesException, - EntityNotFoundException, - FeatureNameCollisionError, FeatureViewNotFoundException, PushSourceNotFoundException, RequestDataNotFoundInEntityDfException, - RequestDataNotFoundInEntityRowsException, ) from feast.feast_object import FeastObject from feast.feature_service import FeatureService from feast.feature_view import ( DUMMY_ENTITY, - DUMMY_ENTITY_ID, DUMMY_ENTITY_NAME, - DUMMY_ENTITY_VAL, FeatureView, ) from feast.inference import ( @@ -89,14 +81,11 @@ FieldStatus, GetOnlineFeaturesResponse, ) -from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value from feast.repo_config import RepoConfig, load_repo_config from feast.repo_contents import RepoContents from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference from feast.stream_feature_view import StreamFeatureView -from feast.type_map import python_values_to_proto_values -from feast.value_type import ValueType from feast.version import get_version warnings.simplefilter("once", DeprecationWarning) @@ -276,42 +265,19 @@ def list_feature_views(self, allow_cache: bool = False) -> List[FeatureView]: "list_feature_views will make breaking changes. Please use list_batch_feature_views instead. " "list_feature_views will behave like list_all_feature_views in the future." ) - return self._list_feature_views(allow_cache) + return utils._list_feature_views(self._registry, self.project, allow_cache) def _list_all_feature_views( self, allow_cache: bool = False, ) -> List[Union[FeatureView, StreamFeatureView, OnDemandFeatureView]]: all_feature_views = ( - self._list_feature_views(allow_cache) + utils._list_feature_views(self._registry, self.project, allow_cache) + self._list_stream_feature_views(allow_cache) + self.list_on_demand_feature_views(allow_cache) ) return all_feature_views - def _list_feature_views( - self, - allow_cache: bool = False, - hide_dummy_entity: bool = True, - ) -> List[FeatureView]: - logging.warning( - "_list_feature_views will make breaking changes. Please use _list_batch_feature_views instead. " - "_list_feature_views will behave like _list_all_feature_views in the future." - ) - feature_views = [] - for fv in self._registry.list_feature_views( - self.project, allow_cache=allow_cache - ): - if ( - hide_dummy_entity - and fv.entities - and fv.entities[0] == DUMMY_ENTITY_NAME - ): - fv.entities = [] - fv.entity_columns = [] - feature_views.append(fv) - return feature_views - def _list_batch_feature_views( self, allow_cache: bool = False, @@ -536,39 +502,6 @@ def delete_feature_service(self, name: str): """ return self._registry.delete_feature_service(name, self.project) - def _get_features( - self, - features: Union[List[str], FeatureService], - allow_cache: bool = False, - ) -> List[str]: - _features = features - - if not _features: - raise ValueError("No features specified for retrieval") - - _feature_refs = [] - if isinstance(_features, FeatureService): - feature_service_from_registry = self.get_feature_service( - _features.name, allow_cache - ) - if feature_service_from_registry != _features: - warnings.warn( - "The FeatureService object that has been passed in as an argument is " - "inconsistent with the version from the registry. Potentially a newer version " - "of the FeatureService has been applied to the registry." - ) - for projection in feature_service_from_registry.feature_view_projections: - _feature_refs.extend( - [ - f"{projection.name_to_use()}:{f.name}" - for f in projection.features - ] - ) - else: - assert isinstance(_features, list) - _feature_refs = _features - return _feature_refs - def _should_use_plan(self): """Returns True if plan and _apply_diffs should be used, False otherwise.""" # Currently only the local provider with sqlite online store supports plan and _apply_diffs. @@ -662,8 +595,8 @@ def _get_feature_views_to_materialize( feature_views_to_materialize: List[FeatureView] = [] if feature_views is None: - feature_views_to_materialize = self._list_feature_views( - hide_dummy_entity=False + feature_views_to_materialize = utils._list_feature_views( + self._registry, self.project, hide_dummy_entity=False ) feature_views_to_materialize = [ fv for fv in feature_views_to_materialize if fv.online @@ -1073,16 +1006,16 @@ def get_historical_features( ... ) >>> feature_data = retrieval_job.to_df() """ - _feature_refs = self._get_features(features) + _feature_refs = utils._get_features(self._registry, self.project, features) ( all_feature_views, all_on_demand_feature_views, - ) = self._get_feature_views_to_use(features) + ) = utils._get_feature_views_to_use(self._registry, self.project, features) # TODO(achal): _group_feature_refs returns the on demand feature views, but it's not passed into the provider. # This is a weird interface quirk - we should revisit the `get_historical_features` to # pass in the on demand feature views as well. - fvs, odfvs = _group_feature_refs( + fvs, odfvs = utils._group_feature_refs( _feature_refs, all_feature_views, all_on_demand_feature_views, @@ -1103,7 +1036,7 @@ def get_historical_features( feature_view_name=odfv.name, ) - _validate_feature_refs(_feature_refs, full_feature_names) + utils._validate_feature_refs(_feature_refs, full_feature_names) provider = self._get_provider() job = provider.get_historical_features( @@ -1510,7 +1443,10 @@ def write_to_offline_store( def get_online_features( self, features: Union[List[str], FeatureService], - entity_rows: List[Dict[str, Any]], + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[Value], RepeatedValue]], + ], full_feature_names: bool = False, ) -> OnlineResponse: """ @@ -1554,230 +1490,19 @@ def get_online_features( ... ) >>> online_response_dict = online_response.to_dict() """ - columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()} - for entity_row in entity_rows: - for key, value in entity_row.items(): - try: - columnar[key].append(value) - except KeyError as e: - raise ValueError("All entity_rows must have the same keys.") from e - - return self._get_online_features( - features=features, - entity_values=columnar, - full_feature_names=full_feature_names, - native_entity_values=True, - ) - - async def get_online_features_async( - self, - features: Union[List[str], FeatureService], - entity_rows: List[Dict[str, Any]], - full_feature_names: bool = False, - ) -> OnlineResponse: - """ - [Alpha] Retrieves the latest online feature data asynchronously. - - Note: This method will download the full feature registry the first time it is run. If you are using a - remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL - duration (which can be set to infinity). If the cached registry is stale (more time than the TTL has - passed), then a new registry will be downloaded synchronously by this method. This download may - introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call - refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to - infinity (cache forever). - - Args: - features: The list of features that should be retrieved from the online store. These features can be - specified either as a list of string feature references or as a feature service. String feature - references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions". - entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair. - full_feature_names: If True, feature names will be prefixed with the corresponding feature view name, - changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions" - changes to "customer_fv__daily_transactions"). - - Returns: - OnlineResponse containing the feature data in records. - - Raises: - Exception: No entity with the specified name exists. - """ - columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()} - for entity_row in entity_rows: - for key, value in entity_row.items(): - try: - columnar[key].append(value) - except KeyError as e: - raise ValueError("All entity_rows must have the same keys.") from e - - return await self._get_online_features_async( - features=features, - entity_values=columnar, - full_feature_names=full_feature_names, - native_entity_values=True, - ) - - def _get_online_request_context( - self, features: Union[List[str], FeatureService], full_feature_names: bool - ): - _feature_refs = self._get_features(features, allow_cache=True) - - ( - requested_feature_views, - requested_on_demand_feature_views, - ) = self._get_feature_views_to_use( - features=features, allow_cache=True, hide_dummy_entity=False - ) - - ( - entity_name_to_join_key_map, - entity_type_map, - join_keys_set, - ) = self._get_entity_maps(requested_feature_views) - - _validate_feature_refs(_feature_refs, full_feature_names) - ( - grouped_refs, - grouped_odfv_refs, - ) = _group_feature_refs( - _feature_refs, - requested_feature_views, - requested_on_demand_feature_views, - ) - - requested_result_row_names = { - feat_ref.replace(":", "__") for feat_ref in _feature_refs - } - if not full_feature_names: - requested_result_row_names = { - name.rpartition("__")[-1] for name in requested_result_row_names - } - - feature_views = list(view for view, _ in grouped_refs) - - needed_request_data = self.get_needed_request_data(grouped_odfv_refs) - - entityless_case = DUMMY_ENTITY_NAME in [ - entity_name - for feature_view in feature_views - for entity_name in feature_view.entities - ] - - return ( - _feature_refs, - requested_on_demand_feature_views, - entity_name_to_join_key_map, - entity_type_map, - join_keys_set, - grouped_refs, - requested_result_row_names, - needed_request_data, - entityless_case, - ) - - def _prepare_entities_to_read_from_online_store( - self, - features: Union[List[str], FeatureService], - entity_values: Mapping[ - str, Union[Sequence[Any], Sequence[Value], RepeatedValue] - ], - full_feature_names: bool = False, - native_entity_values: bool = True, - ): - ( - feature_refs, - requested_on_demand_feature_views, - entity_name_to_join_key_map, - entity_type_map, - join_keys_set, - grouped_refs, - requested_result_row_names, - needed_request_data, - entityless_case, - ) = self._get_online_request_context(features, full_feature_names) - - # Extract Sequence from RepeatedValue Protobuf. - entity_value_lists: Dict[str, Union[List[Any], List[Value]]] = { - k: list(v) if isinstance(v, Sequence) else list(v.val) - for k, v in entity_values.items() - } - - entity_proto_values: Dict[str, List[Value]] - if native_entity_values: - # Convert values to Protobuf once. - entity_proto_values = { - k: python_values_to_proto_values( - v, entity_type_map.get(k, ValueType.UNKNOWN) - ) - for k, v in entity_value_lists.items() - } - else: - entity_proto_values = entity_value_lists - - num_rows = _validate_entity_values(entity_proto_values) - - join_key_values: Dict[str, List[Value]] = {} - request_data_features: Dict[str, List[Value]] = {} - # Entity rows may be either entities or request data. - for join_key_or_entity_name, values in entity_proto_values.items(): - # Found request data - if join_key_or_entity_name in needed_request_data: - request_data_features[join_key_or_entity_name] = values - else: - if join_key_or_entity_name in join_keys_set: - join_key = join_key_or_entity_name - else: + if isinstance(entity_rows, list): + columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()} + for entity_row in entity_rows: + for key, value in entity_row.items(): try: - join_key = entity_name_to_join_key_map[join_key_or_entity_name] - except KeyError: - raise EntityNotFoundException( - join_key_or_entity_name, self.project - ) - else: - warnings.warn( - "Using entity name is deprecated. Use join_key instead." - ) - - # All join keys should be returned in the result. - requested_result_row_names.add(join_key) - join_key_values[join_key] = values + columnar[key].append(value) + except KeyError as e: + raise ValueError( + "All entity_rows must have the same keys." + ) from e - self.ensure_request_data_values_exist( - needed_request_data, request_data_features - ) + entity_rows = columnar - # Populate online features response proto with join keys and request data features - online_features_response = GetOnlineFeaturesResponse(results=[]) - self._populate_result_rows_from_columnar( - online_features_response=online_features_response, - data=dict(**join_key_values, **request_data_features), - ) - - # Add the Entityless case after populating result rows to avoid having to remove - # it later. - if entityless_case: - join_key_values[DUMMY_ENTITY_ID] = python_values_to_proto_values( - [DUMMY_ENTITY_VAL] * num_rows, DUMMY_ENTITY.value_type - ) - - return ( - join_key_values, - grouped_refs, - entity_name_to_join_key_map, - requested_on_demand_feature_views, - feature_refs, - requested_result_row_names, - online_features_response, - ) - - def _get_online_features( - self, - features: Union[List[str], FeatureService], - entity_values: Mapping[ - str, Union[Sequence[Any], Sequence[Value], RepeatedValue] - ], - full_feature_names: bool = False, - native_entity_values: bool = True, - ): ( join_key_values, grouped_refs, @@ -1786,17 +1511,19 @@ def _get_online_features( feature_refs, requested_result_row_names, online_features_response, - ) = self._prepare_entities_to_read_from_online_store( + ) = utils._prepare_entities_to_read_from_online_store( + registry=self._registry, + project=self.project, features=features, - entity_values=entity_values, + entity_values=entity_rows, full_feature_names=full_feature_names, - native_entity_values=native_entity_values, + native_entity_values=True, ) provider = self._get_provider() for table, requested_features in grouped_refs: # Get the correct set of entity values with the correct join keys. - table_entity_values, idxs = self._get_unique_entities( + table_entity_values, idxs = utils._get_unique_entities( table, join_key_values, entity_name_to_join_key_map, @@ -1811,7 +1538,7 @@ def _get_online_features( ) # Populate the result_rows with the Features from the OnlineStore inplace. - self._populate_response_from_feature_data( + utils._populate_response_from_feature_data( feature_data, idxs, online_features_response, @@ -1821,27 +1548,66 @@ def _get_online_features( ) if requested_on_demand_feature_views: - self._augment_response_with_on_demand_transforms( + utils._augment_response_with_on_demand_transforms( online_features_response, feature_refs, requested_on_demand_feature_views, full_feature_names, ) - self._drop_unneeded_columns( + utils._drop_unneeded_columns( online_features_response, requested_result_row_names ) return OnlineResponse(online_features_response) - async def _get_online_features_async( + async def get_online_features_async( self, features: Union[List[str], FeatureService], - entity_values: Mapping[ - str, Union[Sequence[Any], Sequence[Value], RepeatedValue] + entity_rows: Union[ + List[Dict[str, Any]], + Mapping[str, Union[Sequence[Any], Sequence[Value], RepeatedValue]], ], full_feature_names: bool = False, - native_entity_values: bool = True, - ): + ) -> OnlineResponse: + """ + [Alpha] Retrieves the latest online feature data asynchronously. + + Note: This method will download the full feature registry the first time it is run. If you are using a + remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL + duration (which can be set to infinity). If the cached registry is stale (more time than the TTL has + passed), then a new registry will be downloaded synchronously by this method. This download may + introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call + refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to + infinity (cache forever). + + Args: + features: The list of features that should be retrieved from the online store. These features can be + specified either as a list of string feature references or as a feature service. String feature + references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions". + entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair. + full_feature_names: If True, feature names will be prefixed with the corresponding feature view name, + changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions" + changes to "customer_fv__daily_transactions"). + + Returns: + OnlineResponse containing the feature data in records. + + Raises: + Exception: No entity with the specified name exists. + """ + if isinstance(entity_rows, list): + columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()} + for entity_row in entity_rows: + for key, value in entity_row.items(): + try: + columnar[key].append(value) + except KeyError as e: + raise ValueError( + "All entity_rows must have the same keys." + ) from e + + entity_rows = columnar + ( join_key_values, grouped_refs, @@ -1850,17 +1616,19 @@ async def _get_online_features_async( feature_refs, requested_result_row_names, online_features_response, - ) = self._prepare_entities_to_read_from_online_store( + ) = utils._prepare_entities_to_read_from_online_store( + registry=self._registry, + project=self.project, features=features, - entity_values=entity_values, + entity_values=entity_rows, full_feature_names=full_feature_names, - native_entity_values=native_entity_values, + native_entity_values=True, ) provider = self._get_provider() for table, requested_features in grouped_refs: # Get the correct set of entity values with the correct join keys. - table_entity_values, idxs = self._get_unique_entities( + table_entity_values, idxs = utils._get_unique_entities( table, join_key_values, entity_name_to_join_key_map, @@ -1875,7 +1643,7 @@ async def _get_online_features_async( ) # Populate the result_rows with the Features from the OnlineStore inplace. - self._populate_response_from_feature_data( + utils._populate_response_from_feature_data( feature_data, idxs, online_features_response, @@ -1885,14 +1653,14 @@ async def _get_online_features_async( ) if requested_on_demand_feature_views: - self._augment_response_with_on_demand_transforms( + utils._augment_response_with_on_demand_transforms( online_features_response, feature_refs, requested_on_demand_feature_views, full_feature_names, ) - self._drop_unneeded_columns( + utils._drop_unneeded_columns( online_features_response, requested_result_row_names ) return OnlineResponse(online_features_response) @@ -1915,20 +1683,6 @@ def retrieve_online_documents( top_k: The number of closest document features to retrieve. distance_metric: The distance metric to use for retrieval. """ - return self._retrieve_online_documents( - feature=feature, - query=query, - top_k=top_k, - distance_metric=distance_metric, - ) - - def _retrieve_online_documents( - self, - feature: str, - query: Union[str, List[float]], - top_k: int, - distance_metric: Optional[str] = None, - ): if isinstance(query, str): raise ValueError( "Using embedding functionality is not supported for document retrieval. Please embed the query before calling retrieve_online_documents." @@ -1936,8 +1690,12 @@ def _retrieve_online_documents( ( available_feature_views, _, - ) = self._get_feature_views_to_use( - features=[feature], allow_cache=True, hide_dummy_entity=False + ) = utils._get_feature_views_to_use( + registry=self._registry, + project=self.project, + features=[feature], + allow_cache=True, + hide_dummy_entity=False, ) requested_feature_view_name = ( feature.split(":")[0] if isinstance(feature, str) else feature @@ -1969,218 +1727,16 @@ def _retrieve_online_documents( document_feature_vals = [feature[2] for feature in document_features] document_feature_distance_vals = [feature[4] for feature in document_features] online_features_response = GetOnlineFeaturesResponse(results=[]) - self._populate_result_rows_from_columnar( + utils._populate_result_rows_from_columnar( online_features_response=online_features_response, data={requested_feature: document_feature_vals}, ) - self._populate_result_rows_from_columnar( + utils._populate_result_rows_from_columnar( online_features_response=online_features_response, data={"distance": document_feature_distance_vals}, ) return OnlineResponse(online_features_response) - @staticmethod - def _get_columnar_entity_values( - rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]] - ) -> Dict[str, List[Any]]: - if (rowise is None and columnar is None) or ( - rowise is not None and columnar is not None - ): - raise ValueError( - "Exactly one of `columnar_entity_values` and `rowise_entity_values` must be set." - ) - - if rowise is not None: - # Convert entity_rows from rowise to columnar. - res = defaultdict(list) - for entity_row in rowise: - for key, value in entity_row.items(): - res[key].append(value) - return res - return cast(Dict[str, List[Any]], columnar) - - def _get_entity_maps( - self, feature_views - ) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]: - # TODO(felixwang9817): Support entities that have different types for different feature views. - entities = self._list_entities(allow_cache=True, hide_dummy_entity=False) - entity_name_to_join_key_map: Dict[str, str] = {} - entity_type_map: Dict[str, ValueType] = {} - for entity in entities: - entity_name_to_join_key_map[entity.name] = entity.join_key - for feature_view in feature_views: - for entity_name in feature_view.entities: - entity = self._registry.get_entity( - entity_name, self.project, allow_cache=True - ) - # User directly uses join_key as the entity reference in the entity_rows for the - # entity mapping case. - entity_name = feature_view.projection.join_key_map.get( - entity.join_key, entity.name - ) - join_key = feature_view.projection.join_key_map.get( - entity.join_key, entity.join_key - ) - entity_name_to_join_key_map[entity_name] = join_key - for entity_column in feature_view.entity_columns: - entity_type_map[entity_column.name] = ( - entity_column.dtype.to_value_type() - ) - - return ( - entity_name_to_join_key_map, - entity_type_map, - set(entity_name_to_join_key_map.values()), - ) - - @staticmethod - def _get_table_entity_values( - table: FeatureView, - entity_name_to_join_key_map: Dict[str, str], - join_key_proto_values: Dict[str, List[Value]], - ) -> Dict[str, List[Value]]: - # The correct join_keys expected by the OnlineStore for this Feature View. - table_join_keys = [ - entity_name_to_join_key_map[entity_name] for entity_name in table.entities - ] - - # If the FeatureView has a Projection then the join keys may be aliased. - alias_to_join_key_map = {v: k for k, v in table.projection.join_key_map.items()} - - # Subset to columns which are relevant to this FeatureView and - # give them the correct names. - entity_values = { - alias_to_join_key_map.get(k, k): v - for k, v in join_key_proto_values.items() - if alias_to_join_key_map.get(k, k) in table_join_keys - } - return entity_values - - @staticmethod - def _populate_result_rows_from_columnar( - online_features_response: GetOnlineFeaturesResponse, - data: Dict[str, List[Value]], - ): - timestamp = Timestamp() # Only initialize this timestamp once. - # Add more values to the existing result rows - for feature_name, feature_values in data.items(): - online_features_response.metadata.feature_names.val.append(feature_name) - online_features_response.results.append( - GetOnlineFeaturesResponse.FeatureVector( - values=feature_values, - statuses=[FieldStatus.PRESENT] * len(feature_values), - event_timestamps=[timestamp] * len(feature_values), - ) - ) - - @staticmethod - def get_needed_request_data( - grouped_odfv_refs: List[Tuple[OnDemandFeatureView, List[str]]], - ) -> Set[str]: - needed_request_data: Set[str] = set() - for odfv, _ in grouped_odfv_refs: - odfv_request_data_schema = odfv.get_request_data_schema() - needed_request_data.update(odfv_request_data_schema.keys()) - return needed_request_data - - @staticmethod - def ensure_request_data_values_exist( - needed_request_data: Set[str], - request_data_features: Dict[str, List[Any]], - ): - if len(needed_request_data) != len(request_data_features.keys()): - missing_features = [ - x for x in needed_request_data if x not in request_data_features - ] - raise RequestDataNotFoundInEntityRowsException( - feature_names=missing_features - ) - - def _get_unique_entities( - self, - table: FeatureView, - join_key_values: Dict[str, List[Value]], - entity_name_to_join_key_map: Dict[str, str], - ) -> Tuple[Tuple[Dict[str, Value], ...], Tuple[List[int], ...]]: - """Return the set of unique composite Entities for a Feature View and the indexes at which they appear. - - This method allows us to query the OnlineStore for data we need only once - rather than requesting and processing data for the same combination of - Entities multiple times. - """ - # Get the correct set of entity values with the correct join keys. - table_entity_values = self._get_table_entity_values( - table, - entity_name_to_join_key_map, - join_key_values, - ) - - # Convert back to rowise. - keys = table_entity_values.keys() - # Sort the rowise data to allow for grouping but keep original index. This lambda is - # sufficient as Entity types cannot be complex (ie. lists). - rowise = list(enumerate(zip(*table_entity_values.values()))) - rowise.sort( - key=lambda row: tuple(getattr(x, x.WhichOneof("val")) for x in row[1]) - ) - - # Identify unique entities and the indexes at which they occur. - unique_entities: Tuple[Dict[str, Value], ...] - indexes: Tuple[List[int], ...] - unique_entities, indexes = tuple( - zip( - *[ - (dict(zip(keys, k)), [_[0] for _ in g]) - for k, g in itertools.groupby(rowise, key=lambda x: x[1]) - ] - ) - ) - return unique_entities, indexes - - def _get_entity_key_protos( - self, - entity_rows: Iterable[Mapping[str, Value]], - ) -> List[EntityKeyProto]: - # Instantiate one EntityKeyProto per Entity. - entity_key_protos = [ - EntityKeyProto(join_keys=row.keys(), entity_values=row.values()) - for row in entity_rows - ] - return entity_key_protos - - def _convert_rows_to_protobuf( - self, - requested_features: List[str], - read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]], - ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: - # Each row is a set of features for a given entity key. - # We only need to convert the data to Protobuf once. - null_value = Value() - read_row_protos = [] - for read_row in read_rows: - row_ts_proto = Timestamp() - row_ts, feature_data = read_row - # TODO (Ly): reuse whatever timestamp if row_ts is None? - if row_ts is not None: - row_ts_proto.FromDatetime(row_ts) - event_timestamps = [row_ts_proto] * len(requested_features) - if feature_data is None: - statuses = [FieldStatus.NOT_FOUND] * len(requested_features) - values = [null_value] * len(requested_features) - else: - statuses = [] - values = [] - for feature_name in requested_features: - # Make sure order of data is the same as requested_features. - if feature_name not in feature_data: - statuses.append(FieldStatus.NOT_FOUND) - values.append(null_value) - else: - statuses.append(FieldStatus.PRESENT) - values.append(feature_data[feature_name]) - read_row_protos.append((event_timestamps, statuses, values)) - return read_row_protos - def _read_from_online_store( self, entity_rows: Iterable[Mapping[str, Value]], @@ -2197,7 +1753,7 @@ def _read_from_online_store( combination of Entities in `entity_rows` in the same order as they are provided. """ - entity_key_protos = self._get_entity_key_protos(entity_rows) + entity_key_protos = utils._get_entity_key_protos(entity_rows) # Fetch data for Entities. read_rows = provider.online_read( @@ -2207,7 +1763,7 @@ def _read_from_online_store( requested_features=requested_features, ) - return self._convert_rows_to_protobuf(requested_features, read_rows) + return utils._convert_rows_to_protobuf(requested_features, read_rows) async def _read_from_online_store_async( self, @@ -2216,7 +1772,7 @@ async def _read_from_online_store_async( requested_features: List[str], table: FeatureView, ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: - entity_key_protos = self._get_entity_key_protos(entity_rows) + entity_key_protos = utils._get_entity_key_protos(entity_rows) # Fetch data for Entities. read_rows = await provider.online_read_async( @@ -2226,7 +1782,7 @@ async def _read_from_online_store_async( requested_features=requested_features, ) - return self._convert_rows_to_protobuf(requested_features, read_rows) + return utils._convert_rows_to_protobuf(requested_features, read_rows) def _retrieve_from_online_store( self, @@ -2270,243 +1826,6 @@ def _retrieve_from_online_store( ) return read_row_protos - @staticmethod - def _populate_response_from_feature_data( - feature_data: Iterable[ - Tuple[ - Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] - ] - ], - indexes: Iterable[List[int]], - online_features_response: GetOnlineFeaturesResponse, - full_feature_names: bool, - requested_features: Iterable[str], - table: FeatureView, - ): - """Populate the GetOnlineFeaturesResponse with feature data. - - This method assumes that `_read_from_online_store` returns data for each - combination of Entities in `entity_rows` in the same order as they - are provided. - - Args: - feature_data: A list of data in Protobuf form which was retrieved from the OnlineStore. - indexes: A list of indexes which should be the same length as `feature_data`. Each list - of indexes corresponds to a set of result rows in `online_features_response`. - online_features_response: The object to populate. - full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, - changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to - "customer_fv__daily_transactions"). - requested_features: The names of the features in `feature_data`. This should be ordered in the same way as the - data in `feature_data`. - table: The FeatureView that `feature_data` was retrieved from. - """ - # Add the feature names to the response. - requested_feature_refs = [ - f"{table.projection.name_to_use()}__{feature_name}" - if full_feature_names - else feature_name - for feature_name in requested_features - ] - online_features_response.metadata.feature_names.val.extend( - requested_feature_refs - ) - - timestamps, statuses, values = zip(*feature_data) - - # Populate the result with data fetched from the OnlineStore - # which is guaranteed to be aligned with `requested_features`. - for ( - feature_idx, - (timestamp_vector, statuses_vector, values_vector), - ) in enumerate(zip(zip(*timestamps), zip(*statuses), zip(*values))): - online_features_response.results.append( - GetOnlineFeaturesResponse.FeatureVector( - values=apply_list_mapping(values_vector, indexes), - statuses=apply_list_mapping(statuses_vector, indexes), - event_timestamps=apply_list_mapping(timestamp_vector, indexes), - ) - ) - - @staticmethod - def _augment_response_with_on_demand_transforms( - online_features_response: GetOnlineFeaturesResponse, - feature_refs: List[str], - requested_on_demand_feature_views: List[OnDemandFeatureView], - full_feature_names: bool, - ): - """Computes on demand feature values and adds them to the result rows. - - Assumes that 'online_features_response' already contains the necessary request data and input feature - views for the on demand feature views. Unneeded feature values such as request data and - unrequested input feature views will be removed from 'online_features_response'. - - Args: - online_features_response: Protobuf object to populate - feature_refs: List of all feature references to be returned. - requested_on_demand_feature_views: List of all odfvs that have been requested. - full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, - changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to - "customer_fv__daily_transactions"). - """ - requested_odfv_map = { - odfv.name: odfv for odfv in requested_on_demand_feature_views - } - requested_odfv_feature_names = requested_odfv_map.keys() - - odfv_feature_refs = defaultdict(list) - for feature_ref in feature_refs: - view_name, feature_name = feature_ref.split(":") - if view_name in requested_odfv_feature_names: - odfv_feature_refs[view_name].append( - f"{requested_odfv_map[view_name].projection.name_to_use()}__{feature_name}" - if full_feature_names - else feature_name - ) - - initial_response = OnlineResponse(online_features_response) - initial_response_arrow: Optional[pa.Table] = None - initial_response_dict: Optional[Dict[str, List[Any]]] = None - - # Apply on demand transformations and augment the result rows - odfv_result_names = set() - for odfv_name, _feature_refs in odfv_feature_refs.items(): - odfv = requested_odfv_map[odfv_name] - if odfv.mode == "python": - if initial_response_dict is None: - initial_response_dict = initial_response.to_dict() - transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( - initial_response_dict - ) - elif odfv.mode in {"pandas", "substrait"}: - if initial_response_arrow is None: - initial_response_arrow = initial_response.to_arrow() - transformed_features_arrow = odfv.transform_arrow( - initial_response_arrow, full_feature_names - ) - else: - raise Exception( - f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'." - ) - - transformed_features = ( - transformed_features_dict - if odfv.mode == "python" - else transformed_features_arrow - ) - transformed_columns = ( - transformed_features.column_names - if isinstance(transformed_features, pa.Table) - else transformed_features - ) - selected_subset = [f for f in transformed_columns if f in _feature_refs] - - proto_values = [] - for selected_feature in selected_subset: - feature_vector = transformed_features[selected_feature] - proto_values.append( - python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) - if odfv.mode == "python" - else python_values_to_proto_values( - feature_vector.to_numpy(), ValueType.UNKNOWN - ) - ) - - odfv_result_names |= set(selected_subset) - - online_features_response.metadata.feature_names.val.extend(selected_subset) - for feature_idx in range(len(selected_subset)): - online_features_response.results.append( - GetOnlineFeaturesResponse.FeatureVector( - values=proto_values[feature_idx], - statuses=[FieldStatus.PRESENT] * len(proto_values[feature_idx]), - event_timestamps=[Timestamp()] * len(proto_values[feature_idx]), - ) - ) - - @staticmethod - def _drop_unneeded_columns( - online_features_response: GetOnlineFeaturesResponse, - requested_result_row_names: Set[str], - ): - """ - Unneeded feature values such as request data and unrequested input feature views will - be removed from 'online_features_response'. - - Args: - online_features_response: Protobuf object to populate - requested_result_row_names: Fields from 'result_rows' that have been requested, and - therefore should not be dropped. - """ - # Drop values that aren't needed - unneeded_feature_indices = [ - idx - for idx, val in enumerate( - online_features_response.metadata.feature_names.val - ) - if val not in requested_result_row_names - ] - - for idx in reversed(unneeded_feature_indices): - del online_features_response.metadata.feature_names.val[idx] - del online_features_response.results[idx] - - def _get_feature_views_to_use( - self, - features: Optional[Union[List[str], FeatureService]], - allow_cache=False, - hide_dummy_entity: bool = True, - ) -> Tuple[List[FeatureView], List[OnDemandFeatureView]]: - fvs = { - fv.name: fv - for fv in [ - *self._list_feature_views(allow_cache, hide_dummy_entity), - *self._registry.list_stream_feature_views( - project=self.project, allow_cache=allow_cache - ), - ] - } - - od_fvs = { - fv.name: fv - for fv in self._registry.list_on_demand_feature_views( - project=self.project, allow_cache=allow_cache - ) - } - - if isinstance(features, FeatureService): - fvs_to_use, od_fvs_to_use = [], [] - for fv_name, projection in [ - (projection.name, projection) - for projection in features.feature_view_projections - ]: - if fv_name in fvs: - fvs_to_use.append( - fvs[fv_name].with_projection(copy.copy(projection)) - ) - elif fv_name in od_fvs: - odfv = od_fvs[fv_name].with_projection(copy.copy(projection)) - od_fvs_to_use.append(odfv) - # Let's make sure to include an FVs which the ODFV requires Features from. - for projection in odfv.source_feature_view_projections.values(): - fv = fvs[projection.name].with_projection(copy.copy(projection)) - if fv not in fvs_to_use: - fvs_to_use.append(fv) - else: - raise ValueError( - f"The provided feature service {features.name} contains a reference to a feature view" - f"{fv_name} which doesn't exist. Please make sure that you have created the feature view" - f'{fv_name} and that you have registered it by running "apply".' - ) - views_to_use = (fvs_to_use, od_fvs_to_use) - else: - views_to_use = ( - [*fvs.values()], - [*od_fvs.values()], - ) - - return views_to_use - def serve( self, host: str, @@ -2690,101 +2009,6 @@ def get_validation_reference( return ref -def _validate_entity_values(join_key_values: Dict[str, List[Value]]): - set_of_row_lengths = {len(v) for v in join_key_values.values()} - if len(set_of_row_lengths) > 1: - raise ValueError("All entity rows must have the same columns.") - return set_of_row_lengths.pop() - - -def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = False): - """ - Validates that there are no collisions among the feature references. - - Args: - feature_refs: List of feature references to validate. Feature references must have format - "feature_view:feature", e.g. "customer_fv:daily_transactions". - full_feature_names: If True, the full feature references are compared for collisions; if False, - only the feature names are compared. - - Raises: - FeatureNameCollisionError: There is a collision among the feature references. - """ - collided_feature_refs = [] - - if full_feature_names: - collided_feature_refs = [ - ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1 - ] - else: - feature_names = [ref.split(":")[1] for ref in feature_refs] - collided_feature_names = [ - ref - for ref, occurrences in Counter(feature_names).items() - if occurrences > 1 - ] - - for feature_name in collided_feature_names: - collided_feature_refs.extend( - [ref for ref in feature_refs if ref.endswith(":" + feature_name)] - ) - - if len(collided_feature_refs) > 0: - raise FeatureNameCollisionError(collided_feature_refs, full_feature_names) - - -def _group_feature_refs( - features: List[str], - all_feature_views: List[FeatureView], - all_on_demand_feature_views: List[OnDemandFeatureView], -) -> Tuple[ - List[Tuple[FeatureView, List[str]]], List[Tuple[OnDemandFeatureView, List[str]]] -]: - """Get list of feature views and corresponding feature names based on feature references""" - - # view name to view proto - view_index = {view.projection.name_to_use(): view for view in all_feature_views} - - # on demand view to on demand view proto - on_demand_view_index = { - view.projection.name_to_use(): view for view in all_on_demand_feature_views - } - - # view name to feature names - views_features = defaultdict(set) - - # on demand view name to feature names - on_demand_view_features = defaultdict(set) - - for ref in features: - view_name, feat_name = ref.split(":") - if view_name in view_index: - view_index[view_name].projection.get_feature(feat_name) # For validation - views_features[view_name].add(feat_name) - elif view_name in on_demand_view_index: - on_demand_view_index[view_name].projection.get_feature( - feat_name - ) # For validation - on_demand_view_features[view_name].add(feat_name) - # Let's also add in any FV Feature dependencies here. - for input_fv_projection in on_demand_view_index[ - view_name - ].source_feature_view_projections.values(): - for input_feat in input_fv_projection.features: - views_features[input_fv_projection.name].add(input_feat.name) - else: - raise FeatureViewNotFoundException(view_name) - - fvs_result: List[Tuple[FeatureView, List[str]]] = [] - odfvs_result: List[Tuple[OnDemandFeatureView, List[str]]] = [] - - for view_name, feature_names in views_features.items(): - fvs_result.append((view_index[view_name], list(feature_names))) - for view_name, feature_names in on_demand_view_features.items(): - odfvs_result.append((on_demand_view_index[view_name], list(feature_names))) - return fvs_result, odfvs_result - - def _print_materialization_log( start_date, end_date, num_feature_views: int, online_store: str ): @@ -2827,15 +2051,3 @@ def _validate_data_sources(data_sources: List[DataSource]): raise DataSourceRepeatNamesException(case_insensitive_ds_name) else: ds_names.add(case_insensitive_ds_name) - - -def apply_list_mapping( - lst: Iterable[Any], mapping_indexes: Iterable[List[int]] -) -> Iterable[Any]: - output_len = sum(len(item) for item in mapping_indexes) - output = [None] * output_len - for elem, destinations in zip(lst, mapping_indexes): - for idx in destinations: - output[idx] = elem - - return output diff --git a/sdk/python/feast/infra/contrib/grpc_server.py b/sdk/python/feast/infra/contrib/grpc_server.py index 2bd1b27755..b6ed6cb25d 100644 --- a/sdk/python/feast/infra/contrib/grpc_server.py +++ b/sdk/python/feast/infra/contrib/grpc_server.py @@ -114,7 +114,7 @@ def GetOnlineFeatures(self, request: GetOnlineFeaturesRequest, context): else: features = list(request.features.val) - result = self.fs._get_online_features( + result = self.fs.get_online_features( features, request.entities, request.full_feature_names, diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 47faa7d8c4..b811b03776 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -1,24 +1,52 @@ +import copy +import itertools +import logging import os import typing -from collections import defaultdict +import warnings +from collections import Counter, defaultdict from datetime import datetime from pathlib import Path -from typing import Dict, List, Optional, Tuple, Union +from typing import ( + Any, + Dict, + Iterable, + List, + Mapping, + Optional, + Sequence, + Set, + Tuple, + Union, +) import pandas as pd import pyarrow from dateutil.tz import tzlocal +from google.protobuf.timestamp_pb2 import Timestamp from pytz import utc from feast.constants import FEAST_FS_YAML_FILE_PATH_ENV_NAME from feast.entity import Entity +from feast.errors import ( + EntityNotFoundException, + FeatureNameCollisionError, + FeatureViewNotFoundException, + RequestDataNotFoundInEntityRowsException, +) +from feast.protos.feast.serving.ServingService_pb2 import ( + FieldStatus, + GetOnlineFeaturesResponse, +) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import RepeatedValue as RepeatedValueProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.type_map import python_values_to_proto_values from feast.value_type import ValueType from feast.version import get_version if typing.TYPE_CHECKING: + from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView @@ -256,3 +284,740 @@ def _convert_arrow_to_proto( created_timestamps = [None] * table.num_rows return list(zip(entity_keys, features, event_timestamps, created_timestamps)) + + +def _validate_entity_values(join_key_values: Dict[str, List[ValueProto]]): + set_of_row_lengths = {len(v) for v in join_key_values.values()} + if len(set_of_row_lengths) > 1: + raise ValueError("All entity rows must have the same columns.") + return set_of_row_lengths.pop() + + +def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = False): + """ + Validates that there are no collisions among the feature references. + + Args: + feature_refs: List of feature references to validate. Feature references must have format + "feature_view:feature", e.g. "customer_fv:daily_transactions". + full_feature_names: If True, the full feature references are compared for collisions; if False, + only the feature names are compared. + + Raises: + FeatureNameCollisionError: There is a collision among the feature references. + """ + collided_feature_refs = [] + + if full_feature_names: + collided_feature_refs = [ + ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1 + ] + else: + feature_names = [ref.split(":")[1] for ref in feature_refs] + collided_feature_names = [ + ref + for ref, occurrences in Counter(feature_names).items() + if occurrences > 1 + ] + + for feature_name in collided_feature_names: + collided_feature_refs.extend( + [ref for ref in feature_refs if ref.endswith(":" + feature_name)] + ) + + if len(collided_feature_refs) > 0: + raise FeatureNameCollisionError(collided_feature_refs, full_feature_names) + + +def _group_feature_refs( + features: List[str], + all_feature_views: List["FeatureView"], + all_on_demand_feature_views: List["OnDemandFeatureView"], +) -> Tuple[ + List[Tuple["FeatureView", List[str]]], List[Tuple["OnDemandFeatureView", List[str]]] +]: + """Get list of feature views and corresponding feature names based on feature references""" + + # view name to view proto + view_index = {view.projection.name_to_use(): view for view in all_feature_views} + + # on demand view to on demand view proto + on_demand_view_index = { + view.projection.name_to_use(): view for view in all_on_demand_feature_views + } + + # view name to feature names + views_features = defaultdict(set) + + # on demand view name to feature names + on_demand_view_features = defaultdict(set) + + for ref in features: + view_name, feat_name = ref.split(":") + if view_name in view_index: + view_index[view_name].projection.get_feature(feat_name) # For validation + views_features[view_name].add(feat_name) + elif view_name in on_demand_view_index: + on_demand_view_index[view_name].projection.get_feature( + feat_name + ) # For validation + on_demand_view_features[view_name].add(feat_name) + # Let's also add in any FV Feature dependencies here. + for input_fv_projection in on_demand_view_index[ + view_name + ].source_feature_view_projections.values(): + for input_feat in input_fv_projection.features: + views_features[input_fv_projection.name].add(input_feat.name) + else: + raise FeatureViewNotFoundException(view_name) + + fvs_result: List[Tuple["FeatureView", List[str]]] = [] + odfvs_result: List[Tuple["OnDemandFeatureView", List[str]]] = [] + + for view_name, feature_names in views_features.items(): + fvs_result.append((view_index[view_name], list(feature_names))) + for view_name, feature_names in on_demand_view_features.items(): + odfvs_result.append((on_demand_view_index[view_name], list(feature_names))) + return fvs_result, odfvs_result + + +def apply_list_mapping( + lst: Iterable[Any], mapping_indexes: Iterable[List[int]] +) -> Iterable[Any]: + output_len = sum(len(item) for item in mapping_indexes) + output = [None] * output_len + for elem, destinations in zip(lst, mapping_indexes): + for idx in destinations: + output[idx] = elem + + return output + + +def _augment_response_with_on_demand_transforms( + online_features_response: GetOnlineFeaturesResponse, + feature_refs: List[str], + requested_on_demand_feature_views: List["OnDemandFeatureView"], + full_feature_names: bool, +): + """Computes on demand feature values and adds them to the result rows. + + Assumes that 'online_features_response' already contains the necessary request data and input feature + views for the on demand feature views. Unneeded feature values such as request data and + unrequested input feature views will be removed from 'online_features_response'. + + Args: + online_features_response: Protobuf object to populate + feature_refs: List of all feature references to be returned. + requested_on_demand_feature_views: List of all odfvs that have been requested. + full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, + changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to + "customer_fv__daily_transactions"). + """ + from feast.online_response import OnlineResponse + + requested_odfv_map = {odfv.name: odfv for odfv in requested_on_demand_feature_views} + requested_odfv_feature_names = requested_odfv_map.keys() + + odfv_feature_refs = defaultdict(list) + for feature_ref in feature_refs: + view_name, feature_name = feature_ref.split(":") + if view_name in requested_odfv_feature_names: + odfv_feature_refs[view_name].append( + f"{requested_odfv_map[view_name].projection.name_to_use()}__{feature_name}" + if full_feature_names + else feature_name + ) + + initial_response = OnlineResponse(online_features_response) + initial_response_arrow: Optional[pyarrow.Table] = None + initial_response_dict: Optional[Dict[str, List[Any]]] = None + + # Apply on demand transformations and augment the result rows + odfv_result_names = set() + for odfv_name, _feature_refs in odfv_feature_refs.items(): + odfv = requested_odfv_map[odfv_name] + if odfv.mode == "python": + if initial_response_dict is None: + initial_response_dict = initial_response.to_dict() + transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( + initial_response_dict + ) + elif odfv.mode in {"pandas", "substrait"}: + if initial_response_arrow is None: + initial_response_arrow = initial_response.to_arrow() + transformed_features_arrow = odfv.transform_arrow( + initial_response_arrow, full_feature_names + ) + else: + raise Exception( + f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'." + ) + + transformed_features = ( + transformed_features_dict + if odfv.mode == "python" + else transformed_features_arrow + ) + transformed_columns = ( + transformed_features.column_names + if isinstance(transformed_features, pyarrow.Table) + else transformed_features + ) + selected_subset = [f for f in transformed_columns if f in _feature_refs] + + proto_values = [] + for selected_feature in selected_subset: + feature_vector = transformed_features[selected_feature] + proto_values.append( + python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) + if odfv.mode == "python" + else python_values_to_proto_values( + feature_vector.to_numpy(), ValueType.UNKNOWN + ) + ) + + odfv_result_names |= set(selected_subset) + + online_features_response.metadata.feature_names.val.extend(selected_subset) + for feature_idx in range(len(selected_subset)): + online_features_response.results.append( + GetOnlineFeaturesResponse.FeatureVector( + values=proto_values[feature_idx], + statuses=[FieldStatus.PRESENT] * len(proto_values[feature_idx]), + event_timestamps=[Timestamp()] * len(proto_values[feature_idx]), + ) + ) + + +def _get_entity_maps( + registry, + project, + feature_views, +) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]: + # TODO(felixwang9817): Support entities that have different types for different feature views. + entities = registry.list_entities(project, allow_cache=True) + entity_name_to_join_key_map: Dict[str, str] = {} + entity_type_map: Dict[str, ValueType] = {} + for entity in entities: + entity_name_to_join_key_map[entity.name] = entity.join_key + for feature_view in feature_views: + for entity_name in feature_view.entities: + entity = registry.get_entity(entity_name, project, allow_cache=True) + # User directly uses join_key as the entity reference in the entity_rows for the + # entity mapping case. + entity_name = feature_view.projection.join_key_map.get( + entity.join_key, entity.name + ) + join_key = feature_view.projection.join_key_map.get( + entity.join_key, entity.join_key + ) + entity_name_to_join_key_map[entity_name] = join_key + for entity_column in feature_view.entity_columns: + entity_type_map[entity_column.name] = entity_column.dtype.to_value_type() + + return ( + entity_name_to_join_key_map, + entity_type_map, + set(entity_name_to_join_key_map.values()), + ) + + +def _get_table_entity_values( + table: "FeatureView", + entity_name_to_join_key_map: Dict[str, str], + join_key_proto_values: Dict[str, List[ValueProto]], +) -> Dict[str, List[ValueProto]]: + # The correct join_keys expected by the OnlineStore for this Feature View. + table_join_keys = [ + entity_name_to_join_key_map[entity_name] for entity_name in table.entities + ] + + # If the FeatureView has a Projection then the join keys may be aliased. + alias_to_join_key_map = {v: k for k, v in table.projection.join_key_map.items()} + + # Subset to columns which are relevant to this FeatureView and + # give them the correct names. + entity_values = { + alias_to_join_key_map.get(k, k): v + for k, v in join_key_proto_values.items() + if alias_to_join_key_map.get(k, k) in table_join_keys + } + return entity_values + + +def _get_unique_entities( + table: "FeatureView", + join_key_values: Dict[str, List[ValueProto]], + entity_name_to_join_key_map: Dict[str, str], +) -> Tuple[Tuple[Dict[str, ValueProto], ...], Tuple[List[int], ...]]: + """Return the set of unique composite Entities for a Feature View and the indexes at which they appear. + + This method allows us to query the OnlineStore for data we need only once + rather than requesting and processing data for the same combination of + Entities multiple times. + """ + # Get the correct set of entity values with the correct join keys. + table_entity_values = _get_table_entity_values( + table, + entity_name_to_join_key_map, + join_key_values, + ) + + # Convert back to rowise. + keys = table_entity_values.keys() + # Sort the rowise data to allow for grouping but keep original index. This lambda is + # sufficient as Entity types cannot be complex (ie. lists). + rowise = list(enumerate(zip(*table_entity_values.values()))) + rowise.sort(key=lambda row: tuple(getattr(x, x.WhichOneof("val")) for x in row[1])) + + # Identify unique entities and the indexes at which they occur. + unique_entities: Tuple[Dict[str, ValueProto], ...] + indexes: Tuple[List[int], ...] + unique_entities, indexes = tuple( + zip( + *[ + (dict(zip(keys, k)), [_[0] for _ in g]) + for k, g in itertools.groupby(rowise, key=lambda x: x[1]) + ] + ) + ) + return unique_entities, indexes + + +def _drop_unneeded_columns( + online_features_response: GetOnlineFeaturesResponse, + requested_result_row_names: Set[str], +): + """ + Unneeded feature values such as request data and unrequested input feature views will + be removed from 'online_features_response'. + + Args: + online_features_response: Protobuf object to populate + requested_result_row_names: Fields from 'result_rows' that have been requested, and + therefore should not be dropped. + """ + # Drop values that aren't needed + unneeded_feature_indices = [ + idx + for idx, val in enumerate(online_features_response.metadata.feature_names.val) + if val not in requested_result_row_names + ] + + for idx in reversed(unneeded_feature_indices): + del online_features_response.metadata.feature_names.val[idx] + del online_features_response.results[idx] + + +def _populate_result_rows_from_columnar( + online_features_response: GetOnlineFeaturesResponse, + data: Dict[str, List[ValueProto]], +): + timestamp = Timestamp() # Only initialize this timestamp once. + # Add more values to the existing result rows + for feature_name, feature_values in data.items(): + online_features_response.metadata.feature_names.val.append(feature_name) + online_features_response.results.append( + GetOnlineFeaturesResponse.FeatureVector( + values=feature_values, + statuses=[FieldStatus.PRESENT] * len(feature_values), + event_timestamps=[timestamp] * len(feature_values), + ) + ) + + +def get_needed_request_data( + grouped_odfv_refs: List[Tuple["OnDemandFeatureView", List[str]]], +) -> Set[str]: + needed_request_data: Set[str] = set() + for odfv, _ in grouped_odfv_refs: + odfv_request_data_schema = odfv.get_request_data_schema() + needed_request_data.update(odfv_request_data_schema.keys()) + return needed_request_data + + +def ensure_request_data_values_exist( + needed_request_data: Set[str], + request_data_features: Dict[str, List[Any]], +): + if len(needed_request_data) != len(request_data_features.keys()): + missing_features = [ + x for x in needed_request_data if x not in request_data_features + ] + raise RequestDataNotFoundInEntityRowsException(feature_names=missing_features) + + +def _populate_response_from_feature_data( + feature_data: Iterable[ + Tuple[ + Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[ValueProto] + ] + ], + indexes: Iterable[List[int]], + online_features_response: GetOnlineFeaturesResponse, + full_feature_names: bool, + requested_features: Iterable[str], + table: "FeatureView", +): + """Populate the GetOnlineFeaturesResponse with feature data. + + This method assumes that `_read_from_online_store` returns data for each + combination of Entities in `entity_rows` in the same order as they + are provided. + + Args: + feature_data: A list of data in Protobuf form which was retrieved from the OnlineStore. + indexes: A list of indexes which should be the same length as `feature_data`. Each list + of indexes corresponds to a set of result rows in `online_features_response`. + online_features_response: The object to populate. + full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, + changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to + "customer_fv__daily_transactions"). + requested_features: The names of the features in `feature_data`. This should be ordered in the same way as the + data in `feature_data`. + table: The FeatureView that `feature_data` was retrieved from. + """ + # Add the feature names to the response. + requested_feature_refs = [ + f"{table.projection.name_to_use()}__{feature_name}" + if full_feature_names + else feature_name + for feature_name in requested_features + ] + online_features_response.metadata.feature_names.val.extend(requested_feature_refs) + + timestamps, statuses, values = zip(*feature_data) + + # Populate the result with data fetched from the OnlineStore + # which is guaranteed to be aligned with `requested_features`. + for ( + feature_idx, + (timestamp_vector, statuses_vector, values_vector), + ) in enumerate(zip(zip(*timestamps), zip(*statuses), zip(*values))): + online_features_response.results.append( + GetOnlineFeaturesResponse.FeatureVector( + values=apply_list_mapping(values_vector, indexes), + statuses=apply_list_mapping(statuses_vector, indexes), + event_timestamps=apply_list_mapping(timestamp_vector, indexes), + ) + ) + + +def _get_features( + registry, + project, + features: Union[List[str], "FeatureService"], + allow_cache: bool = False, +) -> List[str]: + from feast.feature_service import FeatureService + + _features = features + + if not _features: + raise ValueError("No features specified for retrieval") + + _feature_refs = [] + if isinstance(_features, FeatureService): + feature_service_from_registry = registry.get_feature_service( + _features.name, project, allow_cache + ) + if feature_service_from_registry != _features: + warnings.warn( + "The FeatureService object that has been passed in as an argument is " + "inconsistent with the version from the registry. Potentially a newer version " + "of the FeatureService has been applied to the registry." + ) + for projection in feature_service_from_registry.feature_view_projections: + _feature_refs.extend( + [f"{projection.name_to_use()}:{f.name}" for f in projection.features] + ) + else: + assert isinstance(_features, list) + _feature_refs = _features + return _feature_refs + + +def _list_feature_views( + registry, + project, + allow_cache: bool = False, + hide_dummy_entity: bool = True, +) -> List["FeatureView"]: + from feast.feature_view import DUMMY_ENTITY_NAME + + logging.warning( + "_list_feature_views will make breaking changes. Please use _list_batch_feature_views instead. " + "_list_feature_views will behave like _list_all_feature_views in the future." + ) + feature_views = [] + for fv in registry.list_feature_views(project, allow_cache=allow_cache): + if hide_dummy_entity and fv.entities and fv.entities[0] == DUMMY_ENTITY_NAME: + fv.entities = [] + fv.entity_columns = [] + feature_views.append(fv) + return feature_views + + +def _get_feature_views_to_use( + registry, + project, + features: Optional[Union[List[str], "FeatureService"]], + allow_cache=False, + hide_dummy_entity: bool = True, +) -> Tuple[List["FeatureView"], List["OnDemandFeatureView"]]: + from feast.feature_service import FeatureService + + fvs = { + fv.name: fv + for fv in [ + *_list_feature_views(registry, project, allow_cache, hide_dummy_entity), + *registry.list_stream_feature_views( + project=project, allow_cache=allow_cache + ), + ] + } + + od_fvs = { + fv.name: fv + for fv in registry.list_on_demand_feature_views( + project=project, allow_cache=allow_cache + ) + } + + if isinstance(features, FeatureService): + fvs_to_use, od_fvs_to_use = [], [] + for fv_name, projection in [ + (projection.name, projection) + for projection in features.feature_view_projections + ]: + if fv_name in fvs: + fvs_to_use.append(fvs[fv_name].with_projection(copy.copy(projection))) + elif fv_name in od_fvs: + odfv = od_fvs[fv_name].with_projection(copy.copy(projection)) + od_fvs_to_use.append(odfv) + # Let's make sure to include an FVs which the ODFV requires Features from. + for projection in odfv.source_feature_view_projections.values(): + fv = fvs[projection.name].with_projection(copy.copy(projection)) + if fv not in fvs_to_use: + fvs_to_use.append(fv) + else: + raise ValueError( + f"The provided feature service {features.name} contains a reference to a feature view" + f"{fv_name} which doesn't exist. Please make sure that you have created the feature view" + f'{fv_name} and that you have registered it by running "apply".' + ) + views_to_use = (fvs_to_use, od_fvs_to_use) + else: + views_to_use = ( + [*fvs.values()], + [*od_fvs.values()], + ) + + return views_to_use + + +def _get_online_request_context( + registry, + project, + features: Union[List[str], "FeatureService"], + full_feature_names: bool, +): + from feast.feature_view import DUMMY_ENTITY_NAME + + _feature_refs = _get_features(registry, project, features, allow_cache=True) + + ( + requested_feature_views, + requested_on_demand_feature_views, + ) = _get_feature_views_to_use( + registry=registry, + project=project, + features=features, + allow_cache=True, + hide_dummy_entity=False, + ) + + ( + entity_name_to_join_key_map, + entity_type_map, + join_keys_set, + ) = _get_entity_maps(registry, project, requested_feature_views) + + _validate_feature_refs(_feature_refs, full_feature_names) + ( + grouped_refs, + grouped_odfv_refs, + ) = _group_feature_refs( + _feature_refs, + requested_feature_views, + requested_on_demand_feature_views, + ) + + requested_result_row_names = { + feat_ref.replace(":", "__") for feat_ref in _feature_refs + } + if not full_feature_names: + requested_result_row_names = { + name.rpartition("__")[-1] for name in requested_result_row_names + } + + feature_views = list(view for view, _ in grouped_refs) + + needed_request_data = get_needed_request_data(grouped_odfv_refs) + + entityless_case = DUMMY_ENTITY_NAME in [ + entity_name + for feature_view in feature_views + for entity_name in feature_view.entities + ] + + return ( + _feature_refs, + requested_on_demand_feature_views, + entity_name_to_join_key_map, + entity_type_map, + join_keys_set, + grouped_refs, + requested_result_row_names, + needed_request_data, + entityless_case, + ) + + +def _prepare_entities_to_read_from_online_store( + registry, + project, + features: Union[List[str], "FeatureService"], + entity_values: Mapping[ + str, Union[Sequence[Any], Sequence[ValueProto], RepeatedValueProto] + ], + full_feature_names: bool = False, + native_entity_values: bool = True, +): + from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL + + ( + feature_refs, + requested_on_demand_feature_views, + entity_name_to_join_key_map, + entity_type_map, + join_keys_set, + grouped_refs, + requested_result_row_names, + needed_request_data, + entityless_case, + ) = _get_online_request_context(registry, project, features, full_feature_names) + + # Extract Sequence from RepeatedValue Protobuf. + entity_value_lists: Dict[str, Union[List[Any], List[ValueProto]]] = { + k: list(v) if isinstance(v, Sequence) else list(v.val) + for k, v in entity_values.items() + } + + entity_proto_values: Dict[str, List[ValueProto]] + if native_entity_values: + # Convert values to Protobuf once. + entity_proto_values = { + k: python_values_to_proto_values( + v, entity_type_map.get(k, ValueType.UNKNOWN) + ) + for k, v in entity_value_lists.items() + } + else: + entity_proto_values = entity_value_lists + + num_rows = _validate_entity_values(entity_proto_values) + + join_key_values: Dict[str, List[ValueProto]] = {} + request_data_features: Dict[str, List[ValueProto]] = {} + # Entity rows may be either entities or request data. + for join_key_or_entity_name, values in entity_proto_values.items(): + # Found request data + if join_key_or_entity_name in needed_request_data: + request_data_features[join_key_or_entity_name] = values + else: + if join_key_or_entity_name in join_keys_set: + join_key = join_key_or_entity_name + else: + try: + join_key = entity_name_to_join_key_map[join_key_or_entity_name] + except KeyError: + raise EntityNotFoundException(join_key_or_entity_name, project) + else: + warnings.warn( + "Using entity name is deprecated. Use join_key instead." + ) + + # All join keys should be returned in the result. + requested_result_row_names.add(join_key) + join_key_values[join_key] = values + + ensure_request_data_values_exist(needed_request_data, request_data_features) + + # Populate online features response proto with join keys and request data features + online_features_response = GetOnlineFeaturesResponse(results=[]) + _populate_result_rows_from_columnar( + online_features_response=online_features_response, + data=dict(**join_key_values, **request_data_features), + ) + + # Add the Entityless case after populating result rows to avoid having to remove + # it later. + if entityless_case: + join_key_values[DUMMY_ENTITY_ID] = python_values_to_proto_values( + [DUMMY_ENTITY_VAL] * num_rows, DUMMY_ENTITY.value_type + ) + + return ( + join_key_values, + grouped_refs, + entity_name_to_join_key_map, + requested_on_demand_feature_views, + feature_refs, + requested_result_row_names, + online_features_response, + ) + + +def _get_entity_key_protos( + entity_rows: Iterable[Mapping[str, ValueProto]], +) -> List[EntityKeyProto]: + # Instantiate one EntityKeyProto per Entity. + entity_key_protos = [ + EntityKeyProto(join_keys=row.keys(), entity_values=row.values()) + for row in entity_rows + ] + return entity_key_protos + + +def _convert_rows_to_protobuf( + requested_features: List[str], + read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]], +) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[ValueProto]]]: + # Each row is a set of features for a given entity key. + # We only need to convert the data to Protobuf once. + null_value = ValueProto() + read_row_protos = [] + for read_row in read_rows: + row_ts_proto = Timestamp() + row_ts, feature_data = read_row + # TODO (Ly): reuse whatever timestamp if row_ts is None? + if row_ts is not None: + row_ts_proto.FromDatetime(row_ts) + event_timestamps = [row_ts_proto] * len(requested_features) + if feature_data is None: + statuses = [FieldStatus.NOT_FOUND] * len(requested_features) + values = [null_value] * len(requested_features) + else: + statuses = [] + values = [] + for feature_name in requested_features: + # Make sure order of data is the same as requested_features. + if feature_name not in feature_data: + statuses.append(FieldStatus.NOT_FOUND) + values.append(null_value) + else: + statuses.append(FieldStatus.PRESENT) + values.append(feature_data[feature_name]) + read_row_protos.append((event_timestamps, statuses, values)) + return read_row_protos diff --git a/sdk/python/tests/unit/test_feature_validation.py b/sdk/python/tests/unit/test_feature_validation.py index b349eb8ea0..5e8e11ab91 100644 --- a/sdk/python/tests/unit/test_feature_validation.py +++ b/sdk/python/tests/unit/test_feature_validation.py @@ -1,7 +1,7 @@ import pytest from feast.errors import FeatureNameCollisionError -from feast.feature_store import _validate_feature_refs +from feast.utils import _validate_feature_refs def test_feature_name_collision_on_historical_retrieval(): diff --git a/sdk/python/tests/unit/test_unit_feature_store.py b/sdk/python/tests/unit/test_unit_feature_store.py index 0c13dffa62..19a133564f 100644 --- a/sdk/python/tests/unit/test_unit_feature_store.py +++ b/sdk/python/tests/unit/test_unit_feature_store.py @@ -1,7 +1,7 @@ from dataclasses import dataclass from typing import Dict, List -from feast import FeatureStore +from feast import utils from feast.protos.feast.types.Value_pb2 import Value @@ -36,8 +36,7 @@ def test_get_unique_entities(): projection=MockFeatureViewProjection(join_key_map={}), ) - unique_entities, indexes = FeatureStore._get_unique_entities( - FeatureStore, + unique_entities, indexes = utils._get_unique_entities( table=fv, join_key_values=entity_values, entity_name_to_join_key_map=entity_name_to_join_key_map,