Skip to content

Commit

Permalink
Fix rendering SparkKubernetesOperator.template_body (#37271)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis committed Feb 9, 2024
1 parent 9b99763 commit f691adf
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 1 deletion.
Expand Up @@ -107,7 +107,6 @@ def __init__(
self.get_logs = get_logs
self.log_events_on_failure = log_events_on_failure
self.success_run_history_limit = success_run_history_limit
self.template_body = self.manage_template_specs()

def _render_nested_template_fields(
self,
Expand Down Expand Up @@ -193,6 +192,11 @@ def pod_manager(self) -> PodManager:
def _try_numbers_match(context, pod) -> bool:
return pod.metadata.labels["try_number"] == context["ti"].try_number

@property
def template_body(self):
"""Templated body for CustomObjectLauncher."""
return self.manage_template_specs()

def find_spark_job(self, context):
labels = self.create_labels_for_pod(context, include_try_number=False)
label_selector = self._get_pod_identifying_label_string(labels) + ",spark-role=driver"
Expand Down
16 changes: 16 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_spark_kubernetes.py
Expand Up @@ -25,6 +25,7 @@
from unittest.mock import patch

import pendulum
import pytest
import yaml
from kubernetes.client import models as k8s

Expand Down Expand Up @@ -488,3 +489,18 @@ def test_toleration(

assert op.launcher.body["spec"]["driver"]["tolerations"] == [toleration]
assert op.launcher.body["spec"]["executor"]["tolerations"] == [toleration]


@pytest.mark.db_test
def test_template_body_templating(create_task_instance_of_operator):
ti = create_task_instance_of_operator(
SparkKubernetesOperator,
template_spec={"foo": "{{ ds }}", "bar": "{{ dag_run.dag_id }}"},
kubernetes_conn_id="kubernetes_default_kube_config",
dag_id="test_template_body_templating_dag",
task_id="test_template_body_templating_task",
execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc),
)
ti.render_templates()
task: SparkKubernetesOperator = ti.task
assert task.template_body == {"spark": {"foo": "2024-02-01", "bar": "test_template_body_templating_dag"}}

0 comments on commit f691adf

Please sign in to comment.