Skip to content

Commit

Permalink
Add online read API to FeatureStore class (#1399)
Browse files Browse the repository at this point in the history
online read API

Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev committed Mar 23, 2021
1 parent 4317dd6 commit ed1145a
Show file tree
Hide file tree
Showing 12 changed files with 345 additions and 90 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,9 @@ package-protos:
cp -r ${ROOT_DIR}/protos ${ROOT_DIR}/sdk/python/feast/protos

compile-protos-python:
printenv
pip install --ignore-installed mypy-protobuf || echo "Mypy could not be installed"
@$(foreach dir,$(PROTO_TYPE_SUBDIRS),cd ${ROOT_DIR}/protos; python -m grpc_tools.protoc -I. --grpc_python_out=../sdk/python/feast/protos/ --python_out=../sdk/python/feast/protos/ --mypy_out=../sdk/python/feast/protos/ feast/$(dir)/*.proto;)
@$(foreach dir,$(PROTO_TYPE_SUBDIRS),grep -rli 'from feast.$(dir)' sdk/python/feast/protos | xargs -i@ sed -i 's/from feast.$(dir)/from feast.protos.feast.$(dir)/g' @;)
@$(foreach dir,$(PROTO_TYPE_SUBDIRS),grep -rli 'from feast.$(dir)' sdk/python/feast/protos | xargs -I@ sed -i.bak 's/from feast.$(dir)/from feast.protos.feast.$(dir)/g' @;)
cd ${ROOT_DIR}/protos; python -m grpc_tools.protoc -I. --python_out=../sdk/python/ --mypy_out=../sdk/python/ tensorflow_metadata/proto/v0/*.proto

install-python:
Expand Down
147 changes: 129 additions & 18 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union

import pandas as pd
import pyarrow
Expand All @@ -27,6 +28,11 @@
get_offline_store,
get_offline_store_for_retrieval,
)
from feast.online_response import OnlineResponse, _infer_online_entity_rows
from feast.protos.feast.serving.ServingService_pb2 import (
GetOnlineFeaturesRequestV2,
GetOnlineFeaturesResponse,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
Expand Down Expand Up @@ -245,29 +251,127 @@ def materialize(
self.config.project, feature_view, rows_to_write
)

def get_online_features(
self, feature_refs: List[str], entity_rows: List[Dict[str, Any]],
) -> OnlineResponse:
"""
Retrieves the latest online feature data.
Args:
feature_refs: List of feature references that will be returned for each entity.
Each feature reference should have the following format:
"feature_table:feature" where "feature_table" & "feature" refer to
the feature and feature table names respectively.
Only the feature name is required.
entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair.
Returns:
OnlineResponse containing the feature data in records.
Examples:
>>> from feast import FeatureStore
>>>
>>> store = FeatureStore(repo_path="...")
>>> feature_refs = ["sales:daily_transactions"]
>>> entity_rows = [{"customer_id": 0},{"customer_id": 1}]
>>>
>>> online_response = store.get_online_features(
>>> feature_refs, entity_rows, project="my_project")
>>> online_response_dict = online_response.to_dict()
>>> print(online_response_dict)
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
"""

response = self._get_online_features(
feature_refs=feature_refs,
entity_rows=_infer_online_entity_rows(entity_rows),
project=self.config.project,
)

return OnlineResponse(response)

def _get_requested_feature_views(
def _get_online_features(
self,
entity_rows: List[GetOnlineFeaturesRequestV2.EntityRow],
feature_refs: List[str],
project: str,
) -> GetOnlineFeaturesResponse:

provider = self._get_provider()

entity_keys = []
result_rows: List[GetOnlineFeaturesResponse.FieldValues] = []

for row in entity_rows:
entity_keys.append(_entity_row_to_key(row))
result_rows.append(_entity_row_to_field_values(row))

registry = self._get_registry()
all_feature_views = registry.list_feature_views(project=self.config.project)

grouped_refs = _group_refs(feature_refs, all_feature_views)
for table, requested_features in grouped_refs:
read_rows = provider.online_read(
project=project, table=table, entity_keys=entity_keys,
)
for row_idx, read_row in enumerate(read_rows):
row_ts, feature_data = read_row
result_row = result_rows[row_idx]

if feature_data is None:
for feature_name in requested_features:
feature_ref = f"{table.name}:{feature_name}"
result_row.statuses[
feature_ref
] = GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND
else:
for feature_name in feature_data:
feature_ref = f"{table.name}:{feature_name}"
if feature_name in requested_features:
result_row.fields[feature_ref].CopyFrom(
feature_data[feature_name]
)
result_row.statuses[
feature_ref
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT

return GetOnlineFeaturesResponse(field_values=result_rows)


def _entity_row_to_key(row: GetOnlineFeaturesRequestV2.EntityRow) -> EntityKeyProto:
names, values = zip(*row.fields.items())
return EntityKeyProto(entity_names=names, entity_values=values) # type: ignore


def _entity_row_to_field_values(
row: GetOnlineFeaturesRequestV2.EntityRow,
) -> GetOnlineFeaturesResponse.FieldValues:
result = GetOnlineFeaturesResponse.FieldValues()
for k in row.fields:
result.fields[k].CopyFrom(row.fields[k])
result.statuses[k] = GetOnlineFeaturesResponse.FieldStatus.PRESENT

return result


def _group_refs(
feature_refs: List[str], all_feature_views: List[FeatureView]
) -> List[FeatureView]:
"""Get list of feature views based on feature references"""
) -> List[Tuple[FeatureView, List[str]]]:
""" Get list of feature views and corresponding feature names based on feature references"""

# view name to view proto
view_index = {view.name: view for view in all_feature_views}

# view name to feature names
views_features = defaultdict(list)

feature_views_dict = {}
for ref in feature_refs:
ref_parts = ref.split(":")
found = False
for feature_view in all_feature_views:
if feature_view.name == ref_parts[0]:
found = True
feature_views_dict[feature_view.name] = feature_view
continue

if not found:
view_name, feat_name = ref.split(":")
if view_name not in view_index:
raise ValueError(f"Could not find feature view from reference {ref}")
feature_views_list = []
for view in feature_views_dict.values():
feature_views_list.append(view)
views_features[view_name].append(feat_name)

return feature_views_list
result = []
for view_name, feature_names in views_features.items():
result.append((view_index[view_name], feature_names))
return result


def _run_reverse_field_mapping(
Expand Down Expand Up @@ -367,3 +471,10 @@ def _convert_arrow_to_proto(
(entity_key, feature_dict, event_timestamp, created_timestamp)
)
return rows_to_write


def _get_requested_feature_views(
feature_refs: List[str], all_feature_views: List[FeatureView]
) -> List[FeatureView]:
"""Get list of feature views based on feature references"""
return list(view for view, _ in _group_refs(feature_refs, all_feature_views))
33 changes: 19 additions & 14 deletions sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,24 @@ def online_read(
self,
project: str,
table: Union[FeatureTable, FeatureView],
entity_key: EntityKeyProto,
) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]:
entity_keys: List[EntityKeyProto],
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
client = self._initialize_client()

document_id = compute_datastore_entity_id(entity_key)
key = client.key("Project", project, "Table", table.name, "Row", document_id)
value = client.get(key)
if value is not None:
res = {}
for feature_name, value_bin in value["values"].items():
val = ValueProto()
val.ParseFromString(value_bin)
res[feature_name] = val
return value["event_ts"], res
else:
return None, None
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
for entity_key in entity_keys:
document_id = compute_datastore_entity_id(entity_key)
key = client.key(
"Project", project, "Table", table.name, "Row", document_id
)
value = client.get(key)
if value is not None:
res = {}
for feature_name, value_bin in value["values"].items():
val = ValueProto()
val.ParseFromString(value_bin)
res[feature_name] = val
result.append((value["event_ts"], res))
else:
result.append((None, None))
return result
44 changes: 25 additions & 19 deletions sdk/python/feast/infra/local_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,26 +103,32 @@ def online_read(
self,
project: str,
table: Union[FeatureTable, FeatureView],
entity_key: EntityKeyProto,
) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]:
entity_key_bin = serialize_entity_key(entity_key)
entity_keys: List[EntityKeyProto],
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:

conn = self._get_conn()
cur = conn.cursor()
cur.execute(
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = ?",
(entity_key_bin,),
)

res = {}
res_ts = None
for feature_name, val_bin, ts in cur.fetchall():
val = ValueProto()
val.ParseFromString(val_bin)
res[feature_name] = val
res_ts = ts

if not res:
return None, None
else:
return res_ts, res
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []

for entity_key in entity_keys:
entity_key_bin = serialize_entity_key(entity_key)

cur.execute(
f"SELECT feature_name, value, event_ts FROM {_table_id(project, table)} WHERE entity_key = ?",
(entity_key_bin,),
)

res = {}
res_ts = None
for feature_name, val_bin, ts in cur.fetchall():
val = ValueProto()
val.ParseFromString(val_bin)
res[feature_name] = val
res_ts = ts

if not res:
result.append((None, None))
else:
result.append((res_ts, res))
return result
9 changes: 5 additions & 4 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,16 @@ def online_read(
self,
project: str,
table: Union[FeatureTable, FeatureView],
entity_key: EntityKeyProto,
) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]:
entity_keys: List[EntityKeyProto],
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Read feature values given an Entity Key. This is a low level interface, not
expected to be used by the users directly.
Returns:
A tuple of event_ts for the row, and the feature data as a dict from feature names
to values. Values are returned as Value proto message.
Data is returned as a list, one item per entity key. Each item in the list is a tuple
of event_ts for the row, and the feature data as a dict from feature names to values.
Values are returned as Value proto message.
"""
...

Expand Down
14 changes: 14 additions & 0 deletions sdk/python/tests/cli/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,17 @@
input=driver_locations_source,
tags={},
)

driver_locations_2 = FeatureView(
name="driver_locations_2",
entities=["driver"],
ttl=Duration(seconds=86400 * 1),
features=[
Feature(name="lat", dtype=ValueType.FLOAT),
Feature(name="lon", dtype=ValueType.STRING),
Feature(name="name", dtype=ValueType.STRING),
],
online=True,
input=driver_locations_source,
tags={},
)
6 changes: 4 additions & 2 deletions sdk/python/tests/cli/online_read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ def _driver_rw_test(event_ts, created_ts, write, expect_read):
],
)

_, val = provider.online_read(
project=project_name, table=table, entity_key=entity_key
read_rows = provider.online_read(
project=project_name, table=table, entity_keys=[entity_key]
)
assert len(read_rows) == 1
_, val = read_rows[0]
assert val["lon"].string_val == expect_lon
assert abs(val["lat"].double_val - expect_lat) < 1e-6

Expand Down
17 changes: 2 additions & 15 deletions sdk/python/tests/cli/test_cli_local.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,17 @@
import subprocess
import sys
import tempfile
from pathlib import Path
from textwrap import dedent
from typing import List

from feast import cli
from tests.cli.online_read_write_test import basic_rw_test


class CliRunner:
"""
NB. We can't use test runner helper from click here, since it doesn't start a new Python
interpreter. And we need a new interpreter for each test since we dynamically import
modules from the feature repo, and it is hard to clean up that state otherwise.
"""

def run(self, args: List[str], cwd: Path) -> subprocess.CompletedProcess:
return subprocess.run([sys.executable, cli.__file__] + args, cwd=cwd)
from tests.cli.utils import CliRunner


class TestCliLocal:
def test_basic(self) -> None:
runner = CliRunner()
with tempfile.TemporaryDirectory() as repo_dir_name, tempfile.TemporaryDirectory() as data_dir_name:

# Construct an example repo in a temporary dir
repo_path = Path(repo_dir_name)
data_path = Path(data_dir_name)

Expand Down
16 changes: 1 addition & 15 deletions sdk/python/tests/cli/test_datastore.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
import random
import string
import subprocess
import sys
import tempfile
from pathlib import Path
from textwrap import dedent
from typing import List

import pytest

from feast import cli
from tests.cli.online_read_write_test import basic_rw_test


class CliRunner:
"""
NB. We can't use test runner helper from click here, since it doesn't start a new Python
interpreter. And we need a new interpreter for each test since we dynamically import
modules from the feature repo, and it is hard to clean up that state otherwise.
"""

def run(self, args: List[str], cwd: Path) -> subprocess.CompletedProcess:
return subprocess.run([sys.executable, cli.__file__] + args, cwd=cwd)
from tests.cli.utils import CliRunner


@pytest.mark.integration
Expand Down

0 comments on commit ed1145a

Please sign in to comment.