Skip to content

LocalExecutor causes scheduler crash when API server returns error response #47873

@tirkarthi

Description

@tirkarthi

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I am not able to reproduce this consistently but I noticed when there is an error response of 4xx/5xx from the API then ServerResponseError which inherits from httpx.HTTPStatusError is raised. The object httpx.HTTPStatusError is not pickleable and has an open PR. Since request and response are keyword only arguments this causes the scheduler to crash with below stacktrace trying to unpickle. The exception seems to be raised from raise_on_4xx_5xx function.

encode/httpx#3108
https://docs.python.org/3/library/pickle.html#object.__getnewargs_ex__

You should implement this method if the new() method of your class requires keyword-only arguments. Otherwise, it is recommended for compatibility to implement getnewargs().

Traceback


[2025-03-11T18:20:15.560+0530] {scheduler_job_runner.py:939} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 935, in _execute
    self._run_scheduler_loop()
  File "/home/karthikeyan/stuff/python/airflow/airflow/jobs/scheduler_job_runner.py", line 1067, in _run_scheduler_loop
    executor.heartbeat()
  File "/home/karthikeyan/stuff/python/airflow/airflow/traces/tracer.py", line 54, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/karthikeyan/stuff/python/airflow/airflow/executors/base_executor.py", line 254, in heartbeat
    self.sync()
  File "/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", line 210, in sync
    self._read_results()
  File "/home/karthikeyan/stuff/python/airflow/airflow/executors/local_executor.py", line 215, in _read_results
    key, state, exc = self.result_queue.get()
                      ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/queues.py", line 367, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: ServerResponseError.__init__() missing 2 required keyword-only arguments: 'request' and 'response'

What you think should happen instead?

No response

How to reproduce

API errors to task-sdk trigger this but I don't have a concrete scenario to reproduce this consistently.

Operating System

Ubuntu 20.04

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:Executors-coreLocalExecutor & SequentialExecutorarea:corearea:task-execution-interface-aip72AIP-72: Task Execution Interface (TEI) aka Task SDKkind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions