Skip to content

Commit 1ad2b2c

Browse files
authored
fix: retry provider stats on sqlite lock
Retry transient SQLite lock failures when persisting internal provider stats.
1 parent 85ec7a9 commit 1ad2b2c

2 files changed

Lines changed: 183 additions & 10 deletions

File tree

astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from collections.abc import AsyncGenerator
66
from dataclasses import replace
77

8+
from sqlalchemy.exc import OperationalError
9+
810
from astrbot.core import db_helper, logger
911
from astrbot.core.agent.message import (
1012
CheckpointData,
@@ -519,6 +521,15 @@ async def _save_to_history(
519521
BLOCKED = {"dGZid2h2d3IuY2xvdWQuc2VhbG9zLmlv", "a291cmljaGF0"}
520522
decoded_blocked = [base64.b64decode(b).decode("utf-8") for b in BLOCKED]
521523

524+
PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS = 3
525+
PROVIDER_STATS_SQLITE_LOCK_RETRY_BASE_DELAY = 0.2
526+
527+
528+
def _is_sqlite_database_locked_error(exc: OperationalError) -> bool:
529+
raw = getattr(exc, "orig", exc)
530+
message = str(raw).lower()
531+
return "database" in message and "locked" in message
532+
522533

523534
async def _record_internal_agent_stats(
524535
event: AstrMessageEvent,
@@ -549,15 +560,35 @@ async def _record_internal_agent_stats(
549560
status = "error"
550561
else:
551562
status = "completed"
552-
553-
await db_helper.insert_provider_stat(
554-
umo=event.unified_msg_origin,
555-
conversation_id=conversation_id,
556-
provider_id=provider_config.get("id", "") or provider.meta().id,
557-
provider_model=provider.get_model(),
558-
status=status,
559-
stats=stats.to_dict(),
560-
agent_type="internal",
561-
)
563+
except asyncio.CancelledError:
564+
raise
562565
except Exception as e:
563566
logger.warning("Persist provider stats failed: %s", e, exc_info=True)
567+
return
568+
569+
for attempt in range(PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS):
570+
last_attempt = attempt == PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS - 1
571+
try:
572+
await db_helper.insert_provider_stat(
573+
umo=event.unified_msg_origin,
574+
conversation_id=conversation_id,
575+
provider_id=provider_config.get("id", "") or provider.meta().id,
576+
provider_model=provider.get_model(),
577+
status=status,
578+
stats=stats.to_dict(),
579+
agent_type="internal",
580+
)
581+
break
582+
except asyncio.CancelledError:
583+
raise
584+
except OperationalError as e:
585+
if _is_sqlite_database_locked_error(e) and not last_attempt:
586+
await asyncio.sleep(
587+
PROVIDER_STATS_SQLITE_LOCK_RETRY_BASE_DELAY * (2**attempt)
588+
)
589+
continue
590+
logger.warning("Persist provider stats failed: %s", e, exc_info=True)
591+
break
592+
except Exception as e:
593+
logger.warning("Persist provider stats failed: %s", e, exc_info=True)
594+
break

tests/unit/test_provider_stats.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import asyncio
12
from types import SimpleNamespace
23

34
import pytest
5+
from sqlalchemy.exc import OperationalError
46
from sqlmodel import select
57

68
from astrbot.core.agent.response import AgentStats
@@ -63,3 +65,143 @@ async def test_record_internal_agent_stats_persists_provider_stat(
6365
assert record.start_time == 100.0
6466
assert record.end_time == 108.5
6567
assert record.time_to_first_token == 0.6
68+
69+
70+
def _provider_stats_recording_args():
71+
event = SimpleNamespace(unified_msg_origin="webchat:FriendMessage:session-42")
72+
req = ProviderRequest(conversation=SimpleNamespace(cid="conv-123"))
73+
provider = SimpleNamespace(
74+
provider_config={"id": "provider-1"},
75+
meta=lambda: SimpleNamespace(id="provider-1", type="openai"),
76+
get_model=lambda: "gpt-4.1",
77+
)
78+
agent_runner = SimpleNamespace(
79+
provider=provider,
80+
stats=AgentStats(),
81+
was_aborted=lambda: False,
82+
)
83+
return event, req, agent_runner, SimpleNamespace(role="assistant")
84+
85+
86+
def _provider_stats_operational_error(message: str) -> OperationalError:
87+
return OperationalError("insert into provider_stats", {}, Exception(message))
88+
89+
90+
@pytest.mark.asyncio
91+
@pytest.mark.parametrize(
92+
"lock_message",
93+
["database is locked", "database table is locked"],
94+
)
95+
async def test_record_internal_agent_stats_retries_transient_database_locks(
96+
monkeypatch: pytest.MonkeyPatch,
97+
lock_message: str,
98+
):
99+
attempts = 0
100+
101+
class LockedOnceDb:
102+
async def insert_provider_stat(self, **kwargs):
103+
nonlocal attempts
104+
attempts += 1
105+
if attempts == 1:
106+
raise _provider_stats_operational_error(lock_message)
107+
return SimpleNamespace(**kwargs)
108+
109+
monkeypatch.setattr(internal, "db_helper", LockedOnceDb())
110+
111+
async def no_sleep(delay: float) -> None:
112+
return None
113+
114+
monkeypatch.setattr(internal.asyncio, "sleep", no_sleep)
115+
116+
await internal._record_internal_agent_stats(
117+
*_provider_stats_recording_args(),
118+
)
119+
120+
assert attempts == 2
121+
122+
123+
@pytest.mark.asyncio
124+
async def test_record_internal_agent_stats_logs_after_exhausting_database_lock_retries(
125+
monkeypatch: pytest.MonkeyPatch,
126+
):
127+
attempts = 0
128+
sleep_delays = []
129+
warnings = []
130+
131+
class AlwaysLockedDb:
132+
async def insert_provider_stat(self, **kwargs):
133+
nonlocal attempts
134+
attempts += 1
135+
raise _provider_stats_operational_error("database is locked")
136+
137+
monkeypatch.setattr(internal, "db_helper", AlwaysLockedDb())
138+
139+
async def record_sleep(delay: float) -> None:
140+
sleep_delays.append(delay)
141+
142+
monkeypatch.setattr(internal.asyncio, "sleep", record_sleep)
143+
monkeypatch.setattr(
144+
internal.logger,
145+
"warning",
146+
lambda *args, **kwargs: warnings.append((args, kwargs)),
147+
)
148+
149+
await internal._record_internal_agent_stats(*_provider_stats_recording_args())
150+
151+
assert attempts == internal.PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS
152+
base_delay = internal.PROVIDER_STATS_SQLITE_LOCK_RETRY_BASE_DELAY
153+
expected_sleep_delays = [
154+
base_delay * (2**attempt)
155+
for attempt in range(internal.PROVIDER_STATS_SQLITE_LOCK_RETRY_ATTEMPTS - 1)
156+
]
157+
assert sleep_delays == expected_sleep_delays
158+
assert len(warnings) == 1
159+
160+
161+
@pytest.mark.asyncio
162+
async def test_record_internal_agent_stats_does_not_retry_other_operational_errors(
163+
monkeypatch: pytest.MonkeyPatch,
164+
):
165+
attempts = 0
166+
warnings = []
167+
168+
class FailingDb:
169+
async def insert_provider_stat(self, **kwargs):
170+
nonlocal attempts
171+
attempts += 1
172+
raise _provider_stats_operational_error("no such table: provider_stats")
173+
174+
monkeypatch.setattr(internal, "db_helper", FailingDb())
175+
monkeypatch.setattr(
176+
internal.logger,
177+
"warning",
178+
lambda *args, **kwargs: warnings.append((args, kwargs)),
179+
)
180+
181+
await internal._record_internal_agent_stats(*_provider_stats_recording_args())
182+
183+
assert attempts == 1
184+
assert len(warnings) == 1
185+
186+
187+
@pytest.mark.asyncio
188+
async def test_record_internal_agent_stats_propagates_cancelled_error(
189+
monkeypatch: pytest.MonkeyPatch,
190+
):
191+
warnings = []
192+
193+
class CancellingDb:
194+
async def insert_provider_stat(self, **kwargs):
195+
raise asyncio.CancelledError
196+
197+
monkeypatch.setattr(internal, "db_helper", CancellingDb())
198+
monkeypatch.setattr(
199+
internal.logger,
200+
"warning",
201+
lambda *args, **kwargs: warnings.append((args, kwargs)),
202+
)
203+
204+
with pytest.raises(asyncio.CancelledError):
205+
await internal._record_internal_agent_stats(*_provider_stats_recording_args())
206+
207+
assert warnings == []

0 commit comments

Comments
 (0)