From 595698105637aaeb952fddc2957c83e501964d2a Mon Sep 17 00:00:00 2001 From: Youngkyu OH Date: Mon, 7 Nov 2022 06:42:22 +0900 Subject: [PATCH] fix: Updated AWS Athena template (#3322) * Update the template on how to use AWS Athena Signed-off-by: Youngkyu OH * Remove unnecessary imports Signed-off-by: Youngkyu OH * lint & format Signed-off-by: Youngkyu OH Signed-off-by: Youngkyu OH --- .../athena/feature_repo/feature_store.yaml | 7 ++-- .../athena/feature_repo/test_workflow.py | 35 ++++++++++++------- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml b/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml index 13e7898e86..bd12e906d1 100644 --- a/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml +++ b/sdk/python/feast/templates/athena/feature_repo/feature_store.yaml @@ -6,8 +6,9 @@ online_store: path: online_store.db offline_store: type: athena - region: ap-northeast-2 - database: sampledb + region: {AWS region} + database: {The database in the data catalog to be used by Athena} data_source: AwsDataCatalog - s3_staging_location: s3://sagemaker-yelo-test + s3_staging_location: s3://{S3 bucket to be used by Feast} + workgroup: {Workgroup for Athena} entity_key_serialization_version: 2 \ No newline at end of file diff --git a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py index 7d7daff865..bf69a4bff0 100644 --- a/sdk/python/feast/templates/athena/feature_repo/test_workflow.py +++ b/sdk/python/feast/templates/athena/feature_repo/test_workflow.py @@ -3,16 +3,28 @@ import pandas as pd -from feast import Entity, Feature, FeatureStore, FeatureView, ValueType +from feast import Entity, FeatureStore, FeatureView, Field from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import ( AthenaSource, ) +from feast.types import Float64, Int64 def test_end_to_end(): try: - fs = FeatureStore(".") + + # Before running this test method + # 1. Upload the driver_stats.parquet file to your S3 bucket. + # (https://github.com/feast-dev/feast-custom-offline-store-demo/tree/main/feature_repo/data) + # 2. Using AWS Glue Crawler, create a table in the data catalog. The generated table can be queried through Athena. + # 3. Specify the S3 bucket name, data source(AwsDataCatalog), database name, Athena's workgroup, etc. in feature_store.yaml + + fs = FeatureStore("./feature_repo") + + # Partition pruning has a significant impact on Athena's query performance and cost. + # If offline feature dataset is large, it is highly recommended to create partitions using date columns such as ('created','event_timestamp') + # The date_partition_column must be in form of YYYY-MM-DD(string) as in the beginning of the date column. driver_hourly_stats = AthenaSource( timestamp_field="event_timestamp", @@ -21,31 +33,29 @@ def test_end_to_end(): database="sampledb", data_source="AwsDataCatalog", created_timestamp_column="created", - # date_partition_column="std_date" + # date_partition_column="std_date" #YYYY-MM-DD ) driver = Entity( name="driver_id", - value_type=ValueType.INT64, description="driver id", ) driver_hourly_stats_view = FeatureView( name="driver_hourly_stats", - entities=["driver_id"], - ttl=timedelta(days=365), - features=[ - Feature(name="conv_rate", dtype=ValueType.FLOAT), - Feature(name="acc_rate", dtype=ValueType.FLOAT), - Feature(name="avg_daily_trips", dtype=ValueType.INT64), + entities=[driver], + ttl=timedelta(days=500), + schema=[ + Field(name="conv_rate", dtype=Float64), + Field(name="acc_rate", dtype=Float64), + Field(name="avg_daily_trips", dtype=Int64), ], online=True, - batch_source=driver_hourly_stats, + source=driver_hourly_stats, ) # apply repository fs.apply([driver_hourly_stats, driver, driver_hourly_stats_view]) - print(fs.list_data_sources()) print(fs.list_feature_views()) @@ -54,7 +64,6 @@ def test_end_to_end(): ) # Read features from offline store - feature_vector = ( fs.get_historical_features( features=["driver_hourly_stats:conv_rate"], entity_df=entity_df