Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Feast AWS Athena offline store (again) #3044

Merged
merged 11 commits into from Aug 10, 2022
27 changes: 27 additions & 0 deletions Makefile
Expand Up @@ -139,6 +139,33 @@ test-python-universal-trino:
not test_universal_types" \
sdk/python/tests

#To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. https://docs.aws.amazon.com/athena/latest/ug/getting-started.html
#Modify environment variables ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_S3_BUCKET_NAME if you want to change the data source, database, and bucket name of S3 to use.
#If tests fail with the pytest -n 8 option, change the number to 1.
test-python-universal-athena:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.athena_repo_configuration \
PYTEST_PLUGINS=feast.infra.offline_stores.contrib.athena_offline_store.tests \
FEAST_USAGE=False IS_TEST=True \
ATHENA_DATA_SOURCE=AwsDataCatalog \
ATHENA_DATABASE=default \
ATHENA_S3_BUCKET_NAME=feast-integration-tests \
python -m pytest -n 8 --integration \
-k "not test_go_feature_server and \
not test_logged_features_validation and \
not test_lambda and \
not test_feature_logging and \
not test_offline_write and \
not test_push_offline and \
not test_historical_retrieval_with_validation and \
not test_historical_features_persisting and \
not test_historical_retrieval_fails_on_validation and \
not gcs_registry and \
not s3_registry" \
sdk/python/tests



test-python-universal-postgres:
PYTHONPATH='.' \
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.postgres_repo_configuration \
Expand Down
18 changes: 18 additions & 0 deletions protos/feast/core/DataSource.proto
Expand Up @@ -49,6 +49,7 @@ message DataSource {
PUSH_SOURCE = 9;
BATCH_TRINO = 10;
BATCH_SPARK = 11;
BATCH_ATHENA = 12;
}

// Unique name of data source within the project
Expand Down Expand Up @@ -171,6 +172,22 @@ message DataSource {
string database = 4;
}

// Defines options for DataSource that sources features from a Athena Query
message AthenaOptions {
// Athena table name
string table = 1;

// SQL query that returns a table containing feature data. Must contain an event_timestamp column, and respective
// entity columns
string query = 2;

// Athena database name
string database = 3;

// Athena schema name
string data_source = 4;
}

// Defines options for DataSource that sources features from a Snowflake Query
message SnowflakeOptions {
// Snowflake table name
Expand Down Expand Up @@ -242,5 +259,6 @@ message DataSource {
PushOptions push_options = 22;
SparkOptions spark_options = 27;
TrinoOptions trino_options = 30;
AthenaOptions athena_options = 35;
}
}
6 changes: 6 additions & 0 deletions protos/feast/core/FeatureService.proto
Expand Up @@ -60,6 +60,7 @@ message LoggingConfig {
RedshiftDestination redshift_destination = 5;
SnowflakeDestination snowflake_destination = 6;
CustomDestination custom_destination = 7;
AthenaDestination athena_destination = 8;
}

message FileDestination {
Expand All @@ -80,6 +81,11 @@ message LoggingConfig {
string table_name = 1;
}

message AthenaDestination {
// Destination table name. data_source and database will be taken from an offline store config
string table_name = 1;
}

message SnowflakeDestination {
// Destination table name. Schema and database will be taken from an offline store config
string table_name = 1;
Expand Down
1 change: 1 addition & 0 deletions protos/feast/core/SavedDataset.proto
Expand Up @@ -59,6 +59,7 @@ message SavedDatasetStorage {
DataSource.TrinoOptions trino_storage = 8;
DataSource.SparkOptions spark_storage = 9;
DataSource.CustomSourceOptions custom_storage = 10;
DataSource.AthenaOptions athena_storage = 11;
}
}

Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/__init__.py
Expand Up @@ -5,6 +5,9 @@
from importlib_metadata import PackageNotFoundError, version as _version # type: ignore

from feast.infra.offline_stores.bigquery_source import BigQuerySource
from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import (
AthenaSource,
)
from feast.infra.offline_stores.file_source import FileSource
from feast.infra.offline_stores.redshift_source import RedshiftSource
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
Expand Down Expand Up @@ -50,4 +53,5 @@
"SnowflakeSource",
"PushSource",
"RequestSource",
"AthenaSource",
]
1 change: 1 addition & 0 deletions sdk/python/feast/batch_feature_view.py
Expand Up @@ -14,6 +14,7 @@
"SnowflakeSource",
"SparkSource",
"TrinoSource",
"AthenaSource",
}


Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/data_source.py
Expand Up @@ -156,6 +156,7 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake_source.SnowflakeSource",
DataSourceProto.SourceType.BATCH_TRINO: "feast.infra.offline_stores.contrib.trino_offline_store.trino_source.TrinoSource",
DataSourceProto.SourceType.BATCH_SPARK: "feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource",
DataSourceProto.SourceType.BATCH_ATHENA: "feast.infra.offline_stores.contrib.athena_offline_store.athena_source.AthenaSource",
DataSourceProto.SourceType.STREAM_KAFKA: "feast.data_source.KafkaSource",
DataSourceProto.SourceType.STREAM_KINESIS: "feast.data_source.KinesisSource",
DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestSource",
Expand Down Expand Up @@ -183,6 +184,7 @@ class DataSource(ABC):
maintainer.
timestamp_field (optional): Event timestamp field used for point in time
joins of feature values.
date_partition_column (optional): Timestamp column used for partitioning. Not supported by all offline stores.
"""

name: str
Expand All @@ -192,6 +194,7 @@ class DataSource(ABC):
description: str
tags: Dict[str, str]
owner: str
date_partition_column: str

def __init__(
self,
Expand All @@ -203,6 +206,7 @@ def __init__(
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
date_partition_column: Optional[str] = None,
):
"""
Creates a DataSource object.
Expand All @@ -220,6 +224,7 @@ def __init__(
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the data source, typically the email of the primary
maintainer.
date_partition_column (optional): Timestamp column used for partitioning. Not supported by all stores
"""
self.name = name
self.timestamp_field = timestamp_field or ""
Expand All @@ -237,6 +242,9 @@ def __init__(
self.description = description or ""
self.tags = tags or {}
self.owner = owner or ""
self.date_partition_column = (
date_partition_column if date_partition_column else ""
)

def __hash__(self):
return hash((self.name, self.timestamp_field))
Expand All @@ -256,6 +264,7 @@ def __eq__(self, other):
or self.timestamp_field != other.timestamp_field
or self.created_timestamp_column != other.created_timestamp_column
or self.field_mapping != other.field_mapping
or self.date_partition_column != other.date_partition_column
or self.description != other.description
or self.tags != other.tags
or self.owner != other.owner
Expand Down