Skip to content

[Bug] spark_kubernetes sensor does not terminate SparkApplication when task is manually failed in Airflow #48643

@Neelesh2512

Description

@Neelesh2512

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

apache-airflow-providers-cncf-kubernetes==3.0.0
but can't see in latest version as well

Apache Airflow version

2.3.3

Operating System

Amazon linux 2

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

The SparkKubernetesSensor in airflow.providers.cncf.kubernetes.sensors.spark_kubernetes does not have an on_kill method, which means that when a task is manually failed from the Airflow UI, the associated SparkApplication continues running in Kubernetes instead of being terminated.

This leads to orphaned Spark jobs consuming cluster resources unnecessarily.

Current Behavior

  1. Trigger a DAG using SparkKubernetesSensor to monitor a SparkApplication.
  2. Manually mark the sensor task as "Failed" from the Airflow UI.
  3. The Spark job continues running in the background and is not terminated.

What you think should happen instead

Expected Behavior

  • When a task using SparkKubernetesSensor is manually failed, it should terminate the running SparkApplication in Kubernetes to prevent orphaned workloads.

  • This can be achieved by implementing the on_kill method in SparkKubernetesSensor, which should delete the Spark application if the task is stopped.

How to reproduce

from airflow import DAG
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from datetime import datetime

with DAG(
    dag_id="test_spark_kubernetes_sensor",
    schedule_interval=None,
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    spark_submit = SparkKubernetesOperator(
        task_id="submit_spark_app",
        application_file="path/to/spark-application.yaml",
        namespace="default",
    )

    spark_sensor = SparkKubernetesSensor(
        task_id="monitor_spark_app",
        namespace="default",
        application_name="spark-app-name",
        poke_interval=30,
        timeout=600,
    )

    spark_submit >> spark_sensor

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions