Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Feature/adding remote online store #4226

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
* [Datastore](reference/online-stores/datastore.md)
* [DynamoDB](reference/online-stores/dynamodb.md)
* [Bigtable](reference/online-stores/bigtable.md)
* [Remote](reference/online-stores/remote.md)
* [PostgreSQL (contrib)](reference/online-stores/postgres.md)
* [Cassandra + Astra DB (contrib)](reference/online-stores/cassandra.md)
* [MySQL (contrib)](reference/online-stores/mysql.md)
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/online-stores/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli
{% content-ref url="scylladb.md" %}
[scylladb.md](scylladb.md)
{% endcontent-ref %}

{% content-ref url="remote.md" %}
[remote.md](remote.md)
{% endcontent-ref %}
21 changes: 21 additions & 0 deletions docs/reference/online-stores/remote.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Remote online store

## Description

This remote online store will let you interact with remote feature server. At this moment this only supports the read operation. You can use this online store and able retrieve online features `store.get_online_features` from remote feature server.

## Examples

The registry is pointing to registry of remote feature store. If it is not accessible then should be configured to use remote registry.

{% code title="feature_store.yaml" %}
```yaml
project: my-local-project
registry: /remote/data/registry.db
provider: local
online_store:
path: http://localhost:6566
lokeshrangineni marked this conversation as resolved.
Show resolved Hide resolved
type: remote
entity_key_serialization_version: 2
```
{% endcode %}
167 changes: 167 additions & 0 deletions sdk/python/feast/infra/online_stores/remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright 2021 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

import requests
from pydantic import StrictStr

from feast import Entity, FeatureView, RepoConfig
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from feast.type_map import python_values_to_proto_values
from feast.value_type import ValueType

logger = logging.getLogger(__name__)


class RemoteOnlineStoreConfig(FeastConfigBaseModel):
"""Remote Online store config for remote online store"""

type: Literal["remote"] = "remote"
"""Online store type selector"""

path: StrictStr = "http://localhost:6566"
""" str: Path to metadata store.
If type is 'remote', then this is a URL for registry server """


class RemoteOnlineStore(OnlineStore):
"""
remote online store implementation wrapper to communicate with feast online server.
"""

def online_write_batch(
self,
config: RepoConfig,
table: FeatureView,
data: List[
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
],
progress: Optional[Callable[[int], Any]],
) -> None:
raise NotImplementedError

def online_read(
lokeshrangineni marked this conversation as resolved.
Show resolved Hide resolved
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
assert isinstance(config.online_store, RemoteOnlineStoreConfig)
config.online_store.__class__ = RemoteOnlineStoreConfig

req_body = self._construct_online_read_api_json_request(
entity_keys, table, requested_features
)
response = requests.post(
f"{config.online_store.path}/get-online-features", data=req_body
)
if response.status_code == 200:
logger.debug("Able to retrieve the online features from feature server.")
response_json = json.loads(response.text)
event_ts = self._get_event_ts(response_json)
# Iterating over results and converting the API results in column format to row format.
result_tuples: List[
Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]
] = []
for feature_value_index in range(len(entity_keys)):
feature_values_dict: Dict[str, ValueProto] = dict()
for index, feature_name in enumerate(
response_json["metadata"]["feature_names"]
):
if (
requested_features is not None
lokeshrangineni marked this conversation as resolved.
Show resolved Hide resolved
and feature_name in requested_features
):
if (
response_json["results"][index]["statuses"][
feature_value_index
]
== "PRESENT"
):
message = python_values_to_proto_values(
[
response_json["results"][index]["values"][
feature_value_index
]
],
ValueType.UNKNOWN,
)
feature_values_dict[feature_name] = message[0]
else:
feature_values_dict[feature_name] = ValueProto()
result_tuples.append((event_ts, feature_values_dict))
return result_tuples
else:
error_msg = f"Unable to retrieve the online store data using feature server API. Error_code={response.status_code}, error_message={response.reason}"
logger.error(error_msg)
raise RuntimeError(error_msg)

def _construct_online_read_api_json_request(
self,
entity_keys: List[EntityKeyProto],
table: FeatureView,
requested_features: Optional[List[str]] = None,
) -> str:
api_requested_features = []
if requested_features is not None:
for requested_feature in requested_features:
api_requested_features.append(f"{table.name}:{requested_feature}")

entity_values = []
entity_key = ""
for row in entity_keys:
entity_key = row.join_keys[0]
entity_values.append(
getattr(row.entity_values[0], row.entity_values[0].WhichOneof("val"))
)

req_body = json.dumps(
{
"features": api_requested_features,
"entities": {entity_key: entity_values},
}
)
return req_body

def _get_event_ts(self, response_json) -> datetime:
event_ts = ""
if len(response_json["results"]) > 1:
event_ts = response_json["results"][1]["event_timestamps"][0]
return datetime.fromisoformat(event_ts.replace("Z", "+00:00"))

def update(
self,
config: RepoConfig,
tables_to_delete: Sequence[FeatureView],
tables_to_keep: Sequence[FeatureView],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
partial: bool,
):
pass

def teardown(
self,
config: RepoConfig,
tables: Sequence[FeatureView],
entities: Sequence[Entity],
):
pass
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore",
"ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore",
"elasticsearch": "feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore",
"remote": "feast.infra.online_stores.remote.RemoteOnlineStore",
}

OFFLINE_STORE_CLASS_FOR_TYPE = {
Expand Down
10 changes: 5 additions & 5 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
create_basic_driver_dataset,
create_document_dataset,
)
from tests.integration.feature_repos.integration_test_repo_config import ( # noqa: E402
IntegrationTestRepoConfig,
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig, # noqa: E402
)
from tests.integration.feature_repos.repo_configuration import ( # noqa: E402
AVAILABLE_OFFLINE_STORES,
Expand All @@ -45,8 +45,8 @@
construct_universal_feature_views,
construct_universal_test_data,
)
from tests.integration.feature_repos.universal.data_sources.file import ( # noqa: E402
FileDataSourceCreator,
from tests.integration.feature_repos.universal.data_sources.file import (
FileDataSourceCreator, # noqa: E402
)
from tests.integration.feature_repos.universal.entities import ( # noqa: E402
customer,
Expand Down Expand Up @@ -173,7 +173,7 @@ def simple_dataset_2() -> pd.DataFrame:

def start_test_local_server(repo_path: str, port: int):
fs = FeatureStore(repo_path)
fs.serve("localhost", port, no_access_log=True)
fs.serve(host="localhost", port=port)


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,7 @@

AVAILABLE_ONLINE_STORES: Dict[
str, Tuple[Union[str, Dict[Any, Any]], Optional[Type[OnlineStoreCreator]]]
] = {
"sqlite": ({"type": "sqlite"}, None),
}
] = {"sqlite": ({"type": "sqlite"}, None)}

# Only configure Cloud DWH if running full integration tests
if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True":
Expand All @@ -155,7 +153,6 @@
AVAILABLE_ONLINE_STORES["datastore"] = ("datastore", None)
AVAILABLE_ONLINE_STORES["snowflake"] = (SNOWFLAKE_CONFIG, None)
AVAILABLE_ONLINE_STORES["bigtable"] = (BIGTABLE_CONFIG, None)

# Uncomment to test using private Rockset account. Currently not enabled as
# there is no dedicated Rockset instance for CI testing and there is no
# containerized version of Rockset.
Expand Down Expand Up @@ -489,7 +486,6 @@ def construct_test_environment(
"arn:aws:iam::402087665549:role/lambda_execution_role",
),
)

else:
feature_server = LocalFeatureServerConfig(
feature_logging=FeatureLoggingConfig(enabled=True)
Expand Down
Loading
Loading