fix: retry provider stats on sqlite lock#8574
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces retry logic with exponential backoff when persisting provider stats fails due to a transient SQLite database lock error. It also adds a unit test to verify this retry behavior. The review feedback suggests making the SQLite lock detection more robust by checking for both 'locked' and 'database' in the error message to capture other lock-related errors like 'database table is locked'.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| def _is_sqlite_database_locked_error(exc: Exception) -> bool: | ||
| return ( | ||
| isinstance(exc, OperationalError) and "database is locked" in str(exc).lower() | ||
| ) |
There was a problem hiding this comment.
The current check for SQLite lock errors specifically looks for the substring 'database is locked'. However, SQLite can also raise other lock-related errors such as 'database table is locked' (corresponding to SQLITE_LOCKED). To make the detection more robust and cover all transient SQLite lock scenarios, we should check for the presence of both 'locked' and 'database' in the error message.
| def _is_sqlite_database_locked_error(exc: Exception) -> bool: | |
| return ( | |
| isinstance(exc, OperationalError) and "database is locked" in str(exc).lower() | |
| ) | |
| def _is_sqlite_database_locked_error(exc: Exception) -> bool: | |
| if not isinstance(exc, OperationalError): | |
| return False | |
| exc_str = str(exc).lower() | |
| return "locked" in exc_str and "database" in exc_str |
There was a problem hiding this comment.
Addressed in 4841c57. The detector now inspects SQLAlchemy/nested exception messages and treats messages containing both database and locked as SQLite lock errors, covering database table is locked; added a regression test for that case.
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- Consider extracting the retry count and backoff durations into named constants or configuration so they can be tuned without changing the function logic and to keep the magic numbers
3and0.2self-documenting. - The
_is_sqlite_database_locked_errorhelper currently checksstr(exc)for'database is locked'; to reduce false positives or missed cases, it may be safer to inspectexc.args, nested exceptions, or the driver-specific attributes rather than relying solely on string matching. - The retry loop catches
Exceptionbroadly; if feasible, narrow this to database-related exception types so that non-database errors (e.g., coding errors) are not silently retried and logged only as warnings.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider extracting the retry count and backoff durations into named constants or configuration so they can be tuned without changing the function logic and to keep the magic numbers `3` and `0.2` self-documenting.
- The `_is_sqlite_database_locked_error` helper currently checks `str(exc)` for `'database is locked'`; to reduce false positives or missed cases, it may be safer to inspect `exc.args`, nested exceptions, or the driver-specific attributes rather than relying solely on string matching.
- The retry loop catches `Exception` broadly; if feasible, narrow this to database-related exception types so that non-database errors (e.g., coding errors) are not silently retried and logged only as warnings.
## Individual Comments
### Comment 1
<location path="astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py" line_range="562" />
<code_context>
- stats=stats.to_dict(),
- agent_type="internal",
- )
- except Exception as e:
- logger.warning("Persist provider stats failed: %s", e, exc_info=True)
+ provider_config = getattr(provider, "provider_config", {}) or {}
</code_context>
<issue_to_address>
**issue (bug_risk):** Catching bare Exception here may unintentionally swallow cancellation and other critical errors.
In this retry loop, `Exception` will also catch `asyncio.CancelledError` and similar control-flow exceptions, turning them into warnings instead of allowing proper cancellation. Please either explicitly re-raise `CancelledError` inside the handler, or narrow the `except` to only the expected DB-related exceptions (e.g., `OperationalError`) so the retry logic only covers transient DB failures and doesn’t mask other serious issues.
</issue_to_address>
### Comment 2
<location path="tests/unit/test_provider_stats.py" line_range="70-79" />
<code_context>
+async def test_record_internal_agent_stats_retries_transient_database_lock(
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test to verify non-`database is locked` exceptions are not retried
Right now we only assert the retry behavior for the lock case. Please also add a test where `insert_provider_stat` raises a different `OperationalError` (or generic `Exception`) on the first call, and verify that it is not retried (e.g., `attempts == 1`) and that `logger.warning` is called. This will ensure only lock-related errors are retried.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The new sqlite lock retry tests for
database is lockedanddatabase table is lockedare nearly identical; consider parametrizing them or sharing theLockedOnceDbsetup to reduce duplication and make future behavior changes easier to keep consistent. - If you expect multiple concurrent processes to hit the sqlite lock path, consider adding a small random jitter to the exponential backoff in
_record_internal_agent_statsto reduce synchronized retries and lock contention.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new sqlite lock retry tests for `database is locked` and `database table is locked` are nearly identical; consider parametrizing them or sharing the `LockedOnceDb` setup to reduce duplication and make future behavior changes easier to keep consistent.
- If you expect multiple concurrent processes to hit the sqlite lock path, consider adding a small random jitter to the exponential backoff in `_record_internal_agent_stats` to reduce synchronized retries and lock contention.
## Individual Comments
### Comment 1
<location path="tests/unit/test_provider_stats.py" line_range="69-78" />
<code_context>
+@pytest.mark.asyncio
</code_context>
<issue_to_address>
**issue (testing):** Add a test that covers the case where all retry attempts are exhausted for a SQLite lock and a warning is emitted
Current tests cover the single-retry success case and the non-lock `OperationalError` case, but not the scenario where `_is_sqlite_database_locked_error` is true on every attempt and the loop fully exhausts `PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS` and emits a warning.
Please add a test (e.g., modeled on `test_record_internal_agent_stats_retries_transient_database_lock`) that:
- Uses a fake DB whose `insert_provider_stat` always raises an `OperationalError` with a "database is locked" message.
- Monkeypatches `internal.asyncio.sleep` to a no-op (optionally asserting the backoff sequence).
- Monkeypatches `internal.logger.warning` to capture calls.
- Asserts the number of attempts equals `PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS` and that a single warning is logged.
This will fully cover the permanent lock path and verify we stop at the configured max attempts and log the failure.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 5 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py" line_range="537-543" />
<code_context>
+ yield from (str(arg) for arg in getattr(exc, "args", ()) if arg)
+
+
+def _is_sqlite_database_locked_error(exc: Exception) -> bool:
+ if not isinstance(exc, OperationalError):
+ return False
+ return any(
+ "database" in message.lower() and "locked" in message.lower()
+ for message in _iter_exception_messages(exc)
+ )
+
</code_context>
<issue_to_address>
**suggestion:** Consider broadening the error type accepted by `_is_sqlite_database_locked_error` or removing the internal `OperationalError` check.
The function is annotated to take a generic `Exception`, but immediately returns `False` unless the input is an `OperationalError`. At the current call site (inside `except OperationalError`), this check adds no value. Either narrow the signature to `OperationalError` and remove the guard, or keep the broader type and traverse wrappers (e.g. `DBAPIError.orig` or the exception cause chain) instead of short‑circuiting on `isinstance(exc, OperationalError)` so it’s reusable in more contexts.
```suggestion
def _is_sqlite_database_locked_error(exc: Exception) -> bool:
"""
Detect whether the given exception (or any wrapped/underlying exception)
corresponds to a SQLite "database is locked" error.
This inspects:
- The exception itself
- Any `orig` attribute commonly used by DBAPI/SQLAlchemy
- The exception `__cause__` and `__context__` chains
and looks for an `OperationalError` whose messages contain both
"database" and "locked" (case-insensitive).
"""
visited: set[int] = set()
current: BaseException | None = exc
while current is not None and id(current) not in visited:
visited.add(id(current))
if isinstance(current, OperationalError) and any(
"database" in message.lower() and "locked" in message.lower()
for message in _iter_exception_messages(current)
):
return True
# Prefer `orig` when present (SQLAlchemy/DBAPI), otherwise walk
# the standard exception chaining attributes.
next_exc = getattr(current, "orig", None)
if next_exc is None or next_exc is current:
next_exc = current.__cause__ or current.__context__
current = next_exc
return False
```
</issue_to_address>
### Comment 2
<location path="astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py" line_range="539-547" />
<code_context>
return
- try:
- provider_config = getattr(provider, "provider_config", {}) or {}
- conversation_id = (
- req.conversation.cid
- if req is not None and req.conversation is not None
- else None
- )
-
- if agent_runner.was_aborted():
- status = "aborted"
- elif final_resp is not None and final_resp.role == "err":
- status = "error"
- else:
- status = "completed"
-
</code_context>
<issue_to_address>
**question (bug_risk):** The removal of the outer try/except means failures before the DB call now propagate instead of being logged and swallowed.
Previously, this function treated any error in `_record_internal_agent_stats` as best-effort: everything from resolving `provider.meta().id` / `provider.get_model()` to `stats.to_dict()` was caught and only logged. Now only errors inside `insert_provider_stat` are handled; anything in the setup (building `provider_config`, `conversation_id`, `status`, or the call args) will propagate. Please either reintroduce a narrow try/except around the pre-DB setup or confirm that callers are okay with these errors surfacing.
</issue_to_address>
### Comment 3
<location path="tests/unit/test_provider_stats.py" line_range="89-98" />
<code_context>
+ assert len(warnings) == 1
+
+
+@pytest.mark.asyncio
+async def test_record_internal_agent_stats_does_not_retry_other_operational_errors(
+ monkeypatch: pytest.MonkeyPatch,
+):
+ attempts = 0
+ warnings = []
+
+ class FailingDb:
+ async def insert_provider_stat(self, **kwargs):
+ nonlocal attempts
+ attempts += 1
+ raise _provider_stats_operational_error("no such table: provider_stats")
+
+ monkeypatch.setattr(internal, "db_helper", FailingDb())
+ monkeypatch.setattr(
+ internal.logger,
+ "warning",
+ lambda *args, **kwargs: warnings.append((args, kwargs)),
+ )
+
+ await internal._record_internal_agent_stats(*_provider_stats_recording_args())
+
+ assert attempts == 1
+ assert len(warnings) == 1
</code_context>
<issue_to_address>
**issue (testing):** Add a test to verify that `asyncio.CancelledError` raised during provider stat persistence is propagated rather than swallowed or logged.
There’s now a dedicated `except asyncio.CancelledError: raise` branch, but no test covering it. Please add a test where `db_helper.insert_provider_stat` raises `asyncio.CancelledError` and assert that `_record_internal_agent_stats` re-raises it (e.g., with `pytest.raises(asyncio.CancelledError)`) and does not emit a warning. This guards against regressions where cancellations are mistakenly handled like other exceptions.
</issue_to_address>
### Comment 4
<location path="tests/unit/test_provider_stats.py" line_range="148-152" />
<code_context>
+ lambda *args, **kwargs: warnings.append((args, kwargs)),
+ )
+
+ await internal._record_internal_agent_stats(*_provider_stats_recording_args())
+
+ assert attempts == internal.PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS
+ assert sleep_delays == [0.2, 0.4]
+ assert len(warnings) == 1
+
</code_context>
<issue_to_address>
**suggestion (testing):** Avoid hard-coding retry delays in the test; derive them from the retry constant to keep tests resilient to configuration changes.
In `test_record_internal_agent_stats_logs_after_exhausting_database_lock_retries`, the expected delays `[0.2, 0.4]` directly mirror the current value of `PROVIDER_STATS_SQLITE_LOCK_RETRY_BASE_DELAY`, so the test will break if that constant changes while behavior remains correct. Instead, compute the expected delays from `internal.PROVIDER_STATS_SQLITE_LOCK_RETRY_BASE_DELAY` (e.g. `[base, base * 2]`) so the test still verifies exponential backoff without being brittle.
```suggestion
await internal._record_internal_agent_stats(*_provider_stats_recording_args())
assert attempts == internal.PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS
base_delay = internal.PROVIDER_STATS_SQLITE_LOCK_RETRY_BASE_DELAY
expected_sleep_delays = [
base_delay * (2**i)
for i in range(internal.PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS - 1)
]
assert sleep_delays == expected_sleep_delays
assert len(warnings) == 1
```
</issue_to_address>
### Comment 5
<location path="astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py" line_range="528" />
<code_context>
+PROVIDER_STATS_SQLITE_LOCK_RETRY_BASE_DELAY = 0.2
+
+
+def _iter_exception_messages(exc: BaseException):
+ yield str(exc)
+ orig = getattr(exc, "orig", None)
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the SQLite lock detection helper and flattening the retry loop control flow to make the error-handling logic easier to read while preserving behavior.
You can keep the new robustness while reducing cognitive load by:
1. **Simplifying the SQLite lock detection**
`_iter_exception_messages` is quite generic but only used once with a very specific predicate. You can inline this logic and just inspect `str(exc)` (and optionally `exc.orig`) directly:
```python
def _is_sqlite_database_locked_error(exc: Exception) -> bool:
if not isinstance(exc, OperationalError):
return False
# Prefer the DB-driver error if present
raw = getattr(exc, "orig", exc)
msg = str(raw).lower()
# Keep behavior but make the intent obvious
return "database" in msg and "locked" in msg
```
This removes an extra iterator abstraction and makes it immediately clear what you’re checking.
2. **Flattening the retry loop control flow**
You can keep the same retry/backoff behavior but separate “is lock error” from “is last attempt” and use `break` instead of multiple `return`s inside the loop:
```python
for attempt in range(PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS):
last_attempt = attempt == PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS - 1
try:
await db_helper.insert_provider_stat(
umo=event.unified_msg_origin,
conversation_id=conversation_id,
provider_id=provider_config.get("id", "") or provider.meta().id,
provider_model=provider.get_model(),
status=status,
stats=stats.to_dict(),
agent_type="internal",
)
break
except asyncio.CancelledError:
raise
except OperationalError as e:
if _is_sqlite_database_locked_error(e) and not last_attempt:
await asyncio.sleep(
PROVIDER_STATS_SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt)
)
continue
logger.warning("Persist provider stats failed: %s", e, exc_info=True)
break
except Exception as e:
logger.warning("Persist provider stats failed: %s", e, exc_info=True)
break
```
Functionally this is equivalent to your current logic (same retry count, same backoff, same logging), but the flow is easier to follow:
- One obvious lock check.
- A clear `last_attempt` flag instead of embedding the attempt bound in the condition.
- A single implicit exit after the loop instead of multiple `return`s scattered inside.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="tests/unit/test_provider_stats.py" line_range="156-152" />
<code_context>
+
+
+@pytest.mark.asyncio
+async def test_record_internal_agent_stats_does_not_retry_other_operational_errors(
+ monkeypatch: pytest.MonkeyPatch,
+):
+ attempts = 0
+ warnings = []
+
+ class FailingDb:
+ async def insert_provider_stat(self, **kwargs):
+ nonlocal attempts
+ attempts += 1
+ raise _provider_stats_operational_error("no such table: provider_stats")
+
+ monkeypatch.setattr(internal, "db_helper", FailingDb())
+ monkeypatch.setattr(
+ internal.logger,
+ "warning",
+ lambda *args, **kwargs: warnings.append((args, kwargs)),
+ )
+
+ await internal._record_internal_agent_stats(*_provider_stats_recording_args())
+
+ assert attempts == 1
+ assert len(warnings) == 1
</code_context>
<issue_to_address>
**issue (testing):** Add a test to ensure asyncio.CancelledError from the DB layer is propagated and not swallowed by the retry logic.
None of the existing tests verify this behavior. Please add a test (e.g. `test_record_internal_agent_stats_propagates_cancelled_error`) where `db_helper.insert_provider_stat` raises `asyncio.CancelledError`, and assert with `pytest.raises(asyncio.CancelledError)` around `_record_internal_agent_stats`. This will help ensure future changes don’t accidentally catch and swallow cancellations in the retry loop.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've left some high level feedback:
- The retry loop in
_record_internal_agent_statsrepeats the samelogger.warningand exception handling logic in multiple branches; consider extracting the retry-with-logging behavior into a small helper or collapsing the duplicatedexcept Exception/OperationalErrorbranches to keep the function shorter and easier to follow. _is_sqlite_database_locked_errorrelies on substring matching in the error message; if possible, prefer checking SQLite-specific error attributes or codes (e.g.,exc.origtype/fields) to reduce the chance of false positives from unrelated messages that contain the words 'database' and 'locked'.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The retry loop in `_record_internal_agent_stats` repeats the same `logger.warning` and exception handling logic in multiple branches; consider extracting the retry-with-logging behavior into a small helper or collapsing the duplicated `except Exception/OperationalError` branches to keep the function shorter and easier to follow.
- `_is_sqlite_database_locked_error` relies on substring matching in the error message; if possible, prefer checking SQLite-specific error attributes or codes (e.g., `exc.orig` type/fields) to reduce the chance of false positives from unrelated messages that contain the words 'database' and 'locked'.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Summary
database is lockederrors when persisting internal provider stats.Refs #8570
Test
uv run pytest tests/unit/test_provider_stats.pyuv run ruff check astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py tests/unit/test_provider_stats.pySummary by Sourcery
Add retry handling and logging for transient SQLite lock errors when persisting internal provider stats, and extend tests to cover the new behavior.
Bug Fixes:
Enhancements:
Tests: