From da3eba3da9fb287a92ca59ab8015464c244ab79e Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 25 Mar 2024 22:24:50 -0400 Subject: [PATCH 01/16] uploading current progress, not in a good state yet and have a lot of cleanup to do Signed-off-by: Francisco Javier Arceo --- .../embedded_go/online_features_service.py | 5 + sdk/python/feast/feature_store.py | 41 ++- .../infra/offline_stores/offline_store.py | 8 + sdk/python/feast/on_demand_feature_view.py | 247 +++++++++++++++++- sdk/python/feast/online_response.py | 43 +++ .../transformation/python_transformation.py | 55 ++++ sdk/python/feast/transformation_server.py | 5 + 7 files changed, 387 insertions(+), 17 deletions(-) create mode 100644 sdk/python/feast/transformation/python_transformation.py diff --git a/sdk/python/feast/embedded_go/online_features_service.py b/sdk/python/feast/embedded_go/online_features_service.py index bf82fab6a33..c6430b5f6d6 100644 --- a/sdk/python/feast/embedded_go/online_features_service.py +++ b/sdk/python/feast/embedded_go/online_features_service.py @@ -252,6 +252,11 @@ def transformation_callback( # the typeguard requirement. full_feature_names = bool(full_feature_names) + if odfv.mode != "pandas": + raise Exception( + f"OnDemandFeatureView mode '{odfv.mode} not supported by EmbeddedOnlineFeatureServer." + ) + output = odfv.get_transformed_features_df( input_record.to_pandas(), full_feature_names=full_feature_names ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 44236248fe1..c6b2b89c3a6 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2096,26 +2096,47 @@ def _augment_response_with_on_demand_transforms( ) initial_response = OnlineResponse(online_features_response) - initial_response_df = initial_response.to_df() + initial_response_df: Optional[pd.DataFrame] = None + initial_response_dict: Optional[Dict[str, List[Any]]] = None # Apply on demand transformations and augment the result rows odfv_result_names = set() for odfv_name, _feature_refs in odfv_feature_refs.items(): odfv = requested_odfv_map[odfv_name] - transformed_features_df = odfv.get_transformed_features_df( - initial_response_df, - full_feature_names, + if odfv.mode == "python": + if initial_response_dict is None: + initial_response_dict = initial_response.to_dict() + transformed_features = odfv.get_transformed_features( + initial_response_dict, + full_feature_names, + ) + elif odfv.mode == "pandas": + if initial_response_df is None: + initial_response_df = initial_response.to_df() + transformed_features_df = odfv.get_transformed_features( + initial_response_df, + full_feature_names, + ) + else: + raise Exception( + f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas' or 'python'." + ) + + transformed_columns = ( + transformed_features.columns + if isinstance(transformed_features, pd.DataFrame) + else transformed_features ) - selected_subset = [ - f for f in transformed_features_df.columns if f in _feature_refs - ] + selected_subset = [f for f in transformed_columns if f in _feature_refs] - proto_values = [ + proto_values = [] + for feature in selected_subset: + feature_vector = transformed_features[feature] + proto_values.append( + python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) python_values_to_proto_values( transformed_features_df[feature].values, ValueType.UNKNOWN ) - for feature in selected_subset - ] odfv_result_names |= set(selected_subset) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 30135feccb3..cb91a66d15a 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -81,6 +81,10 @@ def to_df( if self.on_demand_feature_views: # TODO(adchia): Fix requirement to specify dependent feature views in feature_refs for odfv in self.on_demand_feature_views: + if odfv.mode != "pandas": + raise Exception( + f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.' + ) features_df = features_df.join( odfv.get_transformed_features_df( features_df, @@ -124,6 +128,10 @@ def to_arrow( features_df = self._to_df_internal(timeout=timeout) if self.on_demand_feature_views: for odfv in self.on_demand_feature_views: + if odfv.mode != "pandas": + raise Exception( + f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.' + ) features_df = features_df.join( odfv.get_transformed_features_df( features_df, diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index ce416fff2aa..972ee6c556e 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -4,7 +4,7 @@ import warnings from datetime import datetime from types import FunctionType -from typing import Any, Dict, List, Optional, Type, Union +from typing import Any, Dict, List, Optional, Type, Union, cast import dill import pandas as pd @@ -32,9 +32,11 @@ UserDefinedFunctionV2 as UserDefinedFunctionProto, ) from feast.transformation.pandas_transformation import PandasTransformation +from feast.transformation.python_transformation import PythonTransformation from feast.transformation.substrait_transformation import SubstraitTransformation from feast.type_map import ( feast_value_type_to_pandas_type, + feast_value_type_to_python_type, python_type_to_feast_value_type, ) from feast.usage import log_exceptions @@ -69,6 +71,7 @@ class OnDemandFeatureView(BaseFeatureView): source_feature_view_projections: Dict[str, FeatureViewProjection] source_request_sources: Dict[str, RequestSource] feature_transformation: Union[PandasTransformation, SubstraitTransformation] + mode: str description: str tags: Dict[str, str] owner: str @@ -91,6 +94,7 @@ def __init__( # noqa: C901 feature_transformation: Optional[ Union[PandasTransformation, SubstraitTransformation] ] = None, + mode: str = "pandas", description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -109,6 +113,7 @@ def __init__( # noqa: C901 dataframes as inputs. udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI) feature_transformation: The user defined transformation. + mode: Mode of execution (e.g., Pandas or Python native) description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email @@ -122,16 +127,24 @@ def __init__( # noqa: C901 owner=owner, ) + if mode not in {"python", "pandas"}: + raise Exception( + f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs." + ) if not feature_transformation: if udf: warnings.warn( - "udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead.", + "udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.", DeprecationWarning, ) - feature_transformation = PandasTransformation(udf, udf_string) + if isinstance(inspect.signature(udf).return_annotation, pd.DataFrame): + feature_transformation = PandasTransformation(udf, udf_string) + else: + feature_transformation = PythonTransformation(udf, udf_string) + else: raise Exception( - "OnDemandFeatureView needs to be initialized with either transformation or udf arguments" + "OnDemandFeatureView needs to be initialized with either feauture_transformation or udf arguments" ) self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} @@ -159,6 +172,7 @@ def __copy__(self): sources=list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()), feature_transformation=self.feature_transformation, + mode=self.mode, description=self.description, tags=self.tags, owner=self.owner, @@ -179,6 +193,7 @@ def __eq__(self, other): self.source_feature_view_projections != other.source_feature_view_projections or self.source_request_sources != other.source_request_sources + or self.mode != other.mode or self.feature_transformation != other.feature_transformation ): return False @@ -215,7 +230,10 @@ def to_proto(self) -> OnDemandFeatureViewProto: feature_transformation = FeatureTransformationProto( user_defined_function=self.feature_transformation.to_proto() - if isinstance(self.feature_transformation, PandasTransformation) + if isinstance( + self.feature_transformation, + (PandasTransformation, PythonTransformation), + ) else None, substrait_transformation=self.feature_transformation.to_proto() if isinstance(self.feature_transformation, SubstraitTransformation) @@ -226,6 +244,7 @@ def to_proto(self) -> OnDemandFeatureViewProto: features=[feature.to_proto() for feature in self.features], sources=sources, feature_transformation=feature_transformation, + mode=self.mode, description=self.description, tags=self.tags, owner=self.owner, @@ -234,12 +253,17 @@ def to_proto(self) -> OnDemandFeatureViewProto: return OnDemandFeatureViewProto(spec=spec, meta=meta) @classmethod - def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): + def from_proto( + cls, + on_demand_feature_view_proto: OnDemandFeatureViewProto, + skip_udf: bool = False, + ): """ Creates an on demand feature view from a protobuf representation. Args: on_demand_feature_view_proto: A protobuf representation of an on-demand feature view. + skip_udf: A boolean indicating whether to skip loading the udf Returns: A OnDemandFeatureView object based on the on-demand feature view protobuf. @@ -300,6 +324,14 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): else: raise Exception("At least one transformation type needs to be provided") + udf = ( + _empty_odfv_udf_fn + if skip_udf + else dill.loads( + on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body + ) + ) + on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, schema=[ @@ -311,6 +343,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto): ], sources=sources, feature_transformation=transformation, + mode=on_demand_feature_view_proto.spec.mode, description=on_demand_feature_view_proto.spec.description, tags=dict(on_demand_feature_view_proto.spec.tags), owner=on_demand_feature_view_proto.spec.owner, @@ -349,6 +382,9 @@ def get_request_data_schema(self) -> Dict[str, ValueType]: ) return schema + def _get_projected_feature_name(self, feature: str) -> str: + return f"{self.projection.name_to_use()}__{feature}" + def get_transformed_features_df( self, df_with_features: pd.DataFrame, @@ -378,7 +414,7 @@ def get_transformed_features_df( rename_columns: Dict[str, str] = {} for feature in self.features: short_name = feature.name - long_name = f"{self.projection.name_to_use()}__{feature.name}" + long_name = self._get_projected_feature_name(feature.name) if ( short_name in df_with_transformed_features.columns and full_feature_names @@ -392,7 +428,128 @@ def get_transformed_features_df( df_with_features.drop(columns=columns_to_cleanup, inplace=True) return df_with_transformed_features.rename(columns=rename_columns) + def _get_transformed_features_dict( + self, + feature_dict: Dict[str, List[Any]], + full_feature_names: bool = False, + ) -> Dict[str, Any]: + # generates a mapping between feature names and fv__feature names (and vice versa) + name_map: Dict[str, str] = {} + for source_fv_projection in self.source_feature_view_projections.values(): + for feature in source_fv_projection.features: + full_feature_ref = f"{source_fv_projection.name}__{feature.name}" + if full_feature_ref in feature_dict: + name_map[full_feature_ref] = feature.name + elif feature.name in feature_dict: + name_map[feature.name] = name_map[full_feature_ref] + + rows = [] + # this doesn't actually require 2 x |key_space| space; k and name_map[k] point to the same object in memory + for values in zip(*feature_dict.values()): + rows.append( + { + **{k: v for k, v in zip(feature_dict.keys(), values)}, + **{name_map[k]: v for k, v in zip(feature_dict.keys(), values)}, + } + ) + + # construct output dictionary and mapping from expected feature names to alternative feature names + output_dict: Dict[str, List[Any]] = {} + correct_feature_name_to_alias: Dict[str, str] = {} + for feature in self.features: + long_name = self._get_projected_feature_name(feature.name) + correct_name = long_name if full_feature_names else feature.name + correct_feature_name_to_alias[correct_name] = ( + feature.name if full_feature_names else long_name + ) + output_dict[correct_name] = [None] * len(rows) + + # populate output dictionary per row + for i, row in enumerate(rows): + row_output = self.feature_transformation.transform(row) + for feature in output_dict: + output_dict[feature][i] = row_output.get( + feature, row_output[correct_feature_name_to_alias[feature]] + ) + return output_dict + def infer_features(self) -> None: + if self.mode == "pandas": + self._infer_features_df() + elif self.mode == "python": + self._infer_features_dict() + else: + raise Exception( + f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".' + ) + + def _infer_features_dict(self): + """ + Infers the set of features associated to this feature view from the input source. + Raises: + RegistryInferenceFailure: The set of features could not be inferred. + """ + rand_value: Dict[str, Any] = { + "float": 1.0, + "int": 1, + "str": "hello world", + "bytes": str.encode("hello world"), + "bool": True, + "datetime64[ns]": datetime.utcnow(), + } + + feature_dict = {} + # Populate feature dictionary with plausible random inputs + for feature_view_projection in self.source_feature_view_projections.values(): + for feature in feature_view_projection.features: + dtype = feast_value_type_to_python_type(feature.dtype.to_value_type()) + sample_val = rand_value[dtype] if dtype in rand_value else None + feature_key = f"{feature_view_projection.name}__{feature.name}" + feature_dict[feature_key] = sample_val + for request_data in self.source_request_sources.values(): + for field in request_data.schema: + dtype = feast_value_type_to_python_type(field.dtype.to_value_type()) + sample_val = rand_value[dtype] if dtype in rand_value else None + feature_dict[field.name] = sample_val + + # Call the UDF with the feature dictionary to get an output dictionary + output_dict: Dict[str, Any] = self.feature_transformation.transform( + feature_dict + ) + + inferred_features = [] + # Determine feature data types using the output dictionary + for f, val in output_dict.items(): + inferred_features.append( + # TODO: assumes that the UDF produces a dict of (f_name: f_value) pairs + # should instead use `value=val[0]` if the UDF produces (f_name: List(f_value)) + Field( + name=f, + dtype=from_value_type( + python_type_to_feast_value_type(f, value=val) + ), + ) + ) + + if self.features: + missing_features = [] + for specified_feature in self.features: + if specified_feature.name not in feature_dict: + missing_features.append(specified_feature) + if missing_features: + raise SpecifiedFeaturesNotPresentError( + missing_features, inferred_features, self.name + ) + else: + self.features = inferred_features + + if not self.features: + raise RegistryInferenceFailure( + "OnDemandFeatureView", + f"Could not infer Features for the feature view '{self.name}'.", + ) + + def _infer_features_df(self) -> None: """ Infers the set of features associated to this feature view from the input source. @@ -478,6 +635,7 @@ def on_demand_feature_view( FeatureViewProjection, ] ], + mode: str = "pandas", description: str = "", tags: Optional[Dict[str, str]] = None, owner: str = "", @@ -491,6 +649,7 @@ def on_demand_feature_view( sources: A map from input source names to the actual input sources, which may be feature views, or request data sources. These sources serve as inputs to the udf, which will refer to them by name. + mode: The mode of execution (e.g,. Pandas or Python Native) description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the on demand feature view, typically the email @@ -551,6 +710,7 @@ def decorator(user_function): sources=sources, schema=schema, feature_transformation=transformation, + mode=mode, description=description, tags=tags, owner=owner, @@ -562,6 +722,74 @@ def decorator(user_function): return decorator + def get_transformed_features_dict( + self, + feature_dict: Dict[str, List[Any]], + full_feature_names: bool = False, + ) -> Dict[str, Any]: + # generates a mapping between feature names and fv__feature names (and vice versa) + name_map: Dict[str, str] = {} + for source_fv_projection in self.source_feature_view_projections.values(): + for feature in source_fv_projection.features: + full_feature_ref = f"{source_fv_projection.name}__{feature.name}" + if full_feature_ref in feature_dict: + name_map[full_feature_ref] = feature.name + elif feature.name in feature_dict: + name_map[feature.name] = name_map[full_feature_ref] + + rows = [] + # this doesn't actually require 2 x |key_space| space; k and name_map[k] point to the same object in memory + for values in zip(*feature_dict.values()): + rows.append( + { + **{k: v for k, v in zip(feature_dict.keys(), values)}, + **{name_map[k]: v for k, v in zip(feature_dict.keys(), values)}, + } + ) + + # construct output dictionary and mapping from expected feature names to alternative feature names + output_dict: Dict[str, List[Any]] = {} + correct_feature_name_to_alias: Dict[str, str] = {} + for feature in self.features: + long_name = self._get_projected_feature_name(feature.name) + correct_name = long_name if full_feature_names else feature.name + correct_feature_name_to_alias[correct_name] = ( + feature.name if full_feature_names else long_name + ) + output_dict[correct_name] = [None] * len(rows) + + # populate output dictionary per row + for i, row in enumerate(rows): + row_output = self.udf.__call__(row) + for feature in output_dict: + output_dict[feature][i] = row_output.get( + feature, row_output[correct_feature_name_to_alias[feature]] + ) + return output_dict + + def get_transformed_features( + self, + features: Union[Dict[str, List[Any]], pd.DataFrame], + full_feature_names: bool = False, + ) -> Union[Dict[str, List[Any]], pd.DataFrame]: + # TODO: classic inheritance pattern....maybe fix this + if self.mode == "python": + assert isinstance(features, dict) + return self.get_transformed_features_dict( + feature_dict=cast(features, Dict[str, List[Any]]), + full_feature_names=full_feature_names, + ) + elif self.mode == "pandas": + assert isinstance(features, pd.DataFrame) + return self._get_transformed_features( + df_with_features=cast(features, pd.DataFrame), + full_feature_names=full_feature_names, + ) + else: + raise Exception( + f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".' + ) + def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: bfv = BatchFeatureView( @@ -578,3 +806,8 @@ def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: bfv.features = copy.copy(fv.features) bfv.entities = copy.copy(fv.entities) return bfv + + +def _empty_odfv_udf_fn(x: Any) -> Any: + # just an identity mapping, otherwise we risk tripping some downstream tests + return x diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 48524359bf3..915e6bcbf35 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -77,3 +77,46 @@ def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame: """ return pd.DataFrame(self.to_dict(include_event_timestamps)) + + +class OnlineResponseRow(OnlineResponse): + def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]: + """ + Converts GetOnlineFeaturesResponse features into a dictionary form. + Args: + is_with_event_timestamps: bool Optionally include feature timestamps in the dictionary + """ + response: Dict[str, Any] = {} + + for feature_ref, feature_vector in zip( + self.proto.metadata.feature_names.val, self.proto.results + ): + + if ( + len(feature_vector.values) != 1 + or len(feature_vector.event_timestamps) != 1 + ): + raise ValueError( + f"Response contains more than one row: \n" + f"feature_ref: {feature_ref}" + f"feature_vector: {feature_vector.values}," + f"event_timestamps: {feature_vector.event_timestamps}" + ) + + response[feature_ref] = feast_value_type_to_python_type( + feature_vector.values[0] + ) + + if include_event_timestamps: + timestamp_ref = feature_ref + TIMESTAMP_POSTFIX + response[timestamp_ref] = feature_vector.event_timestamps[0].seconds + return response + + def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame: + """ + Converts GetOnlineFeaturesResponse features into Panda dataframe form. + Args: + is_with_event_timestamps: bool Optionally include feature timestamps in the dataframe + """ + + return pd.DataFrame([self.to_dict(include_event_timestamps)]) diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py new file mode 100644 index 00000000000..54760f356ca --- /dev/null +++ b/sdk/python/feast/transformation/python_transformation.py @@ -0,0 +1,55 @@ +from types import FunctionType +from typing import Dict + +import dill + +from feast.protos.feast.core.Transformation_pb2 import ( + UserDefinedFunctionV2 as UserDefinedFunctionProto, +) + + +class PythonTransformation: + def __init__(self, udf: FunctionType, udf_string: str = ""): + """ + Creates an OnDemandPythonTransformation object. + Args: + udf: The user defined transformation function, which must take pandas + dataframes as inputs. + udf_string: The source code version of the udf (for diffing and displaying in Web UI) + """ + self.udf = udf + self.udf_string = udf_string + + def transform(self, df: Dict) -> Dict: + return self.udf.__call__(df) + + def __eq__(self, other): + if not isinstance(other, PythonTransformation): + raise TypeError( + "Comparisons should only involve OnDemandPythonTransformation class objects." + ) + + if not super().__eq__(other): + return False + + if ( + self.udf_string != other.udf_string + or self.udf.__code__.co_code != other.udf.__code__.co_code + ): + return False + + return True + + def to_proto(self) -> UserDefinedFunctionProto: + return UserDefinedFunctionProto( + name=self.udf.__name__, + body=dill.dumps(self.udf, recurse=True), + body_text=self.udf_string, + ) + + @classmethod + def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto): + return PythonTransformation( + udf=dill.loads(user_defined_function_proto.body), + udf_string=user_defined_function_proto.body_text, + ) diff --git a/sdk/python/feast/transformation_server.py b/sdk/python/feast/transformation_server.py index 83f4af749e3..34fe3eac766 100644 --- a/sdk/python/feast/transformation_server.py +++ b/sdk/python/feast/transformation_server.py @@ -47,6 +47,11 @@ def TransformFeatures(self, request, context): df = pa.ipc.open_file(request.transformation_input.arrow_value).read_pandas() + if odfv.mode != "pandas": + raise Exception( + f'OnDemandFeatureView mode "{odfv.mode}" not supported by TransformationServer.' + ) + result_df = odfv.get_transformed_features_df(df, True) result_arrow = pa.Table.from_pandas(result_df) sink = pa.BufferOutputStream() From f8fd94915556bd1cf6bd51987bf56e13c7337ed3 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 25 Mar 2024 22:25:06 -0400 Subject: [PATCH 02/16] bug fix Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c6b2b89c3a6..361d09d8aac 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2134,8 +2134,6 @@ def _augment_response_with_on_demand_transforms( feature_vector = transformed_features[feature] proto_values.append( python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) - python_values_to_proto_values( - transformed_features_df[feature].values, ValueType.UNKNOWN ) odfv_result_names |= set(selected_subset) From bbc55dec5307b421bb9800926461fee242903ec1 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Mon, 25 Mar 2024 22:38:16 -0400 Subject: [PATCH 03/16] uploading current progress...calling it a night Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 239 +++++++++------------ 1 file changed, 97 insertions(+), 142 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 972ee6c556e..f5b123cd2f7 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -70,7 +70,7 @@ class OnDemandFeatureView(BaseFeatureView): features: List[Field] source_feature_view_projections: Dict[str, FeatureViewProjection] source_request_sources: Dict[str, RequestSource] - feature_transformation: Union[PandasTransformation, SubstraitTransformation] + feature_transformation: Union[PandasTransformation, PythonTransformation, SubstraitTransformation] mode: str description: str tags: Dict[str, str] @@ -78,26 +78,24 @@ class OnDemandFeatureView(BaseFeatureView): @log_exceptions # noqa: C901 def __init__( # noqa: C901 - self, - *, - name: str, - schema: List[Field], - sources: List[ - Union[ - FeatureView, - RequestSource, - FeatureViewProjection, - ] - ], - udf: Optional[FunctionType] = None, - udf_string: str = "", - feature_transformation: Optional[ - Union[PandasTransformation, SubstraitTransformation] - ] = None, - mode: str = "pandas", - description: str = "", - tags: Optional[Dict[str, str]] = None, - owner: str = "", + self, + *, + name: str, + schema: List[Field], + sources: List[ + Union[ + FeatureView, + RequestSource, + FeatureViewProjection, + ] + ], + udf: Optional[FunctionType] = None, + udf_string: str = "", + feature_transformation: Union[PandasTransformation, PythonTransformation, SubstraitTransformation], + mode: str = "pandas", + description: str = "", + tags: Optional[Dict[str, str]] = None, + owner: str = "", ): """ Creates an OnDemandFeatureView object. @@ -139,8 +137,10 @@ def __init__( # noqa: C901 ) if isinstance(inspect.signature(udf).return_annotation, pd.DataFrame): feature_transformation = PandasTransformation(udf, udf_string) - else: + elif isinstance(inspect.signature(udf).return_annotation, Dict): feature_transformation = PythonTransformation(udf, udf_string) + else: + pass else: raise Exception( @@ -170,7 +170,7 @@ def __copy__(self): name=self.name, schema=self.features, sources=list(self.source_feature_view_projections.values()) - + list(self.source_request_sources.values()), + + list(self.source_request_sources.values()), feature_transformation=self.feature_transformation, mode=self.mode, description=self.description, @@ -190,11 +190,11 @@ def __eq__(self, other): return False if ( - self.source_feature_view_projections - != other.source_feature_view_projections - or self.source_request_sources != other.source_request_sources - or self.mode != other.mode - or self.feature_transformation != other.feature_transformation + self.source_feature_view_projections + != other.source_feature_view_projections + or self.source_request_sources != other.source_request_sources + or self.mode != other.mode + or self.feature_transformation != other.feature_transformation ): return False @@ -221,8 +221,8 @@ def to_proto(self) -> OnDemandFeatureViewProto: feature_view_projection=fv_projection.to_proto() ) for ( - source_name, - request_sources, + source_name, + request_sources, ) in self.source_request_sources.items(): sources[source_name] = OnDemandSource( request_data_source=request_sources.to_proto() @@ -254,9 +254,9 @@ def to_proto(self) -> OnDemandFeatureViewProto: @classmethod def from_proto( - cls, - on_demand_feature_view_proto: OnDemandFeatureViewProto, - skip_udf: bool = False, + cls, + on_demand_feature_view_proto: OnDemandFeatureViewProto, + skip_udf: bool = False, ): """ Creates an on demand feature view from a protobuf representation. @@ -270,8 +270,8 @@ def from_proto( """ sources = [] for ( - _, - on_demand_source, + _, + on_demand_source, ) in on_demand_feature_view_proto.spec.sources.items(): if on_demand_source.WhichOneof("source") == "feature_view": sources.append( @@ -289,29 +289,29 @@ def from_proto( ) if ( - on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( - "transformation" - ) - == "user_defined_function" - and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text - != "" + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( + "transformation" + ) + == "user_defined_function" + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + != "" ): transformation = PandasTransformation.from_proto( on_demand_feature_view_proto.spec.feature_transformation.user_defined_function ) elif ( - on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( - "transformation" - ) - == "substrait_transformation" + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( + "transformation" + ) + == "substrait_transformation" ): transformation = SubstraitTransformation.from_proto( on_demand_feature_view_proto.spec.feature_transformation.substrait_transformation ) elif ( - hasattr(on_demand_feature_view_proto.spec, "user_defined_function") - and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text - == "" + hasattr(on_demand_feature_view_proto.spec, "user_defined_function") + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + == "" ): backwards_compatible_udf = UserDefinedFunctionProto( name=on_demand_feature_view_proto.spec.user_defined_function.name, @@ -386,9 +386,9 @@ def _get_projected_feature_name(self, feature: str) -> str: return f"{self.projection.name_to_use()}__{feature}" def get_transformed_features_df( - self, - df_with_features: pd.DataFrame, - full_feature_names: bool = False, + self, + df_with_features: pd.DataFrame, + full_feature_names: bool = False, ) -> pd.DataFrame: # Apply on demand transformations columns_to_cleanup = [] @@ -416,8 +416,8 @@ def get_transformed_features_df( short_name = feature.name long_name = self._get_projected_feature_name(feature.name) if ( - short_name in df_with_transformed_features.columns - and full_feature_names + short_name in df_with_transformed_features.columns + and full_feature_names ): rename_columns[short_name] = long_name elif not full_feature_names: @@ -428,10 +428,10 @@ def get_transformed_features_df( df_with_features.drop(columns=columns_to_cleanup, inplace=True) return df_with_transformed_features.rename(columns=rename_columns) - def _get_transformed_features_dict( - self, - feature_dict: Dict[str, List[Any]], - full_feature_names: bool = False, + def get_transformed_features_dict( + self, + feature_dict: Dict[str, List[Any]], + full_feature_names: bool = False, ) -> Dict[str, Any]: # generates a mapping between feature names and fv__feature names (and vice versa) name_map: Dict[str, str] = {} @@ -473,6 +473,29 @@ def _get_transformed_features_dict( ) return output_dict + def get_transformed_features( + self, + features: Union[Dict[str, List[Any]], pd.DataFrame], + full_feature_names: bool = False, + ) -> Union[Dict[str, List[Any]], pd.DataFrame]: + # TODO: classic inheritance pattern....maybe fix this + if self.mode == "python": + assert isinstance(features, dict) + return self.get_transformed_features_dict( + feature_dict=cast(features, Dict[str, List[Any]]), + full_feature_names=full_feature_names, + ) + elif self.mode == "pandas": + assert isinstance(features, pd.DataFrame) + return self.get_transformed_features_df( + df_with_features=cast(features, pd.DataFrame), + full_feature_names=full_feature_names, + ) + else: + raise Exception( + f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".' + ) + def infer_features(self) -> None: if self.mode == "pandas": self._infer_features_df() @@ -611,7 +634,7 @@ def _infer_features_df(self) -> None: @staticmethod def get_requested_odfvs( - feature_refs, project, registry + feature_refs, project, registry ) -> List["OnDemandFeatureView"]: all_on_demand_feature_views = registry.list_on_demand_feature_views( project, allow_cache=True @@ -626,19 +649,19 @@ def get_requested_odfvs( def on_demand_feature_view( - *, - schema: List[Field], - sources: List[ - Union[ - FeatureView, - RequestSource, - FeatureViewProjection, - ] - ], - mode: str = "pandas", - description: str = "", - tags: Optional[Dict[str, str]] = None, - owner: str = "", + *, + schema: List[Field], + sources: List[ + Union[ + FeatureView, + RequestSource, + FeatureViewProjection, + ] + ], + mode: str = "pandas", + description: str = "", + tags: Optional[Dict[str, str]] = None, + owner: str = "", ): """ Creates an OnDemandFeatureView object with the given user function as udf. @@ -665,9 +688,9 @@ def mainify(obj) -> None: def decorator(user_function): return_annotation = inspect.signature(user_function).return_annotation if ( - return_annotation - and return_annotation.__module__ == "ibis.expr.types.relations" - and return_annotation.__name__ == "Table" + return_annotation + and return_annotation.__module__ == "ibis.expr.types.relations" + and return_annotation.__name__ == "Table" ): import ibis import ibis.expr.datatypes as dt @@ -722,74 +745,6 @@ def decorator(user_function): return decorator - def get_transformed_features_dict( - self, - feature_dict: Dict[str, List[Any]], - full_feature_names: bool = False, - ) -> Dict[str, Any]: - # generates a mapping between feature names and fv__feature names (and vice versa) - name_map: Dict[str, str] = {} - for source_fv_projection in self.source_feature_view_projections.values(): - for feature in source_fv_projection.features: - full_feature_ref = f"{source_fv_projection.name}__{feature.name}" - if full_feature_ref in feature_dict: - name_map[full_feature_ref] = feature.name - elif feature.name in feature_dict: - name_map[feature.name] = name_map[full_feature_ref] - - rows = [] - # this doesn't actually require 2 x |key_space| space; k and name_map[k] point to the same object in memory - for values in zip(*feature_dict.values()): - rows.append( - { - **{k: v for k, v in zip(feature_dict.keys(), values)}, - **{name_map[k]: v for k, v in zip(feature_dict.keys(), values)}, - } - ) - - # construct output dictionary and mapping from expected feature names to alternative feature names - output_dict: Dict[str, List[Any]] = {} - correct_feature_name_to_alias: Dict[str, str] = {} - for feature in self.features: - long_name = self._get_projected_feature_name(feature.name) - correct_name = long_name if full_feature_names else feature.name - correct_feature_name_to_alias[correct_name] = ( - feature.name if full_feature_names else long_name - ) - output_dict[correct_name] = [None] * len(rows) - - # populate output dictionary per row - for i, row in enumerate(rows): - row_output = self.udf.__call__(row) - for feature in output_dict: - output_dict[feature][i] = row_output.get( - feature, row_output[correct_feature_name_to_alias[feature]] - ) - return output_dict - - def get_transformed_features( - self, - features: Union[Dict[str, List[Any]], pd.DataFrame], - full_feature_names: bool = False, - ) -> Union[Dict[str, List[Any]], pd.DataFrame]: - # TODO: classic inheritance pattern....maybe fix this - if self.mode == "python": - assert isinstance(features, dict) - return self.get_transformed_features_dict( - feature_dict=cast(features, Dict[str, List[Any]]), - full_feature_names=full_feature_names, - ) - elif self.mode == "pandas": - assert isinstance(features, pd.DataFrame) - return self._get_transformed_features( - df_with_features=cast(features, pd.DataFrame), - full_feature_names=full_feature_names, - ) - else: - raise Exception( - f'Invalid OnDemandFeatureMode: {self.mode}. Expected one of "pandas" or "python".' - ) - def feature_view_to_batch_feature_view(fv: FeatureView) -> BatchFeatureView: bfv = BatchFeatureView( From 0ac27be00c1d1f54ab1d097c3c921e02d734dd9a Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Tue, 26 Mar 2024 23:11:22 -0400 Subject: [PATCH 04/16] uploading current progress Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 16 +-- sdk/python/feast/on_demand_feature_view.py | 97 +++---------------- .../transformation/python_transformation.py | 6 +- 3 files changed, 26 insertions(+), 93 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 361d09d8aac..c2f6e1458b1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2106,14 +2106,14 @@ def _augment_response_with_on_demand_transforms( if odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() - transformed_features = odfv.get_transformed_features( + transformed_features_dict: Dict[str, List[Any]] = odfv.get_transformed_features( initial_response_dict, full_feature_names, ) elif odfv.mode == "pandas": if initial_response_df is None: initial_response_df = initial_response.to_df() - transformed_features_df = odfv.get_transformed_features( + transformed_features_df: pd.DataFrame = odfv.get_transformed_features( initial_response_df, full_feature_names, ) @@ -2122,6 +2122,7 @@ def _augment_response_with_on_demand_transforms( f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas' or 'python'." ) + transformed_features = transformed_features_dict if odfv.mode == "python" else transformed_features_df transformed_columns = ( transformed_features.columns if isinstance(transformed_features, pd.DataFrame) @@ -2130,11 +2131,12 @@ def _augment_response_with_on_demand_transforms( selected_subset = [f for f in transformed_columns if f in _feature_refs] proto_values = [] - for feature in selected_subset: - feature_vector = transformed_features[feature] - proto_values.append( - python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) - ) + for selected_feature in selected_subset: + if odfv.mode in ["python", "pandas"]: + feature_vector = transformed_features[selected_feature] + proto_values.append( + python_values_to_proto_values(feature_vector, ValueType.UNKNOWN) + ) odfv_result_names |= set(selected_subset) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index f5b123cd2f7..25847393698 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -36,7 +36,6 @@ from feast.transformation.substrait_transformation import SubstraitTransformation from feast.type_map import ( feast_value_type_to_pandas_type, - feast_value_type_to_python_type, python_type_to_feast_value_type, ) from feast.usage import log_exceptions @@ -135,9 +134,9 @@ def __init__( # noqa: C901 "udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.", DeprecationWarning, ) - if isinstance(inspect.signature(udf).return_annotation, pd.DataFrame): + if isinstance(inspect.signature(udf).return_annotation, pd.DataFrame) and mode == "pandas": feature_transformation = PandasTransformation(udf, udf_string) - elif isinstance(inspect.signature(udf).return_annotation, Dict): + elif isinstance(inspect.signature(udf).return_annotation, Dict) and mode == "python": feature_transformation = PythonTransformation(udf, udf_string) else: pass @@ -169,8 +168,7 @@ def __copy__(self): fv = OnDemandFeatureView( name=self.name, schema=self.features, - sources=list(self.source_feature_view_projections.values()) - + list(self.source_request_sources.values()), + sources=list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()), feature_transformation=self.feature_transformation, mode=self.mode, description=self.description, @@ -324,14 +322,6 @@ def from_proto( else: raise Exception("At least one transformation type needs to be provided") - udf = ( - _empty_odfv_udf_fn - if skip_udf - else dill.loads( - on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body - ) - ) - on_demand_feature_view_obj = cls( name=on_demand_feature_view_proto.spec.name, schema=[ @@ -391,6 +381,8 @@ def get_transformed_features_df( full_feature_names: bool = False, ) -> pd.DataFrame: # Apply on demand transformations + if not isinstance(df_with_features, pd.DataFrame): + raise TypeError("get_transformed_features_df only accepts pd.DataFrame") columns_to_cleanup = [] for source_fv_projection in self.source_feature_view_projections.values(): for feature in source_fv_projection.features: @@ -405,8 +397,7 @@ def get_transformed_features_df( columns_to_cleanup.append(full_feature_ref) # Compute transformed values and apply to each result row - - df_with_transformed_features = self.feature_transformation.transform( + df_with_transformed_features: pd.DataFrame = self.feature_transformation.transform( df_with_features ) @@ -467,9 +458,9 @@ def get_transformed_features_dict( # populate output dictionary per row for i, row in enumerate(rows): row_output = self.feature_transformation.transform(row) - for feature in output_dict: - output_dict[feature][i] = row_output.get( - feature, row_output[correct_feature_name_to_alias[feature]] + for feature_name in output_dict: + output_dict[feature_name][i] = row_output.get( + feature_name, row_output[correct_feature_name_to_alias[feature_name]] ) return output_dict @@ -479,16 +470,14 @@ def get_transformed_features( full_feature_names: bool = False, ) -> Union[Dict[str, List[Any]], pd.DataFrame]: # TODO: classic inheritance pattern....maybe fix this - if self.mode == "python": - assert isinstance(features, dict) + if self.mode == "python" and isinstance(features, dict): return self.get_transformed_features_dict( - feature_dict=cast(features, Dict[str, List[Any]]), + feature_dict=cast(features, Dict[str, List[Any]]), # type: ignore full_feature_names=full_feature_names, ) - elif self.mode == "pandas": - assert isinstance(features, pd.DataFrame) + elif self.mode == "pandas" and isinstance(features, pd.DataFrame): return self.get_transformed_features_df( - df_with_features=cast(features, pd.DataFrame), + df_with_features=cast(features, pd.DataFrame), # type: ignore full_feature_names=full_feature_names, ) else: @@ -512,65 +501,7 @@ def _infer_features_dict(self): Raises: RegistryInferenceFailure: The set of features could not be inferred. """ - rand_value: Dict[str, Any] = { - "float": 1.0, - "int": 1, - "str": "hello world", - "bytes": str.encode("hello world"), - "bool": True, - "datetime64[ns]": datetime.utcnow(), - } - - feature_dict = {} - # Populate feature dictionary with plausible random inputs - for feature_view_projection in self.source_feature_view_projections.values(): - for feature in feature_view_projection.features: - dtype = feast_value_type_to_python_type(feature.dtype.to_value_type()) - sample_val = rand_value[dtype] if dtype in rand_value else None - feature_key = f"{feature_view_projection.name}__{feature.name}" - feature_dict[feature_key] = sample_val - for request_data in self.source_request_sources.values(): - for field in request_data.schema: - dtype = feast_value_type_to_python_type(field.dtype.to_value_type()) - sample_val = rand_value[dtype] if dtype in rand_value else None - feature_dict[field.name] = sample_val - - # Call the UDF with the feature dictionary to get an output dictionary - output_dict: Dict[str, Any] = self.feature_transformation.transform( - feature_dict - ) - - inferred_features = [] - # Determine feature data types using the output dictionary - for f, val in output_dict.items(): - inferred_features.append( - # TODO: assumes that the UDF produces a dict of (f_name: f_value) pairs - # should instead use `value=val[0]` if the UDF produces (f_name: List(f_value)) - Field( - name=f, - dtype=from_value_type( - python_type_to_feast_value_type(f, value=val) - ), - ) - ) - - if self.features: - missing_features = [] - for specified_feature in self.features: - if specified_feature.name not in feature_dict: - missing_features.append(specified_feature) - if missing_features: - raise SpecifiedFeaturesNotPresentError( - missing_features, inferred_features, self.name - ) - else: - self.features = inferred_features - - if not self.features: - raise RegistryInferenceFailure( - "OnDemandFeatureView", - f"Could not infer Features for the feature view '{self.name}'.", - ) + pass def _infer_features_df(self) -> None: """ diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 54760f356ca..99d4b3351fb 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -20,13 +20,13 @@ def __init__(self, udf: FunctionType, udf_string: str = ""): self.udf = udf self.udf_string = udf_string - def transform(self, df: Dict) -> Dict: - return self.udf.__call__(df) + def transform(self, input_dict: Dict) -> Dict: + return self.udf.__call__(input_dict) def __eq__(self, other): if not isinstance(other, PythonTransformation): raise TypeError( - "Comparisons should only involve OnDemandPythonTransformation class objects." + "Comparisons should only involve PythonTransformation class objects." ) if not super().__eq__(other): From b028f8f42ee1960b5f3ec0ec0f51ba71c9ce99be Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 27 Mar 2024 07:47:22 -0400 Subject: [PATCH 05/16] updated types and refactored to adjust Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 10 +- sdk/python/feast/on_demand_feature_view.py | 170 ++++++++++-------- .../tests/unit/test_on_demand_feature_view.py | 5 +- 3 files changed, 103 insertions(+), 82 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c2f6e1458b1..245ef0a415d 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2106,7 +2106,9 @@ def _augment_response_with_on_demand_transforms( if odfv.mode == "python": if initial_response_dict is None: initial_response_dict = initial_response.to_dict() - transformed_features_dict: Dict[str, List[Any]] = odfv.get_transformed_features( + transformed_features_dict: Dict[ + str, List[Any] + ] = odfv.get_transformed_features( initial_response_dict, full_feature_names, ) @@ -2122,7 +2124,11 @@ def _augment_response_with_on_demand_transforms( f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas' or 'python'." ) - transformed_features = transformed_features_dict if odfv.mode == "python" else transformed_features_df + transformed_features = ( + transformed_features_dict + if odfv.mode == "python" + else transformed_features_df + ) transformed_columns = ( transformed_features.columns if isinstance(transformed_features, pd.DataFrame) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 25847393698..f480314f06d 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -69,7 +69,9 @@ class OnDemandFeatureView(BaseFeatureView): features: List[Field] source_feature_view_projections: Dict[str, FeatureViewProjection] source_request_sources: Dict[str, RequestSource] - feature_transformation: Union[PandasTransformation, PythonTransformation, SubstraitTransformation] + feature_transformation: Union[ + PandasTransformation, PythonTransformation, SubstraitTransformation + ] mode: str description: str tags: Dict[str, str] @@ -77,24 +79,26 @@ class OnDemandFeatureView(BaseFeatureView): @log_exceptions # noqa: C901 def __init__( # noqa: C901 - self, - *, - name: str, - schema: List[Field], - sources: List[ - Union[ - FeatureView, - RequestSource, - FeatureViewProjection, - ] - ], - udf: Optional[FunctionType] = None, - udf_string: str = "", - feature_transformation: Union[PandasTransformation, PythonTransformation, SubstraitTransformation], - mode: str = "pandas", - description: str = "", - tags: Optional[Dict[str, str]] = None, - owner: str = "", + self, + *, + name: str, + schema: List[Field], + sources: List[ + Union[ + FeatureView, + RequestSource, + FeatureViewProjection, + ] + ], + udf: Optional[FunctionType] = None, + udf_string: str = "", + feature_transformation: Union[ + PandasTransformation, PythonTransformation, SubstraitTransformation + ], + mode: str = "pandas", + description: str = "", + tags: Optional[Dict[str, str]] = None, + owner: str = "", ): """ Creates an OnDemandFeatureView object. @@ -128,15 +132,23 @@ def __init__( # noqa: C901 raise Exception( f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs." ) + else: + self.mode = mode if not feature_transformation: if udf: warnings.warn( "udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.", DeprecationWarning, ) - if isinstance(inspect.signature(udf).return_annotation, pd.DataFrame) and mode == "pandas": + if ( + isinstance(inspect.signature(udf).return_annotation, pd.DataFrame) + and mode == "pandas" + ): feature_transformation = PandasTransformation(udf, udf_string) - elif isinstance(inspect.signature(udf).return_annotation, Dict) and mode == "python": + elif ( + isinstance(inspect.signature(udf).return_annotation, Dict) + and mode == "python" + ): feature_transformation = PythonTransformation(udf, udf_string) else: pass @@ -168,7 +180,8 @@ def __copy__(self): fv = OnDemandFeatureView( name=self.name, schema=self.features, - sources=list(self.source_feature_view_projections.values()) + list(self.source_request_sources.values()), + sources=list(self.source_feature_view_projections.values()) + + list(self.source_request_sources.values()), feature_transformation=self.feature_transformation, mode=self.mode, description=self.description, @@ -188,11 +201,11 @@ def __eq__(self, other): return False if ( - self.source_feature_view_projections - != other.source_feature_view_projections - or self.source_request_sources != other.source_request_sources - or self.mode != other.mode - or self.feature_transformation != other.feature_transformation + self.source_feature_view_projections + != other.source_feature_view_projections + or self.source_request_sources != other.source_request_sources + or self.mode != other.mode + or self.feature_transformation != other.feature_transformation ): return False @@ -219,8 +232,8 @@ def to_proto(self) -> OnDemandFeatureViewProto: feature_view_projection=fv_projection.to_proto() ) for ( - source_name, - request_sources, + source_name, + request_sources, ) in self.source_request_sources.items(): sources[source_name] = OnDemandSource( request_data_source=request_sources.to_proto() @@ -252,9 +265,9 @@ def to_proto(self) -> OnDemandFeatureViewProto: @classmethod def from_proto( - cls, - on_demand_feature_view_proto: OnDemandFeatureViewProto, - skip_udf: bool = False, + cls, + on_demand_feature_view_proto: OnDemandFeatureViewProto, + skip_udf: bool = False, ): """ Creates an on demand feature view from a protobuf representation. @@ -268,8 +281,8 @@ def from_proto( """ sources = [] for ( - _, - on_demand_source, + _, + on_demand_source, ) in on_demand_feature_view_proto.spec.sources.items(): if on_demand_source.WhichOneof("source") == "feature_view": sources.append( @@ -287,29 +300,29 @@ def from_proto( ) if ( - on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( - "transformation" - ) - == "user_defined_function" - and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text - != "" + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( + "transformation" + ) + == "user_defined_function" + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + != "" ): transformation = PandasTransformation.from_proto( on_demand_feature_view_proto.spec.feature_transformation.user_defined_function ) elif ( - on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( - "transformation" - ) - == "substrait_transformation" + on_demand_feature_view_proto.spec.feature_transformation.WhichOneof( + "transformation" + ) + == "substrait_transformation" ): transformation = SubstraitTransformation.from_proto( on_demand_feature_view_proto.spec.feature_transformation.substrait_transformation ) elif ( - hasattr(on_demand_feature_view_proto.spec, "user_defined_function") - and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text - == "" + hasattr(on_demand_feature_view_proto.spec, "user_defined_function") + and on_demand_feature_view_proto.spec.feature_transformation.user_defined_function.body_text + == "" ): backwards_compatible_udf = UserDefinedFunctionProto( name=on_demand_feature_view_proto.spec.user_defined_function.name, @@ -376,9 +389,9 @@ def _get_projected_feature_name(self, feature: str) -> str: return f"{self.projection.name_to_use()}__{feature}" def get_transformed_features_df( - self, - df_with_features: pd.DataFrame, - full_feature_names: bool = False, + self, + df_with_features: pd.DataFrame, + full_feature_names: bool = False, ) -> pd.DataFrame: # Apply on demand transformations if not isinstance(df_with_features, pd.DataFrame): @@ -397,8 +410,8 @@ def get_transformed_features_df( columns_to_cleanup.append(full_feature_ref) # Compute transformed values and apply to each result row - df_with_transformed_features: pd.DataFrame = self.feature_transformation.transform( - df_with_features + df_with_transformed_features: pd.DataFrame = ( + self.feature_transformation.transform(df_with_features) ) # Work out whether the correct columns names are used. @@ -407,8 +420,8 @@ def get_transformed_features_df( short_name = feature.name long_name = self._get_projected_feature_name(feature.name) if ( - short_name in df_with_transformed_features.columns - and full_feature_names + short_name in df_with_transformed_features.columns + and full_feature_names ): rename_columns[short_name] = long_name elif not full_feature_names: @@ -420,9 +433,9 @@ def get_transformed_features_df( return df_with_transformed_features.rename(columns=rename_columns) def get_transformed_features_dict( - self, - feature_dict: Dict[str, List[Any]], - full_feature_names: bool = False, + self, + feature_dict: Dict[str, List[Any]], + full_feature_names: bool = False, ) -> Dict[str, Any]: # generates a mapping between feature names and fv__feature names (and vice versa) name_map: Dict[str, str] = {} @@ -460,14 +473,15 @@ def get_transformed_features_dict( row_output = self.feature_transformation.transform(row) for feature_name in output_dict: output_dict[feature_name][i] = row_output.get( - feature_name, row_output[correct_feature_name_to_alias[feature_name]] + feature_name, + row_output[correct_feature_name_to_alias[feature_name]], ) return output_dict def get_transformed_features( - self, - features: Union[Dict[str, List[Any]], pd.DataFrame], - full_feature_names: bool = False, + self, + features: Union[Dict[str, List[Any]], pd.DataFrame], + full_feature_names: bool = False, ) -> Union[Dict[str, List[Any]], pd.DataFrame]: # TODO: classic inheritance pattern....maybe fix this if self.mode == "python" and isinstance(features, dict): @@ -565,7 +579,7 @@ def _infer_features_df(self) -> None: @staticmethod def get_requested_odfvs( - feature_refs, project, registry + feature_refs, project, registry ) -> List["OnDemandFeatureView"]: all_on_demand_feature_views = registry.list_on_demand_feature_views( project, allow_cache=True @@ -580,19 +594,19 @@ def get_requested_odfvs( def on_demand_feature_view( - *, - schema: List[Field], - sources: List[ - Union[ - FeatureView, - RequestSource, - FeatureViewProjection, - ] - ], - mode: str = "pandas", - description: str = "", - tags: Optional[Dict[str, str]] = None, - owner: str = "", + *, + schema: List[Field], + sources: List[ + Union[ + FeatureView, + RequestSource, + FeatureViewProjection, + ] + ], + mode: str = "pandas", + description: str = "", + tags: Optional[Dict[str, str]] = None, + owner: str = "", ): """ Creates an OnDemandFeatureView object with the given user function as udf. @@ -619,9 +633,9 @@ def mainify(obj) -> None: def decorator(user_function): return_annotation = inspect.signature(user_function).return_annotation if ( - return_annotation - and return_annotation.__module__ == "ibis.expr.types.relations" - and return_annotation.__name__ == "Table" + return_annotation + and return_annotation.__module__ == "ibis.expr.types.relations" + and return_annotation.__name__ == "Table" ): import ibis import ibis.expr.datatypes as dt diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index d561bd8e84d..c857596cddf 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -101,8 +101,9 @@ def test_hash(): Field(name="output1", dtype=Float32), Field(name="output2", dtype=Float32), ], - udf=udf2, - udf_string="udf2 source code", + feature_transformation=PandasTransformation( + udf=udf2, udf_string="udf2 source code" + ), description="test", ) From efe16d5695f5308dea379f2296ab0f8eafd80d29 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 27 Mar 2024 12:48:04 -0400 Subject: [PATCH 06/16] fixed types, substraits, and added tests Signed-off-by: Francisco Javier Arceo --- .../infra/offline_stores/offline_store.py | 2 +- sdk/python/feast/on_demand_feature_view.py | 102 +++++++++++++++--- .../transformation/pandas_transformation.py | 15 ++- .../transformation/python_transformation.py | 13 ++- .../unit/infra/test_inference_unit_tests.py | 71 +++++++++++- .../tests/unit/test_on_demand_feature_view.py | 69 +++++++++++- ...test_on_demand_substrait_transformation.py | 1 + 7 files changed, 249 insertions(+), 24 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index cb91a66d15a..aaed78dd459 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -81,7 +81,7 @@ def to_df( if self.on_demand_feature_views: # TODO(adchia): Fix requirement to specify dependent feature views in feature_refs for odfv in self.on_demand_feature_views: - if odfv.mode != "pandas": + if odfv.mode not in {"pandas", "substrait"}: raise Exception( f'OnDemandFeatureView mode "{odfv.mode}" not supported for offline processing.' ) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index f480314f06d..16a9ad25c95 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -128,9 +128,9 @@ def __init__( # noqa: C901 owner=owner, ) - if mode not in {"python", "pandas"}: + if mode not in {"python", "pandas", "substrait"}: raise Exception( - f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs." + f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs and substraits." ) else: self.mode = mode @@ -140,23 +140,37 @@ def __init__( # noqa: C901 "udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.", DeprecationWarning, ) - if ( - isinstance(inspect.signature(udf).return_annotation, pd.DataFrame) - and mode == "pandas" - ): + # Note inspecting the return signature won't work with isinstance so this is the best alternative + if mode == "pandas": feature_transformation = PandasTransformation(udf, udf_string) - elif ( - isinstance(inspect.signature(udf).return_annotation, Dict) - and mode == "python" - ): + elif mode == "python": feature_transformation = PythonTransformation(udf, udf_string) + elif mode == "substrait": + feature_transformation = SubstraitTransformation( + substrait_plan=udf_string + ) else: pass - else: raise Exception( - "OnDemandFeatureView needs to be initialized with either feauture_transformation or udf arguments" + "OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments" + ) + else: + # Note inspecting the return signature won't work with isinstance so this is the best alternative + if mode == "pandas": + feature_transformation = PandasTransformation( + feature_transformation.udf, feature_transformation.udf_string ) + elif mode == "python": + feature_transformation = PythonTransformation( + feature_transformation.udf, feature_transformation.udf_string + ) + elif mode == "substrait": + feature_transformation = SubstraitTransformation( + substrait_plan=feature_transformation.substrait_plan + ) + else: + pass self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} self.source_request_sources: Dict[str, RequestSource] = {} @@ -500,7 +514,7 @@ def get_transformed_features( ) def infer_features(self) -> None: - if self.mode == "pandas": + if self.mode in {"pandas", "substrait"}: self._infer_features_df() elif self.mode == "python": self._infer_features_dict() @@ -512,10 +526,69 @@ def infer_features(self) -> None: def _infer_features_dict(self): """ Infers the set of features associated to this feature view from the input source. + Raises: RegistryInferenceFailure: The set of features could not be inferred. """ - pass + rand_dict_value: Dict[str, Any] = { + "float": [1.0], + "int": [1], + "str": ["hello world"], + "bytes": [str.encode("hello world")], + "bool": [True], + "datetime64[ns]": [datetime.utcnow()], + } + + feature_dict = {} + for feature_view_projection in self.source_feature_view_projections.values(): + for feature in feature_view_projection.features: + dtype = feast_value_type_to_pandas_type(feature.dtype.to_value_type()) + feature_dict[f"{feature_view_projection.name}__{feature.name}"] = ( + rand_dict_value[dtype] if dtype in rand_dict_value else [None] + ) + feature_dict[f"{feature.name}"] = ( + rand_dict_value[dtype] if dtype in rand_dict_value else [None] + ) + for request_data in self.source_request_sources.values(): + for field in request_data.schema: + dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) + feature_dict[f"{field.name}"] = ( + rand_dict_value[dtype] if dtype in rand_dict_value else [None] + ) + + output_dict: Dict[str, List[Any]] = self.feature_transformation.transform( + feature_dict + ) + inferred_features = [] + for f, dt in output_dict.items(): + inferred_features.append( + Field( + name=f, + dtype=from_value_type( + python_type_to_feast_value_type( + f, type_name=type(dt[0]).__name__ + ) + ), + ) + ) + + if self.features: + missing_features = [] + for specified_features in self.features: + if specified_features not in inferred_features: + missing_features.append(specified_features) + if missing_features: + raise SpecifiedFeaturesNotPresentError( + missing_features, inferred_features, self.name + ) + else: + self.features = inferred_features + + if not self.features: + raise RegistryInferenceFailure( + "OnDemandFeatureView", + f"Could not infer Features for the feature view '{self.name}'.", + ) def _infer_features_df(self) -> None: """ @@ -547,6 +620,7 @@ def _infer_features_df(self) -> None: dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type()) sample_val = rand_df_value[dtype] if dtype in rand_df_value else None df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype) + output_df: pd.DataFrame = self.feature_transformation.transform(df) inferred_features = [] for f, dt in zip(output_df.columns, output_df.dtypes): diff --git a/sdk/python/feast/transformation/pandas_transformation.py b/sdk/python/feast/transformation/pandas_transformation.py index 76f17e21065..1838a882f27 100644 --- a/sdk/python/feast/transformation/pandas_transformation.py +++ b/sdk/python/feast/transformation/pandas_transformation.py @@ -11,7 +11,7 @@ class PandasTransformation: def __init__(self, udf: FunctionType, udf_string: str = ""): """ - Creates an OnDemandPandasTransformation object. + Creates an PandasTransformation object. Args: udf: The user defined transformation function, which must take pandas @@ -21,8 +21,17 @@ def __init__(self, udf: FunctionType, udf_string: str = ""): self.udf = udf self.udf_string = udf_string - def transform(self, df: pd.DataFrame) -> pd.DataFrame: - return self.udf.__call__(df) + def transform(self, input_df: pd.DataFrame) -> pd.DataFrame: + if not isinstance(input_df, pd.DataFrame): + raise TypeError( + f"input_df should be type pd.DataFrame but got {type(input_df).__name__}" + ) + output_df = self.udf.__call__(input_df) + if not isinstance(output_df, pd.DataFrame): + raise TypeError( + f"output_df should be type pd.DataFrame but got {type(output_df).__name__}" + ) + return output_df def __eq__(self, other): if not isinstance(other, PandasTransformation): diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 99d4b3351fb..3b67f81250f 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -11,7 +11,7 @@ class PythonTransformation: def __init__(self, udf: FunctionType, udf_string: str = ""): """ - Creates an OnDemandPythonTransformation object. + Creates an PythonTransformation object. Args: udf: The user defined transformation function, which must take pandas dataframes as inputs. @@ -21,7 +21,16 @@ def __init__(self, udf: FunctionType, udf_string: str = ""): self.udf_string = udf_string def transform(self, input_dict: Dict) -> Dict: - return self.udf.__call__(input_dict) + if not isinstance(input_dict, Dict): + raise TypeError( + f"input_dict should be type Dict[str, List[Any]] but got {type(input_dict).__name__}" + ) + output_dict = self.udf.__call__(input_dict) + if not isinstance(output_dict, Dict): + raise TypeError( + f"output_dict should be type Dict[str, List[Any]] but got {type(output_dict).__name__}" + ) + return output_dict def __eq__(self, other): if not isinstance(other, PythonTransformation): diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index a108d397bd9..d183fb5f158 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -1,3 +1,5 @@ +from typing import Any, Dict, List + import pandas as pd import pytest @@ -51,7 +53,7 @@ def test_infer_datasource_names_dwh(): data_source = dwh_class(query="test_query") -def test_on_demand_features_type_inference(): +def test_on_demand_features_valid_type_inference(): # Create Feature Views date_request = RequestSource( name="date_request", @@ -73,6 +75,33 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame: test_view.infer_features() + @on_demand_feature_view( + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), + ], + mode="python", + ) + def python_native_test_view( + input_dict: Dict[str, List[Any]] + ) -> Dict[str, List[Any]]: + output_dict: Dict[str, List[Any]] = { + "output": input_dict["some_date"], + "object_output": str(input_dict["some_date"]), + } + return output_dict + + python_native_test_view.infer_features() + + +def test_on_demand_features_invalid_type_inference(): + # Create Feature Views + date_request = RequestSource( + name="date_request", + schema=[Field(name="some_date", dtype=UnixTimestamp)], + ) + @on_demand_feature_view( sources=[date_request], schema=[ @@ -96,13 +125,49 @@ def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: ], sources=[date_request], ) - def test_view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: + def view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: data = pd.DataFrame() data["output"] = features_df["some_date"] return data with pytest.raises(SpecifiedFeaturesNotPresentError): - test_view_with_missing_feature.infer_features() + view_with_missing_feature.infer_features() + + @on_demand_feature_view( + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), + ], + mode="pandas", + ) + def python_native_test_invalid_pandas_view( + input_dict: Dict[str, List[Any]] + ) -> Dict[str, List[Any]]: + output_dict: Dict[str, List[Any]] = { + "output": input_dict["some_date"], + "object_output": str(input_dict["some_date"]), + } + return output_dict + + with pytest.raises(TypeError): + python_native_test_invalid_pandas_view.infer_features() + + @on_demand_feature_view( + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), + ], + mode="python", + ) + def python_native_test_invalid_dict_view(features_df: pd.DataFrame) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + return data + + with pytest.raises(TypeError): + python_native_test_invalid_dict_view.infer_features() def test_datasource_inference(): diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index c857596cddf..db2ab64dd88 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -12,13 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Any, Dict, List + import pandas as pd import pytest from feast.feature_view import FeatureView from feast.field import Field from feast.infra.offline_stores.file_source import FileSource -from feast.on_demand_feature_view import OnDemandFeatureView, PandasTransformation +from feast.on_demand_feature_view import ( + OnDemandFeatureView, + PandasTransformation, + PythonTransformation, +) from feast.types import Float32 @@ -36,6 +42,14 @@ def udf2(features_df: pd.DataFrame) -> pd.DataFrame: return df +def python_native_udf(features_dict: Dict[str, List[Any]]) -> Dict[str, List[Any]]: + output_dict: Dict[str, List[Any]] = { + "output1": [features_dict["feature1"] + 100], + "output2": [features_dict["feature2"] + 100], + } + return output_dict + + @pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated") def test_hash(): file_source = FileSource(name="my-file-source", path="test.parquet") @@ -129,6 +143,59 @@ def test_hash(): ) +def test_python_native_transformation_mode(): + file_source = FileSource(name="my-file-source", path="test.parquet") + feature_view = FeatureView( + name="my-feature-view", + entities=[], + schema=[ + Field(name="feature1", dtype=Float32), + Field(name="feature2", dtype=Float32), + ], + source=file_source, + ) + sources = [feature_view] + + on_demand_feature_view_python_native = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PythonTransformation( + udf=python_native_udf, udf_string="python native udf source code" + ), + description="test", + mode="python", + ) + + on_demand_feature_view_python_native_err = OnDemandFeatureView( + name="my-on-demand-feature-view", + sources=sources, + schema=[ + Field(name="output1", dtype=Float32), + Field(name="output2", dtype=Float32), + ], + feature_transformation=PandasTransformation( + udf=python_native_udf, udf_string="python native udf source code" + ), + description="test", + mode="python", + ) + + assert ( + on_demand_feature_view_python_native.feature_transformation + == PythonTransformation(python_native_udf, "python native udf source code") + ) + + with pytest.raises(TypeError): + assert ( + on_demand_feature_view_python_native_err.feature_transformation + == PandasTransformation(python_native_udf, "python native udf source code") + ) + + @pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated") def test_from_proto_backwards_compatible_udf(): file_source = FileSource(name="my-file-source", path="test.parquet") diff --git a/sdk/python/tests/unit/test_on_demand_substrait_transformation.py b/sdk/python/tests/unit/test_on_demand_substrait_transformation.py index c9d30c5b7af..378aa7ce3bd 100644 --- a/sdk/python/tests/unit/test_on_demand_substrait_transformation.py +++ b/sdk/python/tests/unit/test_on_demand_substrait_transformation.py @@ -71,6 +71,7 @@ def pandas_view(inputs: pd.DataFrame) -> pd.DataFrame: @on_demand_feature_view( sources=[driver_stats_fv[["conv_rate", "acc_rate"]]], schema=[Field(name="conv_rate_plus_acc_substrait", dtype=Float64)], + mode="substrait", ) def substrait_view(inputs: Table) -> Table: return inputs.select( From 74b77e6de9ed2adcd983bc2ffcae62ebb994c8e7 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 27 Mar 2024 14:52:44 -0400 Subject: [PATCH 07/16] fixed the casting issue Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 30 +++++-------------- .../tests/unit/test_on_demand_feature_view.py | 11 ++++++- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 16a9ad25c95..9fb8b221e87 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -130,7 +130,7 @@ def __init__( # noqa: C901 if mode not in {"python", "pandas", "substrait"}: raise Exception( - f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs and substraits." + f"Unknown mode {mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait." ) else: self.mode = mode @@ -145,32 +145,12 @@ def __init__( # noqa: C901 feature_transformation = PandasTransformation(udf, udf_string) elif mode == "python": feature_transformation = PythonTransformation(udf, udf_string) - elif mode == "substrait": - feature_transformation = SubstraitTransformation( - substrait_plan=udf_string - ) else: pass else: raise Exception( "OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments" ) - else: - # Note inspecting the return signature won't work with isinstance so this is the best alternative - if mode == "pandas": - feature_transformation = PandasTransformation( - feature_transformation.udf, feature_transformation.udf_string - ) - elif mode == "python": - feature_transformation = PythonTransformation( - feature_transformation.udf, feature_transformation.udf_string - ) - elif mode == "substrait": - feature_transformation = SubstraitTransformation( - substrait_plan=feature_transformation.substrait_plan - ) - else: - pass self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {} self.source_request_sources: Dict[str, RequestSource] = {} @@ -705,6 +685,7 @@ def mainify(obj) -> None: obj.__module__ = "__main__" def decorator(user_function): + return_annotation = inspect.signature(user_function).return_annotation if ( return_annotation @@ -745,7 +726,12 @@ def decorator(user_function): else: udf_string = dill.source.getsource(user_function) mainify(user_function) - transformation = PandasTransformation(user_function, udf_string) + if mode == "pandas": + transformation = PandasTransformation(user_function, udf_string) + elif mode == "python": + transformation = PythonTransformation(user_function, udf_string) + elif mode == "substrait": + pass on_demand_feature_view_obj = OnDemandFeatureView( name=user_function.__name__, diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index db2ab64dd88..5610a394317 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -192,7 +192,16 @@ def test_python_native_transformation_mode(): with pytest.raises(TypeError): assert ( on_demand_feature_view_python_native_err.feature_transformation - == PandasTransformation(python_native_udf, "python native udf source code") + == PythonTransformation(python_native_udf, "python native udf source code") + ) + + with pytest.raises(TypeError): + # This should fail + on_demand_feature_view_python_native_err.feature_transformation.transform( + { + "feature1": 0, + "feature2": 1, + } ) From 16850e9571ff77d02d34de562f3d944d7c69228e Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Wed, 27 Mar 2024 15:54:05 -0400 Subject: [PATCH 08/16] updated mode check Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 245ef0a415d..cf575bd75db 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2112,7 +2112,7 @@ def _augment_response_with_on_demand_transforms( initial_response_dict, full_feature_names, ) - elif odfv.mode == "pandas": + elif odfv.mode in {"pandas", "substrait"}: if initial_response_df is None: initial_response_df = initial_response.to_df() transformed_features_df: pd.DataFrame = odfv.get_transformed_features( @@ -2121,7 +2121,7 @@ def _augment_response_with_on_demand_transforms( ) else: raise Exception( - f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas' or 'python'." + f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'." ) transformed_features = ( From aa90fc1436c9c823624e8142a4c0c060b777c02a Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 28 Mar 2024 11:42:39 -0400 Subject: [PATCH 09/16] fixed lint and udf example...should be singleton --- sdk/python/feast/feature_store.py | 2 + sdk/python/feast/on_demand_feature_view.py | 79 ++++++++----------- .../transformation/python_transformation.py | 3 +- .../tests/unit/test_on_demand_feature_view.py | 15 +++- 4 files changed, 50 insertions(+), 49 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index cf575bd75db..795c1039e78 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2115,10 +2115,12 @@ def _augment_response_with_on_demand_transforms( elif odfv.mode in {"pandas", "substrait"}: if initial_response_df is None: initial_response_df = initial_response.to_df() + print("about to happen\n", initial_response_df) transformed_features_df: pd.DataFrame = odfv.get_transformed_features( initial_response_df, full_feature_names, ) + print("it did not happen") else: raise Exception( f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'." diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 9fb8b221e87..9a4840e6a77 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -4,7 +4,7 @@ import warnings from datetime import datetime from types import FunctionType -from typing import Any, Dict, List, Optional, Type, Union, cast +from typing import Any, Dict, List, Optional, Type, Union import dill import pandas as pd @@ -387,6 +387,7 @@ def get_transformed_features_df( df_with_features: pd.DataFrame, full_feature_names: bool = False, ) -> pd.DataFrame: + print("-" * 40, "something happened") # Apply on demand transformations if not isinstance(df_with_features, pd.DataFrame): raise TypeError("get_transformed_features_df only accepts pd.DataFrame") @@ -428,64 +429,54 @@ def get_transformed_features_df( def get_transformed_features_dict( self, - feature_dict: Dict[str, List[Any]], - full_feature_names: bool = False, + feature_dict: Dict[str, Any], # type: ignore ) -> Dict[str, Any]: - # generates a mapping between feature names and fv__feature names (and vice versa) - name_map: Dict[str, str] = {} + + # we need a mapping from full feature name to short and back to do a renaming + # The simplest thing to do is to make the full reference, copy the columns with the short reference + # and rerun + columns_to_cleanup: List[str] = [] for source_fv_projection in self.source_feature_view_projections.values(): for feature in source_fv_projection.features: full_feature_ref = f"{source_fv_projection.name}__{feature.name}" - if full_feature_ref in feature_dict: - name_map[full_feature_ref] = feature.name - elif feature.name in feature_dict: - name_map[feature.name] = name_map[full_feature_ref] - - rows = [] - # this doesn't actually require 2 x |key_space| space; k and name_map[k] point to the same object in memory - for values in zip(*feature_dict.values()): - rows.append( - { - **{k: v for k, v in zip(feature_dict.keys(), values)}, - **{name_map[k]: v for k, v in zip(feature_dict.keys(), values)}, - } - ) + if full_feature_ref in feature_dict.keys(): + # Make sure the partial feature name is always present + feature_dict[feature.name] = feature_dict[full_feature_ref] + columns_to_cleanup.append(str(feature.name)) + elif feature.name in feature_dict.keys(): + # Make sure the full feature name is always present + feature_dict[full_feature_ref] = feature_dict[feature.name] + columns_to_cleanup.append(str(full_feature_ref)) - # construct output dictionary and mapping from expected feature names to alternative feature names - output_dict: Dict[str, List[Any]] = {} - correct_feature_name_to_alias: Dict[str, str] = {} - for feature in self.features: - long_name = self._get_projected_feature_name(feature.name) - correct_name = long_name if full_feature_names else feature.name - correct_feature_name_to_alias[correct_name] = ( - feature.name if full_feature_names else long_name - ) - output_dict[correct_name] = [None] * len(rows) - - # populate output dictionary per row - for i, row in enumerate(rows): - row_output = self.feature_transformation.transform(row) - for feature_name in output_dict: - output_dict[feature_name][i] = row_output.get( - feature_name, - row_output[correct_feature_name_to_alias[feature_name]], - ) + output_dict: Dict[str, Any] = self.feature_transformation.transform( + feature_dict + ) + for feature_name in columns_to_cleanup: + del output_dict[feature_name] return output_dict def get_transformed_features( self, - features: Union[Dict[str, List[Any]], pd.DataFrame], + features: Union[Dict[str, Any], pd.DataFrame], full_feature_names: bool = False, - ) -> Union[Dict[str, List[Any]], pd.DataFrame]: + ) -> Union[Dict[str, Any], pd.DataFrame]: # TODO: classic inheritance pattern....maybe fix this - if self.mode == "python" and isinstance(features, dict): + if self.mode == "python" and isinstance(features, Dict): + # note full_feature_names is not needed for the dictionary return self.get_transformed_features_dict( - feature_dict=cast(features, Dict[str, List[Any]]), # type: ignore - full_feature_names=full_feature_names, + feature_dict=features, ) elif self.mode == "pandas" and isinstance(features, pd.DataFrame): + print( + "*" * 30, + "\n", + type(features), + self.mode, + "\n", + "*" * 30, + ) return self.get_transformed_features_df( - df_with_features=cast(features, pd.DataFrame), # type: ignore + df_with_features=features, full_feature_names=full_feature_names, ) else: diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 3b67f81250f..81532461a39 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -25,7 +25,8 @@ def transform(self, input_dict: Dict) -> Dict: raise TypeError( f"input_dict should be type Dict[str, List[Any]] but got {type(input_dict).__name__}" ) - output_dict = self.udf.__call__(input_dict) + # Ensuring that the inputs are included as well + output_dict = {**input_dict, **self.udf.__call__(input_dict)} if not isinstance(output_dict, Dict): raise TypeError( f"output_dict should be type Dict[str, List[Any]] but got {type(output_dict).__name__}" diff --git a/sdk/python/tests/unit/test_on_demand_feature_view.py b/sdk/python/tests/unit/test_on_demand_feature_view.py index 5610a394317..02e013e7753 100644 --- a/sdk/python/tests/unit/test_on_demand_feature_view.py +++ b/sdk/python/tests/unit/test_on_demand_feature_view.py @@ -38,14 +38,14 @@ def udf1(features_df: pd.DataFrame) -> pd.DataFrame: def udf2(features_df: pd.DataFrame) -> pd.DataFrame: df = pd.DataFrame() df["output1"] = features_df["feature1"] + 100 - df["output2"] = features_df["feature2"] + 100 + df["output2"] = features_df["feature2"] + 101 return df -def python_native_udf(features_dict: Dict[str, List[Any]]) -> Dict[str, List[Any]]: +def python_native_udf(features_dict: Dict[str, List[Any]]) -> Dict[str, Any]: output_dict: Dict[str, List[Any]] = { - "output1": [features_dict["feature1"] + 100], - "output2": [features_dict["feature2"] + 100], + "output1": features_dict["feature1"] + 100, + "output2": features_dict["feature2"] + 101, } return output_dict @@ -204,6 +204,13 @@ def test_python_native_transformation_mode(): } ) + assert on_demand_feature_view_python_native.get_transformed_features( + { + "feature1": 0, + "feature2": 1, + } + ) == {"feature1": 0, "feature2": 1, "output1": 100, "output2": 102} + @pytest.mark.filterwarnings("ignore:udf and udf_string parameters are deprecated") def test_from_proto_backwards_compatible_udf(): From 26c1170fd7fb6610a927e589be89ef0bcb81e81e Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 28 Mar 2024 11:49:10 -0400 Subject: [PATCH 10/16] removed print statement Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 9a4840e6a77..34c8ad559c4 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -387,7 +387,6 @@ def get_transformed_features_df( df_with_features: pd.DataFrame, full_feature_names: bool = False, ) -> pd.DataFrame: - print("-" * 40, "something happened") # Apply on demand transformations if not isinstance(df_with_features, pd.DataFrame): raise TypeError("get_transformed_features_df only accepts pd.DataFrame") @@ -467,14 +466,6 @@ def get_transformed_features( feature_dict=features, ) elif self.mode == "pandas" and isinstance(features, pd.DataFrame): - print( - "*" * 30, - "\n", - type(features), - self.mode, - "\n", - "*" * 30, - ) return self.get_transformed_features_df( df_with_features=features, full_feature_names=full_feature_names, From 231e2db55d51fa251d874ddffde6c97def176a91 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 28 Mar 2024 14:36:16 -0400 Subject: [PATCH 11/16] updated to fix integration test Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/on_demand_feature_view.py | 8 ++ .../unit/infra/test_inference_unit_tests.py | 133 +++++++++--------- 2 files changed, 74 insertions(+), 67 deletions(-) diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 34c8ad559c4..8d51edbe587 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -709,8 +709,16 @@ def decorator(user_function): udf_string = dill.source.getsource(user_function) mainify(user_function) if mode == "pandas": + if return_annotation not in (inspect._empty, pd.DataFrame): + raise TypeError( + f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame" + ) transformation = PandasTransformation(user_function, udf_string) elif mode == "python": + if return_annotation not in (inspect._empty, Dict[str, Any]): + raise TypeError( + f"return signature for {user_function} is {return_annotation} but should be Dict[str, Any]" + ) transformation = PythonTransformation(user_function, udf_string) elif mode == "substrait": pass diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index d183fb5f158..bd4549c6a2e 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -83,10 +83,8 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame: ], mode="python", ) - def python_native_test_view( - input_dict: Dict[str, List[Any]] - ) -> Dict[str, List[Any]]: - output_dict: Dict[str, List[Any]] = { + def python_native_test_view(input_dict: Dict[str, Any]) -> Dict[str, List[Any]]: + output_dict: Dict[str, Any] = { "output": input_dict["some_date"], "object_output": str(input_dict["some_date"]), } @@ -102,72 +100,73 @@ def test_on_demand_features_invalid_type_inference(): schema=[Field(name="some_date", dtype=UnixTimestamp)], ) - @on_demand_feature_view( - sources=[date_request], - schema=[ - Field(name="output", dtype=UnixTimestamp), - Field(name="object_output", dtype=String), - ], - ) - def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: - data = pd.DataFrame() - data["output"] = features_df["some_date"] - data["object_output"] = features_df["some_date"].astype(str) - return data - - with pytest.raises(ValueError, match="Value with native type object"): - invalid_test_view.infer_features() - - @on_demand_feature_view( - schema=[ - Field(name="output", dtype=UnixTimestamp), - Field(name="missing", dtype=String), - ], - sources=[date_request], - ) - def view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: - data = pd.DataFrame() - data["output"] = features_df["some_date"] - return data - - with pytest.raises(SpecifiedFeaturesNotPresentError): - view_with_missing_feature.infer_features() - - @on_demand_feature_view( - sources=[date_request], - schema=[ - Field(name="output", dtype=UnixTimestamp), - Field(name="object_output", dtype=String), - ], - mode="pandas", - ) - def python_native_test_invalid_pandas_view( - input_dict: Dict[str, List[Any]] - ) -> Dict[str, List[Any]]: - output_dict: Dict[str, List[Any]] = { - "output": input_dict["some_date"], - "object_output": str(input_dict["some_date"]), - } - return output_dict + # @on_demand_feature_view( + # sources=[date_request], + # schema=[ + # Field(name="output", dtype=UnixTimestamp), + # Field(name="object_output", dtype=String), + # ], + # ) + # def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: + # data = pd.DataFrame() + # data["output"] = features_df["some_date"] + # data["object_output"] = features_df["some_date"].astype(str) + # return data + # + # with pytest.raises(ValueError, match="Value with native type object"): + # invalid_test_view.infer_features() + # + # @on_demand_feature_view( + # schema=[ + # Field(name="output", dtype=UnixTimestamp), + # Field(name="missing", dtype=String), + # ], + # sources=[date_request], + # ) + # def view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: + # data = pd.DataFrame() + # data["output"] = features_df["some_date"] + # return data + # + # with pytest.raises(SpecifiedFeaturesNotPresentError): + # view_with_missing_feature.infer_features() + # + # @on_demand_feature_view( + # sources=[date_request], + # schema=[ + # Field(name="output", dtype=UnixTimestamp), + # Field(name="object_output", dtype=String), + # ], + # mode="pandas", + # ) + # def python_native_test_invalid_pandas_view( + # input_dict: Dict[str, Any] + # ) -> Dict[str, Any]: + # output_dict: Dict[str, Any] = { + # "output": input_dict["some_date"], + # "object_output": str(input_dict["some_date"]), + # } + # return output_dict + # + # with pytest.raises(TypeError): + # python_native_test_invalid_pandas_view.infer_features() with pytest.raises(TypeError): - python_native_test_invalid_pandas_view.infer_features() - - @on_demand_feature_view( - sources=[date_request], - schema=[ - Field(name="output", dtype=UnixTimestamp), - Field(name="object_output", dtype=String), - ], - mode="python", - ) - def python_native_test_invalid_dict_view(features_df: pd.DataFrame) -> pd.DataFrame: - data = pd.DataFrame() - data["output"] = features_df["some_date"] - return data - with pytest.raises(TypeError): - python_native_test_invalid_dict_view.infer_features() + @on_demand_feature_view( + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), + ], + mode="python", + ) + def python_native_test_invalid_dict_view( + features_df: pd.DataFrame, + ) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + return data def test_datasource_inference(): From 8fedca7edf651244ef98d9089c30863110a4ae02 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 28 Mar 2024 14:53:30 -0400 Subject: [PATCH 12/16] fixed unit test Signed-off-by: Francisco Javier Arceo --- .../unit/infra/test_inference_unit_tests.py | 99 +++++++++---------- 1 file changed, 48 insertions(+), 51 deletions(-) diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index bd4549c6a2e..d14bab95bbf 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -83,7 +83,7 @@ def test_view(features_df: pd.DataFrame) -> pd.DataFrame: ], mode="python", ) - def python_native_test_view(input_dict: Dict[str, Any]) -> Dict[str, List[Any]]: + def python_native_test_view(input_dict: Dict[str, Any]) -> Dict[str, Any]: output_dict: Dict[str, Any] = { "output": input_dict["some_date"], "object_output": str(input_dict["some_date"]), @@ -100,59 +100,56 @@ def test_on_demand_features_invalid_type_inference(): schema=[Field(name="some_date", dtype=UnixTimestamp)], ) - # @on_demand_feature_view( - # sources=[date_request], - # schema=[ - # Field(name="output", dtype=UnixTimestamp), - # Field(name="object_output", dtype=String), - # ], - # ) - # def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: - # data = pd.DataFrame() - # data["output"] = features_df["some_date"] - # data["object_output"] = features_df["some_date"].astype(str) - # return data - # - # with pytest.raises(ValueError, match="Value with native type object"): - # invalid_test_view.infer_features() - # - # @on_demand_feature_view( - # schema=[ - # Field(name="output", dtype=UnixTimestamp), - # Field(name="missing", dtype=String), - # ], - # sources=[date_request], - # ) - # def view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: - # data = pd.DataFrame() - # data["output"] = features_df["some_date"] - # return data - # - # with pytest.raises(SpecifiedFeaturesNotPresentError): - # view_with_missing_feature.infer_features() - # - # @on_demand_feature_view( - # sources=[date_request], - # schema=[ - # Field(name="output", dtype=UnixTimestamp), - # Field(name="object_output", dtype=String), - # ], - # mode="pandas", - # ) - # def python_native_test_invalid_pandas_view( - # input_dict: Dict[str, Any] - # ) -> Dict[str, Any]: - # output_dict: Dict[str, Any] = { - # "output": input_dict["some_date"], - # "object_output": str(input_dict["some_date"]), - # } - # return output_dict - # - # with pytest.raises(TypeError): - # python_native_test_invalid_pandas_view.infer_features() + @on_demand_feature_view( + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), + ], + ) + def invalid_test_view(features_df: pd.DataFrame) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + data["object_output"] = features_df["some_date"].astype(str) + return data + + with pytest.raises(ValueError, match="Value with native type object"): + invalid_test_view.infer_features() + + @on_demand_feature_view( + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="missing", dtype=String), + ], + sources=[date_request], + ) + def view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: + data = pd.DataFrame() + data["output"] = features_df["some_date"] + return data + + with pytest.raises(SpecifiedFeaturesNotPresentError): + view_with_missing_feature.infer_features() with pytest.raises(TypeError): + @on_demand_feature_view( + sources=[date_request], + schema=[ + Field(name="output", dtype=UnixTimestamp), + Field(name="object_output", dtype=String), + ], + mode="pandas", + ) + def python_native_test_invalid_pandas_view( + input_dict: Dict[str, Any] + ) -> Dict[str, Any]: + output_dict: Dict[str, Any] = { + "output": input_dict["some_date"], + "object_output": str(input_dict["some_date"]), + } + return output_dict + with pytest.raises(TypeError): @on_demand_feature_view( sources=[date_request], schema=[ From 5eb583d0de0daa1b7bfe278056bd67cff81d5a46 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 28 Mar 2024 15:17:15 -0400 Subject: [PATCH 13/16] removed import Signed-off-by: Francisco Javier Arceo --- sdk/python/tests/unit/infra/test_inference_unit_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index d14bab95bbf..058b69cc57d 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict import pandas as pd import pytest From b5b71e204eba1b42b97a2f6c8e0d28fff9cc18b1 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 28 Mar 2024 15:28:25 -0400 Subject: [PATCH 14/16] linter Signed-off-by: Francisco Javier Arceo --- sdk/python/tests/unit/infra/test_inference_unit_tests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index 058b69cc57d..be97a838bda 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -132,6 +132,7 @@ def view_with_missing_feature(features_df: pd.DataFrame) -> pd.DataFrame: view_with_missing_feature.infer_features() with pytest.raises(TypeError): + @on_demand_feature_view( sources=[date_request], schema=[ @@ -150,6 +151,7 @@ def python_native_test_invalid_pandas_view( return output_dict with pytest.raises(TypeError): + @on_demand_feature_view( sources=[date_request], schema=[ From 75a30231005843be5fe4c2c8e6cf6fee12bbb879 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 28 Mar 2024 16:02:19 -0400 Subject: [PATCH 15/16] removed print statement and OnlineResponseRow Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/feature_store.py | 2 -- sdk/python/feast/online_response.py | 42 ----------------------------- 2 files changed, 44 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 795c1039e78..cf575bd75db 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2115,12 +2115,10 @@ def _augment_response_with_on_demand_transforms( elif odfv.mode in {"pandas", "substrait"}: if initial_response_df is None: initial_response_df = initial_response.to_df() - print("about to happen\n", initial_response_df) transformed_features_df: pd.DataFrame = odfv.get_transformed_features( initial_response_df, full_feature_names, ) - print("it did not happen") else: raise Exception( f"Invalid OnDemandFeatureMode: {odfv.mode}. Expected one of 'pandas', 'python', or 'substrait'." diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 915e6bcbf35..6a650c50a29 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -78,45 +78,3 @@ def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame: return pd.DataFrame(self.to_dict(include_event_timestamps)) - -class OnlineResponseRow(OnlineResponse): - def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]: - """ - Converts GetOnlineFeaturesResponse features into a dictionary form. - Args: - is_with_event_timestamps: bool Optionally include feature timestamps in the dictionary - """ - response: Dict[str, Any] = {} - - for feature_ref, feature_vector in zip( - self.proto.metadata.feature_names.val, self.proto.results - ): - - if ( - len(feature_vector.values) != 1 - or len(feature_vector.event_timestamps) != 1 - ): - raise ValueError( - f"Response contains more than one row: \n" - f"feature_ref: {feature_ref}" - f"feature_vector: {feature_vector.values}," - f"event_timestamps: {feature_vector.event_timestamps}" - ) - - response[feature_ref] = feast_value_type_to_python_type( - feature_vector.values[0] - ) - - if include_event_timestamps: - timestamp_ref = feature_ref + TIMESTAMP_POSTFIX - response[timestamp_ref] = feature_vector.event_timestamps[0].seconds - return response - - def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame: - """ - Converts GetOnlineFeaturesResponse features into Panda dataframe form. - Args: - is_with_event_timestamps: bool Optionally include feature timestamps in the dataframe - """ - - return pd.DataFrame([self.to_dict(include_event_timestamps)]) From c5ecce6aaaa97f9c834fcb4501f593d244951382 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Thu, 28 Mar 2024 16:24:38 -0400 Subject: [PATCH 16/16] fixed type validation Signed-off-by: Francisco Javier Arceo --- sdk/python/feast/online_response.py | 1 - sdk/python/feast/transformation/python_transformation.py | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 6a650c50a29..48524359bf3 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -77,4 +77,3 @@ def to_df(self, include_event_timestamps: bool = False) -> pd.DataFrame: """ return pd.DataFrame(self.to_dict(include_event_timestamps)) - diff --git a/sdk/python/feast/transformation/python_transformation.py b/sdk/python/feast/transformation/python_transformation.py index 81532461a39..9519f23c05c 100644 --- a/sdk/python/feast/transformation/python_transformation.py +++ b/sdk/python/feast/transformation/python_transformation.py @@ -23,15 +23,15 @@ def __init__(self, udf: FunctionType, udf_string: str = ""): def transform(self, input_dict: Dict) -> Dict: if not isinstance(input_dict, Dict): raise TypeError( - f"input_dict should be type Dict[str, List[Any]] but got {type(input_dict).__name__}" + f"input_dict should be type Dict[str, Any] but got {type(input_dict).__name__}" ) # Ensuring that the inputs are included as well - output_dict = {**input_dict, **self.udf.__call__(input_dict)} + output_dict = self.udf.__call__(input_dict) if not isinstance(output_dict, Dict): raise TypeError( - f"output_dict should be type Dict[str, List[Any]] but got {type(output_dict).__name__}" + f"output_dict should be type Dict[str, Any] but got {type(output_dict).__name__}" ) - return output_dict + return {**input_dict, **output_dict} def __eq__(self, other): if not isinstance(other, PythonTransformation):