Skip to content

Fix Edge worker reporting crashed tasks as SUCCESS#65833

Open
dheerajturaga wants to merge 1 commit into
apache:mainfrom
dheerajturaga:bugfix/edge-fix-fail-exception
Open

Fix Edge worker reporting crashed tasks as SUCCESS#65833
dheerajturaga wants to merge 1 commit into
apache:mainfrom
dheerajturaga:bugfix/edge-fix-fail-exception

Conversation

@dheerajturaga
Copy link
Copy Markdown
Member

@dheerajturaga dheerajturaga commented Apr 25, 2026

_run_job_via_supervisor returned 1 on caught exception, but
multiprocessing.Process ignores the target's return value — exit code
was always 0, so Job.is_success was True and crashed tasks were
reported to the central server as SUCCESS despite the Task execution
failed traceback in the log. Use sys.exit(1) so the subprocess exits
with code 1 and fetch_and_run_job takes the FAILED branch.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)
    ClaudeCode Opus 4.7

  _run_job_via_supervisor returned 1 on caught exception, but
  multiprocessing.Process ignores the target's return value — exit code
  was always 0, so Job.is_success was True and crashed tasks were
  reported to the central server as SUCCESS despite the Task execution
  failed traceback in the log. Use sys.exit(1) so the subprocess exits
  with code 1 and fetch_and_run_job takes the FAILED branch.
@dheerajturaga dheerajturaga requested a review from jscheffl as a code owner April 25, 2026 14:36
@boring-cyborg boring-cyborg Bot added area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3 labels Apr 25, 2026
Copy link
Copy Markdown
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, signalling is better but I do not see an effect. At least folliowing my "integration test" Dag also a failed task ("plan_to_fail") still is reported in "success":

Image

So as I also see this as an improvement, can you tell where you see an effect in reporting the exit status?

@dheerajturaga
Copy link
Copy Markdown
Member Author

dheerajturaga commented Apr 26, 2026

Okay, signalling is better but I do not see an effect. At least folliowing my "integration test" Dag also a failed task ("plan_to_fail") still is reported in "success":

Image So as I also see this as an improvement, can you tell where you see an effect in reporting the exit status?

@jscheffl, What you are reporting seems to be a different bug altogether. when a user task raises, task_runner.main() catches it, sends TaskState(FAILED) to the supervisor, and returns normally. _fork_main then exits the task-runner subprocess with code 0. supervise_task returns exit_code=0. edge3's _run_job_via_supervisor only looks at the return value, so Job.is_success is True and it calls jobs_set_state(..., SUCCESS) .

I will raise another followup PR for the above.

what I was trying to address in this PR is the following exception

 [TIMESTAMP] {worker.py:226} ERROR - Task execution failed                                                                                          
  Traceback (most recent call last):                                                                                                                 
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 101, in map_httpcore_exceptions                          
      yield                                                                                                                                          
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 250, in handle_request                                   
      resp = self._pool.handle_request(req)                                                                                                          
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                          
    File ".../airflow/venv/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 256, in handle_request                              
      raise exc from None                                                                                                                            
    File ".../airflow/venv/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 236, in handle_request                              
      response = connection.handle_request(                                                                                                          
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                          
    File ".../airflow/venv/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 101, in handle_request                                   
      raise exc                                                                                                                                      
    File ".../airflow/venv/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 78, in handle_request                                    
      stream = self._connect(request)                                                                                                                
               ^^^^^^^^^^^^^^^^^^^^^^                                                                                                                
    File ".../airflow/venv/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 124, in _connect                                         
      stream = self._network_backend.connect_tcp(**kwargs)                                                                                           
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                           
    File ".../airflow/venv/lib/python3.11/site-packages/httpcore/_backends/sync.py", line 207, in connect_tcp                                        
      with map_exceptions(exc_map):                                                                                                                  
    File ".../python3.11/contextlib.py", line 155, in __exit__                                                                                       
      self.gen.throw(typ, value, traceback)                                                                                                          
    File ".../airflow/venv/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions                                         
      raise to_exc(exc) from exc                                                                                                                     
  httpcore.ConnectError: [Errno 111] Connection refused                                                                                              
                                                                                                                                                     
  The above exception was the direct cause of the following exception:                                                                               
                                                                                                                                                     
  Traceback (most recent call last):                                                                                                                 
    File ".../airflow/venv/lib/python3.11/site-packages/airflow/providers/edge3/cli/worker.py", line 214, in _run_job_via_supervisor
      supervise(                                                                                                                                     
    File ".../airflow/venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 1993, in supervise                           
      exit_code = process.wait()                                                                                                                     
                  ^^^^^^^^^^^^^^                                                                                                                     
    File ".../airflow/venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 1004, in wait                                
      self.update_task_state_if_needed()                                                                                                             
    File ".../airflow/venv/lib/python3.11/site-packages/airflow/sdk/execution_time/supervisor.py", line 1018, in update_task_state_if_needed         
      self.client.task_instances.finish(                                                                                                             
    File ".../airflow/venv/lib/python3.11/site-packages/airflow/sdk/api/client.py", line 226, in finish                                              
      self.client.patch(f"task-instances/{id}/state", content=body.model_dump_json())                                                                
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_client.py", line 1218, in patch                                                       
      return self.request(                                                                                                                           
             ^^^^^^^^^^^^^                                                                                                                           
    File ".../airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py", line 331, in wrapped_f                                                
      return copy(f, *args, **kw)                                                                                                                    
             ^^^^^^^^^^^^^^^^^^^^                                                                                                                    
    File ".../airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py", line 470, in __call__                                                 
      do = self.iter(retry_state=retry_state)                                                                                                        
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                        
    File ".../airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py", line 371, in iter                                                     
      result = action(retry_state)                                                                                                                   
               ^^^^^^^^^^^^^^^^^^^                                                                                                                   
    File ".../airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py", line 413, in exc_check                                                
      raise retry_exc.reraise()                                                                                                                      
            ^^^^^^^^^^^^^^^^^^^                                                                                                                      
    File ".../airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py", line 184, in reraise                                                  
      raise self.last_attempt.result()                                                                                                               
            ^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                               
    File ".../python3.11/concurrent/futures/_base.py", line 449, in result                                                                           
      return self.__get_result()                                                                                                                     
             ^^^^^^^^^^^^^^^^^^^                                                                                                                     
    File ".../python3.11/concurrent/futures/_base.py", line 401, in __get_result                                                                     
      raise self._exception                                                                                                                          
    File ".../airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py", line 473, in __call__                                                 
      result = fn(*args, **kwargs)                                                                                                                   
               ^^^^^^^^^^^^^^^^^^^
    File ".../airflow/venv/lib/python3.11/site-packages/airflow/sdk/api/client.py", line 887, in request                                             
      return super().request(*args, **kwargs)                                                                                                        
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                        
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_client.py", line 825, in request                                                      
      return self.send(request, auth=auth, follow_redirects=follow_redirects)                                                                        
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                        
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_client.py", line 914, in send                                                         
      response = self._send_handling_auth(                                                                                                           
                 ^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                           
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_client.py", line 942, in _send_handling_auth                                          
      response = self._send_handling_redirects(                                                                                                      
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                      
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_client.py", line 979, in _send_handling_redirects                                     
      response = self._send_single_request(request)                                                                                                  
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                  
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_client.py", line 1014, in _send_single_request                                        
      response = transport.handle_request(request)                                                                                                   
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^                                                                                                   
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 249, in handle_request                                   
      with map_httpcore_exceptions():                                                                                                                
    File ".../python3.11/contextlib.py", line 155, in __exit__                                                                                       
      self.gen.throw(typ, value, traceback)                                                                                                          
    File ".../airflow/venv/lib/python3.11/site-packages/httpx/_transports/default.py", line 118, in map_httpcore_exceptions                          
      raise mapped_exc(message) from exc                                                                                                             
  httpx.ConnectError: [Errno 111] Connection refused                                                                                                 
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:166} INFO - SIGTERM received. Sending SIGTERM to all jobs and quit                                                          
  [TIMESTAMP] {worker.py:372} INFO - Job completed: dag_id=<redacted_dag> task_id=<redacted_task> run_id=manual__<redacted_timestamp> map_index=-1   
  try_number=4

@jscheffl
Copy link
Copy Markdown
Contributor

what I was trying to address in this PR is the following exception

So in your case the task went down in a HTTP exception and then the worker expected "all is good". But in such case of failure if the API server is down (even if retries), then it can not report back to central instance anyway? Or do you have other /longer retries configured for the worker compared to the execution API?

Because if consistently configured then either both workers as well as task retry until API server is back or anyway both will fail communicating back to origin. Handling the error "just" changes the logs cosmetically then?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants