Skip to content

Commit

Permalink
feat: Add tag kwarg to set Snowflake online store table path (#3176)
Browse files Browse the repository at this point in the history
Signed-off-by: Miles Adkins <miles.adkins@snowflake.com>

Signed-off-by: Miles Adkins <miles.adkins@snowflake.com>
  • Loading branch information
sfc-gh-madkins committed Sep 12, 2022
1 parent dfdd0ca commit 39aeea3
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 17 deletions.
16 changes: 14 additions & 2 deletions docs/reference/online-stores/snowflake.md
Expand Up @@ -17,7 +17,6 @@ The data model for using a Snowflake Transient Table as an online store follows
(This model may be subject to change when Snowflake Hybrid Tables are released)

## Example

{% code title="feature_store.yaml" %}
```yaml
project: my_feature_repo
Expand All @@ -34,14 +33,27 @@ online_store:
```
{% endcode %}

## Tags KWARGs Actions:

"ONLINE_PATH": Adding the "ONLINE_PATH" key to a FeatureView tags parameter allows you to choose the online table path for the online serving table (ex. "{database}"."{schema}").

{% code title="example_config.py" %}
```python
driver_stats_fv = FeatureView(
...
tags={"snowflake-online-store/online_path": '"FEAST"."ONLINE"'},
)
```
{% endcode %}

The full set of configuration options is available in [SnowflakeOnlineStoreConfig](https://rtd.feast.dev/en/latest/#feast.infra.online_stores.snowflake.SnowflakeOnlineStoreConfig).

## Functionality Matrix

The set of functionality supported by online stores is described in detail [here](overview.md#functionality).
Below is a matrix indicating which functionality is supported by the Snowflake online store.

| | Snowflake |
| | Snowflake |
| :-------------------------------------------------------- | :-- |
| write feature values to the online store | yes |
| read feature values from the online store | yes |
Expand Down
10 changes: 7 additions & 3 deletions sdk/python/feast/infra/materialization/snowflake_engine.py
Expand Up @@ -28,6 +28,7 @@
assert_snowflake_feature_names,
execute_snowflake_statement,
get_snowflake_conn,
get_snowflake_online_store_path,
package_snowpark_zip,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -370,8 +371,6 @@ def materialize_to_snowflake_online_store(
) -> None:
assert_snowflake_feature_names(feature_view)

online_table = f"""{repo_config .online_store.database}"."{repo_config.online_store.schema_}"."[online-transient] {project}_{feature_view.name}"""

feature_names_str = '", "'.join(
[feature.name for feature in feature_view.features]
)
Expand All @@ -381,8 +380,13 @@ def materialize_to_snowflake_online_store(
else:
fv_created_str = None

online_path = get_snowflake_online_store_path(repo_config, feature_view)
online_table = (
f'{online_path}."[online-transient] {project}_{feature_view.name}"'
)

query = f"""
MERGE INTO "{online_table}" online_table
MERGE INTO {online_table} online_table
USING (
SELECT
"entity_key" || TO_BINARY("feature_name", 'UTF-8') AS "entity_feature_key",
Expand Down
24 changes: 12 additions & 12 deletions sdk/python/feast/infra/online_stores/snowflake.py
Expand Up @@ -15,6 +15,7 @@
from feast.infra.utils.snowflake.snowflake_utils import (
execute_snowflake_statement,
get_snowflake_conn,
get_snowflake_online_store_path,
write_pandas_binary,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -112,9 +113,7 @@ def online_write_batch(
agg_df = pd.concat(dfs)

# This combines both the data upload plus the overwrite in the same transaction
table_path = (
f'"{config.online_store.database}"."{config.online_store.schema_}"'
)
online_path = get_snowflake_online_store_path(config, table)
with get_snowflake_conn(config.online_store, autocommit=False) as conn:
write_pandas_binary(
conn,
Expand All @@ -125,7 +124,7 @@ def online_write_batch(
) # special function for writing binary to snowflake

query = f"""
INSERT OVERWRITE INTO {table_path}."[online-transient] {config.project}_{table.name}"
INSERT OVERWRITE INTO {online_path}."[online-transient] {config.project}_{table.name}"
SELECT
"entity_feature_key",
"entity_key",
Expand All @@ -138,7 +137,7 @@ def online_write_batch(
*,
ROW_NUMBER() OVER(PARTITION BY "entity_key","feature_name" ORDER BY "event_ts" DESC, "created_ts" DESC) AS "_feast_row"
FROM
{table_path}."[online-transient] {config.project}_{table.name}")
{online_path}."[online-transient] {config.project}_{table.name}")
WHERE
"_feast_row" = 1;
"""
Expand Down Expand Up @@ -178,13 +177,13 @@ def online_read(
]
)

table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"'
online_path = get_snowflake_online_store_path(config, table)
with get_snowflake_conn(config.online_store) as conn:
query = f"""
SELECT
"entity_key", "feature_name", "value", "event_ts"
FROM
{table_path}."[online-transient] {config.project}_{table.name}"
{online_path}."[online-transient] {config.project}_{table.name}"
WHERE
"entity_feature_key" IN ({entity_fetch_str})
"""
Expand Down Expand Up @@ -221,11 +220,11 @@ def update(
):
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)

table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"'
with get_snowflake_conn(config.online_store) as conn:
for table in tables_to_keep:
online_path = get_snowflake_online_store_path(config, table)
query = f"""
CREATE TRANSIENT TABLE IF NOT EXISTS {table_path}."[online-transient] {config.project}_{table.name}" (
CREATE TRANSIENT TABLE IF NOT EXISTS {online_path}."[online-transient] {config.project}_{table.name}" (
"entity_feature_key" BINARY,
"entity_key" BINARY,
"feature_name" VARCHAR,
Expand All @@ -237,7 +236,8 @@ def update(
execute_snowflake_statement(conn, query)

for table in tables_to_delete:
query = f'DROP TABLE IF EXISTS {table_path}."[online-transient] {config.project}_{table.name}"'
online_path = get_snowflake_online_store_path(config, table)
query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"'
execute_snowflake_statement(conn, query)

def teardown(
Expand All @@ -248,8 +248,8 @@ def teardown(
):
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)

table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"'
with get_snowflake_conn(config.online_store) as conn:
for table in tables:
query = f'DROP TABLE IF EXISTS {table_path}."[online-transient] {config.project}_{table.name}"'
online_path = get_snowflake_online_store_path(config, table)
query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"'
execute_snowflake_statement(conn, query)
16 changes: 16 additions & 0 deletions sdk/python/feast/infra/utils/snowflake/snowflake_utils.py
Expand Up @@ -22,6 +22,7 @@
import feast
from feast.errors import SnowflakeIncompleteConfig, SnowflakeQueryUnknownError
from feast.feature_view import FeatureView
from feast.repo_config import RepoConfig

try:
import snowflake.connector
Expand Down Expand Up @@ -104,6 +105,21 @@ def get_snowflake_conn(config, autocommit=True) -> SnowflakeConnection:
raise SnowflakeIncompleteConfig(e)


def get_snowflake_online_store_path(
config: RepoConfig,
feature_view: FeatureView,
) -> str:
path_tag = "snowflake-online-store/online_path"
if path_tag in feature_view.tags:
online_path = feature_view.tags[path_tag]
else:
online_path = (
f'"{config.online_store.database}"."{config.online_store.schema_}"'
)

return online_path


def package_snowpark_zip(project_name) -> Tuple[str, str]:
path = os.path.dirname(feast.__file__)
copy_path = path + f"/snowflake_feast_{project_name}"
Expand Down

0 comments on commit 39aeea3

Please sign in to comment.