Skip to content

Commit

Permalink
feat: Remote offline Store (#4262)
Browse files Browse the repository at this point in the history
* feat: Added offline store remote deployment functionly using arrow flight server and client

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Initial functional commit for remote get_historical_features

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* remote offline store example

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* removing unneeded test code and fixinf impotrts

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* call do_put only once, postpone the invocation of do_put and simplified _make_flight_info

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* added primitive parameters to the command descriptor

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* removed redundant param

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Initial skeleton of unit test for offline server

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* added unit test for offline store remote client

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* testing all offlinestore APIs

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* integrated comments

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Updated remote offline server readme with the capability to init with an environment variable

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* added    RemoteOfflineStoreDataSourceCreator,
use feature_view_names to transfer feature views and remove dummies

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* added missing CI requirement

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* fixed linter

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* fixed multiprocess CI requirement

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* feat: Added offline store remote deployment functionly using arrow flight server and client

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* fix test errors

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* managing feature view aliases and restored skipped tests

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* fixced linter issue

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* fixed broken test

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* added supported deployment modes using helm chart for  online (default), offline, ui and registry

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* updated the document for offline remote server

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* added the document for remote offline server

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* rebase and fix conflicts

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* feat: Added offline store remote deployment functionly using arrow flight server and client

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* added unit test for offline store remote client

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* added    RemoteOfflineStoreDataSourceCreator,
use feature_view_names to transfer feature views and remove dummies

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* feat: Added offline store remote deployment functionly using arrow flight server and client

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Added missing remote offline store apis implementation

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Fixed tests

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Implemented PR change proposal

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Implemented PR change proposal

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* updated example readme file

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Implemented PR change proposal

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* fixing the integration tests

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Fixed OfflineServer teardown

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>

* updated the document for remote offline feature server and client

Signed-off-by: Abdul Hameed <ahameed@redhat.com>

* Implemented PR change proposal

Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>

---------

Signed-off-by: Abdul Hameed <ahameed@redhat.com>
Signed-off-by: Theodor Mihalache <tmihalac@redhat.com>
Co-authored-by: Daniele Martinoli <86618610+dmartinol@users.noreply.github.com>
Co-authored-by: Theodor Mihalache <tmihalac@redhat.com>
Co-authored-by: Theodor Mihalache <84387487+tmihalac@users.noreply.github.com>
  • Loading branch information
4 people committed Jun 13, 2024
1 parent b755fc4 commit 28a3d24
Show file tree
Hide file tree
Showing 36 changed files with 1,636 additions and 40 deletions.
3 changes: 3 additions & 0 deletions docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
* [PostgreSQL (contrib)](reference/offline-stores/postgres.md)
* [Trino (contrib)](reference/offline-stores/trino.md)
* [Azure Synapse + Azure SQL (contrib)](reference/offline-stores/mssql.md)
* [Remote Offline](reference/offline-stores/remote-offline-store.md)
* [Online stores](reference/online-stores/README.md)
* [Overview](reference/online-stores/overview.md)
* [SQLite](reference/online-stores/sqlite.md)
Expand Down Expand Up @@ -117,6 +118,8 @@
* [Python feature server](reference/feature-servers/python-feature-server.md)
* [\[Alpha\] Go feature server](reference/feature-servers/go-feature-server.md)
* [\[Alpha\] AWS Lambda feature server](reference/feature-servers/alpha-aws-lambda-feature-server.md)
* [Offline Feature Server](reference/feature-servers/offline-feature-server)

* [\[Beta\] Web UI](reference/alpha-web-ui.md)
* [\[Alpha\] On demand feature view](reference/alpha-on-demand-feature-view.md)
* [\[Alpha\] Data quality monitoring](reference/dqm.md)
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/feature-servers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ Feast users can choose to retrieve features from a feature server, as opposed to

{% content-ref url="alpha-aws-lambda-feature-server.md" %}
[alpha-aws-lambda-feature-server.md](alpha-aws-lambda-feature-server.md)
{% endcontent-ref %}

{% content-ref url="offline-feature-server.md" %}
[offline-feature-server.md](offline-feature-server.md)
{% endcontent-ref %}
35 changes: 35 additions & 0 deletions docs/reference/feature-servers/offline-feature-server.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Offline feature server

## Description

The Offline feature server is an Apache Arrow Flight Server that uses the gRPC communication protocol to exchange data.
This server wraps calls to existing offline store implementations and exposes interfaces as Arrow Flight endpoints.

## How to configure the server

## CLI

There is a CLI command that starts the Offline feature server: `feast serve_offline`. By default, remote offline server uses port 8815, the port can be overridden with a `--port` flag.

## Deploying as a service on Kubernetes

The Offline feature server can be deployed using helm chart see this [helm chart](https://github.com/feast-dev/feast/blob/master/infra/charts/feast-feature-server).

User need to set `feast_mode=offline`, when installing Offline feature server as shown in the helm command below:

```
helm install feast-offline-server feast-charts/feast-feature-server --set feast_mode=offline --set feature_store_yaml_base64=$(base64 > feature_store.yaml)
```

## Server Example

The complete example can be find under [remote-offline-store-example](../../../examples/remote-offline-store)

## How to configure the client

Please see the detail how to configure offline store client [remote-offline-store.md](../offline-stores/remote-offline-store.md)

## Functionality Matrix

The set of functionalities supported by remote offline stores is the same as those supported by offline stores with the SDK, which are described in detail [here](../offline-stores/overview.md#functionality).

28 changes: 28 additions & 0 deletions docs/reference/offline-stores/remote-offline-store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Remote Offline Store

## Description

The Remote Offline Store is an Arrow Flight client for the offline store that implements the `RemoteOfflineStore` class using the existing `OfflineStore` interface.
The client implements various methods, including `get_historical_features`, `pull_latest_from_table_or_query`, `write_logged_features`, and `offline_write_batch`.

## How to configure the client

User needs to create client side `feature_store.yaml` file and set the `offline_store` type `remote` and provide the server connection configuration
including adding the host and specifying the port (default is 8815) required by the Arrow Flight client to connect with the Arrow Flight server.

{% code title="feature_store.yaml" %}
```yaml
offline_store:
type: remote
host: localhost
port: 8815
```
{% endcode %}

## Client Example

The complete example can be find under [remote-offline-store-example](../../../examples/remote-offline-store)

## How to configure the server

Please see the detail how to configure offline feature server [offline-feature-server.md](../feature-servers/offline-feature-server.md)
98 changes: 98 additions & 0 deletions examples/remote-offline-store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Feast Remote Offline Store Server

This example demonstrates the steps using an [Arrow Flight](https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/) server/client as the remote Feast offline store.

## Launch the offline server locally

1. **Create Feast Project**: Using the `feast init` command for example the [offline_server](./offline_server) folder contains a sample Feast repository.

2. **Start Remote Offline Server**: Use the `feast server_offline` command to start remote offline requests. This command will:
- Spin up an `Arrow Flight` server at the default port 8815.

3. **Initialize Offline Server**: The offline server can be initialized by providing the `feature_store.yml` file via an environment variable named `FEATURE_STORE_YAML_BASE64`. A temporary directory will be created with the provided YAML file named `feature_store.yml`.

Example

```console
cd offline_server
feast -c feature_repo apply
```

```console
feast -c feature_repo serve_offline
```

Sample output:
```console
Serving on grpc+tcp://127.0.0.1:8815
```

## Launch a remote offline client

The [offline_client](./offline_client) folder includes a test python function that uses an offline store of type `remote`, leveraging the remote server as the
actual data provider.


The test class is located under [offline_client](./offline_client/) and uses a remote configuration of the offline store to delegate the actual
implementation to the offline store server:
```yaml
offline_store:
type: remote
host: localhost
port: 8815
```

The test code in [test.py](./offline_client/test.py) initializes the store from the local configuration and then fetches the historical features
from the store like any other Feast client, but the actual implementation is delegated to the offline server
```py
store = FeatureStore(repo_path=".")
training_df = store.get_historical_features(entity_df, features).to_df()
```


Run client
`cd offline_client;
python test.py`

Sample output:

```console
config.offline_store is <class 'feast.infra.offline_stores.remote.RemoteOfflineStoreConfig'>
----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 10 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 driver_id 3 non-null int64
1 event_timestamp 3 non-null datetime64[ns, UTC]
2 label_driver_reported_satisfaction 3 non-null int64
3 val_to_add 3 non-null int64
4 val_to_add_2 3 non-null int64
5 conv_rate 3 non-null float32
6 acc_rate 3 non-null float32
7 avg_daily_trips 3 non-null int32
8 conv_rate_plus_val1 3 non-null float64
9 conv_rate_plus_val2 3 non-null float64
dtypes: datetime64[ns, UTC](1), float32(2), float64(2), int32(1), int64(4)
memory usage: 332.0 bytes
None

----- Features -----

driver_id event_timestamp label_driver_reported_satisfaction ... avg_daily_trips conv_rate_plus_val1 conv_rate_plus_val2
0 1001 2021-04-12 10:59:42+00:00 1 ... 590 1.022378 10.022378
1 1002 2021-04-12 08:12:10+00:00 5 ... 974 2.762213 20.762213
2 1003 2021-04-12 16:40:26+00:00 3 ... 127 3.419828 30.419828

[3 rows x 10 columns]
------training_df----
driver_id event_timestamp label_driver_reported_satisfaction ... avg_daily_trips conv_rate_plus_val1 conv_rate_plus_val2
0 1001 2021-04-12 10:59:42+00:00 1 ... 590 1.022378 10.022378
1 1002 2021-04-12 08:12:10+00:00 5 ... 974 2.762213 20.762213
2 1003 2021-04-12 16:40:26+00:00 3 ... 127 3.419828 30.419828

[3 rows x 10 columns]
```

Empty file.
10 changes: 10 additions & 0 deletions examples/remote-offline-store/offline_client/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
project: offline_server
# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry)
registry: ../offline_server/feature_repo/data/registry.db
# The provider primarily specifies default offline / online stores & storing the registry in a given cloud
provider: local
offline_store:
type: remote
host: localhost
port: 8815
entity_key_serialization_version: 2
40 changes: 40 additions & 0 deletions examples/remote-offline-store/offline_client/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from datetime import datetime
from feast import FeatureStore
import pandas as pd

entity_df = pd.DataFrame.from_dict(
{
"driver_id": [1001, 1002, 1003],
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
],
"label_driver_reported_satisfaction": [1, 5, 3],
"val_to_add": [1, 2, 3],
"val_to_add_2": [10, 20, 30],
}
)

features = [
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
]

store = FeatureStore(repo_path=".")

training_df = store.get_historical_features(entity_df, features).to_df()

print("----- Feature schema -----\n")
print(training_df.info())

print()
print("----- Features -----\n")
print(training_df.head())

print("------training_df----")

print(training_df)
Empty file.
Empty file.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# This is an example feature definition file

from datetime import timedelta

import pandas as pd
import os

from feast import (
Entity,
FeatureService,
FeatureView,
Field,
FileSource,
PushSource,
RequestSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64

# Define an entity for the driver. You can think of an entity as a primary key used to
# fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path=f"{os.path.dirname(os.path.abspath(__file__))}/data/driver_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_stats_fv = FeatureView(
# The unique name of this feature view. Two feature views in a single
# project cannot have the same name
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
# The list of features defined below act as a schema to both define features
# for both materialization of features into a store, and are used as references
# during retrieval for building a training dataset or serving features
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64, description="Average daily trips"),
],
online=True,
source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
)

# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fv, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


# This groups features into a model version
driver_activity_v1 = FeatureService(
name="driver_activity_v1",
features=[
driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view
transformed_conv_rate, # Selects all features from the feature view
],
)
driver_activity_v2 = FeatureService(
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
)

# Defines a way to push data (to be available offline, online or both) into Feast.
driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_stats_source,
)

# Defines a slightly modified version of the feature view from above, where the source
# has been changed to the push source. This allows fresh features to be directly pushed
# to the online store for this feature view.
driver_stats_fresh_fv = FeatureView(
name="driver_hourly_stats_fresh",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_push_source, # Changed from above
tags={"team": "driver_performance"},
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


driver_activity_v3 = FeatureService(
name="driver_activity_v3",
features=[driver_stats_fresh_fv, transformed_conv_rate_fresh],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
project: offline_server
# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry)
registry: data/registry.db
# The provider primarily specifies default offline / online stores & storing the registry in a given cloud
provider: local
online_store:
type: sqlite
path: data/online_store.db
entity_key_serialization_version: 2
Loading

0 comments on commit 28a3d24

Please sign in to comment.