-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
We have a task with a hard restriction that it should not run more than once. However, we notice that when the airflow scheduler crashes for whatever reason and there is a task running, the task seems to be retried when the scheduler restores, even though the first tasks succeeded just fine:
** SCHEDULER LOGS **
2023-11-24T04:20:05.554652890Z {"asctime": "2023-11-24T05:20:05.554+0100", "filename": "scheduler_job_runner.py", "lineno": 248, "levelname": "INFO", "message": "Exiting gracefully upon receiving signal 15"}
2023-11-24T04:20:06.801855078Z {"asctime": "2023-11-24T05:20:06.795+0100", "filename": "scheduler_job_runner.py", "lineno": 862, "levelname": "ERROR", "message": "Exception when executing SchedulerJob._run_scheduler_loop"}
2023-11-24T04:20:06.801866656Z Traceback (most recent call last):
2023-11-24T04:20:06.801869902Z File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 385, in sync
2023-11-24T04:20:06.801872272Z self.kube_scheduler.run_next(task)
2023-11-24T04:20:06.801875214Z File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 406, in run_next
2023-11-24T04:20:06.801877950Z self.run_pod_async(pod, **self.kube_config.kube_client_request_args)
2023-11-24T04:20:06.801880748Z File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 311, in run_pod_async
2023-11-24T04:20:06.801883294Z resp = self.kube_client.create_namespaced_pod(
2023-11-24T04:20:06.801885724Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801888109Z File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 7356, in create_namespaced_pod
2023-11-24T04:20:06.801890595Z return self.create_namespaced_pod_with_http_info(namespace, body, **kwargs) # noqa: E501
2023-11-24T04:20:06.801892953Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801895349Z File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 7455, in create_namespaced_pod_with_http_info
2023-11-24T04:20:06.801897690Z return self.api_client.call_api(
2023-11-24T04:20:06.801900014Z ^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801902925Z File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
2023-11-24T04:20:06.801905310Z return self.__call_api(resource_path, method,
2023-11-24T04:20:06.801907565Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801910120Z File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
2023-11-24T04:20:06.801912995Z response_data = self.request(
2023-11-24T04:20:06.801916095Z ^^^^^^^^^^^^^
2023-11-24T04:20:06.801919748Z File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 391, in request
2023-11-24T04:20:06.801923433Z return self.rest_client.POST(url,
2023-11-24T04:20:06.801926644Z ^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801930019Z File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 275, in POST
2023-11-24T04:20:06.801933241Z return self.request("POST", url,
2023-11-24T04:20:06.801936265Z ^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801939589Z File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 168, in request
2023-11-24T04:20:06.801955466Z r = self.pool_manager.request(
2023-11-24T04:20:06.801958232Z ^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801960514Z File "/home/airflow/.local/lib/python3.11/site-packages/urllib3/request.py", line 81, in request
2023-11-24T04:20:06.801962900Z return self.request_encode_body(
2023-11-24T04:20:06.801965253Z ^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801967510Z File "/home/airflow/.local/lib/python3.11/site-packages/urllib3/request.py", line 173, in request_encode_body
2023-11-24T04:20:06.801969921Z return self.urlopen(method, url, **extra_kw)
2023-11-24T04:20:06.801972159Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801974864Z File "/home/airflow/.local/lib/python3.11/site-packages/urllib3/poolmanager.py", line 376, in urlopen
2023-11-24T04:20:06.801977228Z response = conn.urlopen(method, u.request_uri, **kw)
2023-11-24T04:20:06.801979519Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801981756Z File "/home/airflow/.local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 715, in urlopen
2023-11-24T04:20:06.801984109Z httplib_response = self._make_request(
2023-11-24T04:20:06.801986431Z ^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.801988730Z File "/home/airflow/.local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 467, in _make_request
2023-11-24T04:20:06.801991136Z six.raise_from(e, None)
2023-11-24T04:20:06.801993395Z File "<string>", line 3, in raise_from
2023-11-24T04:20:06.801996129Z File "/home/airflow/.local/lib/python3.11/site-packages/urllib3/connectionpool.py", line 462, in _make_request
2023-11-24T04:20:06.801998489Z httplib_response = conn.getresponse()
2023-11-24T04:20:06.802000811Z ^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.802003046Z File "/usr/local/lib/python3.11/http/client.py", line 1378, in getresponse
2023-11-24T04:20:06.802005565Z response.begin()
2023-11-24T04:20:06.802008018Z File "/usr/local/lib/python3.11/http/client.py", line 318, in begin
2023-11-24T04:20:06.802010362Z version, status, reason = self._read_status()
2023-11-24T04:20:06.802012949Z ^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.802015208Z File "/usr/local/lib/python3.11/http/client.py", line 279, in _read_status
2023-11-24T04:20:06.802017388Z line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
2023-11-24T04:20:06.802019561Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.802021792Z File "/usr/local/lib/python3.11/socket.py", line 706, in readinto
2023-11-24T04:20:06.802032537Z return self._sock.recv_into(b)
2023-11-24T04:20:06.802034849Z ^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.802037202Z File "/usr/local/lib/python3.11/ssl.py", line 1311, in recv_into
2023-11-24T04:20:06.802039465Z return self.read(nbytes, buffer)
2023-11-24T04:20:06.802041676Z ^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.802043835Z File "/usr/local/lib/python3.11/ssl.py", line 1167, in read
2023-11-24T04:20:06.802045996Z return self._sslobj.read(len, buffer)
2023-11-24T04:20:06.802048210Z ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.802050459Z File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 251, in _exit_gracefully
2023-11-24T04:20:06.802052641Z sys.exit(os.EX_OK)
2023-11-24T04:20:06.802054965Z SystemExit: 0
2023-11-24T04:20:06.802057156Z
2023-11-24T04:20:06.802059495Z During handling of the above exception, another exception occurred:
2023-11-24T04:20:06.802061585Z
2023-11-24T04:20:06.802066599Z Traceback (most recent call last):
2023-11-24T04:20:06.802068869Z File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 845, in _execute
2023-11-24T04:20:06.802071210Z self._run_scheduler_loop()
2023-11-24T04:20:06.802073476Z File "/home/airflow/.local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 981, in _run_scheduler_loop
2023-11-24T04:20:06.802075636Z self.job.executor.heartbeat()
2023-11-24T04:20:06.802077933Z File "/home/airflow/.local/lib/python3.11/site-packages/airflow/executors/base_executor.py", line 237, in heartbeat
2023-11-24T04:20:06.802080126Z self.sync()
2023-11-24T04:20:06.802082757Z File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 416, in sync
2023-11-24T04:20:06.802084960Z self.task_queue.task_done()
2023-11-24T04:20:06.802087253Z File "<string>", line 2, in task_done
2023-11-24T04:20:06.802089463Z File "/usr/local/lib/python3.11/multiprocessing/managers.py", line 821, in _callmethod
2023-11-24T04:20:06.802091658Z conn.send((self._id, methodname, args, kwds))
2023-11-24T04:20:06.802093901Z File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 206, in send
2023-11-24T04:20:06.802096154Z self._send_bytes(_ForkingPickler.dumps(obj))
2023-11-24T04:20:06.802098712Z File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 427, in _send_bytes
2023-11-24T04:20:06.802100955Z self._send(header + buf)
2023-11-24T04:20:06.802103273Z File "/usr/local/lib/python3.11/multiprocessing/connection.py", line 384, in _send
2023-11-24T04:20:06.802105612Z n = write(self._handle, buf)
2023-11-24T04:20:06.802107767Z ^^^^^^^^^^^^^^^^^^^^^^^^
2023-11-24T04:20:06.802112375Z BrokenPipeError: [Errno 32] Broken pipe
2023-11-24T04:20:06.802120372Z {"asctime": "2023-11-24T05:20:06.801+0100", "filename": "kubernetes_executor.py", "lineno": 695, "levelname": "INFO", "message": "Shutting down Kubernetes executor"}
TASK ATTEMPT 1
{"asctime": "2023-11-24, 05:17:30 CET", "filename": "taskinstance.py", "lineno": 1359, "levelname": "INFO", "message": "Starting attempt 1 of 1"}
...
{"asctime": "2023-11-24, 05:18:02 CET", "filename": "local_task_job_runner.py", "lineno": 228, "levelname": "INFO", "message": "Task exited with return code 0"}
TASK ATTEMPT 2
{"asctime": "2023-11-24, 05:27:26 CET", "filename": "taskinstance.py", "lineno": 1157, "levelname": "INFO", "message": "Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: _redacted_ scheduled__2023-11-23T00:00:00+00:00 [queued]>"}
{"asctime": "2023-11-24, 05:27:26 CET", "filename": "taskinstance.py", "lineno": 1359, "levelname": "INFO", "message": "Starting attempt **2 of 1**"}
{"asctime": "2023-11-24, 05:27:26 CET", "filename": "taskinstance.py", "lineno": 1380, "levelname": "INFO", "message": "Executing <Task(AzureDataFactoryRunPipelineOperator): _redacted_> on 2023-11-23 00:00:00+00:00"}
What you think should happen instead
Tasks should not be retried when retries=0
How to reproduce
Not entirely sure. This happens once every while during our nightly loads - my assumption here would be that health checks fail for Airflow scheduler, scheduler crashes and does not keep track of tasks being in queue
Operating System
Debian GNU/Linux 11 (bullseye)
Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==7.8.0
apache-airflow-providers-common-sql==1.8.0
apache-airflow-providers-databricks==4.7.0
apache-airflow-providers-docker==3.8.0
apache-airflow-providers-elasticsearch==5.0.1
apache-airflow-providers-ftp==3.6.0
apache-airflow-providers-http==4.6.0
apache-airflow-providers-imap==3.4.0
apache-airflow-providers-microsoft-azure==8.1.0
apache-airflow-providers-microsoft-mssql==3.5.0
apache-airflow-providers-odbc==4.1.0
apache-airflow-providers-postgres==5.7.1
apache-airflow-providers-sqlite==3.5.0
Deployment
Official Apache Airflow Helm Chart
Deployment details
Deployment via KubernetesExecutor, with following configuration for the scheduler
scheduler:
replicas: 3
resources:
limits:
cpu: 3
requests:
cpu: 1
livenessProbe:
timeoutSeconds: 120
failureThreshold: 8
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
