diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 20db8292caa..7073a20d1e0 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -89,6 +89,8 @@ from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference from feast.ssl_ca_trust_store_setup import configure_ca_trust_store_env_variables from feast.stream_feature_view import StreamFeatureView +from feast.transformation.pandas_transformation import PandasTransformation +from feast.transformation.python_transformation import PythonTransformation from feast.utils import _utc_now warnings.simplefilter("once", DeprecationWarning) @@ -1546,6 +1548,64 @@ def _get_feature_view_and_df_for_online_write( df = pd.DataFrame(df) except Exception as _: raise DataFrameSerializationError(df) + + # # Apply transformations if this is an OnDemandFeatureView with write_to_online_store=True + if ( + isinstance(feature_view, OnDemandFeatureView) + and feature_view.write_to_online_store + ): + if ( + feature_view.mode == "python" + and isinstance( + feature_view.feature_transformation, PythonTransformation + ) + and df is not None + ): + input_dict = ( + df.to_dict(orient="records")[0] + if feature_view.singleton + else df.to_dict(orient="list") + ) + transformed_data = feature_view.feature_transformation.udf(input_dict) + if feature_view.write_to_online_store: + entities = [ + self.get_entity(entity) + for entity in (feature_view.entities or []) + ] + join_keys = [entity.join_key for entity in entities if entity] + join_keys = [k for k in join_keys if k in input_dict.keys()] + transformed_df = pd.DataFrame(transformed_data) + input_df = pd.DataFrame(input_dict) + if input_df.shape[0] == transformed_df.shape[0]: + for k in input_dict: + if k not in transformed_data: + transformed_data[k] = input_dict[k] + transformed_df = pd.DataFrame(transformed_data) + else: + transformed_df = pd.merge( + transformed_df, + input_df, + how="left", + on=join_keys, + ) + else: + # overwrite any transformed features and update the dictionary + for k in input_dict: + if k not in transformed_data: + transformed_data[k] = input_dict[k] + df = pd.DataFrame(transformed_data) + elif feature_view.mode == "pandas" and isinstance( + feature_view.feature_transformation, PandasTransformation + ): + transformed_df = feature_view.feature_transformation.udf(df) + if df is not None: + for col in df.columns: + transformed_df[col] = df[col] + df = transformed_df + + else: + raise Exception("Unsupported OnDemandFeatureView mode") + return feature_view, df def write_to_online_store( @@ -1887,7 +1947,7 @@ def retrieve_online_documents_v2( ( available_feature_views, - _, + available_odfv_views, ) = utils._get_feature_views_to_use( registry=self._registry, project=self.project, @@ -1898,13 +1958,20 @@ def retrieve_online_documents_v2( feature_view_set = set() for feature in features: feature_view_name = feature.split(":")[0] - feature_view = self.get_feature_view(feature_view_name) + if feature_view_name in [fv.name for fv in available_odfv_views]: + feature_view: Union[OnDemandFeatureView, FeatureView] = ( + self.get_on_demand_feature_view(feature_view_name) + ) + else: + feature_view = self.get_feature_view(feature_view_name) feature_view_set.add(feature_view.name) if len(feature_view_set) > 1: raise ValueError("Document retrieval only supports a single feature view.") requested_features = [ f.split(":")[1] for f in features if isinstance(f, str) and ":" in f ] + if len(available_feature_views) == 0: + available_feature_views.extend(available_odfv_views) # type: ignore[arg-type] requested_feature_view = available_feature_views[0] if not requested_feature_view: diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index 5554026ad09..49b74893451 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -348,12 +348,11 @@ def to_proto(self) -> FeatureViewProto: if self.stream_source: stream_source_proto = self.stream_source.to_proto() stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" - spec = FeatureViewSpecProto( name=self.name, entities=self.entities, entity_columns=[field.to_proto() for field in self.entity_columns], - features=[field.to_proto() for field in self.features], + features=[feature.to_proto() for feature in self.features], description=self.description, tags=self.tags, owner=self.owner, diff --git a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py index e9087094b57..91e432a74fa 100644 --- a/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py +++ b/sdk/python/feast/infra/online_stores/milvus_online_store/milvus.py @@ -197,10 +197,14 @@ def _get_or_create_collection( ) index_params = self.client.prepare_index_params() for vector_field in schema.fields: - if vector_field.dtype in [ - DataType.FLOAT_VECTOR, - DataType.BINARY_VECTOR, - ]: + if ( + vector_field.dtype + in [ + DataType.FLOAT_VECTOR, + DataType.BINARY_VECTOR, + ] + and vector_field.name in vector_field_dict + ): metric = vector_field_dict[ vector_field.name ].vector_search_metric diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 046666d5d82..15ef81188b0 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -167,7 +167,10 @@ def online_write_batch( table_name = _table_id(project, table) for feature_name, val in values.items(): if config.online_store.vector_enabled: - if feature_type_dict[feature_name] in FEAST_VECTOR_TYPES: + if ( + feature_type_dict.get(feature_name, None) + in FEAST_VECTOR_TYPES + ): val_bin = serialize_f32( val.float_list_val.val, config.online_store.vector_len ) # type: ignore @@ -226,22 +229,22 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + serialized_entity_keys = [ + serialize_entity_key( + entity_key, + entity_key_serialization_version=config.entity_key_serialization_version, + ) + for entity_key in entity_keys + ] # Fetch all entities in one go cur.execute( f"SELECT entity_key, feature_name, value, event_ts " f"FROM {_table_id(config.project, table)} " f"WHERE entity_key IN ({','.join('?' * len(entity_keys))}) " f"ORDER BY entity_key", - [ - serialize_entity_key( - entity_key, - entity_key_serialization_version=config.entity_key_serialization_version, - ) - for entity_key in entity_keys - ], + serialized_entity_keys, ) rows = cur.fetchall() - rows = { k: list(group) for k, group in itertools.groupby(rows, key=lambda r: r[0]) } diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index c0c5412928b..4e504997d2a 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -449,7 +449,7 @@ def materialize_single_feature_view( def get_historical_features( self, config: RepoConfig, - feature_views: List[FeatureView], + feature_views: List[Union[FeatureView, OnDemandFeatureView]], feature_refs: List[str], entity_df: Union[pd.DataFrame, str], registry: BaseRegistry, diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 78039e1b873..18fbd051771 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -242,7 +242,7 @@ def materialize_single_feature_view( def get_historical_features( self, config: RepoConfig, - feature_views: List[FeatureView], + feature_views: List[Union[FeatureView, OnDemandFeatureView]], feature_refs: List[str], entity_df: Union[pd.DataFrame, str], registry: BaseRegistry, diff --git a/sdk/python/feast/nlp_test_data.py b/sdk/python/feast/nlp_test_data.py new file mode 100644 index 00000000000..5c0a6af4d61 --- /dev/null +++ b/sdk/python/feast/nlp_test_data.py @@ -0,0 +1,67 @@ +from datetime import datetime +from typing import Dict + +import numpy as np +import pandas as pd + + +def create_document_chunks_df( + documents: Dict[str, str], + start_date: datetime, + end_date: datetime, + embedding_size: int = 60, +) -> pd.DataFrame: + """ + Example df generated by this function: + + | event_timestamp | document_id | chunk_id | chunk_text | embedding | created | + |------------------+-------------+----------+------------------+-----------+------------------| + | 2021-03-17 19:31 | doc_1 | chunk-1 | Hello world | [0.1, ...]| 2021-03-24 19:34 | + | 2021-03-17 19:31 | doc_1 | chunk-2 | How are you? | [0.2, ...]| 2021-03-24 19:34 | + | 2021-03-17 19:31 | doc_2 | chunk-1 | This is a test | [0.3, ...]| 2021-03-24 19:34 | + | 2021-03-17 19:31 | doc_2 | chunk-2 | Document chunk | [0.4, ...]| 2021-03-24 19:34 | + """ + df_hourly = pd.DataFrame( + { + "event_timestamp": [ + pd.Timestamp(dt, unit="ms").round("ms") + for dt in pd.date_range( + start=start_date, + end=end_date, + freq="1h", + inclusive="left", + tz="UTC", + ) + ] + + [ + pd.Timestamp( + year=2021, month=4, day=12, hour=7, minute=0, second=0, tz="UTC" + ) + ] + } + ) + df_all_chunks = pd.DataFrame() + + for doc_id, doc_text in documents.items(): + chunks = doc_text.split(". ") # Simple chunking by sentence + for chunk_id, chunk_text in enumerate(chunks, start=1): + df_hourly_copy = df_hourly.copy() + df_hourly_copy["document_id"] = doc_id + df_hourly_copy["chunk_id"] = f"chunk-{chunk_id}" + df_hourly_copy["chunk_text"] = chunk_text + df_all_chunks = pd.concat([df_hourly_copy, df_all_chunks]) + + df_all_chunks.reset_index(drop=True, inplace=True) + rows = df_all_chunks["event_timestamp"].count() + + # Generate random embeddings for each chunk + df_all_chunks["embedding"] = [ + np.random.rand(embedding_size).tolist() for _ in range(rows) + ] + df_all_chunks["created"] = pd.to_datetime(pd.Timestamp.now(tz=None).round("ms")) + + # Create duplicate rows that should be filtered by created timestamp + late_row = df_all_chunks[rows // 2 : rows // 2 + 1] + df_all_chunks = pd.concat([df_all_chunks, late_row, late_row], ignore_index=True) + + return df_all_chunks diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 7eb4bab26b7..f4ec0149184 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -339,7 +339,6 @@ def to_proto(self) -> OnDemandFeatureViewProto: write_to_online_store=self.write_to_online_store, singleton=self.singleton if self.singleton else False, ) - return OnDemandFeatureViewProto(spec=spec, meta=meta) @classmethod @@ -454,6 +453,8 @@ def from_proto( Field( name=feature.name, dtype=from_value_type(ValueType(feature.value_type)), + vector_index=feature.vector_index, + vector_search_metric=feature.vector_search_metric, ) for feature in on_demand_feature_view_proto.spec.features ], @@ -640,13 +641,25 @@ def transform_dict( def infer_features(self) -> None: random_input = self._construct_random_input(singleton=self.singleton) - inferred_features = self.feature_transformation.infer_features(random_input) + inferred_features = self.feature_transformation.infer_features( + random_input=random_input, singleton=self.singleton + ) if self.features: missing_features = [] for specified_feature in self.features: - if specified_feature not in inferred_features: + if ( + specified_feature not in inferred_features + and "Array" not in specified_feature.dtype.__str__() + ): missing_features.append(specified_feature) + elif "Array" in specified_feature.dtype.__str__(): + if specified_feature.name not in [ + f.name for f in inferred_features + ]: + missing_features.append(specified_feature) + else: + pass if missing_features: raise SpecifiedFeaturesNotPresentError( missing_features, inferred_features, self.name @@ -738,6 +751,7 @@ def on_demand_feature_view( owner: str = "", write_to_online_store: bool = False, singleton: bool = False, + explode: bool = False, ): """ Creates an OnDemandFeatureView object with the given user function as udf. @@ -759,6 +773,7 @@ def on_demand_feature_view( the online store for faster retrieval. singleton (optional): A boolean that indicates whether the transformation is executed on a singleton (only applicable when mode="python"). + explode (optional): A boolean that indicates whether the transformation explodes the input data into multiple rows. """ def mainify(obj) -> None: @@ -778,10 +793,6 @@ def decorator(user_function): ) transformation = PandasTransformation(user_function, udf_string) elif mode == "python": - if return_annotation not in (inspect._empty, dict[str, Any]): - raise TypeError( - f"return signature for {user_function} is {return_annotation} but should be dict[str, Any]" - ) transformation = PythonTransformation(user_function, udf_string) elif mode == "substrait": from ibis.expr.types.relations import Table diff --git a/sdk/python/feast/transformation/pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py index 35e786aac8f..66a5c65caf2 100644 --- a/sdk/python/feast/transformation/pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -1,4 +1,4 @@ -from typing import Any, Callable +from typing import Any, Callable, Optional import dill import pandas as pd @@ -40,7 +40,9 @@ def transform_singleton(self, input_df: pd.DataFrame) -> pd.DataFrame: "PandasTransformation does not support singleton transformations." ) - def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]: + def infer_features( + self, random_input: dict[str, list[Any]], singleton: Optional[bool] + ) -> list[Field]: df = pd.DataFrame.from_dict(random_input) output_df: pd.DataFrame = self.transform(df) diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index ce2aaf2002d..20a9dd9ff6f 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -1,5 +1,5 @@ from types import FunctionType -from typing import Any +from typing import Any, Optional import dill import pyarrow @@ -45,7 +45,9 @@ def transform_singleton(self, input_dict: dict) -> dict: output_dict = self.udf.__call__(input_dict) return {**input_dict, **output_dict} - def infer_features(self, random_input: dict[str, Any]) -> list[Field]: + def infer_features( + self, random_input: dict[str, Any], singleton: Optional[bool] = False + ) -> list[Field]: output_dict: dict[str, Any] = self.transform(random_input) fields = [] @@ -58,6 +60,10 @@ def infer_features(self, random_input: dict[str, Any]) -> list[Field]: ) inferred_type = type(feature_value[0]) inferred_value = feature_value[0] + if singleton: + inferred_value = feature_value + inferred_type = None # type: ignore + else: inferred_type = type(feature_value) inferred_value = feature_value @@ -69,7 +75,7 @@ def infer_features(self, random_input: dict[str, Any]) -> list[Field]: python_type_to_feast_value_type( feature_name, value=inferred_value, - type_name=inferred_type.__name__, + type_name=inferred_type.__name__ if inferred_type else None, ) ), ) diff --git a/sdk/python/feast/transformation/substrait_transformation.py b/sdk/python/feast/transformation/substrait_transformation.py index 47e2ced9768..a6d9bfa18c0 100644 --- a/sdk/python/feast/transformation/substrait_transformation.py +++ b/sdk/python/feast/transformation/substrait_transformation.py @@ -1,5 +1,5 @@ from types import FunctionType -from typing import Any +from typing import Any, Optional import dill import pandas as pd @@ -61,7 +61,9 @@ def table_provider(names, schema: pyarrow.Schema): return table - def infer_features(self, random_input: dict[str, list[Any]]) -> list[Field]: + def infer_features( + self, random_input: dict[str, list[Any]], singleton: Optional[bool] + ) -> list[Field]: df = pd.DataFrame.from_dict(random_input) output_df: pd.DataFrame = self.transform(df) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index e64e38b143a..4cca1379ed3 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -343,6 +343,22 @@ def _convert_arrow_odfv_to_proto( for column, value_type in columns if column in table.column_names } + + # Ensure join keys are included in proto_values_by_column, but check if they exist first + for join_key, value_type in join_keys.items(): + if join_key not in proto_values_by_column: + # Check if the join key exists in the table before trying to access it + if join_key in table.column_names: + proto_values_by_column[join_key] = python_values_to_proto_values( + table.column(join_key).to_numpy(zero_copy_only=False), value_type + ) + else: + # Create null/default values if the join key isn't in the table + null_column = [None] * table.num_rows + proto_values_by_column[join_key] = python_values_to_proto_values( + null_column, value_type + ) + # Adding On Demand Features for feature in feature_view.features: if ( @@ -357,7 +373,7 @@ def _convert_arrow_odfv_to_proto( updated_table = pyarrow.RecordBatch.from_arrays( table.columns + [null_column], schema=table.schema.append( - pyarrow.field(feature.name, null_column.type) + pyarrow.field(feature.name, null_column.type) # type: ignore[attr-defined] ), ) proto_values_by_column[feature.name] = python_values_to_proto_values( @@ -368,7 +384,11 @@ def _convert_arrow_odfv_to_proto( entity_keys = [ EntityKeyProto( join_keys=join_keys, - entity_values=[proto_values_by_column[k][idx] for k in join_keys], + entity_values=[ + proto_values_by_column[k][idx] + for k in join_keys + if k in proto_values_by_column + ], ) for idx in range(table.num_rows) ] @@ -378,6 +398,12 @@ def _convert_arrow_odfv_to_proto( feature.name: proto_values_by_column[feature.name] for feature in feature_view.features } + if feature_view.write_to_online_store: + table_columns = [col.name for col in table.schema] + for feature in feature_view.schema: + if feature.name not in feature_dict and feature.name in table_columns: + feature_dict[feature.name] = proto_values_by_column[feature.name] + features = [dict(zip(feature_dict, vars)) for vars in zip(*feature_dict.values())] # We need to artificially add event_timestamps and created_timestamps @@ -441,19 +467,24 @@ def _group_feature_refs( all_feature_views: List["FeatureView"], all_on_demand_feature_views: List["OnDemandFeatureView"], ) -> Tuple[ - List[Tuple["FeatureView", List[str]]], List[Tuple["OnDemandFeatureView", List[str]]] + List[Tuple[Union["FeatureView", "OnDemandFeatureView"], List[str]]], + List[Tuple["OnDemandFeatureView", List[str]]], ]: """Get list of feature views and corresponding feature names based on feature references""" # view name to view proto - view_index = {view.projection.name_to_use(): view for view in all_feature_views} + view_index: Dict[str, Union["FeatureView", "OnDemandFeatureView"]] = { + view.projection.name_to_use(): view for view in all_feature_views + } # on demand view to on demand view proto - on_demand_view_index = { - view.projection.name_to_use(): view - for view in all_on_demand_feature_views - if view.projection - } + on_demand_view_index: Dict[str, "OnDemandFeatureView"] = {} + for view in all_on_demand_feature_views: + if view.projection and not view.write_to_online_store: + on_demand_view_index[view.projection.name_to_use()] = view + elif view.projection and view.write_to_online_store: + # we insert the ODFV view to FVs for ones that are written to the online store + view_index[view.projection.name_to_use()] = view # view name to feature names views_features = defaultdict(set) @@ -464,7 +495,16 @@ def _group_feature_refs( for ref in features: view_name, feat_name = ref.split(":") if view_name in view_index: - view_index[view_name].projection.get_feature(feat_name) # For validation + if hasattr(view_index[view_name], "write_to_online_store"): + tmp_feat_name = [ + f for f in view_index[view_name].schema if f.name == feat_name + ] + if len(tmp_feat_name) > 0: + feat_name = tmp_feat_name[0].name + else: + view_index[view_name].projection.get_feature( + feat_name + ) # For validation views_features[view_name].add(feat_name) elif view_name in on_demand_view_index: on_demand_view_index[view_name].projection.get_feature( @@ -480,7 +520,7 @@ def _group_feature_refs( else: raise FeatureViewNotFoundException(view_name) - fvs_result: List[Tuple["FeatureView", List[str]]] = [] + fvs_result: List[Tuple[Union["FeatureView", "OnDemandFeatureView"], List[str]]] = [] odfvs_result: List[Tuple["OnDemandFeatureView", List[str]]] = [] for view_name, feature_names in views_features.items(): @@ -557,73 +597,74 @@ def _augment_response_with_on_demand_transforms( odfv_result_names = set() for odfv_name, _feature_refs in odfv_feature_refs.items(): odfv = requested_odfv_map[odfv_name] - if odfv.mode == "python": - if initial_response_dict is None: - initial_response_dict = initial_response.to_dict() - transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( - initial_response_dict - ) - elif odfv.mode in {"pandas", "substrait"}: - if initial_response_arrow is None: - initial_response_arrow = initial_response.to_arrow() - transformed_features_arrow = odfv.transform_arrow( - initial_response_arrow, full_feature_names + if not odfv.write_to_online_store: + if odfv.mode == "python": + if initial_response_dict is None: + initial_response_dict = initial_response.to_dict() + transformed_features_dict: Dict[str, List[Any]] = odfv.transform_dict( + initial_response_dict + ) + elif odfv.mode in {"pandas", "substrait"}: + if initial_response_arrow is None: + initial_response_arrow = initial_response.to_arrow() + transformed_features_arrow = odfv.transform_arrow( + initial_response_arrow, full_feature_names + ) + else: + raise Exception( + f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'." + ) + + transformed_features = ( + transformed_features_dict + if odfv.mode == "python" + else transformed_features_arrow ) - else: - raise Exception( - f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'." + transformed_columns = ( + transformed_features.column_names + if isinstance(transformed_features, pyarrow.Table) + else transformed_features ) - - transformed_features = ( - transformed_features_dict - if odfv.mode == "python" - else transformed_features_arrow - ) - transformed_columns = ( - transformed_features.column_names - if isinstance(transformed_features, pyarrow.Table) - else transformed_features - ) - selected_subset = [f for f in transformed_columns if f in _feature_refs] - - proto_values = [] - schema_dict = {k.name: k.dtype for k in odfv.schema} - for selected_feature in selected_subset: - feature_vector = transformed_features[selected_feature] - selected_feature_type = schema_dict.get(selected_feature, None) - feature_type: ValueType = ValueType.UNKNOWN - if selected_feature_type is not None: - if isinstance( - selected_feature_type, (ComplexFeastType, PrimitiveFeastType) - ): - feature_type = selected_feature_type.to_value_type() - elif not isinstance(selected_feature_type, ValueType): - raise TypeError( - f"Unexpected type for feature_type: {type(feature_type)}" + selected_subset = [f for f in transformed_columns if f in _feature_refs] + + proto_values = [] + schema_dict = {k.name: k.dtype for k in odfv.schema} + for selected_feature in selected_subset: + feature_vector = transformed_features[selected_feature] + selected_feature_type = schema_dict.get(selected_feature, None) + feature_type: ValueType = ValueType.UNKNOWN + if selected_feature_type is not None: + if isinstance( + selected_feature_type, (ComplexFeastType, PrimitiveFeastType) + ): + feature_type = selected_feature_type.to_value_type() + elif not isinstance(selected_feature_type, ValueType): + raise TypeError( + f"Unexpected type for feature_type: {type(feature_type)}" + ) + + proto_values.append( + python_values_to_proto_values( + feature_vector + if isinstance(feature_vector, list) + else [feature_vector] + if odfv.mode == "python" + else feature_vector.to_numpy(), + feature_type, ) - - proto_values.append( - python_values_to_proto_values( - feature_vector - if isinstance(feature_vector, list) - else [feature_vector] - if odfv.mode == "python" - else feature_vector.to_numpy(), - feature_type, ) - ) - odfv_result_names |= set(selected_subset) + odfv_result_names |= set(selected_subset) - online_features_response.metadata.feature_names.val.extend(selected_subset) - for feature_idx in range(len(selected_subset)): - online_features_response.results.append( - GetOnlineFeaturesResponse.FeatureVector( - values=proto_values[feature_idx], - statuses=[FieldStatus.PRESENT] * len(proto_values[feature_idx]), - event_timestamps=[Timestamp()] * len(proto_values[feature_idx]), + online_features_response.metadata.feature_names.val.extend(selected_subset) + for feature_idx in range(len(selected_subset)): + online_features_response.results.append( + GetOnlineFeaturesResponse.FeatureVector( + values=proto_values[feature_idx], + statuses=[FieldStatus.PRESENT] * len(proto_values[feature_idx]), + event_timestamps=[Timestamp()] * len(proto_values[feature_idx]), + ) ) - ) def _get_entity_maps( @@ -821,6 +862,7 @@ def get_needed_request_data( needed_request_data: Set[str] = set() for odfv, _ in grouped_odfv_refs: odfv_request_data_schema = odfv.get_request_data_schema() + # if odfv.write_to_online_store, we should not pass in the request data needed_request_data.update(odfv_request_data_schema.keys()) return needed_request_data @@ -1109,7 +1151,7 @@ def _get_online_request_context( entityless_case = DUMMY_ENTITY_NAME in [ entity_name for feature_view in feature_views - for entity_name in feature_view.entities + for entity_name in (feature_view.entities or []) ] return ( @@ -1172,7 +1214,13 @@ def _prepare_entities_to_read_from_online_store( odfv_entities: List[Entity] = [] request_source_keys: List[str] = [] for on_demand_feature_view in requested_on_demand_feature_views: - odfv_entities.append(*getattr(on_demand_feature_view, "entities", [])) + entities_for_odfv = getattr(on_demand_feature_view, "entities", []) + if len(entities_for_odfv) > 0 and isinstance(entities_for_odfv[0], str): + entities_for_odfv = [ + registry.get_entity(entity_name, project, allow_cache=True) + for entity_name in entities_for_odfv + ] + odfv_entities.extend(entities_for_odfv) for source in on_demand_feature_view.source_request_sources: source_schema = on_demand_feature_view.source_request_sources[source].schema for column in source_schema: diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index 54488d43212..951f7033d23 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -154,23 +154,6 @@ def python_native_test_invalid_pandas_view( } return output_dict - with pytest.raises(TypeError): - - @on_demand_feature_view( - sources=[date_request], - schema=[ - Field(name="output", dtype=UnixTimestamp), - Field(name="object_output", dtype=String), - ], - mode="python", - ) - def python_native_test_invalid_dict_view( - features_df: pd.DataFrame, - ) -> pd.DataFrame: - data = pd.DataFrame() - data["output"] = features_df["some_date"] - return data - def test_datasource_inference(): # Create Feature Views diff --git a/sdk/python/tests/unit/test_on_demand_python_transformation.py b/sdk/python/tests/unit/test_on_demand_python_transformation.py index a0c33fadfda..7ae9f1c70e6 100644 --- a/sdk/python/tests/unit/test_on_demand_python_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_python_transformation.py @@ -1,5 +1,7 @@ import os import re +import sqlite3 +import sys import tempfile import unittest from datetime import datetime, timedelta @@ -20,10 +22,12 @@ from feast.feature_view import DUMMY_ENTITY_FIELD from feast.field import Field from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig +from feast.nlp_test_data import create_document_chunks_df from feast.on_demand_feature_view import on_demand_feature_view from feast.types import ( Array, Bool, + Bytes, Float32, Float64, Int64, @@ -161,7 +165,11 @@ def python_demo_view(inputs: dict[str, Any]) -> dict[str, Any]: @on_demand_feature_view( sources=[driver_stats_fv[["conv_rate", "acc_rate"]]], schema=[ - Field(name="conv_rate_plus_acc_python_singleton", dtype=Float64) + Field(name="conv_rate_plus_acc_python_singleton", dtype=Float64), + Field( + name="conv_rate_plus_acc_python_singleton_array", + dtype=Array(Float64), + ), ], mode="python", singleton=True, @@ -171,6 +179,7 @@ def python_singleton_view(inputs: dict[str, Any]) -> dict[str, Any]: output["conv_rate_plus_acc_python_singleton"] = ( inputs["conv_rate"] + inputs["acc_rate"] ) + output["conv_rate_plus_acc_python_singleton_array"] = [0.1, 0.2, 0.3] return output @on_demand_feature_view( @@ -852,6 +861,9 @@ def test_stored_writes(self): assert driver_stats_fv.entities == [driver.name] assert driver_stats_fv.entity_columns == [] + ODFV_STRING_CONSTANT = "guaranteed constant" + ODFV_OTHER_STRING_CONSTANT = "somethign else" + @on_demand_feature_view( entities=[driver], sources=[ @@ -863,6 +875,7 @@ def test_stored_writes(self): Field(name="current_datetime", dtype=UnixTimestamp), Field(name="counter", dtype=Int64), Field(name="input_datetime", dtype=UnixTimestamp), + Field(name="string_constant", dtype=String), ], mode="python", write_to_online_store=True, @@ -880,6 +893,7 @@ def python_stored_writes_feature_view( "current_datetime": [datetime.now() for _ in inputs["conv_rate"]], "counter": [c + 1 for c in inputs["counter"]], "input_datetime": [d for d in inputs["input_datetime"]], + "string_constant": [ODFV_STRING_CONSTANT], } return output @@ -933,30 +947,13 @@ def python_stored_writes_feature_view( "created": current_datetime, } ] - odfv_entity_rows_to_write = [ - { - "driver_id": 1001, - "counter": 0, - "input_datetime": current_datetime, - } - ] fv_entity_rows_to_read = [ { "driver_id": 1001, } ] - # Note that here we shouldn't have to pass the request source features for reading - # because they should have already been written to the online store - odfv_entity_rows_to_read = [ - { - "driver_id": 1001, - "conv_rate": 0.25, - "acc_rate": 0.50, - "counter": 0, - "input_datetime": current_datetime, - } - ] - print("storing fv features") + print("") + print("storing FV features") self.store.write_to_online_store( feature_view_name="driver_hourly_stats", df=fv_entity_rows_to_write, @@ -978,11 +975,58 @@ def python_stored_writes_feature_view( "acc_rate": [0.25], } - print("storing odfv features") + # Note that here we shouldn't have to pass the request source features for reading + # because they should have already been written to the online store + odfv_entity_rows_to_write = [ + { + "driver_id": 1002, + "counter": 0, + "conv_rate": 0.25, + "acc_rate": 0.50, + "input_datetime": current_datetime, + "string_constant": ODFV_OTHER_STRING_CONSTANT, + } + ] + odfv_entity_rows_to_read = [ + { + "driver_id": 1002, + "conv_rate_plus_acc": 7, # note how this is not the correct value and would be calculate on demand + "conv_rate": 0.25, + "acc_rate": 0.50, + "counter": 0, + "input_datetime": current_datetime, + "string_constant": ODFV_STRING_CONSTANT, + } + ] + print("storing ODFV features") self.store.write_to_online_store( feature_view_name="python_stored_writes_feature_view", df=odfv_entity_rows_to_write, ) + _conn = sqlite3.connect(self.store.config.online_store.path) + _table_name = ( + self.store.project + + "_" + + self.store.get_on_demand_feature_view( + "python_stored_writes_feature_view" + ).name + ) + sample = pd.read_sql( + f""" + select + feature_name, + value + from {_table_name} + """, + _conn, + ) + assert ( + sample[sample["feature_name"] == "string_constant"]["value"] + .astype(str) + .str.contains("guaranteed constant") + .values[0] + ) + print("reading odfv features") online_odfv_python_response = self.store.get_online_features( entity_rows=odfv_entity_rows_to_read, @@ -991,6 +1035,7 @@ def python_stored_writes_feature_view( "python_stored_writes_feature_view:current_datetime", "python_stored_writes_feature_view:counter", "python_stored_writes_feature_view:input_datetime", + "python_stored_writes_feature_view:string_constant", ], ).to_dict() print(online_odfv_python_response) @@ -1001,5 +1046,248 @@ def python_stored_writes_feature_view( "counter", "current_datetime", "input_datetime", + "string_constant", + ] + ) + # This should be 1 because we write the value of 0 and during the write, the counter is incremented + assert online_odfv_python_response["counter"] == [1] + assert online_odfv_python_response["string_constant"] == [ + ODFV_STRING_CONSTANT + ] + assert online_odfv_python_response["string_constant"] != [ + ODFV_OTHER_STRING_CONSTANT + ] + + def test_stored_writes_with_explode(self): + with tempfile.TemporaryDirectory() as data_dir: + self.store = FeatureStore( + config=RepoConfig( + project="test_on_demand_python_transformation_explode", + registry=os.path.join(data_dir, "registry.db"), + provider="local", + entity_key_serialization_version=3, + online_store=SqliteOnlineStoreConfig( + path=os.path.join(data_dir, "online.db"), + vector_enabled=True, + vector_len=5, + ), + ) + ) + + documents = { + "doc_1": "Hello world. How are you?", + "doc_2": "This is a test. Document chunking example.", + } + start_date = datetime.now() - timedelta(days=15) + end_date = datetime.now() + + documents_df = create_document_chunks_df( + documents, + start_date, + end_date, + embedding_size=60, + ) + corpus_path = os.path.join(data_dir, "documents.parquet") + documents_df.to_parquet(path=corpus_path, allow_truncated_timestamps=True) + + chunk = Entity( + name="chunk", join_keys=["chunk_id"], value_type=ValueType.STRING + ) + document = Entity( + name="document", join_keys=["document_id"], value_type=ValueType.STRING + ) + + input_explode_request_source = RequestSource( + name="counter_source", + schema=[ + Field(name="document_id", dtype=String), + Field(name="document_text", dtype=String), + Field(name="document_bytes", dtype=Bytes), + ], + ) + + @on_demand_feature_view( + entities=[chunk, document], + sources=[ + input_explode_request_source, + ], + schema=[ + Field(name="document_id", dtype=String), + Field(name="chunk_id", dtype=String), + Field(name="chunk_text", dtype=String), + Field( + name="vector", + dtype=Array(Float32), + vector_index=True, + vector_search_metric="L2", + ), + ], + mode="python", + write_to_online_store=True, + ) + def python_stored_writes_feature_view_explode_singleton( + inputs: dict[str, Any], + ): + output: dict[str, Any] = { + "document_id": ["doc_1", "doc_1", "doc_2", "doc_2"], + "chunk_id": ["chunk-1", "chunk-2", "chunk-1", "chunk-2"], + "chunk_text": [ + "hello friends", + "how are you?", + "This is a test.", + "Document chunking example.", + ], + "vector": [ + [0.1] * 5, + [0.2] * 5, + [0.3] * 5, + [0.4] * 5, + ], + } + return output + + assert python_stored_writes_feature_view_explode_singleton.entities == [ + chunk.name, + document.name, + ] + assert ( + python_stored_writes_feature_view_explode_singleton.entity_columns[ + 0 + ].name + == document.join_key + ) + assert ( + python_stored_writes_feature_view_explode_singleton.entity_columns[ + 1 + ].name + == chunk.join_key + ) + + self.store.apply( + [ + chunk, + document, + input_explode_request_source, + python_stored_writes_feature_view_explode_singleton, + ] + ) + odfv_applied = self.store.get_on_demand_feature_view( + "python_stored_writes_feature_view_explode_singleton" + ) + + assert odfv_applied.features[1].vector_index + + assert odfv_applied.entities == [chunk.name, document.name] + + # Note here that after apply() is called, the entity_columns are populated with the join_key + assert odfv_applied.entity_columns[1].name == chunk.join_key + assert odfv_applied.entity_columns[0].name == document.join_key + + assert len(self.store.list_all_feature_views()) == 1 + assert len(self.store.list_feature_views()) == 0 + assert len(self.store.list_on_demand_feature_views()) == 1 + assert len(self.store.list_stream_feature_views()) == 0 + assert ( + python_stored_writes_feature_view_explode_singleton.entity_columns + == self.store.get_on_demand_feature_view( + "python_stored_writes_feature_view_explode_singleton" + ).entity_columns + ) + + odfv_entity_rows_to_write = [ + { + "document_id": "document_1", + "document_text": "Hello world. How are you?", + }, + { + "document_id": "document_2", + "document_text": "This is a test. Document chunking example.", + }, + ] + fv_entity_rows_to_read = [ + { + "document_id": "doc_1", + "chunk_id": "chunk-2", + }, + { + "document_id": "doc_2", + "chunk_id": "chunk-1", + }, + ] + + self.store.write_to_online_store( + feature_view_name="python_stored_writes_feature_view_explode_singleton", + df=odfv_entity_rows_to_write, + ) + _table_name = ( + self.store.project + + "_" + + self.store.get_on_demand_feature_view( + "python_stored_writes_feature_view_explode_singleton" + ).name + ) + _conn = sqlite3.connect(self.store.config.online_store.path) + sample = pd.read_sql( + f""" + select + entity_key, + feature_name, + value + from {_table_name} + """, + _conn, + ) + print(f"\nsample from {_table_name}:\n{sample}") + + # verifying we retrieve doc_1 chunk-2 + filt = (sample["feature_name"] == "chunk_text") & ( + sample["value"] + .apply(lambda x: x.decode("latin1")) + .str.contains("how are") + ) + assert ( + sample[filt]["entity_key"].astype(str).str.contains("doc_1") + & sample[filt]["entity_key"].astype(str).str.contains("chunk-2") + ).values[0] + + print("reading fv features") + online_python_response = self.store.get_online_features( + entity_rows=fv_entity_rows_to_read, + features=[ + "python_stored_writes_feature_view_explode_singleton:document_id", + "python_stored_writes_feature_view_explode_singleton:chunk_id", + "python_stored_writes_feature_view_explode_singleton:chunk_text", + ], + ).to_dict() + assert sorted(list(online_python_response.keys())) == sorted( + [ + "chunk_id", + "chunk_text", + "document_id", ] ) + assert online_python_response == { + "document_id": ["doc_1", "doc_2"], + "chunk_id": ["chunk-2", "chunk-1"], + "chunk_text": ["how are you?", "This is a test."], + } + + if sys.version_info[0:2] == (3, 10): + query_embedding = [0.05] * 5 + online_python_vec_response = self.store.retrieve_online_documents_v2( + features=[ + "python_stored_writes_feature_view_explode_singleton:document_id", + "python_stored_writes_feature_view_explode_singleton:chunk_id", + "python_stored_writes_feature_view_explode_singleton:chunk_text", + ], + query=query_embedding, + top_k=2, + ).to_dict() + + assert online_python_vec_response is not None + assert online_python_vec_response == { + "document_id": ["doc_1", "doc_1"], + "chunk_id": ["chunk-1", "chunk-2"], + "chunk_text": ["hello friends", "how are you?"], + "distance": [0.11180340498685837, 0.3354102075099945], + }