Skip to content

AIP-44 Internal API not activated in LocalExecutor task supervisor processes, causing NullPool to be used despite database_access_isolation=True #61964

@mcarter-twosigma

Description

@mcarter-twosigma

Apache Airflow version

2.11.X

If "Other Airflow 3 version" selected, which one?

No response

What happened?

On Apache Airflow Version: 2.10.5 (version not available in this reporting tool), when using LocalExecutor with AIP-44 (Internal API) enabled via AIRFLOW_ENABLE_AIP_44=true and database_access_isolation=True, the Internal API is not properly activated in task supervisor processes. This causes InternalApiConfig.get_use_internal_api() to return False, which triggers the reconfiguration of SQLAlchemy to use NullPool instead of QueuePool at airflow/cli/commands/task_command.py:476.

This defeats the purpose of AIP-44 and has serious consequences:

  1. Task supervisors use direct database access (not via Internal API)
  2. SQLAlchemy uses NullPool, which causes memory leaks with certain database drivers (notably PostgreSQL with libpq 16)
  3. The behavior is inconsistent with other Airflow components that properly activate the Internal API

When database_access_isolation=True and AIRFLOW_ENABLE_AIP_44=true:

  • InternalApiConfig.set_use_internal_api() is never called in task supervisor processes
  • InternalApiConfig.get_use_internal_api() returns False at task_command.py:469
  • The ORM is reconfigured to use NullPool at task_command.py:476
  • Task supervisors use direct database access with NullPool (worst of both worlds)

Root Cause

The issue occurs in the execution flow when LocalExecutor forks to create task supervisor processes:

Normal execution path (e.g., airflow tasks run from CLI):

  1. __main__.py:main() is called
  2. configure_internal_api() is called at line 60
  3. If database_access_isolation=True, calls InternalApiConfig.set_use_internal_api() at line 75
  4. task_run() is called
  5. Check at task_command.py:469 correctly sees get_use_internal_api() returns True

LocalExecutor fork path (the problematic path):

  1. LocalExecutor worker forks at airflow/executors/local_executor.py:117
  2. Child process directly parses args and calls args.func(args) at line 142
  3. Skips __main__.py:configure_internal_api() entirely
  4. task_run() is called with InternalApiConfig._use_internal_api still at default False
  5. Check at task_command.py:469 sees get_use_internal_api() returns False
  6. Triggers settings.reconfigure_orm(disable_connection_pool=True) at line 476

Note: There is a call to InternalApiConfig.set_use_internal_api() in StandardTaskRunner._start_by_fork() at airflow/task/task_runner/standard_task_runner.py:79, but this happens in the task runner child process, not the task supervisor process where the check at line 469 occurs.

Impact:

  1. Memory leak: With NullPool and PostgreSQL libpq 16, task supervisor processes leak ~22 MB/hour
  2. Scalability issues: With 96 parallel tasks, memory leaks quickly exhaust container limits, causing OOM kills
  3. AIP-44 broken for LocalExecutor: The feature doesn't work as designed for one of the most common executors
  4. Security concern: Untrusted components (task supervisors running user code) have direct database access when they shouldn't

Proposed Fix

In airflow/executors/local_executor.py, the _execute_work_in_fork() method should check database_access_isolation and call InternalApiConfig.set_use_internal_api() before invoking the task command:

def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState:
    pid = os.fork()
    if pid:
        # In parent, wait for the child
        pid, ret = os.waitpid(pid, 0)
        return TaskInstanceState.SUCCESS if ret == 0 else TaskInstanceState.FAILED

    from airflow.sentry import Sentry

    ret = 1
    try:
        import signal

        from airflow.cli.cli_parser import get_parser
        from airflow.configuration import conf  # Add this import

        signal.signal(signal.SIGINT, signal.SIG_DFL)
        signal.signal(signal.SIGTERM, signal.SIG_DFL)
        signal.signal(signal.SIGUSR2, signal.SIG_DFL)

        # Add this block to configure Internal API if needed
        if conf.getboolean("core", "database_access_isolation", fallback=False):
            from airflow.api_internal.internal_api_call import InternalApiConfig

            # Set SQL connection to none:// to prevent direct DB access
            if "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in os.environ:
                os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "none://"
                conf.set("database", "sql_alchemy_conn", "none://")

            InternalApiConfig.set_use_internal_api("LocalExecutor task supervisor")

        parser = get_parser()
        # [1:] - remove "airflow" from the start of the command
        args = parser.parse_args(command[1:])
        args.shut_down_logging = False

        setproctitle(f"airflow task supervisor: {command}")

        args.func(args)
        ret = 0
        return TaskInstanceState.SUCCESS
    except Exception as e:
        self.log.exception("Failed to execute task %s.", e)
        return TaskInstanceState.FAILED
    finally:
        Sentry.flush()
        logging.shutdown()
        os._exit(ret)

This mirrors the logic in __main__.py:configure_internal_api() but applies it to the forked task supervisor process.

Workaround

As a temporary workaround, we've patched task_command.py:476-478 to skip the NullPool reconfiguration:

if not InternalApiConfig.get_use_internal_api():
    # PATCH: Skip NullPool to avoid memory leak
    # Original: settings.reconfigure_orm(disable_connection_pool=True)
    pass  # Keep existing QueuePool from initial ORM configuration

and configured SQLAlchemy connection pooling in airflow.cfg:

[database]
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 0
sql_alchemy_pool_recycle = 3600
sql_alchemy_pool_pre_ping = True

This eliminates the memory leak but doesn't properly implement AIP-44.

What you think should happen instead?

When database_access_isolation=True and AIRFLOW_ENABLE_AIP_44=true:

  • Task supervisor processes should call InternalApiConfig.set_use_internal_api() during initialization
  • InternalApiConfig.get_use_internal_api() should return True at task_command.py:469
  • The ORM should NOT be reconfigured to use NullPool
  • Task supervisors should either:
    • Use the Internal API for database access, OR
    • Use QueuePool if direct database access is needed

How to reproduce

  1. Set up Airflow 2.10.5 with PostgreSQL database
  2. Configure in airflow.cfg:
[core]
executor = LocalExecutor
database_access_isolation = True
  1. Set environment variable:
export AIRFLOW_ENABLE_AIP_44=true
  1. Start Airflow scheduler
  2. Trigger a DAG with a simple task
  3. Add debug logging to task_command.py:469:
print(f"PID {os.getpid()}: get_use_internal_api() = {InternalApiConfig.get_use_internal_api()}")
  1. Observe that task supervisor processes print False

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

N/A

Deployment

Other

Deployment details

Environment:

  • Kubernetes
  • Executor: LocalExecutor
  • Python: 3.11+
  • Database: PostgreSQL with libpq 16

Anything else?

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:Executors-coreLocalExecutor & SequentialExecutorarea:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions