Skip to content

Commit

Permalink
feat: Add support for in_cluster config and additional labels for byt…
Browse files Browse the repository at this point in the history
…ewax materialization (#3754)

* SAASMLOPS-734 bytewax in-cluster config, custom labels, fix worker image deps

Signed-off-by: James Crabtree <james.crabtree@sailpoint.com>

* SAASMLOPS-769 make max parallelism configurable

Signed-off-by: James Crabtree <james.crabtree@sailpoint.com>

---------

Signed-off-by: James Crabtree <james.crabtree@sailpoint.com>
  • Loading branch information
james-crabtree-sp committed Sep 21, 2023
1 parent 04804a0 commit 2192e65
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class BytewaxMaterializationEngineConfig(FeastConfigBaseModel):
include_security_context_capabilities: bool = True
""" (optional) Include security context capabilities in the init and job container spec """

labels: dict = {}
""" (optional) additional labels to append to kubernetes objects """

max_parallelism: int = 10
""" (optional) Maximum number of pods (default 10) allowed to run in parallel per job"""


class BytewaxMaterializationEngine(BatchMaterializationEngine):
def __init__(
Expand All @@ -82,7 +88,7 @@ def __init__(
self.online_store = online_store

# TODO: Configure k8s here
k8s_config.load_kube_config()
k8s_config.load_config()

self.k8s_client = client.api_client.ApiClient()
self.v1 = client.CoreV1Api(self.k8s_client)
Expand Down Expand Up @@ -196,14 +202,13 @@ def _create_configuration_map(self, job_id, paths, feature_view, namespace):
{"paths": paths, "feature_view": feature_view.name}
)

labels = {"feast-bytewax-materializer": "configmap"}
configmap_manifest = {
"kind": "ConfigMap",
"apiVersion": "v1",
"metadata": {
"name": f"feast-{job_id}",
"labels": {
"feast-bytewax-materializer": "configmap",
},
"labels": {**labels, **self.batch_engine_config.labels},
},
"data": {
"feature_store.yaml": feature_store_configuration,
Expand Down Expand Up @@ -260,27 +265,25 @@ def _create_job_definition(self, job_id, namespace, pods, env):
"drop": ["ALL"],
}

job_labels = {"feast-bytewax-materializer": "job"}
pod_labels = {"feast-bytewax-materializer": "pod"}
job_definition = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": f"dataflow-{job_id}",
"namespace": namespace,
"labels": {
"feast-bytewax-materializer": "job",
},
"labels": {**job_labels, **self.batch_engine_config.labels},
},
"spec": {
"ttlSecondsAfterFinished": 3600,
"completions": pods,
"parallelism": pods,
"parallelism": min(pods, self.batch_engine_config.max_parallelism),
"completionMode": "Indexed",
"template": {
"metadata": {
"annotations": self.batch_engine_config.annotations,
"labels": {
"feast-bytewax-materializer": "pod",
},
"labels": {**pod_labels, **self.batch_engine_config.labels},
},
"spec": {
"restartPolicy": "Never",
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
"hiredis>=2.0.0,<3",
]

AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2"]
AWS_REQUIRED = ["boto3>=1.17.0,<2", "docker>=5.0.2", "s3fs"]

BYTEWAX_REQUIRED = ["bytewax==0.15.1", "docker>=5.0.2", "kubernetes<=20.13.0"]

Expand Down

0 comments on commit 2192e65

Please sign in to comment.