-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Apache Airflow version
2.10.4
If "Other Airflow 2 version" selected, which one?
No response
What happened?
I am encountering unexpected failures while executing tasks using the KubernetesPodOperator. Specifically, I have a DAG with a task that is mapped to spawn hundreds of KubernetesPodOperator tasks, and some of these tasks are failing spontaneously. I don't think it's a problem about the DAG complexity, parsing time or so. I have 500 tasks and 10% of them fail.
I would like to report this issue and seek assistance in resolving it.
[2025-02-03T13:32:19.910+0000] {settings.py:475} DEBUG - Setting up DB connection pool (PID 7)
[2025-02-03T13:32:19.984+0000] {settings.py:579} DEBUG - settings.prepare_engine_args(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=7
[2025-02-03T13:32:20.814+0000] {configuration.py:862} DEBUG - Could not retrieve value from section core, for key dataset_manager_kwargs. Skipping redaction of this conf.
[2025-02-03T13:32:20.815+0000] {configuration.py:862} DEBUG - Could not retrieve value from section smtp, for key smtp_password. Skipping redaction of this conf.
[2025-02-03T13:32:20.816+0000] {configuration.py:862} DEBUG - Could not retrieve value from section database, for key sql_alchemy_engine_args. Skipping redaction of this conf.
[2025-02-03T13:32:20.833+0000] {cli_action_loggers.py:51} DEBUG - Adding <function default_action_log at 0x7f81367aa7a0> to pre execution callback
/home/airflow/.local/lib/python3.11/site-packages/airflow/metrics/statsd_logger.py:184 RemovedInAirflow3Warning: The basic metric validator will be deprecated in the future in favor of pattern-matching. You can try this now by setting config option metrics_use_pattern_match to True.
[2025-02-03T13:32:39.119+0000] {serde.py:375} DEBUG - registering decimal.Decimal for serialization
[2025-02-03T13:32:39.121+0000] {serde.py:382} DEBUG - registering decimal.Decimal for deserialization
[2025-02-03T13:32:39.122+0000] {serde.py:375} DEBUG - registering builtins.frozenset for serialization
[2025-02-03T13:32:39.123+0000] {serde.py:375} DEBUG - registering builtins.set for serialization
[2025-02-03T13:32:39.123+0000] {serde.py:375} DEBUG - registering builtins.tuple for serialization
[2025-02-03T13:32:39.124+0000] {serde.py:382} DEBUG - registering builtins.frozenset for deserialization
[2025-02-03T13:32:39.124+0000] {serde.py:382} DEBUG - registering builtins.set for deserialization
[2025-02-03T13:32:39.125+0000] {serde.py:382} DEBUG - registering builtins.tuple for deserialization
[2025-02-03T13:32:39.125+0000] {serde.py:390} DEBUG - registering builtins.frozenset for stringifying
[2025-02-03T13:32:39.126+0000] {serde.py:390} DEBUG - registering builtins.set for stringifying
[2025-02-03T13:32:39.126+0000] {serde.py:390} DEBUG - registering builtins.tuple for stringifying
[2025-02-03T13:32:39.128+0000] {serde.py:375} DEBUG - registering datetime.date for serialization
[2025-02-03T13:32:39.128+0000] {serde.py:375} DEBUG - registering datetime.datetime for serialization
[2025-02-03T13:32:39.128+0000] {serde.py:375} DEBUG - registering datetime.timedelta for serialization
[2025-02-03T13:32:39.129+0000] {serde.py:375} DEBUG - registering pendulum.datetime.DateTime for serialization
[2025-02-03T13:32:39.129+0000] {serde.py:382} DEBUG - registering datetime.date for deserialization
[2025-02-03T13:32:39.130+0000] {serde.py:382} DEBUG - registering datetime.datetime for deserialization
[2025-02-03T13:32:39.130+0000] {serde.py:382} DEBUG - registering datetime.timedelta for deserialization
[2025-02-03T13:32:39.130+0000] {serde.py:382} DEBUG - registering pendulum.datetime.DateTime for deserialization
[2025-02-03T13:32:39.131+0000] {serde.py:375} DEBUG - registering deltalake.table.DeltaTable for serialization
[2025-02-03T13:32:39.133+0000] {serde.py:382} DEBUG - registering deltalake.table.DeltaTable for deserialization
[2025-02-03T13:32:39.133+0000] {serde.py:390} DEBUG - registering deltalake.table.DeltaTable for stringifying
[2025-02-03T13:32:39.134+0000] {serde.py:375} DEBUG - registering pyiceberg.table.Table for serialization
[2025-02-03T13:32:39.134+0000] {serde.py:382} DEBUG - registering pyiceberg.table.Table for deserialization
[2025-02-03T13:32:39.135+0000] {serde.py:390} DEBUG - registering pyiceberg.table.Table for stringifying
[2025-02-03T13:32:39.136+0000] {serde.py:375} DEBUG - registering kubernetes.client.models.v1_resource_requirements.V1ResourceRequirements for serialization
[2025-02-03T13:32:39.136+0000] {serde.py:375} DEBUG - registering kubernetes.client.models.v1_pod.V1Pod for serialization
[2025-02-03T13:32:39.137+0000] {serde.py:375} DEBUG - registering numpy.int8 for serialization
[2025-02-03T13:32:39.137+0000] {serde.py:375} DEBUG - registering numpy.int16 for serialization
[2025-02-03T13:32:39.138+0000] {serde.py:375} DEBUG - registering numpy.int32 for serialization
[2025-02-03T13:32:39.138+0000] {serde.py:375} DEBUG - registering numpy.int64 for serialization
[2025-02-03T13:32:39.139+0000] {serde.py:375} DEBUG - registering numpy.uint8 for serialization
[2025-02-03T13:32:39.139+0000] {serde.py:375} DEBUG - registering numpy.uint16 for serialization
[2025-02-03T13:32:39.139+0000] {serde.py:375} DEBUG - registering numpy.uint32 for serialization
[2025-02-03T13:32:39.140+0000] {serde.py:375} DEBUG - registering numpy.uint64 for serialization
[2025-02-03T13:32:39.140+0000] {serde.py:375} DEBUG - registering numpy.bool_ for serialization
[2025-02-03T13:32:39.141+0000] {serde.py:375} DEBUG - registering numpy.float64 for serialization
[2025-02-03T13:32:39.141+0000] {serde.py:375} DEBUG - registering numpy.float16 for serialization
[2025-02-03T13:32:39.142+0000] {serde.py:375} DEBUG - registering numpy.complex128 for serialization
[2025-02-03T13:32:39.142+0000] {serde.py:375} DEBUG - registering numpy.complex64 for serialization
[2025-02-03T13:32:39.142+0000] {serde.py:382} DEBUG - registering numpy.int8 for deserialization
[2025-02-03T13:32:39.143+0000] {serde.py:382} DEBUG - registering numpy.int16 for deserialization
[2025-02-03T13:32:39.143+0000] {serde.py:382} DEBUG - registering numpy.int32 for deserialization
[2025-02-03T13:32:39.144+0000] {serde.py:382} DEBUG - registering numpy.int64 for deserialization
[2025-02-03T13:32:39.144+0000] {serde.py:382} DEBUG - registering numpy.uint8 for deserialization
[2025-02-03T13:32:39.145+0000] {serde.py:382} DEBUG - registering numpy.uint16 for deserialization
[2025-02-03T13:32:39.145+0000] {serde.py:382} DEBUG - registering numpy.uint32 for deserialization
[2025-02-03T13:32:39.145+0000] {serde.py:382} DEBUG - registering numpy.uint64 for deserialization
[2025-02-03T13:32:39.146+0000] {serde.py:382} DEBUG - registering numpy.bool_ for deserialization
[2025-02-03T13:32:39.146+0000] {serde.py:382} DEBUG - registering numpy.float64 for deserialization
[2025-02-03T13:32:39.147+0000] {serde.py:382} DEBUG - registering numpy.float16 for deserialization
[2025-02-03T13:32:39.147+0000] {serde.py:382} DEBUG - registering numpy.complex128 for deserialization
[2025-02-03T13:32:39.147+0000] {serde.py:382} DEBUG - registering numpy.complex64 for deserialization
[2025-02-03T13:32:39.148+0000] {serde.py:375} DEBUG - registering pandas.core.frame.DataFrame for serialization
[2025-02-03T13:32:39.149+0000] {serde.py:382} DEBUG - registering pandas.core.frame.DataFrame for deserialization
[2025-02-03T13:32:39.149+0000] {serde.py:375} DEBUG - registering pendulum.tz.timezone.FixedTimezone for serialization
[2025-02-03T13:32:39.153+0000] {serde.py:375} DEBUG - registering pendulum.tz.timezone.Timezone for serialization
[2025-02-03T13:32:39.154+0000] {serde.py:375} DEBUG - registering zoneinfo.ZoneInfo for serialization
[2025-02-03T13:32:39.155+0000] {serde.py:382} DEBUG - registering pendulum.tz.timezone.FixedTimezone for deserialization
[2025-02-03T13:32:39.156+0000] {serde.py:382} DEBUG - registering pendulum.tz.timezone.Timezone for deserialization
[2025-02-03T13:32:39.156+0000] {serde.py:382} DEBUG - registering zoneinfo.ZoneInfo for deserialization
[2025-02-03T13:32:39.157+0000] {serde.py:393} DEBUG - loading serializers took 0.039 seconds
[2025-02-03T13:32:42.366+0000] {cli_action_loggers.py:79} DEBUG - Calling callbacks: [<function default_action_log at 0x7f81367aa7a0>]
[2025-02-03T13:32:42.829+0000] {plugins_manager.py:357} DEBUG - Loading plugins
[2025-02-03T13:32:42.829+0000] {plugins_manager.py:273} DEBUG - Loading plugins from directory: /opt/airflow/plugins
[2025-02-03T13:32:42.830+0000] {plugins_manager.py:253} DEBUG - Loading plugins from entrypoints
[2025-02-03T13:32:42.911+0000] {plugins_manager.py:256} DEBUG - Importing entry_point plugin openlineage
[2025-02-03T13:32:50.467+0000] {plugins_manager.py:375} DEBUG - Loading 1 plugin(s) took 7.64 seconds
[2025-02-03T13:32:50.468+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py
[2025-02-03T13:32:50.470+0000] {dagbag.py:369} DEBUG - Importing /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py
[2025-02-03T13:33:26.444+0000] {timeout.py:68} ERROR - Process timed out, PID: 7
[2025-02-03T13:33:26.445+0000] {dagbag.py:387} ERROR - Failed to import: /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 383, in parse
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 940, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/home/airflow/.local/lib/python3.11/site-packages/sds_provider/operators/kubernetes/pod.py", line 1, in <module>
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 62, in <module>
from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode, KubernetesPodOperatorCallback
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/callbacks.py", line 23, in <module>
import kubernetes_asyncio.client as async_k8s
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/__init__.py", line 19, in <module>
import kubernetes_asyncio.client
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/client/__init__.py", line 20, in <module>
from kubernetes_asyncio.client.api.well_known_api import WellKnownApi
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/client/api/__init__.py", line 6, in <module>
from kubernetes_asyncio.client.api.well_known_api import WellKnownApi
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/client/api/well_known_api.py", line 20, in <module>
from kubernetes_asyncio.client.api_client import ApiClient
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/client/api_client.py", line 28, in <module>
import kubernetes_asyncio.client.models
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/client/models/__init__.py", line 577, in <module>
from kubernetes_asyncio.client.models.v1beta1_validating_admission_policy_list import V1beta1ValidatingAdmissionPolicyList
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.10.4/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.10.4/best-practices.html#reducing-dag-complexity, PID: 7
[2025-02-03T13:33:26.939+0000] {cli.py:251} WARNING - Dag 'XXX_scan_XXX' not found in path /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py; trying path /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py
[2025-02-03T13:33:26.952+0000] {dagbag.py:588} INFO - Filling up the DagBag from /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py
[2025-02-03T13:33:26.953+0000] {dagbag.py:369} DEBUG - Importing /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py
[2025-02-03T13:33:57.450+0000] {timeout.py:68} ERROR - Process timed out, PID: 7
[2025-02-03T13:33:57.451+0000] {dagbag.py:387} ERROR - Failed to import: /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 383, in parse
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 940, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/home/airflow/.local/lib/python3.11/site-packages/sds_provider/operators/kubernetes/pod.py", line 1, in <module>
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 62, in <module>
from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode, KubernetesPodOperatorCallback
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/callbacks.py", line 23, in <module>
import kubernetes_asyncio.client as async_k8s
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/__init__.py", line 19, in <module>
import kubernetes_asyncio.client
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/client/__init__.py", line 20, in <module>
from kubernetes_asyncio.client.api.well_known_api import WellKnownApi
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/client/api/__init__.py", line 31, in <module>
from kubernetes_asyncio.client.api.certificates_v1alpha1_api import CertificatesV1alpha1Api
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.10.4/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.10.4/best-practices.html#reducing-dag-complexity, PID: 7
[2025-02-03T13:34:15.728+0000] {dagbag.py:369} DEBUG - Importing /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py
[2025-02-03T13:34:47.089+0000] {timeout.py:68} ERROR - Process timed out, PID: 7
[2025-02-03T13:34:47.089+0000] {dagbag.py:387} ERROR - Failed to import: /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 383, in parse
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 940, in exec_module
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "/home/airflow/.local/lib/python3.11/site-packages/sds_provider/operators/kubernetes/pod.py", line 1, in <module>
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 62, in <module>
from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode, KubernetesPodOperatorCallback
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/callbacks.py", line 23, in <module>
import kubernetes_asyncio.client as async_k8s
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/__init__.py", line 19, in <module>
import kubernetes_asyncio.client
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/client/__init__.py", line 45, in <module>
from kubernetes_asyncio.client.api.certificates_v1alpha1_api import CertificatesV1alpha1Api
File "/home/airflow/.local/lib/python3.11/site-packages/kubernetes_asyncio/client/api/__init__.py", line 49, in <module>
from kubernetes_asyncio.client.api.networking_v1alpha1_api import NetworkingV1alpha1Api
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/airflow/dags/repo/dags/csr-vss-XXX/XXX-airflow/dags/scan_XXX.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.10.4/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.10.4/best-practices.html#reducing-dag-complexity, PID: 7
[2025-02-03T13:34:47.282+0000] {cli_action_loggers.py:98} DEBUG - Calling callbacks: []
Traceback (most recent call last):
File "/home/airflow/.local/bin/airflow", line 8, in <module>
sys.exit(main())
^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/__main__.py", line 62, in main
args.func(args)
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 116, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/cli/commands/task_command.py", line 458, in task_run
_dag = get_dag(args.subdir, args.dag_id, args.read_from_db)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/utils/cli.py", line 255, in get_dag
raise AirflowException(
airflow.exceptions.AirflowException: Dag 'XXX_scan_XXX' could not be found; either it does not exist or it failed to parse.
[2025-02-03T13:34:47.588+0000] {settings.py:612} DEBUG - Disposing DB connection pool (PID 7)
What you think should happen instead?
No response
How to reproduce
Trigger a DAG with a mapped task of KubernetesPodOperator.
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==9.1.0
apache-airflow-providers-celery==3.8.5
apache-airflow-providers-cncf-kubernetes==10.0.1
apache-airflow-providers-common-compat==1.2.2
apache-airflow-providers-common-io==1.4.2
apache-airflow-providers-common-sql==1.20.0
apache-airflow-providers-docker==3.14.1
apache-airflow-providers-elasticsearch==5.5.3
apache-airflow-providers-fab==1.5.1
apache-airflow-providers-ftp==3.11.1
apache-airflow-providers-google==11.0.0
apache-airflow-providers-grpc==3.6.0
apache-airflow-providers-hashicorp==3.8.0
apache-airflow-providers-http==4.13.3
apache-airflow-providers-imap==3.7.0
apache-airflow-providers-microsoft-azure==11.1.0
apache-airflow-providers-mysql==5.7.4
apache-airflow-providers-odbc==4.8.1
apache-airflow-providers-openlineage==1.14.0
apache-airflow-providers-postgres==5.14.0
apache-airflow-providers-redis==3.8.0
apache-airflow-providers-sendgrid==3.6.0
apache-airflow-providers-sftp==4.11.1
apache-airflow-providers-slack==8.9.2
apache-airflow-providers-smtp==1.8.1
apache-airflow-providers-snowflake==5.8.1
apache-airflow-providers-sqlite==3.9.1
apache-airflow-providers-ssh==3.14.0
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct