Skip to content

Commit

Permalink
fix: Fix on demand feature view output in feast plan + Web UI crash (#…
Browse files Browse the repository at this point in the history
…3057)

* fix: Fix on demand feature view output in feast plan + Web UI crash with ODFV

Signed-off-by: Danny Chiao <danny@tecton.ai>

* lint

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix tests

Signed-off-by: Danny Chiao <danny@tecton.ai>

Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia committed Aug 15, 2022
1 parent f06874a commit a44fe66
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 19 deletions.
3 changes: 3 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,7 @@ message UserDefinedFunction {

// The python-syntax function body (serialized by dill)
bytes body = 2;

// The string representation of the udf
string body_text = 3;
}
39 changes: 30 additions & 9 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec
from feast.protos.feast.core.RequestFeatureView_pb2 import (
RequestFeatureView as RequestFeatureViewProto,
)
Expand Down Expand Up @@ -137,19 +138,39 @@ def diff_registry_objects(
else:
current_spec = current_proto.spec
new_spec = new_proto.spec
if current_spec != new_spec:
if current != new:
for _field in current_spec.DESCRIPTOR.fields:
if _field.name in FIELDS_TO_IGNORE:
continue
if getattr(current_spec, _field.name) != getattr(new_spec, _field.name):
transition = TransitionType.UPDATE
property_diffs.append(
PropertyDiff(
_field.name,
getattr(current_spec, _field.name),
getattr(new_spec, _field.name),
elif getattr(current_spec, _field.name) != getattr(new_spec, _field.name):
if _field.name == "user_defined_function":
current_spec = cast(OnDemandFeatureViewSpec, current_proto)
new_spec = cast(OnDemandFeatureViewSpec, new_proto)
current_udf = current_spec.user_defined_function
new_udf = new_spec.user_defined_function
for _udf_field in current_udf.DESCRIPTOR.fields:
if _udf_field.name == "body":
continue
if getattr(current_udf, _udf_field.name) != getattr(
new_udf, _udf_field.name
):
transition = TransitionType.UPDATE
property_diffs.append(
PropertyDiff(
_field.name + "." + _udf_field.name,
getattr(current_udf, _udf_field.name),
getattr(new_udf, _udf_field.name),
)
)
else:
transition = TransitionType.UPDATE
property_diffs.append(
PropertyDiff(
_field.name,
getattr(current_spec, _field.name),
getattr(new_spec, _field.name),
)
)
)
return FeastObjectDiff(
name=new_spec.name,
feast_object_type=object_type,
Expand Down
20 changes: 14 additions & 6 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ class OnDemandFeatureView(BaseFeatureView):
maintainer.
"""

# TODO(adchia): remove inputs from proto and declaration
name: str
features: List[Field]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_sources: Dict[str, RequestSource]
udf: FunctionType
udf_string: str
description: str
tags: Dict[str, str]
owner: str
Expand All @@ -81,6 +81,7 @@ def __init__( # noqa: C901
List[Any]
] = None, # Typed as Any because @typechecked can't deal with the List[Union]
udf: Optional[FunctionType] = None,
udf_string: str = "",
inputs: Optional[
Dict[str, Union[FeatureView, FeatureViewProjection, RequestSource]]
] = None,
Expand All @@ -99,8 +100,9 @@ def __init__( # noqa: C901
sources (optional): A map from input source names to the actual input sources,
which may be feature views, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
udf (optional): The user defined transformation function, which must take pandas
udf: The user defined transformation function, which must take pandas
dataframes as inputs.
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
inputs (optional): (Deprecated) A map from input source names to the actual input sources,
which may be feature views, feature view projections, or request data sources.
These sources serve as inputs to the udf, which will refer to them by name.
Expand Down Expand Up @@ -233,9 +235,8 @@ def __init__( # noqa: C901
odfv_source.name
] = odfv_source.projection

if _udf is None:
raise ValueError("The `udf` parameter must be specified.")
self.udf = _udf # type: ignore
self.udf = udf # type: ignore
self.udf_string = udf_string

@property
def proto_class(self) -> Type[OnDemandFeatureViewProto]:
Expand All @@ -249,6 +250,7 @@ def __copy__(self):
sources=list(self.source_feature_view_projections.values())
+ list(self.source_request_sources.values()),
udf=self.udf,
udf_string=self.udf_string,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand All @@ -269,6 +271,7 @@ def __eq__(self, other):
self.source_feature_view_projections
!= other.source_feature_view_projections
or self.source_request_sources != other.source_request_sources
or self.udf_string != other.udf_string
or self.udf.__code__.co_code != other.udf.__code__.co_code
):
return False
Expand Down Expand Up @@ -305,7 +308,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
features=[feature.to_proto() for feature in self.features],
sources=sources,
user_defined_function=UserDefinedFunctionProto(
name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True),
name=self.udf.__name__,
body=dill.dumps(self.udf, recurse=True),
body_text=self.udf_string,
),
description=self.description,
tags=self.tags,
Expand Down Expand Up @@ -354,6 +359,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
udf=dill.loads(
on_demand_feature_view_proto.spec.user_defined_function.body
),
udf_string=on_demand_feature_view_proto.spec.user_defined_function.body_text,
description=on_demand_feature_view_proto.spec.description,
tags=dict(on_demand_feature_view_proto.spec.tags),
owner=on_demand_feature_view_proto.spec.owner,
Expand Down Expand Up @@ -641,6 +647,7 @@ def mainify(obj):
obj.__module__ = "__main__"

def decorator(user_function):
udf_string = dill.source.getsource(user_function)
mainify(user_function)
on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
Expand All @@ -650,6 +657,7 @@ def decorator(user_function):
description=description,
tags=tags,
owner=owner,
udf_string=udf_string,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse

import dill
from google.protobuf.internal.containers import RepeatedCompositeFieldContainer
from google.protobuf.json_format import MessageToJson
from proto import Message
Expand Down Expand Up @@ -729,9 +728,10 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
key=lambda on_demand_feature_view: on_demand_feature_view.name,
):
odfv_dict = self._message_to_sorted_dict(on_demand_feature_view.to_proto())
odfv_dict["spec"]["userDefinedFunction"]["body"] = dill.source.getsource(
on_demand_feature_view.udf
)

odfv_dict["spec"]["userDefinedFunction"][
"body"
] = on_demand_feature_view.udf_string
registry_dict["onDemandFeatureViews"].append(odfv_dict)
for request_feature_view in sorted(
self.list_request_feature_views(project=project),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def conv_rate_plus_100_feature_view(
schema=[] if infer_features else _features,
sources=sources,
udf=conv_rate_plus_100,
udf_string="raw udf source",
)


Expand Down Expand Up @@ -125,6 +126,7 @@ def similarity_feature_view(
sources=sources,
schema=[] if infer_features else _fields,
udf=similarity,
udf_string="similarity raw udf",
)


Expand Down
4 changes: 4 additions & 0 deletions sdk/python/tests/unit/test_on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def test_hash():
Field(name="output2", dtype=Float32),
],
udf=udf1,
udf_string="udf1 source code",
)
on_demand_feature_view_2 = OnDemandFeatureView(
name="my-on-demand-feature-view",
Expand All @@ -66,6 +67,7 @@ def test_hash():
Field(name="output2", dtype=Float32),
],
udf=udf1,
udf_string="udf1 source code",
)
on_demand_feature_view_3 = OnDemandFeatureView(
name="my-on-demand-feature-view",
Expand All @@ -75,6 +77,7 @@ def test_hash():
Field(name="output2", dtype=Float32),
],
udf=udf2,
udf_string="udf2 source code",
)
on_demand_feature_view_4 = OnDemandFeatureView(
name="my-on-demand-feature-view",
Expand All @@ -84,6 +87,7 @@ def test_hash():
Field(name="output2", dtype=Float32),
],
udf=udf2,
udf_string="udf2 source code",
description="test",
)

Expand Down

0 comments on commit a44fe66

Please sign in to comment.