Skip to content

db connection is not closed when container is restarted #33618

@antonio-antuan

Description

@antonio-antuan

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

checked on 2.6.1, 2.7.0

I use LocalExecutor with airflow running in docker container.

Dag's code looks like this:

def get_engine(conn_id = None):
    return PostgresHook(postgres_conn_id="postgres_default").get_sqlalchemy_engine()

@dag(
    "foo",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
    max_active_runs=1,
)
def foo():
    @task
    def run():
        with get_engine().begin() as tr:
            tr.execute('select pg_sleep(6000)')
    run()


foo()

During the task run I restart docker container with something like docker-compose restart scheduler (which is actually a worker).
After that I can see two queries running in pg_stat_activity. If I restart one more time - there are three queries and so on.

Here are logs (always are the same for restarts):

[2023-08-22 15:55:48 +0000] [21] [INFO] Handling signal: term
[2023-08-22 15:55:48 +0000] [22] [INFO] Worker exiting (pid: 22)
[2023-08-22 15:55:48 +0000] [24] [INFO] Worker exiting (pid: 24)
[2023-08-22T15:55:48.159+0000] {scheduler_job_runner.py:247} INFO - Exiting gracefully upon receiving signal 15
[2023-08-22 15:55:48 +0000] [21] [INFO] Shutting down: Master
[2023-08-22T15:55:49.162+0000] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 159. PIDs of all processes in the group: [159]
[2023-08-22T15:55:49.162+0000] {process_utils.py:86} INFO - Sending the signal Signals.SIGTERM to group 159
[2023-08-22T15:55:49.295+0000] {process_utils.py:79} INFO - Process psutil.Process(pid=159, status='terminated', exitcode=0, started='15:55:37') (159) terminated with exit code 0
[2023-08-22T15:55:49.295+0000] {local_executor.py:400} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
[2023-08-22T15:55:49.295+0000] {scheduler_job_runner.py:865} ERROR - Exception when executing Executor.end
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 842, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 999, in _run_scheduler_loop
    time.sleep(min(self._scheduler_idle_sleep_time, next_event if next_event else 0))
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 250, in _exit_gracefully
    sys.exit(os.EX_OK)
SystemExit: 0


During handling of the above exception, another exception occurred:


Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job_runner.py", line 863, in _execute
    self.job.executor.end()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/local_executor.py", line 404, in end
    self.impl.end()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/local_executor.py", line 345, in end
    self.queue.put((None, None))
  File "<string>", line 2, in put
  File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2023-08-22T15:55:49.298+0000] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 159. PIDs of all processes in the group: []
[2023-08-22T15:55:49.298+0000] {process_utils.py:86} INFO - Sending the signal Signals.SIGTERM to group 159
[2023-08-22T15:55:49.298+0000] {process_utils.py:100} INFO - Sending the signal Signals.SIGTERM to process 159 as process group is missing.
[2023-08-22T15:55:49.298+0000] {scheduler_job_runner.py:871} INFO - Exited execute loop

What you think should happen instead

the db connection that is opened when task is running, must be closed.

How to reproduce

  1. put the code
  2. restart container
  3. run in db select * from pg_stat_activity where state != 'idle'

Operating System

Arch Linux

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.0.0
apache-airflow-providers-common-sql==1.4.0
apache-airflow-providers-google==10.0.0
apache-airflow-providers-http==4.3.0
apache-airflow-providers-postgres==5.4.0

Deployment

Docker-Compose

Deployment details

almost the same installation is used in our AWS ECS.

docker-compose.yaml.

---
version: '3.4'

x-airflow-common:
  &airflow-common
  build:
    dockerfile: local.dockerfile
  ulimits:
    nofile:
      soft: 65536
      hard: 65536
  user: "${AIRFLOW_UID:-0}:0"
  env_file:
    - "${ENV_FILE:-.env}"
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./shared:/mnt/shared

x-depends-on:
  &depends-on
  depends_on:
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    ports:
      - "5432:5432"
    command: ["postgres", "-c", "log_statement=all"]
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: unless-stopped
    env_file:
      - "${ENV_FILE:-.env}"
    logging:
      options:
        # in case of "log_statement=all" logs may have a very large size. so limit it (unlimited by default).
        max-size: "100mb"

  scheduler:
    <<: [*airflow-common, *depends-on]
    command: ["airflow", "scheduler"]
    restart: unless-stopped
    ports:
      - "8793:8793"
    extra_hosts:
      - "host.docker.internal:host-gateway"

  webserver:
    <<: [*airflow-common, *depends-on]
    command: ["airflow", "webserver"]
    restart: unless-stopped
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 5s
      timeout: 30s
      retries: 10

local.dockerfile:

FROM apache/airflow:slim-2.7.0-python3.10

USER airflow
COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt

requirements are installed with -c https://raw.githubusercontent.com/apache/airflow/constraints-2.6.1/constraints-3.10.txt

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions