Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes Executor Task Leak #36998

Closed
1 of 2 tasks
smhood opened this issue Jan 24, 2024 · 38 comments
Closed
1 of 2 tasks

Kubernetes Executor Task Leak #36998

smhood opened this issue Jan 24, 2024 · 38 comments
Assignees
Labels
area:performance area:providers kind:bug This is a clearly a bug provider:cncf-kubernetes Kubernetes provider related issues

Comments

@smhood
Copy link

smhood commented Jan 24, 2024

Apache Airflow version

2.8.1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Scheduler stops processing DAGs and moving them to the queued status. When looking at the scheduler is debug mode following information is displayed.

[2024-01-24T13:40:15.828+0000] {scheduler_job_runner.py:1092} DEBUG - Executor full, skipping critical section
[2024-01-24T13:40:15.828+0000] {base_executor.py:217} DEBUG - 32 running task instances
[2024-01-24T13:40:15.829+0000] {base_executor.py:218} DEBUG - 0 in queue
[2024-01-24T13:40:15.829+0000] {base_executor.py:219} DEBUG - 0 open slots

We noticed that a fix was addressed here #36240, however still seeing the same issues.

We are utilizing the airflow helm chart version 1.10, and we have the same issue happening in multiple environments.
Two environments have parallelism set to 32 with 1 scheduler running.
The other has 3 schedulers all with 32 parallelism.

What you think should happen instead?

When a task is complete it should release the slot.

How to reproduce

Currently it seems to just be a time thing, after a certain period of time running the slots fill up with completed tasks.

Operating System

Debian GNU/Linux 12

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.16.0
apache-airflow-providers-celery==3.5.1
apache-airflow-providers-cncf-kubernetes==7.13.0
apache-airflow-providers-common-io==1.2.0
apache-airflow-providers-common-sql==1.10.0
apache-airflow-providers-docker==3.9.1
apache-airflow-providers-elasticsearch==5.3.1
apache-airflow-providers-ftp==3.7.0
apache-airflow-providers-google==10.13.1
apache-airflow-providers-grpc==3.4.1
apache-airflow-providers-hashicorp==3.6.1
apache-airflow-providers-http==4.8.0
apache-airflow-providers-imap==3.5.0
apache-airflow-providers-microsoft-azure==8.5.1
apache-airflow-providers-mysql==5.5.1
apache-airflow-providers-odbc==4.4.0
apache-airflow-providers-openlineage==1.4.0
apache-airflow-providers-postgres==5.10.0
apache-airflow-providers-redis==3.6.0
apache-airflow-providers-sendgrid==3.4.0
apache-airflow-providers-sftp==4.8.1
apache-airflow-providers-slack==8.5.1
apache-airflow-providers-snowflake==5.2.1
apache-airflow-providers-sqlite==3.7.0
apache-airflow-providers-ssh==3.10.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

Deploy via helm chart (1.10) to an azure aks.
Deploy our own image with required packages/dags copied FROM apache/airflow:2.8.1-python3.11
Process is synced with ArgoCD deployment pipeline.

Anything else?

This problem for the most part occurs daily. We have a test instance with only 5 running dags that run once every hour and we are still seeing the issue.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@smhood smhood added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jan 24, 2024
Copy link

boring-cyborg bot commented Jan 24, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@smhood
Copy link
Author

smhood commented Jan 25, 2024

Looking over the logs I get two different outcomes.... when I restart the pods I get the following:

[2024-01-25T13:20:23.100+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T09:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T09:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T09:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T00:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__data_platform_meta__test_trading_partner', task_id='check_new_files', run_id='scheduled__2024-01-25T00:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T03:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.101+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T00:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__data_platform_meta__test_trading_partner', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T03:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T04:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='check_new_files', run_id='scheduled__2024-01-24T21:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T13:20:23.102+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='get_execution_parameters', run_id='scheduled__2024-01-25T08:00:00+00:00', try_number=1, map_index=-1)

However after that I never get the success event:

[2024-01-25T14:00:24.228+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_zip_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.228+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_csv_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.228+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__superhero__blob_battles_parquet_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.229+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.229+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='acq__azure_blob__data_platform_meta__test_trading_partner', task_id='check_new_files', run_id='scheduled__2024-01-25T13:00:00+00:00', try_number=1, map_index=-1)
[2024-01-25T14:00:24.236+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__superhero__blob_battles_zip_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.236+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__superhero__blob_battles_csv_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.236+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__superhero__blob_battles_parquet_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.237+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__sftp__superhero__sftp_battles_csv_feed.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.237+0000] {scheduler_job_runner.py:723} INFO - Setting external_id for <TaskInstance: acq__azure_blob__data_platform_meta__test_trading_partner.check_new_files scheduled__2024-01-25T13:00:00+00:00 [queued]> to 4915
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.243+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat
[2024-01-25T14:00:24.244+0000] {manager.py:258} DEBUG - Received message of type DagParsingStat

@dirrao dirrao added provider:cncf-kubernetes Kubernetes provider related issues and removed needs-triage label for new issues that we didn't triage yet labels Jan 25, 2024
@smhood
Copy link
Author

smhood commented Jan 25, 2024

Seems like whenever our executor is checking the state of the task instance its not being updated from the database....

Looking at the database entry, its clearly marked as success....
image

However its listed as running and stuck in the kubernetes_executor:

[2024-01-25T15:04:16.609+0000] {kubernetes_executor.py:387} DEBUG - self.running: {TaskInstanceKey(dag_id='acq__sftp__superhero__sftp_battles_csv_feed', task_id='check_new_files', run_id='scheduled__2024-01-25T14:00:00+00:00', try_number=1, map_index=-1)

@dirrao
Copy link
Collaborator

dirrao commented Jan 25, 2024

are you seeing this issue when you run the airflow with single scheduler? Can you share the details to reprice it?

This requires triaging. Meanwhile, you can bump up the parallelism configuration to a higher number to beat the leak. Or Restart the scheduler after a certain number of iterations to rest these values.

@smhood
Copy link
Author

smhood commented Jan 25, 2024

are you seeing this issue when you run the airflow with single scheduler? Can you share the details to reprice it?

This requires triaging. Meanwhile, you can bump up the parallelism configuration to a higher number to beat the leak. Or Restart the scheduler after a certain number of iterations to rest these values.

Yes, this is when running on a single scheduler. We are utilizing the helm chart and only overriding the following values in values.yaml.

images:
  airflow:
    repository:  <redacted>
    tag: branch-70e99939
    pullPolicy: Always
labels:
  azure.workload.identity/use: 'true'
airflowPodAnnotations:
  azure.workload.identity/client-id:  <redacted>
env:
  - name: AIRFLOW__CORE__TEST_CONNECTION
    value: Enabled
  - name: AIRFLOW__LOGGING__REMOTE_LOGGING
    value: 'True'
  - name: AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID
    value: wasb_airlow_logs
  - name: AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER
    value: wasb-airflow-logging
  - name: AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE
    value: '4'
  - name: ENVIRONMENT
    value: dev
  - name: LOG_LEVEL
    value: DEBUG
  - name: AIRFLOW__SMTP__SMTP_MAIL_TO
    value:  <redacted>
  - name: AIRFLOW__SMTP__SMTP_HOST
    value:  <redacted>
  - name: AIRFLOW__SMTP__SMTP_PORT
    value: '25'
  - name: AIRFLOW__SMTP__SMTP_STARTTLS
    value: 'False'
  - name: AIRFLOW__SMTP__SMTP_SSL
    value: 'False'
  - name: AIRFLOW__SMTP__SMTP_MAIL_FROM
    value:  <redacted>
  - name: AIRFLOW__SCHEDULER__USE_ROW_LEVEL_LOCKING
    value: 'False'
ingress:
  web:
    enabled: true
    pathType: Prefix
    hosts:
      - name: dev-airflow.privatelink.eastus2.azmk8s.io
        tls:
          enabled: true
          secretName: dev-airflow-tls
    ingressClassName: nginx
    annotations:
      cert-manager.io/cluster-issuer: aks-ca-cluster-issuer
workers:
  serviceAccount:
    annotations:
      azure.workload.identity/client-id:  <redacted>
  resources:
    requests:
      cpu: '0.5'
      memory: 128Mi
    limits:
      cpu: '1'
      memory: 512Mi
executor: KubernetesExecutor
allowPodLaunching: true
webserver:
  replicas: 3
  podDisruptionBudget:
    enabled: true
    config:
      maxUnavailable: 1
  serviceAccount:
    annotations:
      azure.workload.identity/client-id:  <redacted>
  webserverConfigConfigMapName: webserverconfig
volumes:
  - name: secrets-store-inline
    csi:
      driver: secrets-store.csi.k8s.io
      readOnly: true
      volumeAttributes:
        secretProviderClass: airflow-aks-secrets
volumeMounts:
  - name: secrets-store-inline
    mountPath: /mnt/secrets-store
    readOnly: true
dags:
  persistence:
    enabled: false
createUserJob:
  useHelmHooks: false
  applyCustomEnv: false
extraEnvFrom: |-
  - secretRef:
      name: airflow-azure-oauth
migrateDatabaseJob:
  useHelmHooks: false
  applyCustomEnv: false
  jobAnnotations:
    argocd.argoproj.io/hook: Sync
postgresql:
  enabled: false
metadataConnection:
  sslmode: require
pgbouncer:
  enabled: true
data:
  metadataSecretName: postgresql-connection-url
webserverSecretKeySecretName: airflow-webserver-secret-key
fernetKeySecretName: airflow-fernet-key
triggerer:
  replicas: 1
  serviceAccount:
    annotations:
      azure.workload.identity/client-id: <redacted>
scheduler:
  replicas: 1
  serviceAccount:
    annotations:
      azure.workload.identity/client-id: <redacted>
  args: ["bash", "-c", "exec airflow scheduler --verbose"]
config:
  kubernetes_executor:
    namespace: orch-dataplatform
  webserver:
    base_url: https://dev-airflow.privatelink.eastus2.azmk8s.io

@smhood
Copy link
Author

smhood commented Jan 25, 2024

We have been seeing this issue basically ever since we upgraded from 2.7.3 -> 2.8.0 (and now on 2.8.1)

@smhood
Copy link
Author

smhood commented Jan 25, 2024

Also we are creating our own airflow image off and importing our dags there:
Dockerfile:

FROM apache/airflow:2.8.1-python3.11
USER root
RUN apt-get update && apt-get install -y --no-install-recommends grep procps
COPY --chown=airflow:root ./dags/ $AIRFLOW_HOME/dags/
USER airflow
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

requirements.txt

apache-airflow-providers-cncf-kubernetes==7.13.0
azure-identity==1.15.0
azure-storage-blob==12.19.0
azure-servicebus==7.11.4
opencensus-ext-azure==1.1.11
pydantic==2.4.2
pyhumps==3.8.0

@bixel
Copy link

bixel commented Jan 31, 2024

I think we run into the same problem with a similar setup (using the official helm chart and upgraded vom airflow 2.7.3 to 2.8.1). After a while I stop seeing logs from the kubernetes_executor and tasks are stuck in queued after a deferred trigger event was fired. I.e. they are queued -> running -> deferred -> queued (and stuck here). Restarting the scheduler helps and the stuck tasks complete as expected.

I cannot identify the moment when the kubernetes_executor (assumingly) stops working yet.

@smhood
Copy link
Author

smhood commented Jan 31, 2024

So we actually are starting to see things work now potentially. We were utilizing an old version of the helm chart, and after upgrading from 1.10 to 1.11 we are seeing the executors just work.... So keeping this open till confirmed but that was the solution for at least us.

@smhood
Copy link
Author

smhood commented Feb 2, 2024

So after a brief sighting of things working again, we are now seeing it again.....

Single Scheduler running on the 1.11 Helm Chart, Airflow 2.8.1.

@smhood
Copy link
Author

smhood commented Feb 2, 2024

Looking into the latest occurrence, what is weird is we are seeing the following logged event:
image

However also see in the current running slots:

[2024-02-02T16:51:45.480+0000] {kubernetes_executor.py:387} DEBUG - self.running: {TaskInstanceKey(dag_id='acq__sftp__ohsu_partners_miah__prebill_daily_charges', task_id='get_execution_parameters', run_id='scheduled__2024-02-02T03:00:00+00:00', try_number=1, map_index=-1)}

This would semi indicate that the code here:
https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L705

Is not finding the event in our Database..... correct? We have a single scheduler and are setting row locking to false, which means the query should be completely un affected by any additional arguments.

image

@smhood
Copy link
Author

smhood commented Feb 2, 2024

Additional logging showing what this looks like as far as open slots:
image

@smhood
Copy link
Author

smhood commented Feb 2, 2024

@dirrao do I have to change labels in order to get follow up?

@aki263
Copy link

aki263 commented Feb 4, 2024

I am experiencing similar problem in 2.7.3

[2024-02-04T07:19:20.201+0000] {scheduler_job_runner.py:1081} DEBUG - Executor full, skipping critical section [2024-02-04T07:19:20.203+0000] {base_executor.py:217} DEBUG - 32 running task instances [2024-02-04T07:19:20.203+0000] {base_executor.py:218} DEBUG - 0 in queue [2024-02-04T07:19:20.203+0000] {base_executor.py:219} DEBUG - 0 open slots

Running two scheduler replica and getting a lot of msgs like scheduling was skipped, probably because the DAG record was locked

I manually killed one of the scheduler pod and it helped to evaluate the issue. My scheduler were not restarted from last 5 days.

airflow-scheduler-686459bbff-g68xj                                                 2/2     Running             0              19m
airflow-scheduler-686459bbff-gv9vc                                                 2/2     Running             0              5d11h

Airflow config

[core]
dags_folder = /opt/airflow/dags
hostname_callable = airflow.utils.net.getfqdn
might_contain_dag_callable = airflow.utils.file.might_contain_dag_via_default_heuristic
default_timezone = utc
executor = KubernetesExecutor
auth_manager = airflow.auth.managers.fab.fab_auth_manager.FabAuthManager
parallelism = 32
max_active_tasks_per_dag = 16
dags_are_paused_at_creation = True
max_active_runs_per_dag = 16
# mp_start_method =
load_examples = false
plugins_folder = /opt/airflow/plugins
execute_tasks_new_python_interpreter = False
fernet_key = xxxxxxx=
donot_pickle = True
dagbag_import_timeout = 30.0
dagbag_import_error_tracebacks = True
dagbag_import_error_traceback_depth = 2
dag_file_processor_timeout = 50
task_runner = StandardTaskRunner
default_impersonation =
security =
unit_test_mode = False
enable_xcom_pickling = False
allowed_deserialization_classes = airflow\..*
killed_task_cleanup_time = 60
dag_run_conf_overrides_params = True
dag_discovery_safe_mode = True
dag_ignore_file_syntax = regexp
default_task_retries = 0
default_task_retry_delay = 300
max_task_retry_delay = 86400
default_task_weight_rule = downstream
default_task_execution_timeout =
min_serialized_dag_update_interval = 30
compress_serialized_dags = False
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
check_slas = True
xcom_backend = airflow.models.xcom.BaseXCom
lazy_load_plugins = True
lazy_discover_providers = True
hide_sensitive_var_conn_fields = True
sensitive_var_conn_names =
default_pool_task_slot_count = 512
max_map_length = 1024
daemon_umask = 0o077
# dataset_manager_class =
# dataset_manager_kwargs =
database_access_isolation = False
# internal_api_url =
test_connection = Disabled
colored_console_log = True
remote_logging = True
[database]
alembic_ini_file_path = alembic.ini
sql_alchemy_conn = postgresql+psycopg2://.....
# sql_alchemy_engine_args =
sql_engine_encoding = utf-8
# sql_engine_collation_for_ids =
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
sql_alchemy_schema =
# sql_alchemy_connect_args =
load_default_connections = True
max_db_retries = 3
check_migrations = True
[logging]
base_log_folder = /opt/airflow/logs
remote_logging = True
remote_log_conn_id = aws_conn_s3
delete_local_logs = False
google_key_path =
remote_base_log_folder = s3://s3-paas/airflow-prod-logs/
remote_task_handler_kwargs =
encrypt_s3_logs = False
logging_level = DEBUG
celery_logging_level =
fab_logging_level = WARNING
logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
dag_processor_log_target = file
dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware
secret_mask_adapter =
task_log_prefix_template =
log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
extra_logger_names =
worker_log_server_port = 8793
trigger_log_server_port = 8794
# interleave_timestamp_parser =
file_task_handler_new_folder_permissions = 0o775
file_task_handler_new_file_permissions = 0o664
celery_stdout_stderr_separation = False
delete_worker_pods = False
[metrics]
metrics_allow_list =
metrics_block_list =
statsd_on = True
statsd_host = airflow-statsd
statsd_port = 9125
statsd_prefix = airflow
stat_name_handler =
statsd_datadog_enabled = False
statsd_datadog_tags =
statsd_datadog_metrics_tags = True
# statsd_custom_client_path =
statsd_disabled_tags = job_id,run_id
statsd_influxdb_enabled = False
otel_on = False
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 60000
otel_debugging_on = False
otel_ssl_active = False
[secrets]
backend =
backend_kwargs =
use_cache = False
cache_ttl_seconds = 900
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://localhost:8080
[debug]
fail_fast = False
[api]
enable_experimental_api = False
auth_backends = airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
maximum_page_limit = 100
fallback_page_limit = 100
google_oauth2_audience =
google_key_path =
access_control_allow_headers =
access_control_allow_methods =
access_control_allow_origins =
enable_xcom_deserialize_support = False
[lineage]
backend =
[operators]
default_owner = airflow
default_deferrable = false
default_cpus = 1
default_ram = 512
default_disk = 512
default_gpus = 0
default_queue = default
allow_illegal_arguments = False
[webserver]
access_denied_message = Access is Denied
config_file = /opt/airflow/webserver_config.py
base_url = http://localhost:8080
default_ui_timezone = UTC
web_server_host = 0.0.0.0
web_server_port = 8080
web_server_ssl_cert =
web_server_ssl_key =
session_backend = database
web_server_master_timeout = 120
web_server_worker_timeout = 120
worker_refresh_batch_size = 1
worker_refresh_interval = 6000
reload_on_plugin_change = False
secret_key =xxxxxxxx
workers = 4
worker_class = sync
access_logfile = -
error_logfile = -
access_logformat =
expose_config = False
expose_hostname = False
expose_stacktrace = False
dag_default_view = grid
dag_orientation = LR
grid_view_sorting_order = topological
log_fetch_timeout_sec = 5
log_fetch_delay_sec = 2
log_auto_tailing_offset = 30
log_animation_speed = 1000
hide_paused_dags_by_default = False
page_size = 100
navbar_color = #fff
default_dag_run_display_number = 25
enable_proxy_fix = True
proxy_fix_x_for = 1
proxy_fix_x_proto = 1
proxy_fix_x_host = 1
proxy_fix_x_port = 1
proxy_fix_x_prefix = 1
cookie_secure = False
cookie_samesite = Lax
default_wrap = False
x_frame_enabled = True
# analytics_tool =
# analytics_id =
show_recent_stats_for_completed_runs = True
update_fab_perms = True
session_lifetime_minutes = 43200
# instance_name =
instance_name_has_markup = False
auto_refresh_interval = 3
warn_deployment_exposure = True
audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data
# audit_view_included_events =
enable_swagger_ui = True
run_internal_api = False
auth_rate_limited = True
auth_rate_limit = 5 per 40 second
caching_hash_method = md5
show_trigger_form_if_no_params = False
rbac = True
[email]
email_backend = airflow.utils.email.send_email_smtp
email_conn_id = smtp_default
default_email_on_retry = True
default_email_on_failure = True
# subject_template =
# html_content_template =
# from_email =
ssl_context = default
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = true
smtp_ssl = true
smtp_user = platform-airflow-dev@xyz.com
smtp_password = platform-airflow-dev@xyz.com
smtp_port = 587
smtp_mail_from = platform-airflow-dev@xyz.com
smtp_timeout = 30
smtp_retry_limit = 5
[sentry]
sentry_on = false
sentry_dsn =
# before_send =
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
local_task_job_heartbeat_sec = 0
num_runs = -1
scheduler_idle_sleep_time = 1
min_file_process_interval = 30
parsing_cleanup_interval = 60
stale_dag_threshold = 50
dag_dir_list_interval = 30
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 30
enable_health_check = False
scheduler_health_check_server_port = 8974
orphaned_tasks_check_interval = 300.0
child_process_log_directory = /opt/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
zombie_detection_interval = 10.0
catchup_by_default = True
ignore_first_depends_on_past_by_default = True
max_tis_per_query = 16
use_row_level_locking = True
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
schedule_after_task_execution = True
parsing_pre_import_modules = True
parsing_processes = 2
file_parsing_sort_mode = modified_time
standalone_dag_processor = False
max_callbacks_per_loop = 20
dag_stale_not_seen_duration = 600
use_job_schedule = True
allow_trigger_in_future = False
trigger_timeout_check_interval = 15
task_queued_timeout = 600.0
task_queued_timeout_check_interval = 120.0
allowed_run_id_pattern = ^[A-Za-z0-9_.~:+-]+$
logging_level = DEBUG
run_duration = 41460
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow
[triggerer]
default_capacity = 1000
job_heartbeat_sec = 5
triggerer_health_check_threshold = 30
[sensors]
default_timeout = 604800
[aws]
# session_factory =
cloudwatch_task_handler_json_serializer = airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy
[aws_ecs_executor]
conn_id = aws_default
# region_name =
assign_public_ip = False
# cluster =
# container_name =
launch_type = FARGATE
platform_version = LATEST
# security_groups =
# subnets =
# task_definition =
max_run_task_attempts = 3
# run_task_kwargs =
[celery_kubernetes_executor]
kubernetes_queue = kubernetes
[celery]
celery_app_name = airflow.providers.celery.executors.celery_executor
worker_concurrency = 16
# worker_autoscale =
worker_prefetch_multiplier = 1
worker_enable_remote_control = true
broker_url = redis://redis:6379/0
# result_backend =
result_backend_sqlalchemy_engine_options =
flower_host = 0.0.0.0
flower_url_prefix = /
flower_port = 5555
flower_basic_auth =
sync_parallelism = 0
celery_config_options = airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_track_started = True
task_publish_max_retries = 3
worker_precheck = False
[celery_broker_transport_options]
# visibility_timeout =
# sentinel_kwargs =
[local_kubernetes_executor]
kubernetes_queue = kubernetes
[kubernetes_executor]
api_client_retry_configuration =
logs_task_metadata = False
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = xyz.com/airflow-prod
worker_container_tag = a6a136ee
namespace = airflow
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
multi_namespace_mode_namespace_list =
in_cluster = True
# cluster_context =
# config_file =
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = True
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
verify_ssl = True
worker_pods_queued_check_interval = 60
ssl_ca_cert =
[dask]
cluster_address = 127.0.0.1:8786
tls_ca =
tls_cert =
tls_key =

[azure_remote_logging]
remote_wasb_log_container = airflow-logs
[kubernetes]
airflow_configmap = airflow-airflow-config
airflow_local_settings_configmap = airflow-airflow-config
multi_namespace_mode = False
namespace = airflow
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = xyz.com/airflow-prod
worker_container_tag = a6a136ee

@smhood
Copy link
Author

smhood commented Feb 6, 2024

image

Able to capture a pretty good log of what we are seeing. Things were working fine yesterday between 09:00 - 17:00, then 18:00 comes along and we start seeing the execution lock issue. No deployments happened at this time, same Dags were running and no error logs as far as we can tell just yet.

@smhood
Copy link
Author

smhood commented Feb 19, 2024

After a couple of weeks with no responses on this post we just decided to revert back to 2.7.2 and the issue is gone. Down the road we will investigate being able to use 2.8.x on something like astronomer, but can definitely confirm that the change to 2.7 -> 2.8 caused this issue.

@bixel
Copy link

bixel commented Feb 22, 2024

It looks like the scheduler or the kubernetes_executor cannot recover from communication issues with kubernetes. I've collected a few hours of logging after a restart of the scheduler and the problems seem to occur after following lines:

[2024-02-22T10:24:59.431+0000] {kubernetes_executor_utils.py:121} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 710, in _error_catcher
    yield
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1073, in read_chunked
    self._update_chunk_length()
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
    raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
    self.resource_version = self._run(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
    for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 933, in stream
    yield from self.read_chunked(amt, decode_content=decode_content)
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1061, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 727, in _error_catcher
    raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
Process KubernetesJobWatcher-3:
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 710, in _error_catcher
    yield
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1073, in read_chunked
    self._update_chunk_length()
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
    raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
    self.resource_version = self._run(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
    for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/home/airflow/.local/lib/python3.10/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 933, in stream
    yield from self.read_chunked(amt, decode_content=decode_content)
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 1061, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/airflow/.local/lib/python3.10/site-packages/urllib3/response.py", line 727, in _error_catcher
    raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
[2024-02-22T10:24:59.568+0000] {kubernetes_executor_utils.py:359} ERROR - Error while health checking kube watcher process for namespace airflow. Process died for unknown reasons
[2024-02-22T10:24:59.586+0000] {kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0

After that, tasks are stuck in queued and I don't see any more lines of the kind

[2024-02-22T10:16:50.260+0000] {scheduler_job_runner.py:696} INFO - Received executor event with state success for task instance TaskInstanceKey

I can only recover from that state by clearing all scheduled and queued tasks and restarting the scheduler. I wasn't able to dig deeper into the kubernetes_executor yet, but there seem to be quite a few changes between 2.7.3 and 2.8.1. That would be my first guess for the origin of this.

@crabio
Copy link

crabio commented Apr 9, 2024

Hi! I had the same error with 2.7.3 described in #36478
and I tested it on 2.8.4 and bug still exists

@crabio
Copy link

crabio commented Apr 9, 2024

Update: I have the same error with 1 scheduler with Airflow 2.8.4.
But I think that error may be also in the kubernetes provider.
Libs:

apache-airflow == 2.8.4
dbt-core == 1.7.11
dbt-snowflake == 1.7.3
apache-airflow[statsd]
facebook-business == 19.0.2
google-ads == 22.1.0
twitter-ads == 11.0.0
acryl-datahub-airflow-plugin == 0.10.2.3
acryl-datahub[dbt] == 0.10.2.3
checksumdir
filelock
openpyxl
cronsim
apache-airflow-providers-snowflake
apache-airflow-providers-cncf-kubernetes == 8.0.1
apache-airflow-providers-apache-kafka == 1.3.1
apache-airflow-providers-slack == 8.6.1
apache-airflow-providers-amazon
kubernetes
snowplow_analytics_sdk
elementary-data == 0.14.1

@crabio
Copy link

crabio commented Apr 10, 2024

@smhood I have downgraded Airflow version to 2.7.2, but issue still exists...

apache-airflow == 2.7.2
dbt-core == 1.7.11
dbt-snowflake == 1.7.3
apache-airflow[statsd]
facebook-business == 19.0.2
google-ads == 22.1.0
twitter-ads == 11.0.0
acryl-datahub-airflow-plugin == 0.10.2.3
acryl-datahub[dbt] == 0.10.2.3
checksumdir
filelock
openpyxl
cronsim
apache-airflow-providers-snowflake
apache-airflow-providers-cncf-kubernetes == 8.0.1
apache-airflow-providers-apache-kafka == 1.3.1
apache-airflow-providers-slack == 8.6.1
apache-airflow-providers-amazon
kubernetes
snowplow_analytics_sdk
elementary-data == 0.14.1

Maybe issue is inside providers packages?

@crabio
Copy link

crabio commented Apr 10, 2024

Found emails related the same issue: https://www.mail-archive.com/commits@airflow.apache.org/msg309101.html

@crabio
Copy link

crabio commented Apr 10, 2024

I found workaround and some insights:
If scheduler parallelism is less than sum of all slots in pools (32 parallelism and for example 8 pools by 8 slots) and all slots are used - scheduler starts leaking slots after some time.
Workaround - set parallelism higher than pools slots sum you have

@paramjeet01
Copy link

@crabio , Were you able to find a solution ? We are also facing the task leak issue in v2.6.3

@crabio
Copy link

crabio commented Apr 16, 2024

@paramjeet01 Fully - not
We still have slots leak:
image

But we found walkaround:

  1. set AIRFLOW__CORE__PARALLELISM to 512+ to have free slots in the scheduler. Or bigger than sum of slots in all pools that you have
  2. try set AIRFLOW__SCHEDULER__NUM_RUNS to reset scheduler some times. We are going to try this option with 4 hour scheduler life time
  3. check that you haven't zombie pods in all namespaces that you are using for the Airflow. We have some namespaces, where, we think, Airflow Scheduler has no access to kill pods and we have zombie succeeded tasks

@paramjeet01
Copy link

@crabio I have updated my comments here #38968 (comment) , I was able to improve the performance and the task no longer have longer queue duration

@aru-trackunit
Copy link
Contributor

I see similarity between the issue we are facing and the one you describe.

Airflow 2.8.4
We run one instance of scheduler and we observe a list of completed tasks (usually cleared) like here: #33402),
image

Config:

  • core parallelism = 32
  • default pool slots = 128
Screenshot 2024-04-15 at 15 26 19

At 3 AM there were supposed to run more tasks but nothing like that happened.

In the morning we ended up having no running tasks, many in queued state

@aru-trackunit
Copy link
Contributor

@crabio Could you please post steps to reproduce the issue? Then I could spend a little bit more time understanding it

@crabio
Copy link

crabio commented Apr 17, 2024

@aru-trackunit
Sure

  1. run Airflow with Kubernetes Executor with 1 scheduler
  2. run some tasks in your default namespace (maybe it is not required) and run tasks in another namespace with access for Airflow for create, watch and delete pods
  3. run airflow with active tasks for some time. Need to have running tasks for some time to catch slots leak.

Maybe tasks in another namespace not required, because we faced this issue before we started using multiple namespaces.
I think @paramjeet01 and @smhood has 1 namespace

@paramjeet01
Copy link

@crabio , Yes we run in single namespace.

@karunpoudel-chr
Copy link

karunpoudel-chr commented Apr 19, 2024

I am seeing issue in single namespace. Scheduler fails to remove pods in Completed state and after running slot reaches 32 new scheduled task aren't getting queued.
airflow==2.8.4
apache-airflow-providers-cncf-kubernetes==7.14.0
kubernetes==23.6.0
urllib3==2.0.7

KubernetesJobWatcher failed a couple times but it was able to restart.
In the logs below, the Watcher running on PID: 2034 failed. On the next sync of the executor, it was able to start back with PID: 3740.

[2024-04-18T23:29:34.285+0000] [2034:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:121} ERROR - Unknown error in KubernetesJobWatcher. Failing
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 710, in _error_catcher
    yield
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 1073, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 1008, in _update_chunk_length
    raise InvalidChunkLength(self, line) from None
urllib3.exceptions.InvalidChunkLength: InvalidChunkLength(got length b'', 0 bytes read)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 112, in run
    self.resource_version = self._run(
                            ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py", line 168, in _run
    for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 165, in stream
    for line in iter_resp_lines(resp):
  File "/usr/local/lib/python3.11/site-packages/kubernetes/watch/watch.py", line 56, in iter_resp_lines
    for seg in resp.stream(amt=None, decode_content=False):
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 933, in stream
    yield from self.read_chunked(amt, decode_content=decode_content)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 1061, in read_chunked
    with self._error_catcher():
  File "/usr/local/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/local/lib/python3.11/site-packages/urllib3/response.py", line 727, in _error_catcher
    raise ProtocolError(f"Connection broken: {e!r}", e) from e
urllib3.exceptions.ProtocolError: ("Connection broken: InvalidChunkLength(got length b'', 0 bytes read)", InvalidChunkLength(got length b'', 0 bytes read))
[2024-04-18T23:29:35.067+0000] [ 7:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:449} DEBUG - Syncing KubernetesExecutor
[2024-04-18T23:29:35.067+0000] [ 7:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:359} ERROR - Error while health checking kube watcher process for namespace astronomer-contractual-po-prod. Process died for unknown reasons
[2024-04-18T23:29:35.078+0000] [3740:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:157} INFO - Event: and now my watch begins starting at resource_version: 0

I was working fine for a minute; it was reporting back the pod changes.

[2024-04-18T23:30:50.605+0000] [3740:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:170} DEBUG - Event: strategizer-monitor-check-strategizer-bid-statuses-lz2ww1cv had an event of type DELETED
[2024-04-18T23:30:50.605+0000] [3740:139691425343296] {airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:256} INFO - Skipping event for Succeeded pod strategizer-monitor-check-strategizer-bid-statuses-lz2ww1cv - event for this pod already sent to executor

After this the Watcher went silent, no logs with PID 3740. KubernetesExecuter.running set kept increasing:

[2024-04-18T23:40:01.059+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:217} DEBUG - 1 running task instances
[2024-04-18T23:40:01.060+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:218} DEBUG - 0 in queue
[2024-04-18T23:40:01.060+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:219} DEBUG - 31 open slots
...
[2024-04-19T13:24:44.721+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:217} DEBUG - 32 running task instances
[2024-04-19T13:24:44.721+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:218} DEBUG - 0 in queue
[2024-04-19T13:24:44.721+0000] [ 7:139691425343296] {airflow/executors/base_executor.py:219} DEBUG - 0 open slots

I confirmed that the PID: 3740 is still running.

/usr/local/airflow$ ps -ef
UID   PID PPID  C STIME     TIME CMD
astro    1    0  0 Apr18 00:00:00 tini -- /entrypoint bash -c exec airflow scheduler
astro    7    1  4 Apr18 00:37:17 /usr/local/bin/python /usr/local/bin/airflow scheduler
astro   24    7  0 Apr18 00:01:06 /usr/local/bin/python /usr/local/bin/airflow scheduler
astro   33    7  0 Apr18 00:00:41 /usr/local/bin/python /usr/local/bin/airflow scheduler
astro   44    7  1 Apr18 00:12:28 airflow scheduler -- DagFileProcessorManager
astro 3740    7  0 Apr18 00:00:00 /usr/local/bin/python /usr/local/bin/airflow scheduler  <=== KubernetesJobWatcher

KubernetesJobWatcher is stuck in generator self._pod_events(kube_client=kube_client, query_kwargs=kwargs)

for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):

waiting for stream from kubernetes Watch.stream() and urllib3 HttpResponse.stream()

https://github.com/kubernetes-client/python/blob/98cd2251152fcdbfa6de24c85384887b0999a94c/kubernetes/base/watch/watch.py#L56

https://github.com/urllib3/urllib3/blob/56f01e088dc006c03d4ee6ea9da4ab810f1ed700/src/urllib3/response.py#L914

@karunpoudel-chr
Copy link

Also, those TaskInstances has been marked as completed and new DagRun has already started (while the open slots was still > 0) but the KubernetesExecuter.running set was still keeping those TaskInstances.
Maybe we could log warning when task instances in KubernetesExecuter.running has compled status. This would have helped with the debugging.

@karunpoudel-chr
Copy link

Based on my finding above that the KubernetesJobWatcher was running but not getting back any pod changes, I have added a timeout of 5 min so that watcher restarts itself. This has fixed the issue for me.

https://github.com/apache/airflow/compare/providers-cncf-kubernetes/7.14.0...karunpoudel-chr:airflow:watcher_idle_fix?expand=1

@alex-slynko
Copy link

We experiencing the problem with slots leaking. We have several schedulers running and one of the root causes is that two different schedulers pick up the same task and after the succeeds, the task is removed from the running list only on a single scheduler.

@dirrao
Copy link
Collaborator

dirrao commented May 5, 2024

Meanwhile, restart the scheduler after a certain number of iterations and increase the number of executor pool slots to high number.

@RNHTTR
Copy link
Collaborator

RNHTTR commented May 7, 2024

Based on my finding above that the KubernetesJobWatcher was running but not getting back any pod changes, I have added a timeout of 5 min so that watcher restarts itself. This has fixed the issue for me.

https://github.com/apache/airflow/compare/providers-cncf-kubernetes/7.14.0...karunpoudel-chr:airflow:watcher_idle_fix?expand=1

@jedcunningham @dstandish do you think it makes sense to implement this fix?

@dirrao
Copy link
Collaborator

dirrao commented May 11, 2024

This issue is related to watcher is not able to scale and process the events on time. This leads to so many completed pods over the time.
related: #22612

@dirrao dirrao self-assigned this May 11, 2024
@dirrao
Copy link
Collaborator

dirrao commented May 11, 2024

This issue is related to watcher is not able to scale and process the events on time. This leads to so many completed pods over the time.
related: #22612

@RNHTTR
Copy link
Collaborator

RNHTTR commented May 17, 2024

Closed by: #39551

@RNHTTR RNHTTR closed this as completed May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:performance area:providers kind:bug This is a clearly a bug provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

No branches or pull requests