Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics dagrun.duration.failed.<dag_id> not updated when the dag run failed due to timeout #29013

Closed
1 of 2 tasks
yangguoaws opened this issue Jan 18, 2023 · 2 comments · Fixed by #29076
Closed
1 of 2 tasks
Labels

Comments

@yangguoaws
Copy link

yangguoaws commented Jan 18, 2023

Apache Airflow version

2.5.0

What happened

When the dag was set with dagrun_timeout parameter and the dag run failed due to time out reason, the metrics dagrun.duration.failed.<dag_id> was not triggered.

What you think should happen instead

According to the doc, the metrics dagrun.duration.failed.<dag_id> should capture Milliseconds taken for a DagRun to reach failed state. Then it should capture all kinds of dag failure including the failure caused by dag level time out.

How to reproduce

set dagrun_timeout parameter (e.g. dagrun_timeout=timedelta(seconds=5)), then set up a BashOperator task run longer than dagrun_timeout. (e.g., bash_command='sleep 120',).

Then check the metrics, dagrun.duration.failed.<dag_id> can not capture this failed dag run due to timeout reason.

Operating System

Ubuntu 22.04.1 LTS

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==7.1.0
apache-airflow-providers-common-sql==1.3.3
apache-airflow-providers-ftp==3.3.0
apache-airflow-providers-http==4.1.1
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-sqlite==3.3.1

Deployment

Virtualenv installation

Deployment details

No response

Anything else

According to the doc, the metrics dagrun.duration.failed.<dag_id> should capture Milliseconds taken for a DagRun to reach failed state. However, if the dag run was failed due to the dag run level timeout, the metric can not capture the failed dag run.

I deep dive to the airflow code and figured out the reason.

The timer dagrun.duration.failed.{self.dag_id} was triggered in the method _emit_duration_stats_for_finished_state. code

    def _emit_duration_stats_for_finished_state(self):
        if self.state == State.RUNNING:
            return
        if self.start_date is None:
            self.log.warning("Failed to record duration of %s: start_date is not set.", self)
            return
        if self.end_date is None:
            self.log.warning("Failed to record duration of %s: end_date is not set.", self)
            return

        duration = self.end_date - self.start_date
        if self.state == State.SUCCESS:
            Stats.timing(f"dagrun.duration.success.{self.dag_id}", duration)
        elif self.state == State.FAILED:
            Stats.timing(f"dagrun.duration.failed.{self.dag_id}", duration)

The function _emit_duration_stats_for_finished_state was only called in the update_state() method for class DagRun(). code If the update_state() method was not call, then _emit_duration_stats_for_finished_state will not used.

        if self._state == DagRunState.FAILED or self._state == DagRunState.SUCCESS:
            msg = (
                "DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, "
                "run_start_date=%s, run_end_date=%s, run_duration=%s, "
                "state=%s, external_trigger=%s, run_type=%s, "
                "data_interval_start=%s, data_interval_end=%s, dag_hash=%s"
            )
            self.log.info(
                msg,
                self.dag_id,
                self.execution_date,
                self.run_id,
                self.start_date,
                self.end_date,
                (self.end_date - self.start_date).total_seconds()
                if self.start_date and self.end_date
                else None,
                self._state,
                self.external_trigger,
                self.run_type,
                self.data_interval_start,
                self.data_interval_end,
                self.dag_hash,
            )
            session.flush()

        self._emit_true_scheduling_delay_stats_for_finished_state(finished_tis)
        self._emit_duration_stats_for_finished_state()

When a dag run was timed out, in the scheduler job, it will only call set_state(). code

        if (
            dag_run.start_date
            and dag.dagrun_timeout
            and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
        ):
            dag_run.set_state(DagRunState.FAILED)
            unfinished_task_instances = (
                session.query(TI)
                .filter(TI.dag_id == dag_run.dag_id)
                .filter(TI.run_id == dag_run.run_id)
                .filter(TI.state.in_(State.unfinished))
            )
            for task_instance in unfinished_task_instances:
                task_instance.state = TaskInstanceState.SKIPPED
                session.merge(task_instance)
            session.flush()
            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
            active_runs = dag.get_num_active_runs(only_running=False, session=session)
            # Work out if we should allow creating a new DagRun now?
            if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
                dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))

            callback_to_execute = DagCallbackRequest(
                full_filepath=dag.fileloc,
                dag_id=dag.dag_id,
                run_id=dag_run.run_id,
                is_failure_callback=True,
                processor_subdir=dag_model.processor_subdir,
                msg="timed_out",
            )

            dag_run.notify_dagrun_state_changed()
            return callback_to_execute

From the above code, we can see that when the DAG run was timed out, it will call the set_state() method only. Here update_state() method was not called and that is why the metrics dagrun.duration.failed.{self.dag_id} was not set up accordingly.

Please fix this bug to let the timer dagrun.duration.failed.<dag_id> can capture the failed dag run due to dag level timed out.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@yangguoaws yangguoaws added area:core kind:bug This is a clearly a bug labels Jan 18, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jan 18, 2023

Thanks for opening your first issue here! Be sure to follow the issue template!

@yangguoaws
Copy link
Author

@o-nikolas Could you take a look?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants