Skip to content

Commit

Permalink
fix: Fix Feast Java inconsistency with int64 serialization vs python (#…
Browse files Browse the repository at this point in the history
…3031)

Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia committed Aug 10, 2022
1 parent c1d477a commit f340aeb
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 21 deletions.
1 change: 1 addition & 0 deletions examples/java-demo/feature_repo/application-override.yaml
Expand Up @@ -10,6 +10,7 @@ feature-server:
host: my-redis-master
port: 6379
password: [YOUR PASSWORD]
entityKeySerializationVersion: 2
global:
registry:
path: gs://[YOUR BUCKET]/demo-repo/registry.db
Expand Down
22 changes: 5 additions & 17 deletions examples/java-demo/feature_repo/driver_repo.py
@@ -1,24 +1,22 @@
import pandas as pd
from feast.data_source import RequestSource
from feast.field import Field
from feast.on_demand_feature_view import on_demand_feature_view
from feast.request_feature_view import RequestFeatureView
from feast.types import Float32, Float64, Int64, String
from google.protobuf.duration_pb2 import Duration
from feast.field import Field

from feast import Entity, Feature, BatchFeatureView, FileSource
from feast import Entity, FileSource, FeatureView

driver_hourly_stats = FileSource(
path="data/driver_stats_with_string.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
driver = Entity(name="driver_id", description="driver id",)
driver_hourly_stats_view = BatchFeatureView(
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400000),
entities=[driver],
ttl=timedelta(days=365),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Expand All @@ -40,6 +38,7 @@
],
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
Expand All @@ -58,14 +57,3 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


# Define request feature view
driver_age_request_fv = RequestFeatureView(
name="driver_age",
request_data_source=RequestSource(
name="driver_age",
schema=[
Field(name="driver_age", dtype=Int64),
],
),
)
1 change: 1 addition & 0 deletions examples/java-demo/feature_repo/feature_store.yaml
Expand Up @@ -6,3 +6,4 @@ online_store:
connection_string: localhost:6379,password=[YOUR PASSWORD]
offline_store:
type: file
entity_key_serialization_version: 2
26 changes: 26 additions & 0 deletions examples/java-demo/feature_repo/test_python_fetch.py
@@ -0,0 +1,26 @@
from feast import FeatureStore


def run_demo():
store = FeatureStore(repo_path=".")

print("\n--- Online features ---")
features = store.get_online_features(
features=[
"driver_hourly_stats:conv_rate",
],
entity_rows=[
{
"driver_id": 1001,
},
{
"driver_id": 1002,
}
],
).to_dict()
for key, value in sorted(features.items()):
print(key, " : ", value)


if __name__ == "__main__":
run_demo()
4 changes: 2 additions & 2 deletions java/serving/README.md
Expand Up @@ -41,7 +41,7 @@ From the Feast GitHub root, run:
java \
-Xms1g \
-Xmx4g \
-jar java/serving/target/feast-serving-0.17.1-SNAPSHOT-jar-with-dependencies.jar \
-jar java/serving/target/feast-serving-[YOUR VERSION]-jar-with-dependencies.jar \
classpath:/application.yml,file:./application-override.yaml
```
5. Now you have a Feast Serving gRPC service running on port 6566 locally!
Expand Down Expand Up @@ -124,7 +124,7 @@ You can debug this like any other Java executable. Swap the java command above w
-Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend=y \
-Xms1g \
-Xmx4g \
-jar java/serving/target/feast-serving-0.17.1-SNAPSHOT-jar-with-dependencies.jar \
-jar java/serving/target/feast-serving-[YOUR VERSION]-jar-with-dependencies.jar \
classpath:/application.yml,file:./application-override.yaml
```
Now you can attach e.g. a Remote debugger in IntelliJ to port 5005 to debug / make breakpoints.
Expand Down
Expand Up @@ -87,14 +87,15 @@ public byte[] serialize(RedisProto.RedisKeyV2 entityKey) {
break;
case INT64_VAL:
buffer.addAll(encodeInteger(ValueProto.ValueType.Enum.INT64.getNumber()));
buffer.addAll(encodeInteger(Integer.BYTES));
/* This is super dumb - but in https://github.com/feast-dev/feast/blob/dcae1606f53028ce5413567fb8b66f92cfef0f8e/sdk/python/feast/infra/key_encoding_utils.py#L9
we use `struct.pack("<l", v.int64_val)` to get the bytes of an int64 val. This actually extracts only 4 bytes,
instead of 8 bytes as you'd expect from to serialize an int64 value.
*/
if (this.entityKeySerializationVersion <= 1) {
buffer.addAll(encodeInteger(Integer.BYTES));
buffer.addAll(encodeInteger(((Long) val.getInt64Val()).intValue()));
} else {
buffer.addAll(encodeInteger(Long.BYTES));
buffer.addAll(encodeLong(((Long) val.getInt64Val())));
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_store.py
Expand Up @@ -839,7 +839,7 @@ def apply(
)

for rfv in request_views_to_update:
data_sources_set_to_update.add(rfv.request_data_source)
data_sources_set_to_update.add(rfv.request_source)

for odfv in odfvs_to_update:
for v in odfv.source_request_sources.values():
Expand Down

0 comments on commit f340aeb

Please sign in to comment.