Skip to content

fix: 清理 Loguru sink 消除退出 semaphore warning#8596

Open
Akokk0 wants to merge 2 commits into
AstrBotDevs:masterfrom
Akokk0:fix/loguru-shutdown-cleanup
Open

fix: 清理 Loguru sink 消除退出 semaphore warning#8596
Akokk0 wants to merge 2 commits into
AstrBotDevs:masterfrom
Akokk0:fix/loguru-shutdown-cleanup

Conversation

@Akokk0
Copy link
Copy Markdown

@Akokk0 Akokk0 commented Jun 5, 2026

变更说明

修复开启文件日志后,AstrBot 退出时出现的 multiprocessing.resource_tracker semaphore warning。

典型 warning:

resource_tracker: There appear to be 8 leaked semaphore objects to clean up at shutdown

如果同时开启普通文件日志和 trace 日志,可能出现:

resource_tracker: There appear to be 16 leaked semaphore objects to clean up at shutdown

问题原因

AstrBot 的文件日志 sink 使用了 Loguru 的 enqueue=True

enqueue=True 会创建 multiprocessing 队列相关资源。如果进程退出前没有显式 flush / remove sink,Python 的 multiprocessing.resource_tracker 会在进程关闭时检测到残留 semaphore,并输出 warning。

原先 AstrBot 只在日志重配置时移除旧 sink,但进程关闭路径没有统一清理 Loguru sink。

主要修改

  • 新增 LogManager.shutdown(),统一 complete() 并移除 trace/file/console sinks
  • AstrBotCoreLifecycle.stop()InitialLoader 的关闭路径调用日志清理
  • astrbot run 增加 SIGINT / SIGTERM graceful shutdown,确保收到退出信号时也能走清理流程
  • 补充日志 shutdown、CLI 信号关闭、初始化失败/取消路径测试

验证

已通过以下检查:

ruff check astrbot/cli/commands/cmd_run.py astrbot/core/log.py astrbot/core/core_lifecycle.py astrbot/core/initial_loader.py tests/unit/test_core_lifecycle.py tests/unit/test_log_manager_shutdown.py tests/unit/test_cmd_run_shutdown.py
ruff format --check astrbot/cli/commands/cmd_run.py astrbot/core/log.py astrbot/core/core_lifecycle.py astrbot/core/initial_loader.py tests/unit/test_core_lifecycle.py tests/unit/test_log_manager_shutdown.py tests/unit/test_cmd_run_shutdown.py
pytest tests/unit/test_cmd_run_shutdown.py tests/unit/test_log_manager_shutdown.py tests/unit/test_core_lifecycle.py -q

结果:

31 passed

另外跑过 dashboard 相关回归测试:

pytest tests/test_dashboard.py tests/test_api_key_open_api.py tests/test_kb_import.py -q

结果:

71 passed, 1 warning

其中 warning 是已有插件路径里的 deprecated decorator warning,与本次 resource_tracker 问题无关。

真实运行验证

在 macOS 本地真实启动 AstrBot,配置:

{
  "log_file_enable": true,
  "trace_log_enable": true,
  "log_file_path": "logs/astrbot.log",
  "trace_log_path": "logs/astrbot.trace.log"
}

启动到:

AstrBot started.

随后发送 SIGTERM。

修复后结果:

EXIT_CODE=0
WARNING_COUNT=0

关键日志:

AstrBot started.
Received SIGTERM; stopping AstrBot...
🌈 正在关闭 AstrBot...

未再出现:

resource_tracker
leaked semaphore objects
semaphore objects

SIGINT 场景也已验证,未再出现该 warning。

Fixes #8595

🤖 Generated with Claude Code

Summary by Sourcery

Add graceful shutdown and centralized log sink cleanup to eliminate Loguru semaphore warnings on AstrBot exit.

Bug Fixes:

  • Ensure Loguru sinks are flushed and removed during process shutdown to prevent leaked semaphore warnings from multiprocessing.resource_tracker.

Enhancements:

  • Introduce a LogManager.shutdown hook and invoke it from core lifecycle and initialization shutdown paths.
  • Add SIGINT/SIGTERM handling in the astrbot run command to cancel the main task and trigger a clean shutdown sequence.

Tests:

  • Add unit tests covering CLI signal-driven shutdown, log manager shutdown behavior, and log cleanup on initialization failure or cancellation.

Add LogManager.shutdown() and wire it into the AstrBot shutdown path.Also handle SIGINT/SIGTERM gracefully in astrbot run so file-log sinks are flushed and removed before exit.

This prevents multiprocessing.resource_tracker semaphore warnings when file logging is enabled.

Refs AstrBotDevs#8595
@dosubot dosubot Bot added size:L This PR changes 100-499 lines, ignoring generated files. area:core The bug / feature is about astrbot's core, backend labels Jun 5, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces graceful shutdown handling for AstrBot, registering handlers for SIGINT and SIGTERM signals, ensuring that running tasks are cancelled, and adding a shutdown method to LogManager to clean up loguru sinks. The review feedback highlights critical improvements for robustness: ensuring that the core lifecycle is stopped even when non-cancellation exceptions occur during execution, and adding exception handling for signal registration and cleanup to prevent crashes in non-main threads or when dealing with null signal handlers.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines 57 to +62
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
await core_lifecycle.stop()
if initialized:
await core_lifecycle.stop()
finally:
await LogManager.shutdown()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

如果 await task 在运行时抛出非 CancelledError 的异常(例如数据库连接断开、网络异常或插件崩溃等),该异常将不会被 except asyncio.CancelledError 捕获,而是直接进入 finally 块。

这会导致 core_lifecycle.stop() 无法被调用,从而无法优雅地关闭各个管理器、释放数据库连接和清理插件资源,可能导致资源泄露或锁文件残留。

建议将 core_lifecycle.stop() 的调用移动到 finally 块中,并结合 initialized 状态进行保护,以确保在任何异常退出路径下都能进行优雅清理。

Suggested change
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
await core_lifecycle.stop()
if initialized:
await core_lifecycle.stop()
finally:
await LogManager.shutdown()
except asyncio.CancelledError:
logger.info("🌈 正在关闭 AstrBot...")
finally:
if initialized:
await core_lifecycle.stop()
await LogManager.shutdown()

Comment on lines +27 to +36
for signum in handled_signals:
previous_handlers[signum] = signal.getsignal(signum)
try:
loop.add_signal_handler(signum, callback, signum)
installed.append(signum)
except (NotImplementedError, RuntimeError):
signal.signal(
signum, lambda _signum, _frame: callback(signal.Signals(_signum))
)
installed.append(signum)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

在非主线程(例如某些测试环境或嵌入式运行环境)中,signal.getsignalloop.add_signal_handlersignal.signal 可能会抛出 ValueError。此外,如果 signal.getsignal 返回 None(表示信号处理器由 C/C++ 层注册),直接将其传给 signal.signal 会导致 TypeError

建议对信号获取和注册进行更健壮的异常处理,并允许在非主线程中优雅降级。

Suggested change
for signum in handled_signals:
previous_handlers[signum] = signal.getsignal(signum)
try:
loop.add_signal_handler(signum, callback, signum)
installed.append(signum)
except (NotImplementedError, RuntimeError):
signal.signal(
signum, lambda _signum, _frame: callback(signal.Signals(_signum))
)
installed.append(signum)
for signum in handled_signals:
try:
previous_handlers[signum] = signal.getsignal(signum)
except ValueError:
previous_handlers[signum] = None
try:
loop.add_signal_handler(signum, callback, signum)
installed.append(signum)
except (NotImplementedError, RuntimeError, ValueError):
try:
signal.signal(
signum, lambda _signum, _frame: callback(signal.Signals(_signum))
)
installed.append(signum)
except ValueError:
pass

Comment on lines +38 to +44
def cleanup() -> None:
for signum in installed:
try:
loop.remove_signal_handler(signum)
except (NotImplementedError, RuntimeError):
pass
signal.signal(signum, previous_handlers[signum])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

在清理信号处理器时,如果 previous_handlers 中对应信号的值为 None,直接调用 signal.signal(signum, None) 会抛出 TypeError。此外,如果在非主线程中执行清理,signal.signal 也会抛出 ValueError

建议在恢复信号处理器前进行 None 检查,并捕获可能的 ValueError

Suggested change
def cleanup() -> None:
for signum in installed:
try:
loop.remove_signal_handler(signum)
except (NotImplementedError, RuntimeError):
pass
signal.signal(signum, previous_handlers[signum])
def cleanup() -> None:
for signum in installed:
try:
loop.remove_signal_handler(signum)
except (NotImplementedError, RuntimeError, ValueError):
pass
handler = previous_handlers.get(signum)
if handler is not None:
try:
signal.signal(signum, handler)
except ValueError:
pass

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • LogManager.shutdown() is now invoked from multiple layers (AstrBotCoreLifecycle.stop, InitialLoader.start finally, and run_astrbot finally), which can lead to redundant shutdown calls; consider centralizing responsibility in a single top-level shutdown path or explicitly documenting/making shutdown idempotent to avoid surprises.
  • Coupling AstrBotCoreLifecycle.stop() to the global LogManager.shutdown() means stopping the core lifecycle also tears down process-wide logging; if callers might expect to reuse the logger after a stop/restart, consider decoupling these concerns or adding a dedicated higher-level shutdown that orchestrates both.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- LogManager.shutdown() is now invoked from multiple layers (AstrBotCoreLifecycle.stop, InitialLoader.start finally, and run_astrbot finally), which can lead to redundant shutdown calls; consider centralizing responsibility in a single top-level shutdown path or explicitly documenting/making shutdown idempotent to avoid surprises.
- Coupling AstrBotCoreLifecycle.stop() to the global LogManager.shutdown() means stopping the core lifecycle also tears down process-wide logging; if callers might expect to reuse the logger after a stop/restart, consider decoupling these concerns or adding a dedicated higher-level shutdown that orchestrates both.

## Individual Comments

### Comment 1
<location path="astrbot/cli/commands/cmd_run.py" line_range="78-86" />
<code_context>
+    )
+
+    try:
+        done, _ = await asyncio.wait(
+            {runner_task, shutdown_task},
+            return_when=asyncio.FIRST_COMPLETED,
+        )
+        if shutdown_task in done and not runner_task.done():
+            signal_name = shutdown_signal.name if shutdown_signal else "unknown"
+            logger.info(f"Received {signal_name}; stopping AstrBot...")
+            runner_task.cancel()
+        await runner_task
+    finally:
+        cleanup_signal_handlers()
</code_context>
<issue_to_address>
**issue (bug_risk):** Handle CancelledError and ensure shutdown_task is fully cleaned up to avoid noisy shutdowns.

When `runner_task` is cancelled, `await runner_task` will raise `asyncio.CancelledError` and currently bubble out of `run_astrbot`, causing noisy tracebacks on normal SIGINT/SIGTERM shutdowns. It would be better to catch `asyncio.CancelledError` around `await runner_task` and treat it as a normal shutdown.

Additionally, `shutdown_task.cancel()` is never awaited, which can leave a pending task warning when the loop closes. After cleaning up signal handlers, check `shutdown_task.done()`, and if not, cancel and await it (suppressing `CancelledError`) so the shutdown path is quiet and all tasks are fully cleaned up.
</issue_to_address>

### Comment 2
<location path="astrbot/cli/commands/cmd_run.py" line_range="18" />
<code_context>
+ShutdownCallback = Callable[[signal.Signals], None]
+
+
+def _install_shutdown_signal_handlers(
+    loop: asyncio.AbstractEventLoop,
+    callback: ShutdownCallback,
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the signal handling and shutdown flow in run_astrbot by inlining the logic, reducing task coordination, and centralizing LogManager shutdown.

You can reduce complexity without losing functionality by simplifying both the signal-handling abstraction and the task coordination.

### 1. Inline signal setup instead of `_install_shutdown_signal_handlers`

The helper tracks previous handlers and has two code paths, plus a cleanup closure. For this CLI entrypoint, you can keep behavior but make it local and linear:

```python
async def run_astrbot(astrbot_root: Path) -> None:
    ...
    loop = asyncio.get_running_loop()
    shutdown_requested = asyncio.Event()

    def request_shutdown(signum: int) -> None:
        logger.info(f"Received {signal.Signals(signum).name}; stopping AstrBot...")
        shutdown_requested.set()

    for signum in (signal.SIGINT, signal.SIGTERM):
        try:
            loop.add_signal_handler(signum, request_shutdown, signum)
        except (NotImplementedError, RuntimeError):
            signal.signal(signum, lambda s, _f: request_shutdown(s))
```

This removes:

- `ShutdownCallback` and `_install_shutdown_signal_handlers`
- `previous_handlers`, `installed`, and the `cleanup` closure
- The need to thread `shutdown_signal` state around

If restoring previous handlers is really required, you can still keep that logic *inline* in `run_astrbot`, which is easier to reason about than a generic helper.

### 2. Simplify the two-task coordination

You can avoid `asyncio.wait` and a separate `shutdown_task` by directly awaiting the event and then cancelling the main task:

```python
async def run_astrbot(astrbot_root: Path) -> None:
    ...
    runner_task = asyncio.create_task(core_lifecycle.start(), name="astrbot")

    try:
        await shutdown_requested.wait()
        if not runner_task.done():
            runner_task.cancel()
        await runner_task
    finally:
        await LogManager.shutdown()
```

If you want to log only when a signal actually occurred, keep the logging in `request_shutdown` as above. This preserves:

- “run core_lifecycle until a signal”
- “cancel and await on signal”
- “always shut down logging”

but removes `shutdown_task`, `asyncio.wait`, and the `FIRST_COMPLETED` + `done` bookkeeping.

### 3. Centralise `LogManager.shutdown`

Right now `run_astrbot` calls `await LogManager.shutdown()` and `InitialLoader.start` also calls it in a `finally`. To avoid double-shutdown logic spread across layers, consider consolidating it in *one* place.

For example, keep it only in the CLI:

```python
# in InitialLoader.start()
try:
    ...
finally:
    # remove LogManager.shutdown() from here
    ...
```

And leave:

```python
try:
    ...
finally:
    await LogManager.shutdown()
```

in `run_astrbot`.

This reduces cross-layer coupling and simplifies reasoning about where the global logging system is terminated.
</issue_to_address>

### Comment 3
<location path="astrbot/core/initial_loader.py" line_range="30" />
<code_context>
+
+    for signum in handled_signals:
+        previous_handlers[signum] = signal.getsignal(signum)
+        try:
+            loop.add_signal_handler(signum, callback, signum)
+            installed.append(signum)
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the start logic to move initialization and task orchestration into small helpers, removing the nested try and initialized flag while preserving current behavior.

The nested `try` + `initialized` flag can be removed without changing behaviour by separating initialization from the run/cancel flow and extracting a small helper for the core+dashboard orchestration.

You can keep all guarantees:

- Init failures are logged and abort early.
- `stop()` is only called if init succeeded.
- `LogManager.shutdown()` still always runs.

For example:

```python
class InitialLoader:
    ...

    async def _init_core(self, core_lifecycle: AstrBotCoreLifecycle) -> bool:
        try:
            await core_lifecycle.initialize()
            return True
        except Exception as e:
            logger.critical(traceback.format_exc())
            logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
            return False

    def _build_task(self, core_lifecycle: AstrBotCoreLifecycle) -> asyncio.Future:
        core_task = core_lifecycle.start()
        webui_dir = self.webui_dir
        self.dashboard_server = AstrBotDashboard(
            core_lifecycle,
            self.db,
            core_lifecycle.dashboard_shutdown_event,
            webui_dir,
        )
        coro = self.dashboard_server.run()
        if coro:
            return asyncio.gather(core_task, coro)
        return core_task

    async def start(self) -> None:
        core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)

        if not await self._init_core(core_lifecycle):
            # init failed -> nothing to stop, just ensure logs shutdown
            await LogManager.shutdown()
            return

        task = self._build_task(core_lifecycle)

        try:
            await task  # 整个AstrBot在这里运行
        except asyncio.CancelledError:
            logger.info("🌈 正在关闭 AstrBot...")
            await core_lifecycle.stop()
        finally:
            await LogManager.shutdown()
```

Key effects:

- No nested `try` and no `initialized` flag; the init path is linear and clearly separated.
- The run orchestration (`_build_task`) is self-contained and easier to read.
- Behaviour on init failure and cancellation remains the same as in your current version.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread astrbot/cli/commands/cmd_run.py Outdated
ShutdownCallback = Callable[[signal.Signals], None]


def _install_shutdown_signal_handlers(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the signal handling and shutdown flow in run_astrbot by inlining the logic, reducing task coordination, and centralizing LogManager shutdown.

You can reduce complexity without losing functionality by simplifying both the signal-handling abstraction and the task coordination.

1. Inline signal setup instead of _install_shutdown_signal_handlers

The helper tracks previous handlers and has two code paths, plus a cleanup closure. For this CLI entrypoint, you can keep behavior but make it local and linear:

async def run_astrbot(astrbot_root: Path) -> None:
    ...
    loop = asyncio.get_running_loop()
    shutdown_requested = asyncio.Event()

    def request_shutdown(signum: int) -> None:
        logger.info(f"Received {signal.Signals(signum).name}; stopping AstrBot...")
        shutdown_requested.set()

    for signum in (signal.SIGINT, signal.SIGTERM):
        try:
            loop.add_signal_handler(signum, request_shutdown, signum)
        except (NotImplementedError, RuntimeError):
            signal.signal(signum, lambda s, _f: request_shutdown(s))

This removes:

  • ShutdownCallback and _install_shutdown_signal_handlers
  • previous_handlers, installed, and the cleanup closure
  • The need to thread shutdown_signal state around

If restoring previous handlers is really required, you can still keep that logic inline in run_astrbot, which is easier to reason about than a generic helper.

2. Simplify the two-task coordination

You can avoid asyncio.wait and a separate shutdown_task by directly awaiting the event and then cancelling the main task:

async def run_astrbot(astrbot_root: Path) -> None:
    ...
    runner_task = asyncio.create_task(core_lifecycle.start(), name="astrbot")

    try:
        await shutdown_requested.wait()
        if not runner_task.done():
            runner_task.cancel()
        await runner_task
    finally:
        await LogManager.shutdown()

If you want to log only when a signal actually occurred, keep the logging in request_shutdown as above. This preserves:

  • “run core_lifecycle until a signal”
  • “cancel and await on signal”
  • “always shut down logging”

but removes shutdown_task, asyncio.wait, and the FIRST_COMPLETED + done bookkeeping.

3. Centralise LogManager.shutdown

Right now run_astrbot calls await LogManager.shutdown() and InitialLoader.start also calls it in a finally. To avoid double-shutdown logic spread across layers, consider consolidating it in one place.

For example, keep it only in the CLI:

# in InitialLoader.start()
try:
    ...
finally:
    # remove LogManager.shutdown() from here
    ...

And leave:

try:
    ...
finally:
    await LogManager.shutdown()

in run_astrbot.

This reduces cross-layer coupling and simplifies reasoning about where the global logging system is terminated.

core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)
initialized = False

try:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring the start logic to move initialization and task orchestration into small helpers, removing the nested try and initialized flag while preserving current behavior.

The nested try + initialized flag can be removed without changing behaviour by separating initialization from the run/cancel flow and extracting a small helper for the core+dashboard orchestration.

You can keep all guarantees:

  • Init failures are logged and abort early.
  • stop() is only called if init succeeded.
  • LogManager.shutdown() still always runs.

For example:

class InitialLoader:
    ...

    async def _init_core(self, core_lifecycle: AstrBotCoreLifecycle) -> bool:
        try:
            await core_lifecycle.initialize()
            return True
        except Exception as e:
            logger.critical(traceback.format_exc())
            logger.critical(f"😭 初始化 AstrBot 失败:{e} !!!")
            return False

    def _build_task(self, core_lifecycle: AstrBotCoreLifecycle) -> asyncio.Future:
        core_task = core_lifecycle.start()
        webui_dir = self.webui_dir
        self.dashboard_server = AstrBotDashboard(
            core_lifecycle,
            self.db,
            core_lifecycle.dashboard_shutdown_event,
            webui_dir,
        )
        coro = self.dashboard_server.run()
        if coro:
            return asyncio.gather(core_task, coro)
        return core_task

    async def start(self) -> None:
        core_lifecycle = AstrBotCoreLifecycle(self.log_broker, self.db)

        if not await self._init_core(core_lifecycle):
            # init failed -> nothing to stop, just ensure logs shutdown
            await LogManager.shutdown()
            return

        task = self._build_task(core_lifecycle)

        try:
            await task  # 整个AstrBot在这里运行
        except asyncio.CancelledError:
            logger.info("🌈 正在关闭 AstrBot...")
            await core_lifecycle.stop()
        finally:
            await LogManager.shutdown()

Key effects:

  • No nested try and no initialized flag; the init path is linear and clearly separated.
  • The run orchestration (_build_task) is self-contained and easier to read.
  • Behaviour on init failure and cancellation remains the same as in your current version.

- await and clean up shutdown_task to avoid pending task noise
- treat SIGINT/SIGTERM-triggered CancelledError as normal shutdown
- stop the core on runtime errors while preserving the original exception
- add regression tests for the shutdown paths

Refs AstrBotDevs#8595
@Akokk0
Copy link
Copy Markdown
Author

Akokk0 commented Jun 5, 2026

已根据 review 做了一轮收敛修复:

  • astrbot run 的 shutdown 流程现在会 await 并清理 shutdown_task
  • SIGINT / SIGTERM 触发的 CancelledError 视为正常关闭,不再向外冒
  • 运行期异常时会先 stop() core,再保留原始异常继续抛出
  • 补充了对应回归测试

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

启用文件日志后,AstrBot 退出时出现 resource_tracker leaked semaphore warning

1 participant