From 9d882ced781f6a88015e0f3bd09b90ad8df7e3bb Mon Sep 17 00:00:00 2001 From: Dan Herrera Date: Mon, 11 Jul 2022 13:07:19 -0700 Subject: [PATCH 1/4] feat: Initial Bytewax materialization engine Signed-off-by: Dan Herrera --- .../reference/batch-materialization/README.md | 5 + .../batch-materialization/bytewax.md | 59 +++ sdk/python/docs/index.rst | 10 +- ....infra.materialization.contrib.bytewax.rst | 29 ++ .../feast.infra.materialization.contrib.rst | 10 + sdk/python/docs/source/index.rst | 8 + .../contrib/bytewax/__init__.py | 15 + .../bytewax_materialization_dataflow.py | 90 ++++ .../bytewax/bytewax_materialization_engine.py | 397 ++++++++++++++++++ .../bytewax/bytewax_materialization_job.py | 55 +++ .../bytewax/bytewax_materialization_task.py | 10 + sdk/python/feast/repo_config.py | 1 + setup.py | 4 + 13 files changed, 692 insertions(+), 1 deletion(-) create mode 100644 docs/reference/batch-materialization/README.md create mode 100644 docs/reference/batch-materialization/bytewax.md create mode 100644 sdk/python/docs/source/feast.infra.materialization.contrib.bytewax.rst create mode 100644 sdk/python/docs/source/feast.infra.materialization.contrib.rst create mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/__init__.py create mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py create mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py create mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py create mode 100644 sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_task.py diff --git a/docs/reference/batch-materialization/README.md b/docs/reference/batch-materialization/README.md new file mode 100644 index 00000000000..6e8fd606110 --- /dev/null +++ b/docs/reference/batch-materialization/README.md @@ -0,0 +1,5 @@ +# Batch materialization + +Please see [Batch Materialization Engine](../../getting-started/architecture-and-components/batch-materialization-engine.md) for an explanation of batch materialization engines. + +{% page-ref page="bytewax.md" %} diff --git a/docs/reference/batch-materialization/bytewax.md b/docs/reference/batch-materialization/bytewax.md new file mode 100644 index 00000000000..2dc4de3c646 --- /dev/null +++ b/docs/reference/batch-materialization/bytewax.md @@ -0,0 +1,59 @@ +# Bytewax + +## Description + +The [Bytewax](https://bytewax.io) batch materialization engine provides an execution +engine for batch materializing operations (`materialize` and `materialize-incremental`). + +### Guide + +In order to use the Bytewax materialization engine, you will need a [Kubernetes](https://kubernetes.io/) cluster running version 1.22.10 or greater. + +#### Kubernetes Authentication + +The Bytewax materialization engine loads authentication and cluster information from the [kubeconfig file](https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/). By default, kubectl looks for a file named `config` in the `$HOME/.kube directory`. You can specify other kubeconfig files by setting the `KUBECONFIG` environment variable. + +#### Resource Authentication + +Bytewax jobs can be configured to access [Kubernetes secrets](https://kubernetes.io/docs/concepts/configuration/secret/) as environment variables to access online and offline stores during job runs. + +To configure secrets, first create them using `kubectl`: + +``` shell +kubectl create secret generic -n bytewax aws-credentials --from-literal=aws-access-key-id='' --from-literal=aws-secret-access-key='' +``` + +Then configure them in the batch_engine section of `feature_store.yaml`: + +``` yaml +batch_engine: + type: bytewax + namespace: bytewax + pods: 3 + env: + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: aws-credentials + key: aws-access-key-id + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: aws-credentials + key: aws-secret-access-key +``` + +#### Configuration + +The Bytewax materialization engine is configured through the The `feature_store.yaml` configuration file: + +``` yaml +batch_engine: + type: bytewax + namespace: bytewax + pods: 3 +``` + +The `namespace` configuration directive specifies which Kubernetes [namespace](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/) jobs, services and configuration maps will be created in. + +The `pods` configuration directive is used to configure the number of Bytewax pods that will be started for each FeatureView that is being materialized. diff --git a/sdk/python/docs/index.rst b/sdk/python/docs/index.rst index 4d17f2b05be..b1e75332b0c 100644 --- a/sdk/python/docs/index.rst +++ b/sdk/python/docs/index.rst @@ -310,6 +310,14 @@ Local Engine (Alpha) Lambda Based Engine --------------------------- -.. autoclass:: feast.infra.materialization.lambda.lambda_engine +.. automodule:: feast.infra.materialization.lambda.lambda_engine + :members: + :noindex: + + +Bytewax Engine +--------------------------- + +.. automodule:: feast.infra.materialization.contrib.bytewax :members: :noindex: diff --git a/sdk/python/docs/source/feast.infra.materialization.contrib.bytewax.rst b/sdk/python/docs/source/feast.infra.materialization.contrib.bytewax.rst new file mode 100644 index 00000000000..86fbaa61515 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.materialization.contrib.bytewax.rst @@ -0,0 +1,29 @@ +feast.infra.materialization.contrib.bytewax package +================================================================= + +Submodules +---------- + +feast.infra.materialization.contrib.bytewax.bytewax\_materialization\_engine +---------------------------------------------------------------------- + +.. automodule:: feast.infra.materialization.contrib.bytewax.bytewax_materialization_engine + :members: + :undoc-members: + :show-inheritance: + +feast.infra.materialization.contrib.bytewax.bytewax\_materialization\_job +---------------------------------------------------------------------- + +.. automodule:: feast.infra.materialization.contrib.bytewax.bytewax_materialization_job + :members: + :undoc-members: + :show-inheritance: + +Module contents +--------------- + +.. automodule:: feast.infra.materialization.contrib.bytewax + :members: + :undoc-members: + :show-inheritance: diff --git a/sdk/python/docs/source/feast.infra.materialization.contrib.rst b/sdk/python/docs/source/feast.infra.materialization.contrib.rst new file mode 100644 index 00000000000..f9d77006610 --- /dev/null +++ b/sdk/python/docs/source/feast.infra.materialization.contrib.rst @@ -0,0 +1,10 @@ +feast.infra.materialization.contrib package +========================================== + +Subpackages +----------- + +.. toctree:: + :maxdepth: 4 + + feast.infra.materialization.contrib.bytewax diff --git a/sdk/python/docs/source/index.rst b/sdk/python/docs/source/index.rst index 4d17f2b05be..acee151d3e3 100644 --- a/sdk/python/docs/source/index.rst +++ b/sdk/python/docs/source/index.rst @@ -313,3 +313,11 @@ Local Engine .. autoclass:: feast.infra.materialization.lambda.lambda_engine :members: :noindex: + + +Bytewax Engine +--------------------------- + +.. automodule:: feast.infra.materialization.contrib.bytewax + :members: + :noindex: diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/__init__.py b/sdk/python/feast/infra/materialization/contrib/bytewax/__init__.py new file mode 100644 index 00000000000..0838a4c0d59 --- /dev/null +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/__init__.py @@ -0,0 +1,15 @@ +from .bytewax_materialization_dataflow import BytewaxMaterializationDataflow +from .bytewax_materialization_engine import ( + BytewaxMaterializationEngine, + BytewaxMaterializationEngineConfig, +) +from .bytewax_materialization_job import BytewaxMaterializationJob +from .bytewax_materialization_task import BytewaxMaterializationTask + +__all__ = [ + "BytewaxMaterializationTask", + "BytewaxMaterializationJob", + "BytewaxMaterializationDataflow", + "BytewaxMaterializationEngine", + "BytewaxMaterializationEngineConfig", +] diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py new file mode 100644 index 00000000000..67916152572 --- /dev/null +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -0,0 +1,90 @@ +from typing import Any, Iterable, List + +import pyarrow as pa +import pyarrow.parquet as pq +import s3fs +from bytewax import Dataflow, cluster_main +from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute +from bytewax.parse import proc_env +from tqdm import tqdm + +from feast import FeatureStore, FeatureView, RepoConfig +from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping + + +class BytewaxMaterializationDataflow: + def __init__( + self, + config: RepoConfig, + feature_view: FeatureView, + paths: List[str], + ): + self.config = config + self.feature_store = FeatureStore(config=config) + + self.feature_view = feature_view + self.offline_store = feature_view.source + self.paths = paths + + self._run_dataflow() + + def process_path(self, path): + fs = s3fs.S3FileSystem() + dataset = pq.ParquetDataset(path, filesystem=fs, use_legacy_dataset=False) + batches = [] + for fragment in dataset.fragments: + for batch in fragment.to_table().to_batches(): + batches.append(batch) + + return batches + + def input_builder(self, worker_index, worker_count, resume_epoch): + worker_paths = distribute(self.paths, worker_index, worker_count) + epoch = 0 + for path in worker_paths: + yield AdvanceTo(epoch) + yield Emit(path) + epoch += 1 + + return + + def output_builder(self, worker_index, worker_count): + def output_fn(epoch_batch): + _, batch = epoch_batch + + table = pa.Table.from_batches([batch]) + + if self.feature_view.batch_source.field_mapping is not None: + table = _run_pyarrow_field_mapping( + table, self.feature_view.batch_source.field_mapping + ) + + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in self.feature_view.entity_columns + } + + rows_to_write = _convert_arrow_to_proto( + table, self.feature_view, join_key_to_value_type + ) + provider = self.feature_store._get_provider() + with tqdm(total=len(rows_to_write)) as progress: + provider.online_write_batch( + config=self.config, + table=self.feature_view, + data=rows_to_write, + progress=progress.update, + ) + + return output_fn + + def _run_dataflow(self): + flow = Dataflow() + flow.flat_map(self.process_path) + flow.capture() + cluster_main( + flow, + ManualInputConfig(self.input_builder), + self.output_builder, + **proc_env(), + ) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py new file mode 100644 index 00000000000..1ea7e9bddca --- /dev/null +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -0,0 +1,397 @@ +import uuid +from datetime import datetime +from typing import Callable, List, Literal, Sequence, Union + +import kubernetes +import kubernetes.client +import yaml +from kubernetes import client +from kubernetes import config as k8s_config +from kubernetes import utils +from kubernetes.utils import FailToCreateError +from pydantic import StrictStr +from tqdm import tqdm + +from feast import FeatureView, RepoConfig +from feast.batch_feature_view import BatchFeatureView +from feast.entity import Entity +from feast.infra.materialization import ( + BatchMaterializationEngine, + MaterializationJob, + MaterializationTask, +) +from feast.infra.offline_stores.offline_store import OfflineStore +from feast.infra.online_stores.online_store import OnlineStore +from feast.registry import BaseRegistry +from feast.repo_config import FeastConfigBaseModel +from feast.stream_feature_view import StreamFeatureView +from feast.utils import _get_column_names + +from .bytewax_materialization_job import BytewaxMaterializationJob + + +class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): + """Batch Materialization Engine config for Bytewax""" + + type: Literal["bytewax"] = "bytewax" + """ Materialization type selector""" + + pods: int = 3 + """ (optional) The number of Kubernetes pods to create. + For each feature view to be materialized, Bytewax will create a job with the specified + number of pods and distribute the work among them. + """ + + namespace: StrictStr = "default" + """ (optional) The namespace in Kubernetes to use when creating services, configuration maps and jobs. + """ + + env: List[dict] = [] + """ (optional) A list of environment variables to set in the created Kubernetes pods. + These environment variables can be used to reference Kubernetes secrets. + """ + + +class BytewaxMaterializationEngine(BatchMaterializationEngine): + def __init__( + self, + *, + repo_config: RepoConfig, + offline_store: OfflineStore, + online_store: OnlineStore, + **kwargs, + ): + super().__init__( + repo_config=repo_config, + offline_store=offline_store, + online_store=online_store, + **kwargs, + ) + self.repo_config = repo_config + self.offline_store = offline_store + self.online_store = online_store + + # TODO: Configure k8s here + k8s_config.load_kube_config() + + self.k8s_client = client.api_client.ApiClient() + self.v1 = client.CoreV1Api(self.k8s_client) + self.batch_v1 = client.BatchV1Api(self.k8s_client) + self.batch_engine_config = repo_config.batch_engine + self.namespace = self.batch_engine_config.namespace + + def update( + self, + project: str, + views_to_delete: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + views_to_keep: Sequence[ + Union[BatchFeatureView, StreamFeatureView, FeatureView] + ], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + ): + """This method ensures that any necessary infrastructure or resources needed by the + engine are set up ahead of materialization.""" + pass + + def teardown_infra( + self, + project: str, + fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]], + entities: Sequence[Entity], + ): + """This method ensures that any infrastructure or resources set up by ``update()``are torn down.""" + pass + + def materialize( + self, + registry: BaseRegistry, + tasks: List[MaterializationTask], + ) -> List[MaterializationJob]: + return [ + self._materialize_one( + registry, + task.feature_view, + task.start_time, + task.end_time, + task.project, + task.tqdm_builder, + ) + for task in tasks + ] + + def _materialize_one( + self, + registry: BaseRegistry, + feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], + start_date: datetime, + end_date: datetime, + project: str, + tqdm_builder: Callable[[int], tqdm], + ): + entities = [] + for entity_name in feature_view.entities: + entities.append(registry.get_entity(entity_name, project)) + + ( + join_key_columns, + feature_name_columns, + timestamp_field, + created_timestamp_column, + ) = _get_column_names(feature_view, entities) + + offline_job = self.offline_store.pull_latest_from_table_or_query( + config=self.repo_config, + data_source=feature_view.batch_source, + join_key_columns=join_key_columns, + feature_name_columns=feature_name_columns, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + start_date=start_date, + end_date=end_date, + ) + + paths = offline_job.to_remote_storage() + job_id = str(uuid.uuid4()) + return self._create_kubernetes_job(job_id, paths, feature_view) + + def _create_kubernetes_job(self, job_id, paths, feature_view): + try: + # Create a k8s configmap with information needed by bytewax + self._create_configuration_map(job_id, paths, feature_view, self.namespace) + + # Create the k8s service definition, used for bytewax communication + self._create_service_definition(job_id, self.namespace) + + # Create the k8s job definition + self._create_job_definition( + job_id, + self.namespace, + self.batch_engine_config.pods, + self.batch_engine_config.env, + ) + except FailToCreateError as failures: + return BytewaxMaterializationJob(job_id, self.namespace, error=failures) + + return BytewaxMaterializationJob(job_id, self.namespace) + + def _create_configuration_map(self, job_id, paths, feature_view, namespace): + """Create a Kubernetes configmap for this job""" + + feature_store_configuration = yaml.dump( + yaml.safe_load( + self.repo_config.json( + exclude={"repo_path"}, + exclude_unset=True, + ) + ) + ) + + materialization_config = yaml.dump( + {"paths": paths, "feature_view": feature_view.name} + ) + + configmap_manifest = { + "kind": "ConfigMap", + "apiVersion": "v1", + "metadata": { + "name": f"feast-{job_id}", + }, + "data": { + "feature_store.yaml": feature_store_configuration, + "bytewax_materialization_config.yaml": materialization_config, + }, + } + self.v1.create_namespaced_config_map( + namespace=namespace, + body=configmap_manifest, + ) + + def _create_service_definition(self, job_id, namespace): + """Creates a kubernetes service definition. + + This service definition is created to allow bytewax workers + to communicate with each other. + """ + service_definition = { + "apiVersion": "v1", + "kind": "Service", + "metadata": { + "name": f"dataflow-{job_id}", + "namespace": namespace, + }, + "spec": { + "clusterIP": "None", + "clusterIPs": ["None"], + "internalTrafficPolicy": "Cluster", + "ipFamilies": ["IPv4"], + "ipFamilyPolicy": "SingleStack", + "ports": [ + { + "name": "worker", + "port": 9999, + "protocol": "TCP", + "targetPort": 9999, + } + ], + "selector": {"job-name": f"dataflow-{job_id}"}, + "sessionAffinity": "None", + "type": "ClusterIP", + }, + } + + utils.create_from_dict(self.k8s_client, service_definition) + + def _create_job_definition(self, job_id, namespace, pods, env): + """Create a kubernetes job definition.""" + job_env = [ + {"name": "RUST_BACKTRACE", "value": "full"}, + { + "name": "BYTEWAX_PYTHON_FILE_PATH", + "value": "/bytewax/dataflow.py", + }, + {"name": "BYTEWAX_WORKDIR", "value": "/bytewax"}, + { + "name": "BYTEWAX_WORKERS_PER_PROCESS", + "value": "1", + }, + { + "name": "BYTEWAX_POD_NAME", + "valueFrom": { + "fieldRef": { + "apiVersion": "v1", + "fieldPath": "metadata.annotations['batch.kubernetes.io/job-completion-index']", + } + }, + }, + { + "name": "BYTEWAX_REPLICAS", + "value": f"{pods}", + }, + { + "name": "BYTEWAX_KEEP_CONTAINER_ALIVE", + "value": "false", + }, + { + "name": "BYTEWAX_HOSTFILE_PATH", + "value": "/etc/bytewax/hostfile.txt", + }, + { + "name": "BYTEWAX_STATEFULSET_NAME", + "value": f"dataflow-{job_id}", + }, + ] + # Add any Feast configured environment variables + job_env.extend(env) + + job_definition = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": f"dataflow-{job_id}", + "namespace": namespace, + }, + "spec": { + "ttlSecondsAfterFinished": 3600, + "completions": pods, + "parallelism": pods, + "completionMode": "Indexed", + "template": { + "spec": { + "restartPolicy": "Never", + "subdomain": f"dataflow-{job_id}", + "initContainers": [ + { + "command": [ + "sh", + "-c", + f'set -ex\n# Generate hostfile.txt.\necho "dataflow-{job_id}-0.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" > /etc/bytewax/hostfile.txt\nreplicas=$(($BYTEWAX_REPLICAS-1))\nx=1\nwhile [ $x -le $replicas ]\ndo\n echo "dataflow-{job_id}-$x.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" >> /etc/bytewax/hostfile.txt\n x=$(( $x + 1 ))\ndone', + ], + "env": [ + { + "name": "BYTEWAX_REPLICAS", + "value": f"{pods}", + } + ], + "image": "busybox", + "imagePullPolicy": "Always", + "name": "init-hostfile", + "resources": {}, + "securityContext": { + "allowPrivilegeEscalation": False, + "capabilities": { + "add": ["NET_BIND_SERVICE"], + "drop": ["ALL"], + }, + "readOnlyRootFilesystem": True, + }, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "volumeMounts": [ + {"mountPath": "/etc/bytewax", "name": "hostfile"}, + { + "mountPath": "/tmp/bytewax/", + "name": "python-files", + }, + { + "mountPath": "/var/feast/", + "name": f"feast-{job_id}", + }, + ], + } + ], + "containers": [ + { + "command": ["sh", "-c", "sh ./entrypoint.sh"], + "env": job_env, + "image": "bytewax/bytewax-feast:latest", + "imagePullPolicy": "Always", + "name": "process", + "ports": [ + { + "containerPort": 9999, + "name": "process", + "protocol": "TCP", + } + ], + "resources": {}, + "securityContext": { + "allowPrivilegeEscalation": False, + "capabilities": { + "add": ["NET_BIND_SERVICE"], + "drop": ["ALL"], + }, + "readOnlyRootFilesystem": False, + }, + "terminationMessagePath": "/dev/termination-log", + "terminationMessagePolicy": "File", + "volumeMounts": [ + {"mountPath": "/etc/bytewax", "name": "hostfile"}, + { + "mountPath": "/var/feast/", + "name": f"feast-{job_id}", + }, + ], + } + ], + "volumes": [ + {"emptyDir": {}, "name": "hostfile"}, + { + "configMap": { + "defaultMode": 420, + "name": f"feast-{job_id}", + }, + "name": "python-files", + }, + { + "configMap": {"name": f"feast-{job_id}"}, + "name": f"feast-{job_id}", + }, + ], + } + }, + }, + } + utils.create_from_dict(self.k8s_client, job_definition) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py new file mode 100644 index 00000000000..f21ce574b9f --- /dev/null +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py @@ -0,0 +1,55 @@ +from typing import Optional + +import kubernetes +import kubernetes.client +from kubernetes import client +from kubernetes import config as k8s_config +from kubernetes import utils + +from feast.infra.materialization.batch_materialization_engine import ( + MaterializationJob, + MaterializationJobStatus, +) + + +class BytewaxMaterializationJob(MaterializationJob): + def __init__( + self, + job_id, + namespace, + error: Optional[BaseException] = None, + ): + super().__init__() + self._job_id = job_id + self.namespace = namespace + self._error: Optional[BaseException] = error + self.batch_v1 = client.BatchV1Api() + + def error(self): + return self._error + + def status(self): + if self._error is not None: + return MaterializationJobStatus.ERROR + else: + # TODO: Find a better way to parse status? + job_status = self.batch_v1.read_namespaced_job_status( + self.job_id(), self.namespace + ).status + if job_status.active is not None: + if job_status.completion_time is None: + return MaterializationJobStatus.RUNNING + elif job_status.failed is not None: + return MaterializationJobStatus.ERROR + elif job_status.active is None and job_status.succeeded is not None: + if job_status.conditions[0].type == "Complete": + return MaterializationJobStatus.SUCCEEDED + + def should_be_retried(self): + return False + + def job_id(self): + return f"dataflow-{self._job_id}" + + def url(self): + return None diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_task.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_task.py new file mode 100644 index 00000000000..fb8690aac54 --- /dev/null +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_task.py @@ -0,0 +1,10 @@ +from feast.infra.materialization import MaterializationTask + + +class BytewaxMaterializationTask(MaterializationTask): + def __init__(self, project, feature_view, start_date, end_date, tqdm): + self.project = project + self.feature_view = feature_view + self.start_date = start_date + self.end_date = end_date + self.tqdm = tqdm diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 34df1a215f6..f09e63ff484 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -37,6 +37,7 @@ BATCH_ENGINE_CLASS_FOR_TYPE = { "local": "feast.infra.materialization.LocalMaterializationEngine", "lambda": "feast.infra.materialization.lambda.lambda_engine.LambdaMaterializationEngine", + "bytewax": "feast.infra.materialization.contrib.bytewax.bytewax_materialization_engine.BytewaxMaterializationEngine", } ONLINE_STORE_CLASS_FOR_TYPE = { diff --git a/setup.py b/setup.py index b52453d43aa..eb02397b44a 100644 --- a/setup.py +++ b/setup.py @@ -93,6 +93,8 @@ AWS_REQUIRED = ["boto3>=1.17.0,<=1.20.23", "docker>=5.0.2", "s3fs>=0.4.0,<=2022.01.0"] +BYTEWAX_REQUIRED = ["bytewax==0.10.0", "docker>=5.0.2", "kubernetes<=20.13.0"] + SNOWFLAKE_REQUIRED = [ "snowflake-connector-python[pandas]>=2.7.3,<=2.7.8", ] @@ -174,6 +176,7 @@ + GCP_REQUIRED + REDIS_REQUIRED + AWS_REQUIRED + + BYTEWAX_REQUIRED + SNOWFLAKE_REQUIRED + SPARK_REQUIRED + POSTGRES_REQUIRED @@ -506,6 +509,7 @@ def copy_extensions_to_source(self): "ci": CI_REQUIRED, "gcp": GCP_REQUIRED, "aws": AWS_REQUIRED, + "bytewax": BYTEWAX_REQUIRED, "redis": REDIS_REQUIRED, "snowflake": SNOWFLAKE_REQUIRED, "spark": SPARK_REQUIRED, From 6272a19a6a86bc7837241ae37ece77827cf88220 Mon Sep 17 00:00:00 2001 From: Dan Herrera Date: Tue, 2 Aug 2022 12:34:08 -0700 Subject: [PATCH 2/4] Respond to PR feedback - Add integration test, by factoring out shared consistency test. - Make the number of Pods dynamic, based on the number of .parquet file paths. - Add instructions for creating a bytewax test cluster for integration testing. Signed-off-by: Dan Herrera --- .../batch-materialization/bytewax.md | 6 +- .../bytewax_materialization_dataflow.py | 4 +- .../bytewax/bytewax_materialization_engine.py | 13 ++-- .../bytewax/bytewax_materialization_job.py | 4 -- .../materialization/contrib/bytewax/README.md | 22 +++++++ .../contrib/bytewax/eks-config.yaml | 13 ++++ .../contrib/bytewax/test_bytewax.py | 66 +++++++++++++++++++ 7 files changed, 110 insertions(+), 18 deletions(-) create mode 100644 sdk/python/tests/integration/materialization/contrib/bytewax/README.md create mode 100644 sdk/python/tests/integration/materialization/contrib/bytewax/eks-config.yaml create mode 100644 sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py diff --git a/docs/reference/batch-materialization/bytewax.md b/docs/reference/batch-materialization/bytewax.md index 2dc4de3c646..db2d79ddbff 100644 --- a/docs/reference/batch-materialization/bytewax.md +++ b/docs/reference/batch-materialization/bytewax.md @@ -29,7 +29,6 @@ Then configure them in the batch_engine section of `feature_store.yaml`: batch_engine: type: bytewax namespace: bytewax - pods: 3 env: - name: AWS_ACCESS_KEY_ID valueFrom: @@ -51,9 +50,10 @@ The Bytewax materialization engine is configured through the The `feature_store. batch_engine: type: bytewax namespace: bytewax - pods: 3 + image: bytewax/bytewax-feast:latest ``` The `namespace` configuration directive specifies which Kubernetes [namespace](https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/) jobs, services and configuration maps will be created in. -The `pods` configuration directive is used to configure the number of Bytewax pods that will be started for each FeatureView that is being materialized. +The `image` parameter specifies which container image to use when running the materialization job. To create a custom image based on this container, please see the [GitHub repository](https://github.com/bytewax/bytewax-feast) for this image. + diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index 67916152572..e17d6335de1 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -1,9 +1,9 @@ -from typing import Any, Iterable, List +from typing import List import pyarrow as pa import pyarrow.parquet as pq import s3fs -from bytewax import Dataflow, cluster_main +from bytewax import Dataflow, cluster_main # type: ignore from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute from bytewax.parse import proc_env from tqdm import tqdm diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py index 1ea7e9bddca..d7d8301a55a 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_engine.py @@ -2,8 +2,6 @@ from datetime import datetime from typing import Callable, List, Literal, Sequence, Union -import kubernetes -import kubernetes.client import yaml from kubernetes import client from kubernetes import config as k8s_config @@ -36,16 +34,13 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel): type: Literal["bytewax"] = "bytewax" """ Materialization type selector""" - pods: int = 3 - """ (optional) The number of Kubernetes pods to create. - For each feature view to be materialized, Bytewax will create a job with the specified - number of pods and distribute the work among them. - """ - namespace: StrictStr = "default" """ (optional) The namespace in Kubernetes to use when creating services, configuration maps and jobs. """ + image: StrictStr = "bytewax/bytewax-feast:latest" + """ (optional) The container image to use when running the materialization job.""" + env: List[dict] = [] """ (optional) A list of environment variables to set in the created Kubernetes pods. These environment variables can be used to reference Kubernetes secrets. @@ -169,7 +164,7 @@ def _create_kubernetes_job(self, job_id, paths, feature_view): self._create_job_definition( job_id, self.namespace, - self.batch_engine_config.pods, + len(paths), # Create a pod for each parquet file self.batch_engine_config.env, ) except FailToCreateError as failures: diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py index f21ce574b9f..77d2149eb5a 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_job.py @@ -1,10 +1,6 @@ from typing import Optional -import kubernetes -import kubernetes.client from kubernetes import client -from kubernetes import config as k8s_config -from kubernetes import utils from feast.infra.materialization.batch_materialization_engine import ( MaterializationJob, diff --git a/sdk/python/tests/integration/materialization/contrib/bytewax/README.md b/sdk/python/tests/integration/materialization/contrib/bytewax/README.md new file mode 100644 index 00000000000..4ed5d49a680 --- /dev/null +++ b/sdk/python/tests/integration/materialization/contrib/bytewax/README.md @@ -0,0 +1,22 @@ +# Running Bytewax integration tests + +To run the Bytewax integration tests, you'll need to provision a cluster using [eksctl.](https://docs.aws.amazon.com/eks/latest/userguide/eksctl.html). + +## Creating an EKS cluster + +In this directory is a configuration file for a single-node EKS cluster + +To create the EKS cluster needed for testing, issue the following command: + +``` shell +> eksctl create cluster -f ./eks-config.yaml +``` + +When the tests are complete, delete the created cluster with: + +``` shell +> eksctl delete cluster bytewax-feast-cluster +``` + + + diff --git a/sdk/python/tests/integration/materialization/contrib/bytewax/eks-config.yaml b/sdk/python/tests/integration/materialization/contrib/bytewax/eks-config.yaml new file mode 100644 index 00000000000..5f8d0655aac --- /dev/null +++ b/sdk/python/tests/integration/materialization/contrib/bytewax/eks-config.yaml @@ -0,0 +1,13 @@ +apiVersion: eksctl.io/v1alpha5 +kind: ClusterConfig + +metadata: + name: bytewax-feast-cluster + version: "1.22" + region: us-west-2 + +managedNodeGroups: +- name: ng-1 + instanceType: c6a.large + desiredCapacity: 1 + privateNetworking: true diff --git a/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py b/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py new file mode 100644 index 00000000000..4ddcc8937a6 --- /dev/null +++ b/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py @@ -0,0 +1,66 @@ +from datetime import timedelta + +import pytest + +from feast import Entity, Feature, FeatureView, ValueType +from tests.data.data_creator import create_basic_driver_dataset +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, + RegistryLocation, +) +from tests.integration.feature_repos.repo_configuration import ( + construct_test_environment, +) +from tests.integration.feature_repos.universal.data_sources.redshift import ( + RedshiftDataSourceCreator, +) +from tests.utils.e2e_test_validation import validate_offline_online_store_consistency + + +@pytest.mark.integration +def test_bytewax_materialization(): + bytewax_config = IntegrationTestRepoConfig( + provider="aws", + online_store={"type": "dynamodb", "region": "us-west-2"}, + offline_store_creator=RedshiftDataSourceCreator, + batch_engine={ + "type": "bytewax", + }, + registry_location=RegistryLocation.S3, + ) + bytewax_environment = construct_test_environment(bytewax_config, None) + + df = create_basic_driver_dataset() + ds = bytewax_environment.data_source_creator.create_data_source( + df, + bytewax_environment.feature_store.project, + field_mapping={"ts_1": "ts"}, + ) + + fs = bytewax_environment.feature_store + driver = Entity( + name="driver_id", + join_key="driver_id", + value_type=ValueType.INT64, + ) + + driver_stats_fv = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=timedelta(weeks=52), + features=[Feature(name="value", dtype=ValueType.FLOAT)], + batch_source=ds, + ) + + try: + fs.apply([driver, driver_stats_fv]) + + # materialization is run in two steps and + # we use timestamp from generated dataframe as a split point + split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1) + + print(f"Split datetime: {split_dt}") + + validate_offline_online_store_consistency(fs, driver_stats_fv, split_dt) + finally: + fs.teardown() From 43986185669e4ab675bd4dbfd96897889ff28b22 Mon Sep 17 00:00:00 2001 From: Dan Herrera Date: Mon, 8 Aug 2022 08:40:41 -0700 Subject: [PATCH 3/4] Mark bytewax test to be skipped. Signed-off-by: Dan Herrera --- .../integration/materialization/contrib/bytewax/test_bytewax.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py b/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py index 4ddcc8937a6..0d2cecb2f14 100644 --- a/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py +++ b/sdk/python/tests/integration/materialization/contrib/bytewax/test_bytewax.py @@ -18,6 +18,7 @@ @pytest.mark.integration +@pytest.mark.skip(reason="Run this test manually after creating an EKS cluster.") def test_bytewax_materialization(): bytewax_config = IntegrationTestRepoConfig( provider="aws", From 383e3cb3533c832eaace4352660a35d3ba7f1714 Mon Sep 17 00:00:00 2001 From: Dan Herrera Date: Thu, 11 Aug 2022 16:50:43 -0700 Subject: [PATCH 4/4] Remove unused offline store reference. Signed-off-by: Dan Herrera --- .../contrib/bytewax/bytewax_materialization_dataflow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py index e17d6335de1..1fad2c909fa 100644 --- a/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py +++ b/sdk/python/feast/infra/materialization/contrib/bytewax/bytewax_materialization_dataflow.py @@ -23,7 +23,6 @@ def __init__( self.feature_store = FeatureStore(config=config) self.feature_view = feature_view - self.offline_store = feature_view.source self.paths = paths self._run_dataflow()