diff --git a/docs/reference/batch-materialization/spark.md b/docs/reference/batch-materialization/spark.md index ce58c0ad0fd..27a1388c48e 100644 --- a/docs/reference/batch-materialization/spark.md +++ b/docs/reference/batch-materialization/spark.md @@ -19,3 +19,37 @@ batch_engine: partitions: [optional num partitions to use to write to online store] ``` {% endcode %} + +## Example in Python + +{% code title="feature_store.py" %} +```python +from feast import FeatureStore, RepoConfig +from feast.repo_config import RegistryConfig +from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig +from feast.infra.offline_stores.contrib.spark_offline_store.spark import SparkOfflineStoreConfig + +repo_config = RepoConfig( + registry="s3://[YOUR_BUCKET]/feast-registry.db", + project="feast_repo", + provider="aws", + offline_store=SparkOfflineStoreConfig( + spark_conf={ + "spark.ui.enabled": "false", + "spark.eventLog.enabled": "false", + "spark.sql.catalogImplementation": "hive", + "spark.sql.parser.quotedRegexColumnNames": "true", + "spark.sql.session.timeZone": "UTC" + } + ), + batch_engine={ + "type": "spark.engine", + "partitions": 10 + }, + online_store=DynamoDBOnlineStoreConfig(region="us-west-1"), + entity_key_serialization_version=2 +) + +store = FeatureStore(config=repo_config) +``` +{% endcode %} \ No newline at end of file diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index 66eb97bca78..00f2c950a28 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -1,4 +1,3 @@ -import tempfile from dataclasses import dataclass from datetime import datetime from typing import Callable, List, Literal, Optional, Sequence, Union @@ -196,7 +195,7 @@ class _SparkSerializedArtifacts: """Class to assist with serializing unpicklable artifacts to the spark workers""" feature_view_proto: str - repo_config_file: str + repo_config_byte: str @classmethod def serialize(cls, feature_view, repo_config): @@ -205,12 +204,10 @@ def serialize(cls, feature_view, repo_config): feature_view_proto = feature_view.to_proto().SerializeToString() # serialize repo_config to disk. Will be used to instantiate the online store - repo_config_file = tempfile.NamedTemporaryFile(delete=False).name - with open(repo_config_file, "wb") as f: - dill.dump(repo_config, f) + repo_config_byte = dill.dumps(repo_config) return _SparkSerializedArtifacts( - feature_view_proto=feature_view_proto, repo_config_file=repo_config_file + feature_view_proto=feature_view_proto, repo_config_byte=repo_config_byte ) def unserialize(self): @@ -220,8 +217,7 @@ def unserialize(self): feature_view = FeatureView.from_proto(proto) # load - with open(self.repo_config_file, "rb") as f: - repo_config = dill.load(f) + repo_config = dill.loads(self.repo_config_byte) provider = PassthroughProvider(repo_config) online_store = provider.online_store