fix(conversations): bounded ThreadPoolExecutor for background work#4827
fix(conversations): bounded ThreadPoolExecutor for background work#4827
Conversation
…olExecutor Each conversation completion was spawning 7+ raw threading.Thread() calls (save_structured_vector, _extract_memories, _extract_trends, _save_action_items, _update_goal_progress, conversation_created_webhook, update_personas_async). No pooling, no rate limiting. Under sustained load this creates hundreds of concurrent threads, each holding full Conversation objects in memory. Replaced with a bounded ThreadPoolExecutor(max_workers=32) that queues work when all workers are busy instead of spawning unlimited threads. Found during deep memory leak audit (follow-up to PR #4784).
There was a problem hiding this comment.
Code Review
This pull request is a solid improvement for the application's stability and resource management. Replacing unbounded threading.Thread creation with a bounded ThreadPoolExecutor is the correct approach to handle background tasks under sustained load. My review focuses on ensuring this new pattern is implemented robustly, with particular attention to exception handling and resource lifecycle management to prevent silent failures and resource leaks. All original comments are valid and have been kept.
| if not is_reprocess: | ||
| _conversation_bg_executor.submit(save_structured_vector, uid, conversation) | ||
| _conversation_bg_executor.submit(_extract_memories, uid, conversation) | ||
| _conversation_bg_executor.submit(_extract_trends, uid, conversation) | ||
| _conversation_bg_executor.submit(_save_action_items, uid, conversation) | ||
| _conversation_bg_executor.submit(_update_goal_progress, uid, conversation) |
There was a problem hiding this comment.
By switching to ThreadPoolExecutor.submit, any exceptions that occur within the submitted tasks will be swallowed silently if the returned Future object is not checked. This is a critical issue as it can lead to silent failures of background processing, causing data inconsistencies or incomplete conversation processing without any logs.
Several of the submitted functions do not have top-level exception handling, including:
save_structured_vector_extract_memories_save_action_items
To fix this, you should add a try...except block within each of these functions to catch all exceptions and log them properly. For example, in save_structured_vector:
def save_structured_vector(uid: str, conversation: Conversation, update_only: bool = False):
try:
# ... function body ...
except Exception as e:
logging.error(f"Error in background task save_structured_vector for conv {conversation.id}: {e}", exc_info=True)Please apply this pattern to all background tasks submitted to the executor that currently lack robust exception handling.
There was a problem hiding this comment.
Fixed in 9f21832. Added try/except Exception with logging.exception() to all executor-submitted functions:
save_structured_vector_extract_memories_extract_trends_save_action_items(refactored to_save_action_items→_save_action_items_inner)
All exceptions are now logged with full traceback and conversation ID for debugging.
|
|
||
| # Bounded thread pool for post-conversation background work. | ||
| # Prevents thread explosion under sustained load (was spawning 7+ raw threads per conversation). | ||
| _conversation_bg_executor = ThreadPoolExecutor(max_workers=32, thread_name_prefix="conv-bg") |
There was a problem hiding this comment.
The ThreadPoolExecutor is created as a global instance, but there's no corresponding call to shutdown() when the application exits. This can lead to tasks in the queue being lost during application restarts or deployments, causing incomplete background work. It's important to ensure a graceful shutdown.
You can register a shutdown function using atexit to ensure the executor waits for running tasks to complete before the process exits. You will also need to add import atexit at the top of the file.
| _conversation_bg_executor = ThreadPoolExecutor(max_workers=32, thread_name_prefix="conv-bg") | |
| _conversation_bg_executor = ThreadPoolExecutor(max_workers=32, thread_name_prefix="conv-bg") | |
| atexit.register(_conversation_bg_executor.shutdown, wait=True) |
There was a problem hiding this comment.
Fixed in 9f21832. Added import atexit and atexit.register(_conversation_bg_executor.shutdown, wait=True) immediately after executor creation.
| # Update persona prompts with new conversation | ||
| threading.Thread(target=update_personas_async, args=(uid,)).start() | ||
| _conversation_bg_executor.submit(conversation_created_webhook, uid, conversation) | ||
| _conversation_bg_executor.submit(update_personas_async, uid) |
There was a problem hiding this comment.
The function update_personas_async is being submitted to the thread pool, but its implementation in utils/apps.py creates and manages its own threads using threading.Thread and join(). This is an anti-pattern when using a thread pool.
A worker from _conversation_bg_executor will be blocked waiting for the new threads inside update_personas_async to complete. This negates the benefits of the thread pool for this task and can still lead to an uncontrolled number of threads if many conversations are processed concurrently.
It's recommended to refactor update_personas_async to submit its individual persona update tasks (sync_update_persona_prompt) directly to the shared _conversation_bg_executor instead of creating new threads internally.
There was a problem hiding this comment.
Fixed in 9f21832. Replaced the call to update_personas_async (which spawns inner threads) with a new _update_personas_via_pool function that:
- Checks rate limiting via
can_update_persona(uid) - Sets update timestamp via
set_persona_update_timestamp(uid) - Submits each individual persona update to the shared
_conversation_bg_executorpool
This eliminates the nested thread anti-pattern — no more raw threading.Thread creation inside a pool worker.
Chaos Engineering Test Results — Thread ExplosionTest: 50 rapid conversation completions, each spawning 7 background tasks with 2-5s sleep (simulating slow LLM/DB work).
Verdict: PASS — Vulnerable explodes to 350 threads, fixed caps at 32. Reproducer: cd backend/testing/chaos-threadpool/
./run_chaos_test.shTest harness at |
…read fix - Add try/except to all executor-submitted functions to prevent silent failures: save_structured_vector, _extract_memories, _extract_trends, _save_action_items - Register atexit.shutdown(wait=True) for graceful executor cleanup - Replace update_personas_async (spawns raw threads inside pool worker) with _update_personas_via_pool that submits individual persona updates to the shared executor, eliminating the nested thread anti-pattern Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Wraps asyncio.run(auto_sync_action_items_batch) with try/except to prevent silent swallowing of exceptions when submitted via _conversation_bg_executor.submit(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
12 tests covering: - Executor setup: max_workers cap, atexit registration, submit returns Future - Exception handling: logged vs swallowed, wrapped functions don't propagate - Persona pool: rate limiting, empty list, per-persona submission, fault isolation - Boundary: concurrent tasks capped at max_workers, contrast with raw threads Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Closing for now — will revisit and review later. |
|
Hey @beastoin 👋 Thank you so much for taking the time to contribute to Omi! We truly appreciate you putting in the effort to submit this pull request. After careful review, we've decided not to merge this particular PR. Please don't take this personally — we genuinely try to merge as many contributions as possible, but sometimes we have to make tough calls based on:
Your contribution is still valuable to us, and we'd love to see you contribute again in the future! If you'd like feedback on how to improve this PR or want to discuss alternative approaches, please don't hesitate to reach out. Thank you for being part of the Omi community! 💜 |
Summary
threading.Thread().start()per conversation completion withThreadPoolExecutor(max_workers=32)save_structured_vector,_extract_memories,_extract_trends,_save_action_items,_update_goal_progress,conversation_created_webhook,update_personas_async,_run_auto_syncPart of #4825 (Fix 2/3). Follow-up to PR #4784.
Test plan
🤖 Generated with Claude Code