Move cleanup_stale_conversations to main task group#3648
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the cleanup_stale_conversations background task to be part of the main asyncio task group, which is a good improvement for task management and error handling. However, I've identified a potential race condition. The cleanup_stale_conversations function is not safe for concurrent execution, which can occur if a user has multiple active connections. My review includes a comment with a suggestion to add a distributed lock to prevent data inconsistencies.
| stream_transcript_task = asyncio.create_task(stream_transcript_process()) | ||
| record_usage_task = asyncio.create_task(_record_usage_periodically()) | ||
| lifecycle_manager_task = asyncio.create_task(conversation_lifecycle_manager()) | ||
| cleanup_task = asyncio.create_task(cleanup_stale_conversations()) |
There was a problem hiding this comment.
The cleanup_stale_conversations function, which is now being managed as a main task, appears to lack protection against concurrent execution. If a user opens multiple connections (e.g., multiple browser tabs), this task could run in parallel for the same user.
The function processes stale and orphaned conversations. This involves calling _create_conversation, which in turn calls process_conversation. The logic within process_conversation and its helpers (like _extract_memories and _save_action_items) is not idempotent. For instance, they often follow a delete-then-create pattern, which can lead to race conditions and data inconsistencies if executed concurrently on the same conversation.
To prevent this, I recommend implementing a distributed lock (e.g., using Redis) at the beginning of cleanup_stale_conversations to ensure that only one instance of this cleanup logic can run at a time for a specific user.
Example of locking:
async def cleanup_stale_conversations():
lock = redis_db.get_lock(f"lock:cleanup_stale_conversations:{uid}", timeout=60)
if not lock.acquire(blocking=False):
print(f"Cleanup for user {uid} is already in progress. Skipping.")
return
try:
# ... existing function body ...
finally:
lock.release()This would make the cleanup process robust against multiple connections.
* Revert "Move cleanup_stale_conversations to main task group (#3648)" This reverts commit c4399f5. * Revert "R83tq stable stt (#3647)" This reverts commit bd7f5f9. * Revert "Add timeout check for orphaned conversations, status check to create new conversation if not in_progress (#3646)" This reverts commit 6ad9f28. * Revert "Ensuring only the latest in-progress conversation is considered (#3645)" This reverts commit 15b4bb1. * Reduce the complexity on transcribing; faster cleanup with tasks-based
* Revert "Move cleanup_stale_conversations to main task group (BasedHardware#3648)" This reverts commit c4399f5. * Revert "R83tq stable stt (BasedHardware#3647)" This reverts commit bd7f5f9. * Revert "Add timeout check for orphaned conversations, status check to create new conversation if not in_progress (BasedHardware#3646)" This reverts commit 6ad9f28. * Revert "Ensuring only the latest in-progress conversation is considered (BasedHardware#3645)" This reverts commit 15b4bb1. * Reduce the complexity on transcribing; faster cleanup with tasks-based
No description provided.