Skip to content

Commit

Permalink
Fix broken scheduled batch ingestion jobs (#72)
Browse files Browse the repository at this point in the history
* Allow SDK to overwrite job schedule defined by job template

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix schedule job update failure

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Add scheduled job id to the label

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed May 19, 2021
1 parent 78f33e1 commit 740f5a9
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 10 deletions.
3 changes: 3 additions & 0 deletions python/feast_spark/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,9 @@ def get_name(self) -> str:
def get_job_type(self) -> SparkJobType:
return SparkJobType.SCHEDULED_BATCH_INGESTION

def get_job_schedule(self) -> str:
return self._cron_schedule

def get_arguments(self) -> List[str]:
return super().get_arguments() + [
"--mode",
Expand Down
1 change: 1 addition & 0 deletions python/feast_spark/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ def schedule_offline_to_online_ingestion(
resource = _prepare_scheduled_job_resource(
scheduled_job_template=self._scheduled_resource_template,
scheduled_job_id=schedule_job_id,
job_schedule=ingestion_job_params.get_job_schedule(),
job_template=self._resource_template,
job_type=OFFLINE_TO_ONLINE_JOB_TYPE,
main_application_file=jar_s3_path,
Expand Down
12 changes: 8 additions & 4 deletions python/feast_spark/pyspark/launchers/k8s/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def _prepare_job_resource(
def _prepare_scheduled_job_resource(
scheduled_job_template: Dict[str, Any],
scheduled_job_id: str,
job_schedule: str,
job_template: Dict[str, Any],
job_type: str,
main_application_file: str,
Expand All @@ -170,8 +171,9 @@ def _prepare_scheduled_job_resource(
) -> Dict[str, Any]:
""" Prepare ScheduledSparkApplication custom resource configs """
scheduled_job = deepcopy(scheduled_job_template)
_add_keys(scheduled_job, ("spec",), dict(schedule=job_schedule))

labels = {LABEL_JOBTYPE: job_type}
labels = {LABEL_JOBID: scheduled_job_id, LABEL_JOBTYPE: job_type}
if extra_labels:
labels = {**labels, **extra_labels}

Expand Down Expand Up @@ -296,10 +298,12 @@ def _submit_scheduled_job(
api.create_namespaced_custom_object(
**_scheduled_crd_args(namespace), body=resource
)
return
else:
api.replace_namespaced_custom_object(
**_scheduled_crd_args(namespace), name=name, body=resource,
)
raise e
api.patch_namespaced_custom_object(
**_scheduled_crd_args(namespace), name=name, body=resource,
)


def _list_jobs(
Expand Down
23 changes: 17 additions & 6 deletions tests/e2e/test_job_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,22 @@ def test_schedule_batch_ingestion_jobs(
)
config.load_incluster_config()
k8s_api = client.CustomObjectsApi()
k8s_api.get_namespaced_custom_object(
group="sparkoperator.k8s.io",
version="v1beta2",
namespace=pytestconfig.getoption("k8s_namespace"),
plural="scheduledsparkapplications",
name=f"feast-{feast_client.project}-{feature_table.name}".replace("_", "-"),

def get_scheduled_spark_application():
return k8s_api.get_namespaced_custom_object(
group="sparkoperator.k8s.io",
version="v1beta2",
namespace=pytestconfig.getoption("k8s_namespace"),
plural="scheduledsparkapplications",
name=f"feast-{feast_client.project}-{feature_table.name}".replace("_", "-"),
)

response = get_scheduled_spark_application()
assert response["spec"]["schedule"] == "0 0 * * *"
feast_spark_client.schedule_offline_to_online_ingestion(
feature_table, 1, "1 0 * * *"
)
response = get_scheduled_spark_application()
assert response["spec"]["schedule"] == "1 0 * * *"

feast_spark_client.unschedule_offline_to_online_ingestion(feature_table)

0 comments on commit 740f5a9

Please sign in to comment.