Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dag stuck in "running" status when running in backfill job #34448

Closed
1 of 2 tasks
charleschang0531 opened this issue Sep 18, 2023 · 2 comments · Fixed by #36954
Closed
1 of 2 tasks

dag stuck in "running" status when running in backfill job #34448

charleschang0531 opened this issue Sep 18, 2023 · 2 comments · Fixed by #36954
Labels
affected_version:2.7 Issues Reported for 2.7 area:backfill Specifically for backfill related area:core kind:bug This is a clearly a bug

Comments

@charleschang0531
Copy link

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

We are running airflow 2.6.2 with KubernetesExecutor, and we have two dags

  1. dag with dbt (dag_test.py)
  2. dag with backfill command to rerun dag_test.py (dag_rerun.py)

I'm trying to mock a situation when the job still failed when running airflow dag backfill, and there is a problem i cant solve
when I try to backfill, the dag_test.py Run shows running forever when some task in dag_test.py are in failed state

  1. dag_test.py
from datetime import datetime, timedelta
from airflow.configuration import conf
from airflow import DAG, AirflowException
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from textwrap import dedent
from airflow.decorators import task
from airflow.kubernetes.secret import Secret
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s
import os
import random

from common.airflow import create_dag_by_common
from common.kubernetes import (
    CONTAINER_RESOURCES,
    NODE_AFFINITY,
    get_config
)

def create_dag(dag_id: str) -> DAG:

    # Define DAG args
    default_args = {
        "retries": 0,
        "retry_delay": timedelta(minutes=5),
    }

    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        description="basic dag",
        doc_md="",
        schedule_interval="06 * * * *",
        start_date=datetime(2023, 9, 16),
        catchup=True,
        max_active_runs=4,
        max_active_tasks=10
    )
    with dag:
        dag_start = DummyOperator(
            task_id="dag_start",
            dag=dag,
        )

        dag_end = DummyOperator(
            task_id="dag_end",
            dag=dag,
        )

        @task()
        def random_failed():            
            number = random.randrange(1, 3)
            print(number)
            if number == 2 or number == 3:
                # DagRun.set_state = "failed"?
                raise AirflowException("Unexpected number 2 or 3")

        task_dbt_run = KubernetesPodOperator(
            task_id="task_dbt_run",
            image=f"{image_url}",
            secrets=[secret_volume],
            cmds=["sh", "-c", "dbt run -s path:models/test --target pixel"],
            env_vars={
                "Airflow_Job_StartTime": "{{ ts }}"
            },
            name="task_dbt_run",
            is_delete_operator_pod=True,
            image_pull_policy="Always",
            in_cluster=True,
            get_logs=True,
            do_xcom_push=False,
            affinity=NODE_AFFINITY,
            dag=dag
        )

        dag_start >> random_failed() >> task_dbt_run >> dag_end

    return dag


dag_id = "dag_test"

create_dag_by_common(
    dag_id=dag_id, create_dag_func=create_dag
)
  1. dag_rerun.py
from datetime import datetime, timedelta
from airflow.configuration import conf
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash import BashOperator
from textwrap import dedent
from airflow.decorators import task
from airflow.kubernetes.secret import Secret
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s
import os

from common.airflow import create_dag_by_common
from common.kubernetes import (
    get_config
)


def create_dag(dag_id: str) -> DAG:

    # Define DAG args
    default_args = {
        "retries": 0,
        "retry_delay": timedelta(minutes=5),
    }

    dag = DAG(
        dag_id=dag_id,
        default_args=default_args,
        description="basic dag",
        doc_md="",
        schedule_interval="45 * * * *",
        start_date=datetime(2023, 9, 14),
        catchup=False,
        max_active_runs=2,
        max_active_tasks=10
    )
    with dag:
        dag_start = DummyOperator(
            task_id="dag_start",
            dag=dag,
        )

        dag_end = DummyOperator(
            task_id="dag_end",
            dag=dag,
        )

        templated_command = dedent(
        """        
        airflow dags backfill dag_test -s 2023-09-15T08:00:00 -e 2023-09-16T15:00:00 --rerun-failed-tasks --continue-on-failures --disable-retry
        """
        )
        
        rerun_task = BashOperator(
            task_id="rerun_task",
            depends_on_past=False,
            bash_command=templated_command
        )      
          

        dag_start >> rerun_task >> dag_end

    return dag


dag_id = "dag_rerun"

create_dag_by_common(
    dag_id=dag_id, create_dag_func=create_dag
)
  1. Result
    image (9)

What you think should happen instead

The backfill jobs should rerun failed Runs, and if rerun is still failed,
we hope that can just mark Run failed status, and keep going on next job

How to reproduce

No response

Operating System

Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow 2.6.2
apache-airflow-providers-amazon 8.1.0
apache-airflow-providers-celery 3.2.0
apache-airflow-providers-cncf-kubernetes 7.0.0
apache-airflow-providers-common-sql 1.5.1
apache-airflow-providers-docker 3.7.0
apache-airflow-providers-elasticsearch 4.5.0
apache-airflow-providers-ftp 3.4.1
apache-airflow-providers-google 10.1.1
apache-airflow-providers-grpc 3.2.0
apache-airflow-providers-hashicorp 3.4.0
apache-airflow-providers-http 4.4.1
apache-airflow-providers-imap 3.2.1
apache-airflow-providers-microsoft-azure 6.1.1
apache-airflow-providers-mysql 5.1.0
apache-airflow-providers-odbc 3.3.0
apache-airflow-providers-postgres 5.5.0
apache-airflow-providers-redis 3.2.0
apache-airflow-providers-sendgrid 3.2.0
apache-airflow-providers-sftp 4.3.0
apache-airflow-providers-slack 7.3.0
apache-airflow-providers-snowflake 4.1.0
apache-airflow-providers-sqlite 3.4.1
apache-airflow-providers-ssh 3.7.0
google-cloud-orchestration-airflow 1.9.0
pytest-airflow 0.0.3

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@charleschang0531 charleschang0531 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 18, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 18, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@RNHTTR
Copy link
Contributor

RNHTTR commented Sep 21, 2023

I've been able to reproduce this. Interacting with a task via the database (e.g. clearing or marking the task as failed) during a backfill DAG run results in tasks no longer being scheduled.

@RNHTTR RNHTTR added area:backfill Specifically for backfill related affected_version:2.7 Issues Reported for 2.7 affected_version:2.6 Issues Reported for 2.6 and removed area:core needs-triage label for new issues that we didn't triage yet labels Sep 21, 2023
@eladkal eladkal added area:core and removed affected_version:2.6 Issues Reported for 2.6 labels Jan 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.7 Issues Reported for 2.7 area:backfill Specifically for backfill related area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants