-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Closed
Labels
area:Schedulerincluding HA (high availability) schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yet
Description
Apache Airflow version
main (development)
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Following stack trace is seen on trying to execute the attached dag.
[2025-03-15 15:30:21 +0530] [27911] [INFO] Worker exiting (pid: 27911)
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 52, in scheduler
run_command_with_daemon_option(
File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/daemon_utils.py", line 86, in run_command_with_daemon_option
callback()
File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 55, in <lambda>
callback=lambda: _run_scheduler_job(args),
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/cli/commands/local_commands/scheduler_command.py", line 43, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 101, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 342, in run_job
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/job.py", line 371, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 937, in _execute
self._run_scheduler_loop()
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1063, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1163, in _do_scheduling
callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/retries.py", line 93, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py", line 443, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/.venv/lib/python3.11/site-packages/tenacity/__init__.py", line 398, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/retries.py", line 102, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1569, in _schedule_all_dag_runs
callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1569, in <listcomp>
callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1667, in _schedule_dag_run
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 98, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/models/dagrun.py", line 943, in update_state
info = self.task_instance_scheduling_decisions(session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 98, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/models/dagrun.py", line 1123, in task_instance_scheduling_decisions
schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/models/dagrun.py", line 1222, in _get_ready_tis
if not schedulable.are_dependencies_met(session=session, dep_context=dep_context):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/utils/session.py", line 98, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/karthikeyan/stuff/python/airflow/airflow/models/taskinstance.py", line 2335, in are_dependencies_met
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, session=session):
File "/home/karthikeyan/stuff/python/airflow/airflow/models/taskinstance.py", line 2359, in get_failed_dep_statuses
for dep_status in dep.get_dep_statuses(self, session, dep_context):
File "/home/karthikeyan/stuff/python/airflow/airflow/ti_deps/deps/base_ti_dep.py", line 116, in get_dep_statuses
yield from self._get_dep_statuses(ti, session, cxt)
File "/home/karthikeyan/stuff/python/airflow/airflow/ti_deps/deps/not_previously_skipped_dep.py", line 56, in _get_dep_statuses
if parent.inherits_from_skipmixin:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'SerializedBaseOperator' object has no attribute 'inherits_from_skipmixin'
[2025-03-15 15:30:21 +0530] [27910] [INFO] Shutting down: Master
What you think should happen instead?
No response
How to reproduce
Following dag file crashes the scheduler
DOC = """# Docs for the dag
* 1
* 2
*bold* _italics_
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
from datetime import timedelta
with DAG(
dag_id="retry_issue_dag",
start_date=datetime(2023, 10, 10),
catchup=False,
schedule="@once",
doc_md=DOC,
) as dag:
@task(retries=8, retry_delay=timedelta(seconds=1))
def retry_less_than_10():
raise Exception("fail")
@task(retries=40, retry_delay=timedelta(seconds=1))
def retry_more_than_10():
raise Exception("fail")
retry_less_than_10()
retry_more_than_10()Operating System
Ubuntu 20.04
Versions of Apache Airflow Providers
No response
Deployment
Virtualenv installation
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area:Schedulerincluding HA (high availability) schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yet