Skip to content

Object of type datetime is not JSON serializable after detecting zombie jobs with CeleryExecutor and separated Scheduler and DAG-Processor #25343

@nicolamarangoni

Description

@nicolamarangoni

Apache Airflow version

2.3.3 (latest released)

What happened

After running for a certain period (few minutes until several hours depending on the number of active DAGs in the environment) The scheduler crashes with the following error message:

[2022-07-26 15:07:24,362] {executor_loader.py:105} INFO - Loaded executor: CeleryExecutor
[2022-07-26 15:07:24,363] {scheduler_job.py:1252} INFO - Resetting orphaned tasks for active dag runs
[2022-07-26 15:07:25,585] {celery_executor.py:532} INFO - Adopted the following 1 tasks from a dead executor
    <TaskInstance: freewheel_uafl_data_scala.freewheel.delivery_data scheduled__2022-07-25T04:15:00+00:00 [running]> in state STARTED
[2022-07-26 15:07:35,881] {scheduler_job.py:1381} WARNING - Failing (1) jobs without heartbeat after 2022-07-26 12:37:35.868798+00:00
[2022-07-26 15:07:35,881] {scheduler_job.py:1389} ERROR - Detected zombie job: {'full_filepath': '/data/dags/09_scala_apps/freewheel_uafl_data_scala.py', 'msg': 'Detected <TaskInstance: freewheel_uafl_data_scala.freewheel.delivery_data scheduled__2022-07-25T04:15:00+00:00 [running]> as zombie', 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7fb4a1105690>, 'is_failure_callback': True}
[2022-07-26 15:07:35,883] {scheduler_job.py:769} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 752, in _execute
    self._run_scheduler_loop()
  File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 873, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/lib/python3.10/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat
    action(*args, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1390, in _find_zombies
    self.executor.send_callback(request)
  File "/usr/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 363, in send_callback
    self.callback_sink.send(request)
  File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/callbacks/database_callback_sink.py", line 34, in send
    db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
  File "<string>", line 4, in __init__
  File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 481, in _initialize_instance
    with util.safe_reraise():
  File "/usr/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 479, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/models/db_callback_request.py", line 44, in __init__
    self.callback_data = callback.to_json()
  File "/usr/lib/python3.10/site-packages/airflow/callbacks/callback_requests.py", line 79, in to_json
    return json.dumps(dict_obj)
  File "/usr/lib/python3.10/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/lib/python3.10/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type datetime is not JSON serializable
[2022-07-26 15:07:36,100] {scheduler_job.py:781} INFO - Exited execute loop
Traceback (most recent call last):
  File "/usr/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/lib/python3.10/site-packages/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/usr/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/usr/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/usr/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 752, in _execute
    self._run_scheduler_loop()
  File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 873, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/lib/python3.10/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 36, in repeat
    action(*args, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1390, in _find_zombies
    self.executor.send_callback(request)
  File "/usr/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 363, in send_callback
    self.callback_sink.send(request)
  File "/usr/lib/python3.10/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/callbacks/database_callback_sink.py", line 34, in send
    db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
  File "<string>", line 4, in __init__
  File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 481, in _initialize_instance
    with util.safe_reraise():
  File "/usr/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/usr/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/usr/lib/python3.10/site-packages/sqlalchemy/orm/state.py", line 479, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/usr/lib/python3.10/site-packages/airflow/models/db_callback_request.py", line 44, in __init__
    self.callback_data = callback.to_json()
  File "/usr/lib/python3.10/site-packages/airflow/callbacks/callback_requests.py", line 79, in to_json
    return json.dumps(dict_obj)
  File "/usr/lib/python3.10/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/lib/python3.10/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type datetime is not JSON serializable

What you think should happen instead

The scheduler should handle zombie jobs without crashing.

How to reproduce

The following conditions are necessary:

  • dag-processor and scheduler run in separated containers
  • AirFlow uses the CeleryExecutor
  • There are zombie jobs

Operating System

Alpine Linux 3.16.1

Versions of Apache Airflow Providers

apache-airflow-providers-apache-hdfs==3.0.1
apache-airflow-providers-celery==3.0.0
apache-airflow-providers-cncf-kubernetes==4.2.0
apache-airflow-providers-common-sql==1.0.0
apache-airflow-providers-datadog==3.0.0
apache-airflow-providers-exasol==2.1.3
apache-airflow-providers-ftp==3.1.0
apache-airflow-providers-http==4.0.0
apache-airflow-providers-imap==3.0.0
apache-airflow-providers-jenkins==3.0.0
apache-airflow-providers-microsoft-mssql==3.1.0
apache-airflow-providers-odbc==3.1.0
apache-airflow-providers-oracle==3.1.0
apache-airflow-providers-postgres==5.1.0
apache-airflow-providers-redis==3.0.0
apache-airflow-providers-slack==5.1.0
apache-airflow-providers-sqlite==3.1.0
apache-airflow-providers-ssh==3.1.0

Deployment

Other 3rd-party Helm chart

Deployment details

One Pod on Kubernetes containing the following containers

  • 1 Container for the webserver service
  • 1 Container for the scheduler service
  • 1 Container for the dag-processor service
  • 1 Container for the flower service
  • 1 Container for the redis service
  • 2 or 3 containers for the celery workers services

Due to a previous issue crashing the scheduler with the message UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS, we substitute scheduler_job.py with the file https://raw.githubusercontent.com/tanelk/airflow/a4b22932e5ac9c2b6f37c8c58345eee0f63cae09/airflow/jobs/scheduler_job.py.
Sadly I don't remember which issue or MR exactly but it was related to scheduler and dag-processor running in separate containers.

Anything else

It looks like that only the combination of CeleryExecutor and separated scheduler and dag-processor services crashes the scheduler when handling zombie jobs.
The KubernetesExecutor with separated scheduler and dag-processor doesn't crash the scheduler.
It looks like the CeleryExecutor with scheduler and dag-processor in the same container doesn't crash the scheduler.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions