Skip to content

Commit

Permalink
fix: Update bytewax materialization (#3368)
Browse files Browse the repository at this point in the history
* Updates Bytewax materialization engine.

- Since no data is exchanged between workers, remove k8s service
  definition.
- Update bytewax dataflow to only use one worker.

Signed-off-by: Dan Herrera <whoahbot@bytewax.io>

* Update Bytewax to latest version.

Signed-off-by: Dan Herrera <whoahbot@bytewax.io>

* Format imports.

Signed-off-by: Dan Herrera <whoahbot@bytewax.io>

Signed-off-by: Dan Herrera <whoahbot@bytewax.io>
  • Loading branch information
whoahbot committed Dec 2, 2022
1 parent eaf354c commit 4ebe00f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 75 deletions.
Expand Up @@ -3,9 +3,10 @@
import pyarrow as pa
import pyarrow.parquet as pq
import s3fs
from bytewax import Dataflow, cluster_main # type: ignore
from bytewax.inputs import AdvanceTo, Emit, ManualInputConfig, distribute
from bytewax.parse import proc_env
from bytewax.dataflow import Dataflow # type: ignore
from bytewax.execution import cluster_main
from bytewax.inputs import ManualInputConfig, distribute
from bytewax.outputs import ManualOutputConfig
from tqdm import tqdm

from feast import FeatureStore, FeatureView, RepoConfig
Expand Down Expand Up @@ -37,20 +38,15 @@ def process_path(self, path):

return batches

def input_builder(self, worker_index, worker_count, resume_epoch):
def input_builder(self, worker_index, worker_count, _state):
worker_paths = distribute(self.paths, worker_index, worker_count)
epoch = 0
for path in worker_paths:
yield AdvanceTo(epoch)
yield Emit(path)
epoch += 1
yield None, path

return

def output_builder(self, worker_index, worker_count):
def output_fn(epoch_batch):
_, batch = epoch_batch

def output_fn(batch):
table = pa.Table.from_batches([batch])

if self.feature_view.batch_source.field_mapping is not None:
Expand Down Expand Up @@ -79,11 +75,7 @@ def output_fn(epoch_batch):

def _run_dataflow(self):
flow = Dataflow()
flow.input("inp", ManualInputConfig(self.input_builder))
flow.flat_map(self.process_path)
flow.capture()
cluster_main(
flow,
ManualInputConfig(self.input_builder),
self.output_builder,
**proc_env(),
)
flow.capture(ManualOutputConfig(self.output_builder))
cluster_main(flow, [], 0)
Expand Up @@ -23,7 +23,7 @@
from feast.infra.registry.base_registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
from feast.utils import _get_column_names, get_default_yaml_file_path

from .bytewax_materialization_job import BytewaxMaterializationJob

Expand Down Expand Up @@ -157,9 +157,6 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
# Create a k8s configmap with information needed by bytewax
self._create_configuration_map(job_id, paths, feature_view, self.namespace)

# Create the k8s service definition, used for bytewax communication
self._create_service_definition(job_id, self.namespace)

# Create the k8s job definition
self._create_job_definition(
job_id,
Expand All @@ -175,14 +172,10 @@ def _create_kubernetes_job(self, job_id, paths, feature_view):
def _create_configuration_map(self, job_id, paths, feature_view, namespace):
"""Create a Kubernetes configmap for this job"""

feature_store_configuration = yaml.dump(
yaml.safe_load(
self.repo_config.json(
exclude={"repo_path"},
exclude_unset=True,
)
)
)
repo_path = self.repo_config.repo_path
assert repo_path
feature_store_path = get_default_yaml_file_path(repo_path)
feature_store_configuration = feature_store_path.read_text()

materialization_config = yaml.dump(
{"paths": paths, "feature_view": feature_view.name}
Expand All @@ -204,41 +197,6 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
body=configmap_manifest,
)

def _create_service_definition(self, job_id, namespace):
"""Creates a kubernetes service definition.
This service definition is created to allow bytewax workers
to communicate with each other.
"""
service_definition = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"dataflow-{job_id}",
"namespace": namespace,
},
"spec": {
"clusterIP": "None",
"clusterIPs": ["None"],
"internalTrafficPolicy": "Cluster",
"ipFamilies": ["IPv4"],
"ipFamilyPolicy": "SingleStack",
"ports": [
{
"name": "worker",
"port": 9999,
"protocol": "TCP",
"targetPort": 9999,
}
],
"selector": {"job-name": f"dataflow-{job_id}"},
"sessionAffinity": "None",
"type": "ClusterIP",
},
}

utils.create_from_dict(self.k8s_client, service_definition)

def _create_job_definition(self, job_id, namespace, pods, env):
"""Create a kubernetes job definition."""
job_env = [
Expand Down Expand Up @@ -269,10 +227,6 @@ def _create_job_definition(self, job_id, namespace, pods, env):
"name": "BYTEWAX_KEEP_CONTAINER_ALIVE",
"value": "false",
},
{
"name": "BYTEWAX_HOSTFILE_PATH",
"value": "/etc/bytewax/hostfile.txt",
},
{
"name": "BYTEWAX_STATEFULSET_NAME",
"value": f"dataflow-{job_id}",
Expand All @@ -299,11 +253,6 @@ def _create_job_definition(self, job_id, namespace, pods, env):
"subdomain": f"dataflow-{job_id}",
"initContainers": [
{
"command": [
"sh",
"-c",
f'set -ex\n# Generate hostfile.txt.\necho "dataflow-{job_id}-0.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" > /etc/bytewax/hostfile.txt\nreplicas=$(($BYTEWAX_REPLICAS-1))\nx=1\nwhile [ $x -le $replicas ]\ndo\n echo "dataflow-{job_id}-$x.dataflow-{job_id}.{namespace}.svc.cluster.local:9999" >> /etc/bytewax/hostfile.txt\n x=$(( $x + 1 ))\ndone',
],
"env": [
{
"name": "BYTEWAX_REPLICAS",
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -93,7 +93,7 @@

AWS_REQUIRED = ["boto3>=1.17.0,<=1.20.23", "docker>=5.0.2", "s3fs>=0.4.0,<=2022.01.0"]

BYTEWAX_REQUIRED = ["bytewax==0.10.0", "docker>=5.0.2", "kubernetes<=20.13.0"]
BYTEWAX_REQUIRED = ["bytewax==0.13.1", "docker>=5.0.2", "kubernetes<=20.13.0"]

SNOWFLAKE_REQUIRED = [
"snowflake-connector-python[pandas]>=2.7.3,<3",
Expand Down

0 comments on commit 4ebe00f

Please sign in to comment.