Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SerializedDagNotFound: DAG not found in serialized_dag table #18843

Closed
2 tasks done
KulykDmytro opened this issue Oct 8, 2021 · 12 comments · Fixed by #19113
Closed
2 tasks done

SerializedDagNotFound: DAG not found in serialized_dag table #18843

KulykDmytro opened this issue Oct 8, 2021 · 12 comments · Fixed by #19113
Assignees
Labels
affected_version:2.1 Issues Reported for 2.1 area:core area:serialization kind:bug This is a clearly a bug
Milestone

Comments

@KulykDmytro
Copy link
Contributor

KulykDmytro commented Oct 8, 2021

Apache Airflow version

2.1.4 (latest released)

Operating System

Linux 5.4.149-73.259.amzn2.x86_64

Versions of Apache Airflow Providers

No response

Deployment

Other 3rd-party Helm chart

Deployment details

AWS EKS over own helm chart

What happened

We have an issue back from 2.0.x #13504
Each time scheduler is restarted it deletes all DAGS deom serialized_dag table and trying to serialize them again from the scratch. Afterwards scheduler pod become failed with error:

[2021-10-08 20:19:40,683] {kubernetes_executor.py:761} INFO - Shutting down Kubernetes executor                                                                                            
[2021-10-08 20:19:41,705] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 32                                                                                                 
[2021-10-08 20:19:42,207] {process_utils.py:207} INFO - Waiting up to 5 seconds for processes to exit...                                                                                   
[2021-10-08 20:19:42,223] {process_utils.py:66} INFO - Process psutil.Process(pid=32, status='terminated', exitcode=0, started='20:19:40') (32) terminated with exit code 0                
[2021-10-08 20:19:42,225] {process_utils.py:66} INFO - Process psutil.Process(pid=40, status='terminated', started='20:19:40') (40) terminated with exit code None                         
[2021-10-08 20:19:42,226] {process_utils.py:66} INFO - Process psutil.Process(pid=36, status='terminated', started='20:19:40') (36) terminated with exit code None                         
[2021-10-08 20:19:42,226] {scheduler_job.py:722} INFO - Exited execute loop                                                                                                                
Traceback (most recent call last):                                                                                                                                                         
  File "/home/airflow/.local/bin/airflow", line 8, in <module>                                                                                                                             
    sys.exit(main())                                                                                                                                                                       
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/__main__.py", line 40, in main                                                                                            
    args.func(args)                                                                                                                                                                        
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command                                                                                   
    return func(*args, **kwargs)                                                                                                                                                           
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper                                                                                        
    return f(*args, **kwargs)                                                                                                                                                              
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 70, in scheduler                                                                 
    job.run()                                                                                                                                                                              
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 245, in run                                                                                       
    self._execute()                                                                                                                                                                        
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 695, in _execute                                                                             
    self._run_scheduler_loop()                                                                                                                                                             
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 788, in _run_scheduler_loop                                                                  
    num_queued_tis = self._do_scheduling(session)                                                                                                                                          
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 927, in _do_scheduling                                                                       
    num_queued_tis = self._critical_section_execute_task_instances(session=session)                                                                                                        
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 551, in _critical_section_execute_task_instances                                             
    queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)                                                                                                       
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper                                                                                    
    return func(*args, **kwargs)                                                                                                                                                           
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 431, in _executable_task_instances_to_queued                                                 
    serialized_dag = self.dagbag.get_dag(dag_id, session=session)                                                                                                                          
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 67, in wrapper                                                                                    
    return func(*args, **kwargs)                                                                                                                                                           
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 186, in get_dag                                                                                   
    self._add_dag_from_db(dag_id=dag_id, session=session)                                                                                                                                  
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db                                                                          
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")                                                                                                       
airflow.exceptions.SerializedDagNotFound: DAG 'aws_transforms_player_hourly' not found in serialized_dag table                                                                             

causing All DAGs to be absent in serialized_dag table

Python version: 3.9.7
Airflow version: 2.1.4
Node: airflow-webserver-7b45758f99-rk8dg
-------------------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/auth.py", line 49, in decorated
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", line 97, in view_func
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/decorators.py", line 60, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/www/views.py", line 2027, in tree
    dag = current_app.dag_bag.get_dag(dag_id)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 186, in get_dag
    self._add_dag_from_db(dag_id=dag_id, session=session)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 258, in _add_dag_from_db
    raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'canary_dag' not found in serialized_dag table

What you expected to happen

Scheduler shouldn't fail

How to reproduce

restart scheduler pod
observe its failure
open dag in webserver
observe an error

Anything else

issue is temporary gone when i've run "serialize" script from webserver pod until next scheduler reboot

from airflow.models import DagBag
from airflow.models.serialized_dag import SerializedDagModel

dag_bag = DagBag()

# Check DB for missing serialized DAGs, and add them if missing
for dag_id in dag_bag.dag_ids:
    if not SerializedDagModel.get(dag_id):
        dag = dag_bag.get_dag(dag_id)
        SerializedDagModel.write_dag(dag)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@ephraimbuddy
Copy link
Contributor

We have removed SerializedDagNotFound error in #18554

@KulykDmytro
Copy link
Contributor Author

Great news
However issue is about scheduler crashing while parse DagBag which leads to mentioned exception in ui

@easontm
Copy link
Contributor

easontm commented Oct 10, 2021

I'm experiencing a similar issue. If a task is in the scheduled state and the DAG code is temporarily removed (e.g. part of DAG CICD), the DAG processor will delete the associated information in serialized_dag while the task still exists. Then if the scheduler tries to transition the task from scheduled to queued before the new code is serialized, it will crash the scheduler. Upon reboot, one of the first things the scheduler tries to do is adopt orphan tasks, and adoption is attempted before DAG serialization, resulting in a crashloop.

@ephraimbuddy
Copy link
Contributor

cc @kaxil

@eladkal eladkal added this to the Airflow 2.2.1 milestone Oct 12, 2021
@ashb ashb modified the milestones: Airflow 2.2.1, Airflow 2.2.2 Oct 14, 2021
@jurovee
Copy link

jurovee commented Oct 15, 2021

I'm experiencing a similar issue. If a task is in the scheduled state and the DAG code is temporarily removed (e.g. part of DAG CICD), the DAG processor will delete the associated information in serialized_dag while the task still exists. Then if the scheduler tries to transition the task from scheduled to queued before the new code is serialized, it will crash the scheduler. Upon reboot, one of the first things the scheduler tries to do is adopt orphan tasks, and adoption is attempted before DAG serialization, resulting in a crashloop.

Can confirm this kind of behavior on our deployment as well. 2.1.4, 3.9 - Scheduler went down after DAG file deletion from DAGs folder. Error messages went from:

airflow.exceptions.SerializedDagNotFound: DAG 'some_dag' not found in serialized_dag table

to

AttributeError: 'NoneType' object has no attribute 'dag_id'.

We are also noticing ~10x higher CPU usage from single Scheduler on 2.1.4 compared with 2.1.2 (same number of DAGs, same settings), but this seems unrelated and we will probably create a new issue for that, if not already answered. Posting it just in case someone experiences the same.

@kaxil kaxil modified the milestones: Airflow 2.2.2, Airflow 2.2.1 Oct 15, 2021
@kaxil
Copy link
Member

kaxil commented Oct 15, 2021

Yes, we should fix it in 2.2.2 - Scheduler should not crash and instead handle the SerializedDagNotFound error.

@easontm @jurovee !KulykDmytro - Can you confirm if this happens in Airflow 2.2.0 too please

@uranusjr
Copy link
Member

I believe we already fixed this in #18554 (therefore in 2.2.0). cc @ephraimbuddy

@anmol
Copy link

anmol commented Oct 20, 2021

Not sure if this qualifies as a separate bug but I think it might be related - facing after Upgraded from 2.1.2 - > 2.2.0.
airflow version: 2.2.0

Deployment details

  • AWS EKS over own helm chart
  • KubernetesExecutor

How to Reproduce: Restart (rollout) Scheduler when a task is running for a dag_id = dag_1, dag_run_1

Error: Scheduler goes in CrashLoopBackOff with error -

{scheduler_job.py:952} ERROR - Couldn't find dag dag_1 in DagBag/DB!
...
airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun dag_run_1> needs to be set

Full Stacktrace for an actual Dag:

[2021-10-20 07:30:42,955] {retries.py:82} DEBUG - Running SchedulerJob._create_dagruns_for_dags with retries. Try 1 of 3
[2021-10-20 07:30:42,972] {retries.py:82} DEBUG - Running SchedulerJob._get_next_dagruns_to_examine with retries. Try 1 of 3
[2021-10-20 07:30:42,974] {serialized_dag.py:205} DEBUG - Deleting Serialized DAGs (for which DAG files are deleted) from serialized_dag table
[2021-10-20 07:30:42,982] {dag.py:2850} DEBUG - Deactivating DAGs (for which DAG files are deleted) from dag table
[2021-10-20 07:30:42,990] {retries.py:82} DEBUG - Running SchedulerJob._get_next_dagruns_to_examine with retries. Try 1 of 3
[2021-10-20 07:30:43,000] {scheduler_job.py:952} ERROR - Couldn't find dag ecicjc_query_table_16 in DagBag/DB!
[2021-10-20 07:30:43,014] {scheduler_job.py:1007} DEBUG - DAG smart_sensor_group_shard_0 not changed structure, skipping dagrun.verify_integrity
[2021-10-20 07:30:43,019] {dagrun.py:543} DEBUG - number of tis tasks for <DagRun smart_sensor_group_shard_0 @ 2021-10-06 06:23:28.320990+00:00: scheduled__2021-10-06T06:23:28.320990+00:00, externally triggered: False>: 1 task(s)
[2021-10-20 07:30:43,019] {dagrun.py:558} DEBUG - number of scheduleable tasks for <DagRun smart_sensor_group_shard_0 @ 2021-10-06 06:23:28.320990+00:00: scheduled__2021-10-06T06:23:28.320990+00:00, externally triggered: False>: 0 task(s)
[2021-10-20 07:30:43,020] {taskinstance.py:1050} DEBUG - <TaskInstance: smart_sensor_group_shard_0.smart_sensor_task scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2021-10-20 07:30:43,020] {taskinstance.py:1050} DEBUG - <TaskInstance: smart_sensor_group_shard_0.smart_sensor_task scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]> dependency 'Not In Retry Period' PASSED: True, The context specified that being in a retry period was permitted.
[2021-10-20 07:30:43,020] {taskinstance.py:1050} DEBUG - <TaskInstance: smart_sensor_group_shard_0.smart_sensor_task scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream tasks.
[2021-10-20 07:30:43,020] {taskinstance.py:1035} DEBUG - Dependencies all met for <TaskInstance: smart_sensor_group_shard_0.smart_sensor_task scheduled__2021-10-06T06:23:28.320990+00:00 [scheduled]>
[2021-10-20 07:30:43,025] {scheduler_job.py:603} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 587, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 668, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 758, in _do_scheduling
    self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1024, in _send_dag_callbacks_to_processor
    dag = dag_run.get_dag()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 397, in get_dag
    raise AirflowException(f"The DAG (.dag) for {self} needs to be set")
airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun ecicjc_query_table_16 @ 2020-07-24 07:45:00+00:00: scheduled__2020-07-24T07:45:00+00:00, externally triggered: False> needs to be set
[2021-10-20 07:30:43,026] {kubernetes_executor.py:787} INFO - Shutting down Kubernetes executor
[2021-10-20 07:30:43,026] {kubernetes_executor.py:788} DEBUG - Flushing task_queue...
[2021-10-20 07:30:43,027] {kubernetes_executor.py:742} DEBUG - Executor shutting down, task_queue approximate size=0
[2021-10-20 07:30:43,027] {kubernetes_executor.py:790} DEBUG - Flushing result_queue...
[2021-10-20 07:30:43,027] {kubernetes_executor.py:755} DEBUG - Executor shutting down, result_queue approximate size=0
[2021-10-20 07:30:43,028] {kubernetes_executor.py:395} DEBUG - Terminating kube_watcher...
[2021-10-20 07:30:43,033] {kubernetes_executor.py:398} DEBUG - kube_watcher=<KubernetesJobWatcher(KubernetesJobWatcher-3, stopped)>
[2021-10-20 07:30:43,033] {kubernetes_executor.py:399} DEBUG - Flushing watcher_queue...
[2021-10-20 07:30:43,034] {kubernetes_executor.py:383} DEBUG - Executor shutting down, watcher_queue approx. size=0
[2021-10-20 07:30:43,034] {kubernetes_executor.py:403} DEBUG - Shutting down manager...
[2021-10-20 07:30:43,069] {dagcode.py:132} DEBUG - Deleting code from dag_code table
[2021-10-20 07:30:44,045] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 39
[2021-10-20 07:30:45,074] {settings.py:331} DEBUG - Disposing DB connection pool (PID 52)
[2021-10-20 07:30:45,087] {process_utils.py:66} INFO - Process psutil.Process(pid=52, status='terminated', started='07:30:42') (52) terminated with exit code None
[2021-10-20 07:30:45,491] {settings.py:331} DEBUG - Disposing DB connection pool (PID 53)
[2021-10-20 07:30:45,514] {process_utils.py:66} INFO - Process psutil.Process(pid=53, status='terminated', started='07:30:42') (53) terminated with exit code None
[2021-10-20 07:30:45,883] {settings.py:331} DEBUG - Disposing DB connection pool (PID 54)
[2021-10-20 07:30:45,890] {process_utils.py:212} INFO - Waiting up to 5 seconds for processes to exit...
[2021-10-20 07:30:45,899] {process_utils.py:66} INFO - Process psutil.Process(pid=54, status='terminated', started='07:30:42') (54) terminated with exit code None
[2021-10-20 07:30:45,899] {process_utils.py:66} INFO - Process psutil.Process(pid=39, status='terminated', exitcode=0, started='07:30:42') (39) terminated with exit code 0
[2021-10-20 07:30:45,900] {scheduler_job.py:614} INFO - Exited execute loop
[2021-10-20 07:30:45,910] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
[2021-10-20 07:30:45,913] {settings.py:331} DEBUG - Disposing DB connection pool (PID 8)
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 587, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 668, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 758, in _do_scheduling
    self._send_dag_callbacks_to_processor(dag_run, callback_to_run)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1024, in _send_dag_callbacks_to_processor
    dag = dag_run.get_dag()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 397, in get_dag
    raise AirflowException(f"The DAG (.dag) for {self} needs to be set")
airflow.exceptions.AirflowException: The DAG (.dag) for <DagRun ecicjc_query_table_16 @ 2020-07-24 07:45:00+00:00: scheduled__2020-07-24T07:45:00+00:00, externally triggered: False> needs to be set

At this stage all the dag_runs which are running will cause this issue:

Scheduler can be resurrected by deleting all dag_runs which are in running state:

delete from dag_run where state='running' and dag_id='ecicjc_query_table_16';

@ephraimbuddy ephraimbuddy self-assigned this Oct 20, 2021
@uranusjr
Copy link
Member

The error definitely is different and should be reported separately IMO. The SerializedDagNotFound: DAG not found in serialized_dag table error speficically should already have been fixed.

@easontm
Copy link
Contributor

easontm commented Oct 21, 2021

@kaxil I'm doing the 2.2.0 upgrade next week, I will let you know if behavior changes.

@jurovee
Copy link

jurovee commented Nov 5, 2021

@kaxil Hi, yesterday we've upgraded to 2.2.1, so I've tested it by deleting DAG file during runtime:

...
[2021-11-05 14:15:12,459] {manager.py:1051} INFO - Finding 'running' jobs without a recent heartbeat
[2021-11-05 14:15:12,459] {manager.py:1055} INFO - Failing jobs without heartbeat after 2021-11-05 14:05:12.459285+00:00
Process ForkProcess-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 277, in _run_processor_manager
    processor_manager.start()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 510, in start
    return self._run_parsing_loop()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 583, in _run_parsing_loop
    self.prepare_file_path_queue()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 978, in prepare_file_path_queue
    files_with_mtime[file_path] = os.path.getmtime(file_path)
  File "/usr/local/lib/python3.9/genericpath.py", line 55, in getmtime
    return os.stat(filename).st_mtime
FileNotFoundError: [Errno 2] No such file or directory: 'dagfile.py'
[2021-11-05 14:15:13,543] {manager.py:308} WARNING - DagFileProcessorManager (PID=57) exited with exit code 1 - re-launching
[2021-11-05 14:15:13,550] {manager.py:163} INFO - Launched DagFileProcessorManager with pid: 24817
[2021-11-05 14:15:13,561] {settings.py:52} INFO - Configured default timezone Timezone('America/Chicago')
...

Scheduler survived - so it seems we are good here. Thank you!

@kaxil
Copy link
Member

kaxil commented Nov 5, 2021

Awesome, glad to hear that. Thanks for the update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.1 Issues Reported for 2.1 area:core area:serialization kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants