From 544a1313c3727ccb05329ea0cad7274cfc54f21f Mon Sep 17 00:00:00 2001 From: shrividyahegde <5312302+Shrividya@users.noreply.github.com> Date: Fri, 20 Mar 2026 15:24:27 -0400 Subject: [PATCH 1/8] add application name as env var to driver and executor pods for SparkKubernetesOperator - added spark application name to both driver and executor components in template body - custom object launcher already had a value for the env under spec["item"]. The application name is added along with the k8_spec env var. - tests added accordingly -changelog.rst updated. --- providers/cncf/kubernetes/docs/changelog.rst | 8 +++ .../operators/custom_object_launcher.py | 3 +- .../kubernetes/operators/spark_kubernetes.py | 15 ++++ .../operators/test_spark_kubernetes.py | 70 +++++++++++++++++-- 4 files changed, 91 insertions(+), 5 deletions(-) diff --git a/providers/cncf/kubernetes/docs/changelog.rst b/providers/cncf/kubernetes/docs/changelog.rst index 03353a1915577..98b132592984b 100644 --- a/providers/cncf/kubernetes/docs/changelog.rst +++ b/providers/cncf/kubernetes/docs/changelog.rst @@ -32,6 +32,14 @@ Changelog Previously this would create a job that would never complete and always fail the task. Executing a task with ``parallelism = 0`` and ``wait_until_job_complete=True`` will now raise a validation error. +10.15.0 +....... + +Features +~~~~~~~~ + +* ``Add SPARK_APPLICATION_NAME env variable to driver and executor pods in SparkKubernetesOperator (#43801)`` + 10.14.0 ....... diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py index bbe507716f959..39c9269f1544a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py @@ -248,7 +248,8 @@ def get_body(self): self.body.spec["imagePullSecrets"] = k8s_spec.image_pull_secrets for item in ["driver", "executor"]: # Env List - self.body.spec[item]["env"] = k8s_spec.env_vars + existing_env = self.body.spec[item].get("env") or [] + self.body.spec[item]["env"] = existing_env + k8s_spec.env_vars self.body.spec[item]["envFrom"] = k8s_spec.env_from # Volumes self.body.spec[item]["volumeMounts"] = k8s_spec.volume_mounts diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index dfaac1ecd042d..57110031612de 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -396,6 +396,21 @@ def _setup_spark_configuration(self, context: Context): spec_dict[component]["labels"].update(task_context_labels) + if "spark" not in template_body: + template_body["spark"] = {} + if "spec" not in template_body["spark"]: + template_body["spark"]["spec"] = {} + + spec_dict = template_body["spark"]["spec"] + app_name_env = {"name": "SPARK_APPLICATION_NAME", "value": self.name} + + for component in ["driver", "executor"]: + if component not in spec_dict: + spec_dict[component] = {} + env_list = spec_dict[component].setdefault("env", []) + if not any(e.get("name") == "SPARK_APPLICATION_NAME" for e in env_list): + env_list.append(app_name_env) + self.log.info("Creating sparkApplication.") self.launcher = CustomObjectLauncher( name=self.name, diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py index 56f3132d7d0be..616149b088f7c 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -166,11 +166,12 @@ def _get_expected_application_dict_with_labels(task_name="default_yaml"): } -def _get_expected_application_dict_without_task_context_labels(task_name="default_yaml"): +def _get_expected_application_dict_without_task_context_labels(task_name="default_yaml", app_name=None): """Create expected application dict without task context labels (only original file labels).""" original_file_labels = { "version": "2.4.5", } + app_name = app_name or task_name return { "apiVersion": "sparkoperator.k8s.io/v1beta2", @@ -193,6 +194,7 @@ def _get_expected_application_dict_without_task_context_labels(task_name="defaul "labels": original_file_labels.copy(), "serviceAccount": "spark", "volumeMounts": [{"name": "test-volume", "mountPath": "/tmp"}], + "env": [{"name": "SPARK_APPLICATION_NAME", "value": app_name}], }, "executor": { "cores": 1, @@ -200,6 +202,7 @@ def _get_expected_application_dict_without_task_context_labels(task_name="defaul "memory": "512m", "labels": original_file_labels.copy(), "volumeMounts": [{"name": "test-volume", "mountPath": "/tmp"}], + "env": [{"name": "SPARK_APPLICATION_NAME", "value": app_name}] }, }, } @@ -378,7 +381,7 @@ def test_create_application( assert isinstance(done_op.name, str) assert done_op.name != "" - expected_dict = _get_expected_application_dict_without_task_context_labels(task_name) + expected_dict = _get_expected_application_dict_without_task_context_labels(task_name,app_name=done_op.name) expected_dict["metadata"]["name"] = done_op.name mock_create_namespaced_crd.assert_called_with( body=expected_dict, @@ -424,7 +427,7 @@ def test_create_application_and_use_name_from_operator_args( else: assert done_op.name == name_normalized - expected_dict = _get_expected_application_dict_without_task_context_labels(task_name) + expected_dict = _get_expected_application_dict_without_task_context_labels(task_name,app_name=done_op.name) expected_dict["metadata"]["name"] = done_op.name mock_create_namespaced_crd.assert_called_with( body=expected_dict, @@ -467,7 +470,7 @@ def test_create_application_and_use_name_task_id( else: assert done_op.name == name_normalized - expected_dict = _get_expected_application_dict_without_task_context_labels(task_name) + expected_dict = _get_expected_application_dict_without_task_context_labels(task_name,app_name=done_op.name) expected_dict["metadata"]["name"] = done_op.name mock_create_namespaced_crd.assert_called_with( body=expected_dict, @@ -504,6 +507,8 @@ def test_new_template_from_yaml( expected_dict = _get_expected_k8s_dict() expected_dict["metadata"]["name"] = done_op.name + expected_dict["spec"]["driver"]["env"] = [{"name": "SPARK_APPLICATION_NAME", "value": done_op.name}] + expected_dict["spec"]["executor"]["env"] = [{"name": "SPARK_APPLICATION_NAME", "value": done_op.name}] mock_create_namespaced_crd.assert_called_with( body=expected_dict, **self.call_commons, @@ -540,6 +545,8 @@ def test_template_spec( expected_dict = _get_expected_k8s_dict() expected_dict["metadata"]["name"] = done_op.name + expected_dict["spec"]["driver"]["env"] = [{"name": "SPARK_APPLICATION_NAME", "value": done_op.name}] + expected_dict["spec"]["executor"]["env"] = [{"name": "SPARK_APPLICATION_NAME", "value": done_op.name}] mock_create_namespaced_crd.assert_called_with( body=expected_dict, **self.call_commons, @@ -625,9 +632,11 @@ def test_env( task_name, mock_create_job_name, job_spec=job_spec, mock_get_kube_client=mock_get_kube_client ) assert op.launcher.body["spec"]["driver"]["env"] == [ + {"name": "SPARK_APPLICATION_NAME", "value": "default_env"}, k8s.V1EnvVar(name="TEST_ENV_1", value="VALUE1"), ] assert op.launcher.body["spec"]["executor"]["env"] == [ + {"name": "SPARK_APPLICATION_NAME", "value": "default_env"}, k8s.V1EnvVar(name="TEST_ENV_1", value="VALUE1"), ] @@ -1520,3 +1529,56 @@ def test_reattach_skips_launcher_creation_in_execute( # And verify delete works op.on_kill() mock_launcher_cls.return_value.delete_spark_job.assert_called() + + def test_spark_application_name_env_injected(self): + op = SparkKubernetesOperator( + task_id="test_task", + namespace="default", + template_spec={ + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "spec": { + "driver": {}, + "executor": {}, + }, + }, + reattach_on_restart=False, + ) + op.name = "my-spark-app-abc123" + + with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()): + op._setup_spark_configuration(mock.MagicMock()) + + body = op.launcher.body + for component in ["driver", "executor"]: + env = body["spec"][component].get("env", []) + names = [e["name"] for e in env] + assert "SPARK_APPLICATION_NAME" in names + value = next(e["value"] for e in env if e["name"] == "SPARK_APPLICATION_NAME") + assert value == "my-spark-app-abc123" + + def test_spark_application_name_env_not_duplicated(self): + op = SparkKubernetesOperator( + task_id="test_task", + namespace="default", + template_spec={ + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "spec": { + "driver": {"env": [{"name": "SPARK_APPLICATION_NAME", "value": "user-defined"}]}, + "executor": {"env": [{"name": "SPARK_APPLICATION_NAME", "value": "user-defined"}]}, + }, + }, + reattach_on_restart=False, + ) + op.name = "my-spark-app-abc123" + + with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()): + op._setup_spark_configuration(mock.MagicMock()) + + body = op.launcher.body + for component in ["driver", "executor"]: + env = body["spec"][component].get("env", []) + app_name_envs = [e for e in env if e["name"] == "SPARK_APPLICATION_NAME"] + assert len(app_name_envs) == 1 # not duplicated + assert app_name_envs[0]["value"] == "user-defined" From d3e176e85c3ba6a46d8f4bd502d1caba0f94e3c0 Mon Sep 17 00:00:00 2001 From: shrividyahegde <5312302+Shrividya@users.noreply.github.com> Date: Fri, 20 Mar 2026 15:56:15 -0400 Subject: [PATCH 2/8] reverting the changelog --- providers/cncf/kubernetes/docs/changelog.rst | 8 -------- 1 file changed, 8 deletions(-) diff --git a/providers/cncf/kubernetes/docs/changelog.rst b/providers/cncf/kubernetes/docs/changelog.rst index 98b132592984b..03353a1915577 100644 --- a/providers/cncf/kubernetes/docs/changelog.rst +++ b/providers/cncf/kubernetes/docs/changelog.rst @@ -32,14 +32,6 @@ Changelog Previously this would create a job that would never complete and always fail the task. Executing a task with ``parallelism = 0`` and ``wait_until_job_complete=True`` will now raise a validation error. -10.15.0 -....... - -Features -~~~~~~~~ - -* ``Add SPARK_APPLICATION_NAME env variable to driver and executor pods in SparkKubernetesOperator (#43801)`` - 10.14.0 ....... From 43d4ca891add2e09ca431ec854e52ffc87753de6 Mon Sep 17 00:00:00 2001 From: shrividyahegde <5312302+Shrividya@users.noreply.github.com> Date: Fri, 20 Mar 2026 17:50:08 -0400 Subject: [PATCH 3/8] simplification of logic --- .../cncf/kubernetes/operators/spark_kubernetes.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 57110031612de..0d77361199d15 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -396,20 +396,11 @@ def _setup_spark_configuration(self, context: Context): spec_dict[component]["labels"].update(task_context_labels) - if "spark" not in template_body: - template_body["spark"] = {} - if "spec" not in template_body["spark"]: - template_body["spark"]["spec"] = {} - - spec_dict = template_body["spark"]["spec"] - app_name_env = {"name": "SPARK_APPLICATION_NAME", "value": self.name} - + spec_dict = template_body.setdefault("spark", {}).setdefault("spec", {}) for component in ["driver", "executor"]: - if component not in spec_dict: - spec_dict[component] = {} - env_list = spec_dict[component].setdefault("env", []) + env_list = spec_dict.setdefault(component, {}).setdefault("env", []) if not any(e.get("name") == "SPARK_APPLICATION_NAME" for e in env_list): - env_list.append(app_name_env) + env_list.append({"name": "SPARK_APPLICATION_NAME", "value": self.name}) self.log.info("Creating sparkApplication.") self.launcher = CustomObjectLauncher( From 4e0dde321954ea5cc7fbab1f4dd3fbb17e816066 Mon Sep 17 00:00:00 2001 From: shrividyahegde <5312302+Shrividya@users.noreply.github.com> Date: Fri, 20 Mar 2026 15:24:27 -0400 Subject: [PATCH 4/8] add application name as env var to driver and executor pods for SparkKubernetesOperator - added spark application name to both driver and executor components in template body - custom object launcher already had a value for the env under spec["item"]. The application name is added along with the k8_spec env var. - tests added accordingly -changelog.rst updated. --- providers/cncf/kubernetes/docs/changelog.rst | 8 +++ .../operators/custom_object_launcher.py | 3 +- .../kubernetes/operators/spark_kubernetes.py | 15 ++++ .../operators/test_spark_kubernetes.py | 70 +++++++++++++++++-- 4 files changed, 91 insertions(+), 5 deletions(-) diff --git a/providers/cncf/kubernetes/docs/changelog.rst b/providers/cncf/kubernetes/docs/changelog.rst index 03353a1915577..98b132592984b 100644 --- a/providers/cncf/kubernetes/docs/changelog.rst +++ b/providers/cncf/kubernetes/docs/changelog.rst @@ -32,6 +32,14 @@ Changelog Previously this would create a job that would never complete and always fail the task. Executing a task with ``parallelism = 0`` and ``wait_until_job_complete=True`` will now raise a validation error. +10.15.0 +....... + +Features +~~~~~~~~ + +* ``Add SPARK_APPLICATION_NAME env variable to driver and executor pods in SparkKubernetesOperator (#43801)`` + 10.14.0 ....... diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py index bbe507716f959..39c9269f1544a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py @@ -248,7 +248,8 @@ def get_body(self): self.body.spec["imagePullSecrets"] = k8s_spec.image_pull_secrets for item in ["driver", "executor"]: # Env List - self.body.spec[item]["env"] = k8s_spec.env_vars + existing_env = self.body.spec[item].get("env") or [] + self.body.spec[item]["env"] = existing_env + k8s_spec.env_vars self.body.spec[item]["envFrom"] = k8s_spec.env_from # Volumes self.body.spec[item]["volumeMounts"] = k8s_spec.volume_mounts diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index dfaac1ecd042d..57110031612de 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -396,6 +396,21 @@ def _setup_spark_configuration(self, context: Context): spec_dict[component]["labels"].update(task_context_labels) + if "spark" not in template_body: + template_body["spark"] = {} + if "spec" not in template_body["spark"]: + template_body["spark"]["spec"] = {} + + spec_dict = template_body["spark"]["spec"] + app_name_env = {"name": "SPARK_APPLICATION_NAME", "value": self.name} + + for component in ["driver", "executor"]: + if component not in spec_dict: + spec_dict[component] = {} + env_list = spec_dict[component].setdefault("env", []) + if not any(e.get("name") == "SPARK_APPLICATION_NAME" for e in env_list): + env_list.append(app_name_env) + self.log.info("Creating sparkApplication.") self.launcher = CustomObjectLauncher( name=self.name, diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py index 56f3132d7d0be..616149b088f7c 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -166,11 +166,12 @@ def _get_expected_application_dict_with_labels(task_name="default_yaml"): } -def _get_expected_application_dict_without_task_context_labels(task_name="default_yaml"): +def _get_expected_application_dict_without_task_context_labels(task_name="default_yaml", app_name=None): """Create expected application dict without task context labels (only original file labels).""" original_file_labels = { "version": "2.4.5", } + app_name = app_name or task_name return { "apiVersion": "sparkoperator.k8s.io/v1beta2", @@ -193,6 +194,7 @@ def _get_expected_application_dict_without_task_context_labels(task_name="defaul "labels": original_file_labels.copy(), "serviceAccount": "spark", "volumeMounts": [{"name": "test-volume", "mountPath": "/tmp"}], + "env": [{"name": "SPARK_APPLICATION_NAME", "value": app_name}], }, "executor": { "cores": 1, @@ -200,6 +202,7 @@ def _get_expected_application_dict_without_task_context_labels(task_name="defaul "memory": "512m", "labels": original_file_labels.copy(), "volumeMounts": [{"name": "test-volume", "mountPath": "/tmp"}], + "env": [{"name": "SPARK_APPLICATION_NAME", "value": app_name}] }, }, } @@ -378,7 +381,7 @@ def test_create_application( assert isinstance(done_op.name, str) assert done_op.name != "" - expected_dict = _get_expected_application_dict_without_task_context_labels(task_name) + expected_dict = _get_expected_application_dict_without_task_context_labels(task_name,app_name=done_op.name) expected_dict["metadata"]["name"] = done_op.name mock_create_namespaced_crd.assert_called_with( body=expected_dict, @@ -424,7 +427,7 @@ def test_create_application_and_use_name_from_operator_args( else: assert done_op.name == name_normalized - expected_dict = _get_expected_application_dict_without_task_context_labels(task_name) + expected_dict = _get_expected_application_dict_without_task_context_labels(task_name,app_name=done_op.name) expected_dict["metadata"]["name"] = done_op.name mock_create_namespaced_crd.assert_called_with( body=expected_dict, @@ -467,7 +470,7 @@ def test_create_application_and_use_name_task_id( else: assert done_op.name == name_normalized - expected_dict = _get_expected_application_dict_without_task_context_labels(task_name) + expected_dict = _get_expected_application_dict_without_task_context_labels(task_name,app_name=done_op.name) expected_dict["metadata"]["name"] = done_op.name mock_create_namespaced_crd.assert_called_with( body=expected_dict, @@ -504,6 +507,8 @@ def test_new_template_from_yaml( expected_dict = _get_expected_k8s_dict() expected_dict["metadata"]["name"] = done_op.name + expected_dict["spec"]["driver"]["env"] = [{"name": "SPARK_APPLICATION_NAME", "value": done_op.name}] + expected_dict["spec"]["executor"]["env"] = [{"name": "SPARK_APPLICATION_NAME", "value": done_op.name}] mock_create_namespaced_crd.assert_called_with( body=expected_dict, **self.call_commons, @@ -540,6 +545,8 @@ def test_template_spec( expected_dict = _get_expected_k8s_dict() expected_dict["metadata"]["name"] = done_op.name + expected_dict["spec"]["driver"]["env"] = [{"name": "SPARK_APPLICATION_NAME", "value": done_op.name}] + expected_dict["spec"]["executor"]["env"] = [{"name": "SPARK_APPLICATION_NAME", "value": done_op.name}] mock_create_namespaced_crd.assert_called_with( body=expected_dict, **self.call_commons, @@ -625,9 +632,11 @@ def test_env( task_name, mock_create_job_name, job_spec=job_spec, mock_get_kube_client=mock_get_kube_client ) assert op.launcher.body["spec"]["driver"]["env"] == [ + {"name": "SPARK_APPLICATION_NAME", "value": "default_env"}, k8s.V1EnvVar(name="TEST_ENV_1", value="VALUE1"), ] assert op.launcher.body["spec"]["executor"]["env"] == [ + {"name": "SPARK_APPLICATION_NAME", "value": "default_env"}, k8s.V1EnvVar(name="TEST_ENV_1", value="VALUE1"), ] @@ -1520,3 +1529,56 @@ def test_reattach_skips_launcher_creation_in_execute( # And verify delete works op.on_kill() mock_launcher_cls.return_value.delete_spark_job.assert_called() + + def test_spark_application_name_env_injected(self): + op = SparkKubernetesOperator( + task_id="test_task", + namespace="default", + template_spec={ + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "spec": { + "driver": {}, + "executor": {}, + }, + }, + reattach_on_restart=False, + ) + op.name = "my-spark-app-abc123" + + with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()): + op._setup_spark_configuration(mock.MagicMock()) + + body = op.launcher.body + for component in ["driver", "executor"]: + env = body["spec"][component].get("env", []) + names = [e["name"] for e in env] + assert "SPARK_APPLICATION_NAME" in names + value = next(e["value"] for e in env if e["name"] == "SPARK_APPLICATION_NAME") + assert value == "my-spark-app-abc123" + + def test_spark_application_name_env_not_duplicated(self): + op = SparkKubernetesOperator( + task_id="test_task", + namespace="default", + template_spec={ + "apiVersion": "sparkoperator.k8s.io/v1beta2", + "kind": "SparkApplication", + "spec": { + "driver": {"env": [{"name": "SPARK_APPLICATION_NAME", "value": "user-defined"}]}, + "executor": {"env": [{"name": "SPARK_APPLICATION_NAME", "value": "user-defined"}]}, + }, + }, + reattach_on_restart=False, + ) + op.name = "my-spark-app-abc123" + + with mock.patch.object(op, "get_or_create_spark_crd", return_value=mock.MagicMock()): + op._setup_spark_configuration(mock.MagicMock()) + + body = op.launcher.body + for component in ["driver", "executor"]: + env = body["spec"][component].get("env", []) + app_name_envs = [e for e in env if e["name"] == "SPARK_APPLICATION_NAME"] + assert len(app_name_envs) == 1 # not duplicated + assert app_name_envs[0]["value"] == "user-defined" From bb1f97b85af78c5f0d3a640448b6d8037a78aca0 Mon Sep 17 00:00:00 2001 From: shrividyahegde <5312302+Shrividya@users.noreply.github.com> Date: Fri, 20 Mar 2026 15:56:15 -0400 Subject: [PATCH 5/8] reverting the changelog --- providers/cncf/kubernetes/docs/changelog.rst | 8 -------- 1 file changed, 8 deletions(-) diff --git a/providers/cncf/kubernetes/docs/changelog.rst b/providers/cncf/kubernetes/docs/changelog.rst index 98b132592984b..03353a1915577 100644 --- a/providers/cncf/kubernetes/docs/changelog.rst +++ b/providers/cncf/kubernetes/docs/changelog.rst @@ -32,14 +32,6 @@ Changelog Previously this would create a job that would never complete and always fail the task. Executing a task with ``parallelism = 0`` and ``wait_until_job_complete=True`` will now raise a validation error. -10.15.0 -....... - -Features -~~~~~~~~ - -* ``Add SPARK_APPLICATION_NAME env variable to driver and executor pods in SparkKubernetesOperator (#43801)`` - 10.14.0 ....... From 449d3e106052abb817b0839b5e95aca71545c9ae Mon Sep 17 00:00:00 2001 From: shrividyahegde <5312302+Shrividya@users.noreply.github.com> Date: Fri, 20 Mar 2026 17:50:08 -0400 Subject: [PATCH 6/8] simplification of logic --- .../cncf/kubernetes/operators/spark_kubernetes.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py index 57110031612de..0d77361199d15 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -396,20 +396,11 @@ def _setup_spark_configuration(self, context: Context): spec_dict[component]["labels"].update(task_context_labels) - if "spark" not in template_body: - template_body["spark"] = {} - if "spec" not in template_body["spark"]: - template_body["spark"]["spec"] = {} - - spec_dict = template_body["spark"]["spec"] - app_name_env = {"name": "SPARK_APPLICATION_NAME", "value": self.name} - + spec_dict = template_body.setdefault("spark", {}).setdefault("spec", {}) for component in ["driver", "executor"]: - if component not in spec_dict: - spec_dict[component] = {} - env_list = spec_dict[component].setdefault("env", []) + env_list = spec_dict.setdefault(component, {}).setdefault("env", []) if not any(e.get("name") == "SPARK_APPLICATION_NAME" for e in env_list): - env_list.append(app_name_env) + env_list.append({"name": "SPARK_APPLICATION_NAME", "value": self.name}) self.log.info("Creating sparkApplication.") self.launcher = CustomObjectLauncher( From 1f793a3f23424c7fde197f8615538fe7f99c0272 Mon Sep 17 00:00:00 2001 From: Shrividya Hegde <5312302+Shrividya@users.noreply.github.com> Date: Sun, 22 Mar 2026 14:19:29 -0400 Subject: [PATCH 7/8] Refactor expected_dict assignment for readability --- .../kubernetes/operators/test_spark_kubernetes.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py index 616149b088f7c..5d63658ed6f41 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -202,7 +202,7 @@ def _get_expected_application_dict_without_task_context_labels(task_name="defaul "memory": "512m", "labels": original_file_labels.copy(), "volumeMounts": [{"name": "test-volume", "mountPath": "/tmp"}], - "env": [{"name": "SPARK_APPLICATION_NAME", "value": app_name}] + "env": [{"name": "SPARK_APPLICATION_NAME", "value": app_name}], }, }, } @@ -381,7 +381,9 @@ def test_create_application( assert isinstance(done_op.name, str) assert done_op.name != "" - expected_dict = _get_expected_application_dict_without_task_context_labels(task_name,app_name=done_op.name) + expected_dict = _get_expected_application_dict_without_task_context_labels( + task_name, app_name=done_op.name + ) expected_dict["metadata"]["name"] = done_op.name mock_create_namespaced_crd.assert_called_with( body=expected_dict, @@ -427,7 +429,9 @@ def test_create_application_and_use_name_from_operator_args( else: assert done_op.name == name_normalized - expected_dict = _get_expected_application_dict_without_task_context_labels(task_name,app_name=done_op.name) + expected_dict = _get_expected_application_dict_without_task_context_labels( + task_name, app_name=done_op.name + ) expected_dict["metadata"]["name"] = done_op.name mock_create_namespaced_crd.assert_called_with( body=expected_dict, @@ -470,7 +474,9 @@ def test_create_application_and_use_name_task_id( else: assert done_op.name == name_normalized - expected_dict = _get_expected_application_dict_without_task_context_labels(task_name,app_name=done_op.name) + expected_dict = _get_expected_application_dict_without_task_context_labels( + task_name, app_name=done_op.name + ) expected_dict["metadata"]["name"] = done_op.name mock_create_namespaced_crd.assert_called_with( body=expected_dict, From 945b5000a6c4e0cf962e3938420c8cfd4dc79cee Mon Sep 17 00:00:00 2001 From: Shrividya Hegde <5312302+Shrividya@users.noreply.github.com> Date: Sun, 22 Mar 2026 16:20:22 -0400 Subject: [PATCH 8/8] Add mock patching to spark application name tests --- .../unit/cncf/kubernetes/operators/test_spark_kubernetes.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py index 5d63658ed6f41..b5f278ad49d31 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_spark_kubernetes.py @@ -1536,7 +1536,8 @@ def test_reattach_skips_launcher_creation_in_execute( op.on_kill() mock_launcher_cls.return_value.delete_spark_job.assert_called() - def test_spark_application_name_env_injected(self): + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client") + def test_spark_application_name_env_injected(self, mock_client): op = SparkKubernetesOperator( task_id="test_task", namespace="default", @@ -1563,7 +1564,8 @@ def test_spark_application_name_env_injected(self): value = next(e["value"] for e in env if e["name"] == "SPARK_APPLICATION_NAME") assert value == "my-spark-app-abc123" - def test_spark_application_name_env_not_duplicated(self): + @patch("airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator.client") + def test_spark_application_name_env_not_duplicated(self, mock_client): op = SparkKubernetesOperator( task_id="test_task", namespace="default",