Skip to content

Use WeakKeyDictionary for WRITE_LOCKS to prevent file object leaks#807

Merged
hynek merged 3 commits intohynek:mainfrom
ashb:weakref-write-locks
May 2, 2026
Merged

Use WeakKeyDictionary for WRITE_LOCKS to prevent file object leaks#807
hynek merged 3 commits intohynek:mainfrom
ashb:weakref-write-locks

Conversation

@ashb
Copy link
Copy Markdown
Contributor

@ashb ashb commented May 1, 2026

File objects registered in WRITE_LOCKS were never released, causing a
memory leak in long-running processes that open many log files (e.g.,
task executors creating a per-task BytesLogger or WriteLogger).

WeakKeyDictionary stores keys as weak references, so entries expire
automatically when the last strong reference to a file object is
dropped — no manual cleanup needed.

Closes #806 (no need to expose it, we tidy it up correctly ourselves)

A test script to show that WeakKeyDict works:

#!/usr/bin/env python
from __future__ import annotations

import gc
import tempfile
import weakref
from pathlib import Path

import structlog._output as _output


def _open_task_log(log_path: Path) -> tuple[object, weakref.ref]:
    """Open a log file, create a BytesLogger (registering it in WRITE_LOCKS).

    Returns the logger and a weakref to the underlying file so we can
    track whether the file object is GC'd after cleanup.

    Mirrors what Airflow's supervisor._configure_logging does per task.
    """
    import structlog

    fd = log_path.open("ab")
    ref = weakref.ref(fd)
    # BytesLogger.__init__ calls _get_lock_for_file(fd), inserting fd into
    # WRITE_LOCKS as a strong-reference key.
    _logger = structlog.BytesLogger(fd)
    fd.close()
    del fd
    return _logger, ref


def demonstrate_leak(n_tasks: int = 5) -> None:
    print(f"{'='*60}")
    print("PART 1 — current behaviour (plain dict)")
    print(f"{'='*60}")

    baseline = len(_output.WRITE_LOCKS)
    file_refs: list[weakref.ref] = []
    tmp_files: list[Path] = []

    for i in range(n_tasks):
        with tempfile.NamedTemporaryFile(suffix=f"-task{i}.log", delete=False) as t:
            log_path = Path(t.name)
        tmp_files.append(log_path)
        logger, ref = _open_task_log(log_path)
        file_refs.append(ref)
        del logger  # drop the last application-side reference

    gc.collect()

    added = len(_output.WRITE_LOCKS) - baseline
    still_alive = sum(1 for r in file_refs if r() is not None)

    print(f"Ran {n_tasks} tasks.")
    print(f"WRITE_LOCKS grew by {added} entries (expected {n_tasks}).")
    print(f"File objects still alive after close+del+gc: {still_alive}/{n_tasks}")
    print()
    if still_alive == n_tasks:
        print("  ✗  LEAK CONFIRMED: closed file objects kept alive by WRITE_LOCKS.")
    else:
        print("  (unexpected: some files freed — check for lingering references)")

    for p in tmp_files:
        p.unlink(missing_ok=True)
    print()



def demonstrate_fix(n_tasks: int = 5) -> None:
    print(f"{'='*60}")
    print("PART 2 — fixed behaviour (WeakKeyDictionary)")
    print(f"{'='*60}")

    # Patch WRITE_LOCKS.  _get_lock_for_file resolves WRITE_LOCKS as a
    # module-level name at call time, so replacing the module attribute is
    # sufficient — no need to monkey-patch the function itself.
    original = _output.WRITE_LOCKS
    weak_locks: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary()
    _output.WRITE_LOCKS = weak_locks  # type: ignore[assignment]

    try:
        file_refs: list[weakref.ref] = []
        tmp_files: list[Path] = []

        for i in range(n_tasks):
            with tempfile.NamedTemporaryFile(suffix=f"-task{i}.log", delete=False) as t:
                log_path = Path(t.name)
            tmp_files.append(log_path)
            logger, ref = _open_task_log(log_path)
            file_refs.append(ref)
            del logger

        gc.collect()

        locks_remaining = len(weak_locks)
        still_alive = sum(1 for r in file_refs if r() is not None)

        print(f"Ran {n_tasks} tasks.")
        print(f"WRITE_LOCKS entries remaining: {locks_remaining} (expected 0).")
        print(f"File objects still alive after close+del+gc: {still_alive}/{n_tasks}")
        print()
        if still_alive == 0 and locks_remaining == 0:
            print("  ✓  NO LEAK: file objects GC'd automatically, WRITE_LOCKS self-cleaned.")
        else:
            print("  (unexpected: some files survived — check for other references)")

        for p in tmp_files:
            p.unlink(missing_ok=True)

    finally:
        _output.WRITE_LOCKS = original

    print()

if __name__ == "__main__":
    demonstrate_leak()
    demonstrate_fix()

Pull Request Check List

  • I acknowledge this project's AI policy.
  • This pull requests is not from my main branch.
  • There's tests for all new and changed code.
  • New APIs are added to our typing tests in api.py.
  • Updated documentation for changed code.
    • New functions/classes have to be added to docs/api.rst by hand.
    • Changed/added classes/methods/functions have appropriate versionadded, versionchanged, or deprecated directives.
      • The next version is the second number in the current release + 1. The first number represents the current year. So if the current version on PyPI is 26.1.0, the next version is gonna be 26.2.0. If the next version is the first in the new year, it'll be 27.1.0.
  • Documentation in .rst and .md files is written using semantic newlines.
  • Changes (and possible deprecations) are documented in the changelog.

File objects registered in WRITE_LOCKS were never released, causing a
memory leak in long-running processes that open many log files (e.g.,
task executors creating a per-task BytesLogger or WriteLogger).

WeakKeyDictionary stores keys as weak references, so entries expire
automatically when the last strong reference to a file object is
dropped — no manual cleanup needed.

Closes hynek#806 (no need to expose it, we tidy it up correctly ourselves)
Copy link
Copy Markdown
Owner

@hynek hynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thank you! I knew there had to be some API but didn't get around to check.

@hynek hynek merged commit 79032b3 into hynek:main May 2, 2026
17 checks passed
@wjddn279
Copy link
Copy Markdown

wjddn279 commented May 2, 2026

@hynek cool! Do you happen to know when a release with this change will be available?

@hynek
Copy link
Copy Markdown
Owner

hynek commented May 2, 2026

I want to release the very overdue 26.1.0 ASAP, but I need to work myself through the list of PRs, so I cannot make any promises on the timeline, sorry.

@wjddn279
Copy link
Copy Markdown

wjddn279 commented May 2, 2026

ok thanks!

@ashb ashb deleted the weakref-write-locks branch May 2, 2026 12:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants