Skip to content

Commit

Permalink
feat: Decouple transformation types from ODFVs (#3949)
Browse files Browse the repository at this point in the history
* decouple transformation from odfvs

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>

* OnDemandFeatureView: keep udf and udf_string parameters for backwards compatibility

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>

* fix linting issues

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>

* remove unused import in registry protos

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>

---------

Signed-off-by: tokoko <togurg14@freeuni.edu.ge>
  • Loading branch information
tokoko committed Feb 24, 2024
1 parent 59639db commit 0a9fae8
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 46 deletions.
4 changes: 3 additions & 1 deletion protos/feast/core/OnDemandFeatureView.proto
Expand Up @@ -48,7 +48,9 @@ message OnDemandFeatureViewSpec {
// Map of sources for this feature view.
map<string, OnDemandSource> sources = 4;

UserDefinedFunction user_defined_function = 5;
oneof transformation {
UserDefinedFunction user_defined_function = 5;
}

// Description of the on demand feature view.
string description = 6;
Expand Down
1 change: 0 additions & 1 deletion protos/feast/registry/RegistryServer.proto
Expand Up @@ -2,7 +2,6 @@ syntax = "proto3";

package feast.registry;

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "feast/core/Registry.proto";
import "feast/core/Entity.proto";
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/registry/base_registry.py
Expand Up @@ -665,7 +665,7 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:

odfv_dict["spec"]["userDefinedFunction"][
"body"
] = on_demand_feature_view.udf_string
] = on_demand_feature_view.transformation.udf_string
registry_dict["onDemandFeatureViews"].append(odfv_dict)
for request_feature_view in sorted(
self.list_request_feature_views(project=project),
Expand Down
81 changes: 49 additions & 32 deletions sdk/python/feast/on_demand_feature_view.py
Expand Up @@ -16,6 +16,7 @@
from feast.feature_view import FeatureView
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field, from_value_type
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
Expand All @@ -24,9 +25,6 @@
OnDemandFeatureViewSpec,
OnDemandSource,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)
from feast.type_map import (
feast_value_type_to_pandas_type,
python_type_to_feast_value_type,
Expand All @@ -51,8 +49,7 @@ class OnDemandFeatureView(BaseFeatureView):
sources with type FeatureViewProjection.
source_request_sources: A map from input source names to the actual input
sources with type RequestSource.
udf: The user defined transformation function, which must take pandas dataframes
as inputs.
transformation: The user defined transformation.
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the on demand feature view, typically the email of the primary
Expand All @@ -63,8 +60,7 @@ class OnDemandFeatureView(BaseFeatureView):
features: List[Field]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
udf: FunctionType
udf_string: str
transformation: Union[OnDemandPandasTransformation]
description: str
tags: Dict[str, str]
owner: str
Expand All @@ -82,8 +78,9 @@ def __init__( # noqa: C901
FeatureViewProjection,
]
],
udf: FunctionType,
udf: Optional[FunctionType] = None,
udf_string: str = "",
transformation: Optional[Union[OnDemandPandasTransformation]] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
Expand All @@ -98,9 +95,10 @@ def __init__( # noqa: C901
sources: A map from input source names to the actual input sources, which may be
feature views, or request data sources. These sources serve as inputs to the udf,
which will refer to them by name.
udf: The user defined transformation function, which must take pandas
udf (deprecated): The user defined transformation function, which must take pandas
dataframes as inputs.
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI)
transformation: The user defined transformation.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the on demand feature view, typically the email
Expand All @@ -114,6 +112,18 @@ def __init__( # noqa: C901
owner=owner,
)

if not transformation:
if udf:
warnings.warn(
"udf and udf_string parameters are deprecated. Please use transformation=OnDemandPandasTransformation(udf, udf_string) instead.",
DeprecationWarning,
)
transformation = OnDemandPandasTransformation(udf, udf_string)
else:
raise Exception(
"OnDemandFeatureView needs to be initialized with either transformation or udf arguments"
)

self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
self.source_request_sources: Dict[str, RequestSource] = {}
for odfv_source in sources:
Expand All @@ -126,8 +136,7 @@ def __init__( # noqa: C901
odfv_source.name
] = odfv_source.projection

self.udf = udf # type: ignore
self.udf_string = udf_string
self.transformation = transformation

@property
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
Expand All @@ -139,8 +148,7 @@ def __copy__(self):
schema=self.features,
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
udf=self.udf,
udf_string=self.udf_string,
transformation=self.transformation,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand All @@ -161,8 +169,7 @@ def __eq__(self, other):
self.source_feature_view_projections
!= other.source_feature_view_projections
or self.source_request_sources != other.source_request_sources
or self.udf_string != other.udf_string
or self.udf.__code__.co_code != other.udf.__code__.co_code
or self.transformation != other.transformation
):
return False

Expand Down Expand Up @@ -200,11 +207,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
name=self.name,
features=[feature.to_proto() for feature in self.features],
sources=sources,
user_defined_function=UserDefinedFunctionProto(
name=self.udf.__name__,
body=dill.dumps(self.udf, recurse=True),
body_text=self.udf_string,
),
user_defined_function=self.transformation.to_proto()
if type(self.transformation) == OnDemandPandasTransformation
else None,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand Down Expand Up @@ -243,6 +248,16 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
RequestSource.from_proto(on_demand_source.request_data_source)
)

if (
on_demand_feature_view_proto.spec.WhichOneof("transformation")
== "user_defined_function"
):
transformation = OnDemandPandasTransformation.from_proto(
on_demand_feature_view_proto.spec.user_defined_function
)
else:
raise Exception("At least one transformation type needs to be provided")

on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
schema=[
Expand All @@ -253,10 +268,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
for feature in on_demand_feature_view_proto.spec.features
],
sources=sources,
udf=dill.loads(
on_demand_feature_view_proto.spec.user_defined_function.body
),
udf_string=on_demand_feature_view_proto.spec.user_defined_function.body_text,
transformation=transformation,
description=on_demand_feature_view_proto.spec.description,
tags=dict(on_demand_feature_view_proto.spec.tags),
owner=on_demand_feature_view_proto.spec.owner,
Expand Down Expand Up @@ -315,7 +327,8 @@ def get_transformed_features_df(
columns_to_cleanup.append(full_feature_ref)

# Compute transformed values and apply to each result row
df_with_transformed_features = self.udf.__call__(df_with_features)

df_with_transformed_features = self.transformation.transform(df_with_features)

# Work out whether the correct columns names are used.
rename_columns: Dict[str, str] = {}
Expand All @@ -335,7 +348,7 @@ def get_transformed_features_df(
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features.rename(columns=rename_columns)

def infer_features(self):
def infer_features(self) -> None:
"""
Infers the set of features associated to this feature view from the input source.
Expand Down Expand Up @@ -365,7 +378,7 @@ def infer_features(self):
dtype = feast_value_type_to_pandas_type(field.dtype.to_value_type())
sample_val = rand_df_value[dtype] if dtype in rand_df_value else None
df[f"{field.name}"] = pd.Series(sample_val, dtype=dtype)
output_df: pd.DataFrame = self.udf.__call__(df)
output_df: pd.DataFrame = self.transformation.transform(df)
inferred_features = []
for f, dt in zip(output_df.columns, output_df.dtypes):
inferred_features.append(
Expand Down Expand Up @@ -396,7 +409,9 @@ def infer_features(self):
)

@staticmethod
def get_requested_odfvs(feature_refs, project, registry):
def get_requested_odfvs(
feature_refs, project, registry
) -> List["OnDemandFeatureView"]:
all_on_demand_feature_views = registry.list_on_demand_feature_views(
project, allow_cache=True
)
Expand Down Expand Up @@ -438,7 +453,7 @@ def on_demand_feature_view(
of the primary maintainer.
"""

def mainify(obj):
def mainify(obj) -> None:
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
# name as the original file defining the ODFV.
if obj.__module__ != "__main__":
Expand All @@ -447,15 +462,17 @@ def mainify(obj):
def decorator(user_function):
udf_string = dill.source.getsource(user_function)
mainify(user_function)

transformation = OnDemandPandasTransformation(user_function, udf_string)

on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
sources=sources,
schema=schema,
udf=user_function,
transformation=transformation,
description=description,
tags=tags,
owner=owner,
udf_string=udf_string,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
Expand Down
56 changes: 56 additions & 0 deletions sdk/python/feast/on_demand_pandas_transformation.py
@@ -0,0 +1,56 @@
from types import FunctionType

import dill
import pandas as pd

from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)


class OnDemandPandasTransformation:
def __init__(self, udf: FunctionType, udf_string: str = ""):
"""
Creates an OnDemandPandasTransformation object.
Args:
udf: The user defined transformation function, which must take pandas
dataframes as inputs.
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
"""
self.udf = udf
self.udf_string = udf_string

def transform(self, df: pd.DataFrame) -> pd.DataFrame:
return self.udf.__call__(df)

def __eq__(self, other):
if not isinstance(other, OnDemandPandasTransformation):
raise TypeError(
"Comparisons should only involve OnDemandPandasTransformation class objects."
)

if not super().__eq__(other):
return False

if (
self.udf_string != other.udf_string
or self.udf.__code__.co_code != other.udf.__code__.co_code
):
return False

return True

def to_proto(self) -> UserDefinedFunctionProto:
return UserDefinedFunctionProto(
name=self.udf.__name__,
body=dill.dumps(self.udf, recurse=True),
body_text=self.udf_string,
)

@classmethod
def from_proto(cls, user_defined_function_proto: UserDefinedFunctionProto):
return OnDemandPandasTransformation(
udf=dill.loads(user_defined_function_proto.body),
udf_string=user_defined_function_proto.body_text,
)
Expand Up @@ -15,6 +15,7 @@
)
from feast.data_source import DataSource, RequestSource
from feast.feature_view_projection import FeatureViewProjection
from feast.on_demand_feature_view import OnDemandPandasTransformation
from feast.types import Array, FeastType, Float32, Float64, Int32, Int64
from tests.integration.feature_repos.universal.entities import (
customer,
Expand Down Expand Up @@ -70,8 +71,9 @@ def conv_rate_plus_100_feature_view(
name=conv_rate_plus_100.__name__,
schema=[] if infer_features else _features,
sources=sources,
udf=conv_rate_plus_100,
udf_string="raw udf source",
transformation=OnDemandPandasTransformation(
udf=conv_rate_plus_100, udf_string="raw udf source"
),
)


Expand Down Expand Up @@ -108,8 +110,9 @@ def similarity_feature_view(
name=similarity.__name__,
sources=sources,
schema=[] if infer_features else _fields,
udf=similarity,
udf_string="similarity raw udf",
transformation=OnDemandPandasTransformation(
udf=similarity, udf_string="similarity raw udf"
),
)


Expand Down

0 comments on commit 0a9fae8

Please sign in to comment.