fix: Prevent SQLite corruption in parallel mode with atomic operations#108
fix: Prevent SQLite corruption in parallel mode with atomic operations#108cabana8471-arch wants to merge 11 commits intoAutoForgeAI:masterfrom
Conversation
Fixes AutoForgeAI#106 ## Problem When running multiple agents in parallel (up to 5), database corruption occurs: - `sqlite3.DatabaseError: database disk image is malformed` - `PRAGMA integrity_check` shows btreeInitPage errors ## Root Causes 1. `threading.Lock()` in feature_mcp.py only works per-process, not cross-process 2. Read-modify-write patterns in 6+ MCP tools cause race conditions 3. Orchestrator engine never disposed - connection pool not cleaned up 4. No WAL checkpoint on agent exit causes stale cache issues 5. assistant.db engine cache has no locking ## Solution - Replace read-modify-write patterns with atomic `UPDATE ... WHERE` clauses - Use SQLite IMMEDIATE transactions for consistent reads during cycle detection - Add WAL checkpoint before engine dispose on shutdown - Add thread-safe locking to assistant.db engine cache - Add proper connection timeout settings (30s) throughout ## Changes - api/database.py: Add atomic_transaction(), atomic_claim_feature(), etc. - mcp_server/feature_mcp.py: Fix all high-risk tools with atomic SQL - parallel_orchestrator.py: Add cleanup() with WAL checkpoint, signal handlers - progress.py: Add _get_connection() with timeout settings - server/services/assistant_database.py: Add thread-safe cache locking Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📝 WalkthroughWalkthroughAdds SQLAlchemy 2.0-style declarative base and an explicit Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant MCP as mcp_server.feature_mcp
participant API as api.database
participant DB as SQLite
Client->>MCP: request to claim feature (id)
MCP->>API: atomic_claim_feature(session_maker, id)
API->>DB: BEGIN IMMEDIATE
DB-->>API: lock acquired
API->>DB: UPDATE features SET in_progress=1 WHERE id=? AND conditions...
DB-->>API: affected_rows (1 or 0)
API->>DB: COMMIT
API-->>MCP: return success/failure with feature payload
MCP-->>Client: respond with claim result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
progress.py (1)
65-74: Ensure SQLite connections close on error paths.
Ifcursor.execute(...)raises,conn.close()is skipped, which can leak file descriptors and keep locks longer than needed. Consider wrapping connections withcontextlib.closing(or explicittry/finally) in all three call sites.♻️ Suggested fix (apply same pattern to the other functions)
+from contextlib import closing ... - try: - conn = _get_connection(db_file) - cursor = conn.cursor() - cursor.execute("SELECT COUNT(*) FROM features") - count = cursor.fetchone()[0] - conn.close() - return count > 0 - except Exception: - return False + try: + with closing(_get_connection(db_file)) as conn: + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM features") + count = cursor.fetchone()[0] + return count > 0 + except Exception: + return FalseAlso applies to: 93-126, 145-158
🤖 Fix all issues with AI agents
In `@mcp_server/feature_mcp.py`:
- Around line 37-45: The import block is unsorted causing ruff errors; reorder
the names imported from api.database so they are alphabetically sorted (e.g.,
adjust the import that currently lists Feature, create_database,
atomic_transaction to list them in alphabetical order: atomic_transaction,
create_database, Feature) and ensure the overall import group ordering follows
standard (stdlib, third-party, local) so FastMCP, BaseModel, Field, text,
sys.path insertion, then the sorted api.database import and subsequent
dependency_resolver imports.
- Around line 417-426: In the result.rowcount == 0 handling block, remove the
unnecessary f-string prefix on the final error message; change the return
json.dumps({"error": f"Failed to mark feature in-progress for unknown reason"})
to a plain string literal (no f) so it reads return json.dumps({"error": "Failed
to mark feature in-progress for unknown reason"}); keep the other messages that
interpolate feature_id (e.g., the returns for Feature not found, already
passing, already in-progress) as f-strings and leave session.query(Feature) and
feature_id usage unchanged.
- Sort imports alphabetically in feature_mcp.py (I001) - Remove unnecessary f-string prefix (F541) - Use contextlib.closing to ensure SQLite connections close on error paths Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Ruff requires classes (uppercase) to come before functions (lowercase): Feature, atomic_transaction, create_database Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Combined: - Stuck features clearing at orchestrator startup (my-features) - Signal handlers and atexit cleanup (PR AutoForgeAI#108) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…orgeAI#100, AutoForgeAI#108, AutoForgeAI#109, AutoForgeAI#110 PR AutoForgeAI#110 (Quality Gates): - Move quality checks before DB session to avoid holding locks - Return error instead of None for missing configured custom script - Use contextlib.closing for SQLite connections in progress.py PR AutoForgeAI#109 (Rate Limit): - Extract rate limit logic to shared rate_limit_utils.py module - Remove duplicated code from agent.py and test_agent.py PR AutoForgeAI#108 (SQLite Parallel): - Sort imports alphabetically in feature_mcp.py PR AutoForgeAI#100 (Config Diagnostics): - Add logger.warning for pkill_processes validation failures PR AutoForgeAI#95 (Infrastructure Mock): - Add language tags to fenced code blocks in initializer template Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…orgeAI#100, AutoForgeAI#108, AutoForgeAI#109, AutoForgeAI#110 PR AutoForgeAI#110 (Quality Gates): - Move quality checks before DB session to avoid holding locks - Return error instead of None for missing configured custom script - Use contextlib.closing for SQLite connections in progress.py PR AutoForgeAI#109 (Rate Limit): - Extract rate limit logic to shared rate_limit_utils.py module - Remove duplicated code from agent.py and test_agent.py PR AutoForgeAI#108 (SQLite Parallel): - Sort imports alphabetically in feature_mcp.py PR AutoForgeAI#100 (Config Diagnostics): - Add logger.warning for pkill_processes validation failures PR AutoForgeAI#95 (Infrastructure Mock): - Add language tags to fenced code blocks in initializer template Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Hey @cabana8471-arch! Thanks for tackling this - SQLite corruption in parallel mode is a serious issue and the core approach here (replacing I did a thorough review and found a few things worth addressing before merging. Splitting them into must-fix and should-fix: Critical (Must Fix)1. Signal handler performs unsafe database operations
The signal handler calls This is a documented real-world issue. Suggested fix: Use a flag-based approach instead: def signal_handler(signum, frame):
orchestrator._shutdown_requested = True
orchestrator.is_running = FalseThen let the main 2.
|
…afety Changes per leonvanzyl and CodeRabbit feedback: Signal handling (parallel_orchestrator.py): - Make signal handler async-safe (only sets flags, no I/O) - Add _shutdown_requested flag for graceful shutdown - Remove SIGINT handler to let KeyboardInterrupt work naturally - Make cleanup() idempotent by clearing _engine first - Fix Optional type hints for mypy compliance SQLite transactions (api/database.py): - Add SQLAlchemy event hooks for BEGIN IMMEDIATE transactions - This replaces fragile pysqlite implicit transaction handling - Future-proof for Python 3.16 when legacy mode is removed - Fix rollback exception swallowing in atomic_transaction() - Remove ~150 lines of dead code (unused atomic_* functions) Type safety improvements: - Migrate to SQLAlchemy 2.0 DeclarativeBase (database.py, registry.py) - Fix all mypy errors in modified files - Add explicit type annotations for Optional parameters Testing: - Add test_atomic_operations.py with 7 concurrency tests - Tests verify atomic claims, priority updates, cleanup idempotency Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…afety Changes per leonvanzyl and CodeRabbit feedback: Signal handling (parallel_orchestrator.py): - Make signal handler async-safe (only sets flags, no I/O) - Add _shutdown_requested flag for graceful shutdown - Remove SIGINT handler to let KeyboardInterrupt work naturally - Make cleanup() idempotent by clearing _engine first - Fix Optional type hints for mypy compliance SQLite transactions (api/database.py): - Add SQLAlchemy event hooks for BEGIN IMMEDIATE transactions - This replaces fragile pysqlite implicit transaction handling - Future-proof for Python 3.16 when legacy mode is removed - Fix rollback exception swallowing in atomic_transaction() - Remove ~150 lines of dead code (unused atomic_* functions) Type safety improvements: - Migrate to SQLAlchemy 2.0 DeclarativeBase (database.py, registry.py) - Fix all mypy errors in modified files - Add explicit type annotations for Optional parameters Testing: - Add test_atomic_operations.py with 7 concurrency tests - Tests verify atomic claims, priority updates, cleanup idempotency Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@api/database.py`:
- Around line 496-513: The code currently attempts session.execute(text("BEGIN
EXCLUSIVE")) when isolation_level == "EXCLUSIVE", but the SQLAlchemy "begin"
event hook already issues "BEGIN IMMEDIATE", causing a nested-transaction error;
fix by removing the explicit BEGIN EXCLUSIVE and do not support starting
EXCLUSIVE at the session level—either (preferred) map the "EXCLUSIVE" argument
to "IMMEDIATE" (treat isolation_level == "EXCLUSIVE" as "IMMEDIATE") in the
atomic_transaction/session creation logic, or immediately raise a clear
ValueError when callers pass "EXCLUSIVE"; update references to isolation_level
handling and remove session.execute(text("BEGIN EXCLUSIVE")) so the existing
begin event hook (which issues BEGIN IMMEDIATE) is the single source of BEGIN
semantics.
🧹 Nitpick comments (1)
test_atomic_operations.py (1)
218-280: Clarify: Test validates serialization, not parallel reads.The test comment says "IMMEDIATE prevents stale reads" but the expected behavior (
initial + 200) actually validates that transactions serialize properly - the second transaction waits for the first to complete, then reads the updated value.This is correct behavior for
BEGIN IMMEDIATE, but the test name/comments could be clearer. With IMMEDIATE, the second transaction blocks until the first commits, so each thread reads the previously committed value (not a stale value). This is serialization via write-lock contention, not snapshot isolation.Suggested clarification in docstring
class TestAtomicTransactionIsolation: - """Test that atomic_transaction with IMMEDIATE prevents stale reads.""" + """Test that atomic_transaction with IMMEDIATE serializes concurrent writes.""" def test_read_modify_write_isolation(self, temp_db): - """Verify IMMEDIATE transaction prevents stale read in read-modify-write.""" + """Verify IMMEDIATE transactions serialize, preventing lost updates.""" engine, session_maker = temp_db - # This test verifies that two concurrent read-modify-write operations - # don't both read the same value and create a conflict + # This test verifies that IMMEDIATE transactions serialize correctly. + # The second transaction blocks until the first commits, then reads + # the updated value. This prevents lost updates.
| session = session_maker() | ||
| try: | ||
| # For EXCLUSIVE mode, override the default IMMEDIATE from event hooks | ||
| # For IMMEDIATE mode, the event hooks handle BEGIN IMMEDIATE automatically | ||
| if isolation_level == "EXCLUSIVE": | ||
| session.execute(text("BEGIN EXCLUSIVE")) | ||
| # Note: For IMMEDIATE, we don't issue BEGIN here - the event hook handles it | ||
| # This prevents the fragile "BEGIN on already-begun transaction" issue | ||
| yield session | ||
| session.commit() | ||
| except Exception: | ||
| try: | ||
| session.rollback() | ||
| except Exception: | ||
| pass # Don't let rollback failure mask original error | ||
| raise | ||
| finally: | ||
| session.close() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if EXCLUSIVE mode is actually used anywhere in the codebase
echo "=== Searching for EXCLUSIVE mode usage ==="
rg -n "EXCLUSIVE" --type=py -B 2 -A 2
echo -e "\n=== Searching for isolation_level assignments ==="
rg -n 'isolation_level' --type=py -B 2 -A 2
echo -e "\n=== Looking for atomic_transaction context manager ==="
rg -n "atomic_transaction\|@atomic_transaction" --type=py -B 2 -A 2 | head -50Repository: leonvanzyl/autocoder
Length of output: 6215
🏁 Script executed:
#!/bin/bash
# Find the event hook implementation
echo "=== Looking for do_begin event hook ==="
rg -n "do_begin\|event.listen\|before_cursor_execute" --type=py -B 3 -A 5Repository: leonvanzyl/autocoder
Length of output: 103
🏁 Script executed:
#!/bin/bash
# Check the database.py file structure to find event hook setup
echo "=== Examining api/database.py for event hooks and session setup ==="
head -100 api/database.pyRepository: leonvanzyl/autocoder
Length of output: 3621
🏁 Script executed:
#!/bin/bash
echo "=== Looking for _configure_sqlite_immediate_transactions function ==="
rg -n "_configure_sqlite_immediate_transactions" --type=py -A 20
echo -e "\n=== Searching for event.listen calls ==="
rg -n "event\.listen" --type=py -B 2 -A 5Repository: leonvanzyl/autocoder
Length of output: 5510
🏁 Script executed:
#!/bin/bash
echo "=== Searching for BEGIN IMMEDIATE in database.py ==="
rg -n "BEGIN IMMEDIATE" api/database.py -B 3 -A 3
echo -e "\n=== Full context around lines 350-420 in api/database.py ==="
sed -n '350,420p' api/database.pyRepository: leonvanzyl/autocoder
Length of output: 5057
🏁 Script executed:
#!/bin/bash
echo "=== Check lines around the atomic_transaction function and event setup ==="
sed -n '460,520p' api/database.pyRepository: leonvanzyl/autocoder
Length of output: 2522
EXCLUSIVE mode causes "cannot start a transaction within a transaction" error.
When isolation_level == "EXCLUSIVE", the code executes session.execute(text("BEGIN EXCLUSIVE")) at line 501. However, SQLAlchemy's "begin" event (lines 367-370) fires and executes BEGIN IMMEDIATE before this statement runs. Since autocommit=False on the sessionmaker, attempting BEGIN EXCLUSIVE within an already-started transaction causes SQLite to error.
This affects the actual usage in feature_mcp.py line 564, which calls atomic_transaction(_session_maker, "EXCLUSIVE").
Proposed fix
Either:
- Disable EXCLUSIVE mode support and only allow IMMEDIATE:
session = session_maker()
try:
- # For EXCLUSIVE mode, override the default IMMEDIATE from event hooks
- # For IMMEDIATE mode, the event hooks handle BEGIN IMMEDIATE automatically
- if isolation_level == "EXCLUSIVE":
- session.execute(text("BEGIN EXCLUSIVE"))
- # Note: For IMMEDIATE, we don't issue BEGIN here - the event hook handles it
- # This prevents the fragile "BEGIN on already-begun transaction" issue
+ if isolation_level != "IMMEDIATE":
+ raise ValueError(f"Only IMMEDIATE isolation level is supported, got: {isolation_level}")
yield session- Or conditionally disable the event hook for EXCLUSIVE mode connections (requires connection-level isolation which adds complexity).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| session = session_maker() | |
| try: | |
| # For EXCLUSIVE mode, override the default IMMEDIATE from event hooks | |
| # For IMMEDIATE mode, the event hooks handle BEGIN IMMEDIATE automatically | |
| if isolation_level == "EXCLUSIVE": | |
| session.execute(text("BEGIN EXCLUSIVE")) | |
| # Note: For IMMEDIATE, we don't issue BEGIN here - the event hook handles it | |
| # This prevents the fragile "BEGIN on already-begun transaction" issue | |
| yield session | |
| session.commit() | |
| except Exception: | |
| try: | |
| session.rollback() | |
| except Exception: | |
| pass # Don't let rollback failure mask original error | |
| raise | |
| finally: | |
| session.close() | |
| session = session_maker() | |
| try: | |
| if isolation_level != "IMMEDIATE": | |
| raise ValueError(f"Only IMMEDIATE isolation level is supported, got: {isolation_level}") | |
| yield session | |
| session.commit() | |
| except Exception: | |
| try: | |
| session.rollback() | |
| except Exception: | |
| pass # Don't let rollback failure mask original error | |
| raise | |
| finally: | |
| session.close() |
🤖 Prompt for AI Agents
In `@api/database.py` around lines 496 - 513, The code currently attempts
session.execute(text("BEGIN EXCLUSIVE")) when isolation_level == "EXCLUSIVE",
but the SQLAlchemy "begin" event hook already issues "BEGIN IMMEDIATE", causing
a nested-transaction error; fix by removing the explicit BEGIN EXCLUSIVE and do
not support starting EXCLUSIVE at the session level—either (preferred) map the
"EXCLUSIVE" argument to "IMMEDIATE" (treat isolation_level == "EXCLUSIVE" as
"IMMEDIATE") in the atomic_transaction/session creation logic, or immediately
raise a clear ValueError when callers pass "EXCLUSIVE"; update references to
isolation_level handling and remove session.execute(text("BEGIN EXCLUSIVE")) so
the existing begin event hook (which issues BEGIN IMMEDIATE) is the single
source of BEGIN semantics.
These imports are needed for the default_concurrency column and migration. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add explicit encoding="utf-8" and errors="replace" to popen_kwargs dict in parallel_orchestrator.py (3 locations) to fix Windows CP1252 encoding issues. Closes leonvanzyl#138 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Hey @cabana8471-arch — thanks for this PR and the thorough work on diagnosing the SQLite corruption issue in parallel mode. Your root cause analysis was spot-on: We've gone ahead and implemented the core improvements from this PR directly on master, incorporating the best parts while addressing a few issues we found during review. Here's what we took and what we adjusted: What we implemented (inspired by this PR)
What we fixed vs this PR
What we left out
Thanks again for identifying these issues and proposing solid solutions. The atomic SQL pattern and event hook approach were exactly right, and this PR was instrumental in getting these fixes onto master. We're going to close this PR since the changes have been incorporated, but your contribution is very much appreciated! 🙏 |
|
Closing this PR as the core improvements have been implemented directly on master (commit d8a8c83). See the detailed comment above for what was incorporated. Thanks again for the excellent work on this! |
Replace ineffective threading.Lock() with atomic SQL operations for cross-process safety. Key changes: - Add SQLAlchemy event hooks (do_connect/do_begin) for BEGIN IMMEDIATE transactions in api/database.py - Add atomic_transaction() context manager for multi-statement ops - Convert all feature MCP write operations to atomic UPDATE...WHERE with compare-and-swap patterns (feature_claim, mark_passing, etc.) - Add WHERE passes=0 state guard to feature_mark_passing - Add WAL checkpoint on shutdown and idempotent cleanup() in parallel_orchestrator.py with async-safe signal handling - Wrap SQLite connections with contextlib.closing() in progress.py - Add thread-safe engine cache with double-checked locking in assistant_database.py - Migrate to SQLAlchemy 2.0 DeclarativeBase across all modules Inspired by PR #108 (cabana8471-arch), with fixes for nested BEGIN EXCLUSIVE bug and missing state guards. Closes #106 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
sqlite3.DatabaseError: database disk image is malformed) when running multiple agents in parallelFixes #106
Root Causes Addressed
threading.Lock()useless cross-process - Removed and replaced with atomic SQLUPDATE ... WHEREclausescleanup()method with signal handlersPRAGMA wal_checkpoint(FULL)before disposeChanges
api/database.pyatomic_transaction()context manager withBEGIN IMMEDIATE/EXCLUSIVEatomic_claim_feature(),atomic_mark_passing(),atomic_update_priority_to_end()mcp_server/feature_mcp.py_priority_lock = threading.Lock()feature_skip- atomic priority update with subqueryfeature_mark_in_progress- atomicUPDATE WHERE in_progress=0feature_claim_and_get- atomic claimfeature_create_bulk- EXCLUSIVE transaction for priority rangefeature_add_dependency- IMMEDIATE transaction for cycle detectionparallel_orchestrator.pycleanup()method with WAL checkpoint before disposeatexitand signal handlers (SIGTERM, SIGINT)finallyblockprogress.py_get_connection()helper withtimeout=30andPRAGMA busy_timeout=30000server/services/assistant_database.pythreading.Lock()for engine cacheTest plan
PRAGMA integrity_checkreturns "ok"SELECT priority, COUNT(*) FROM features GROUP BY priority HAVING COUNT(*) > 1Migration
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.