Skip to content

Commit

Permalink
Add container_resources as KubernetesPodOperator templatable (#27457)
Browse files Browse the repository at this point in the history
closes #23529
  • Loading branch information
alanatlemba committed Nov 9, 2022
1 parent aefadb8 commit 47a2b9e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
Expand Up @@ -137,7 +137,7 @@ class KubernetesPodOperator(BaseOperator):
:param annotations: non-identifying metadata you can attach to the Pod.
Can be a large range of data, and can include characters
that are not permitted by labels.
:param container_resources: resources for the launched pod.
:param container_resources: resources for the launched pod. (templated)
:param affinity: affinity scheduling rules for the launched pod.
:param config_file: The path to the Kubernetes config file. (templated)
If not specified, default value is ``~/.kube/config``
Expand Down Expand Up @@ -185,6 +185,7 @@ class KubernetesPodOperator(BaseOperator):
"config_file",
"pod_template_file",
"namespace",
"container_resources",
)
template_fields_renderers = {"env_vars": "py"}

Expand Down Expand Up @@ -320,6 +321,11 @@ def _render_nested_template_fields(
self._do_render_template_fields(content, ("value", "name"), context, jinja_env, seen_oids)
return

if id(content) not in seen_oids and isinstance(content, k8s.V1ResourceRequirements):
seen_oids.add(id(content))
self._do_render_template_fields(content, ("limits", "requests"), context, jinja_env, seen_oids)
return

super()._render_nested_template_fields(content, context, jinja_env, seen_oids)

@staticmethod
Expand Down
35 changes: 35 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
Expand Up @@ -101,6 +101,41 @@ def setup(self, dag_maker):

patch.stopall()

def test_templates(self, create_task_instance_of_operator):
dag_id = "TestKubernetesPodOperator"
ti = create_task_instance_of_operator(
KubernetesPodOperator,
dag_id=dag_id,
task_id="task-id",
namespace="{{ dag.dag_id }}",
container_resources=k8s.V1ResourceRequirements(
requests={"memory": "{{ dag.dag_id }}", "cpu": "{{ dag.dag_id }}"},
limits={"memory": "{{ dag.dag_id }}", "cpu": "{{ dag.dag_id }}"},
),
pod_template_file="{{ dag.dag_id }}",
config_file="{{ dag.dag_id }}",
labels="{{ dag.dag_id }}",
env_vars=["{{ dag.dag_id }}"],
arguments="{{ dag.dag_id }}",
cmds="{{ dag.dag_id }}",
image="{{ dag.dag_id }}",
)

rendered = ti.render_templates()

assert dag_id == rendered.container_resources.limits["memory"]
assert dag_id == rendered.container_resources.limits["cpu"]
assert dag_id == rendered.container_resources.requests["memory"]
assert dag_id == rendered.container_resources.requests["cpu"]
assert dag_id == ti.task.image
assert dag_id == ti.task.cmds
assert dag_id == ti.task.namespace
assert dag_id == ti.task.config_file
assert dag_id == ti.task.labels
assert dag_id == ti.task.pod_template_file
assert dag_id == ti.task.arguments
assert dag_id == ti.task.env_vars[0]

def run_pod(self, operator: KubernetesPodOperator, map_index: int = -1) -> k8s.V1Pod:
with self.dag_maker(dag_id="dag") as dag:
operator.dag = dag
Expand Down

0 comments on commit 47a2b9e

Please sign in to comment.