Skip to content

Conversation

@ArindamRoy23
Copy link
Contributor

@ArindamRoy23 ArindamRoy23 commented Jul 21, 2025

  • Added logic to renew redis lock
  • Added logic to finish conversation in case data is incorrect from directus
  • Figured out a god level error in directus SDK

Summary by CodeRabbit

  • New Features

    • Added robust Redis lock management with automatic renewal and proper release to ensure safe and exclusive conversation processing.
    • Enhanced error handling and validation for external API responses, providing clearer logging and user feedback on failures.
    • Introduced explicit marking of conversations as finished after audio processing.
  • Bug Fixes

    • Prevented redundant or broad data fetches when no conversation IDs are provided, reducing unnecessary processing.
  • Improvements

    • Optimized retrieval of contextual transcripts using efficient batched queries.
    • Improved error and warning messages for better traceability during processing.
    • Refined code style and formatting for consistency and readability.

roy added 2 commits July 18, 2025 19:15
…ement

- Introduced `finish_conversation` and `renew_redis_lock` utility functions for better conversation state management and Redis lock handling.
- Enhanced `DirectusETLPipeline` with improved error logging and response validation.
- Updated `AudioETLPipeline` and `ContextualChunkETLPipeline` to utilize the new utility functions, ensuring Redis locks are maintained during processing.
- Added specific error handling for Directus ETL failures to improve robustness and logging clarity.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 21, 2025

Walkthrough

This update introduces robust Redis lock management and Directus API response validation into the audio LightRAG ETL pipeline. New utility functions for finishing conversations and renewing Redis locks are added, with corresponding integration into pipeline steps. Exception handling and logging are enhanced for Directus failures, and previous transcript retrieval is optimized.

Changes

File(s) Change Summary
echo/server/dembrane/audio_lightrag/main/run_etl.py Replaces direct Directus update with release_redis_lock; adds DirectusException handling; updates imports and error flow.
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py Adds renew_redis_lock call before processing each conversation in the ETL transform step.
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py Adds renew_redis_lock at multiple points; refactors transcript retrieval to batch query; improves error handling/logging.
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py Adds DirectusException, response validation, and failure handler; enhances logging and error handling.
echo/server/dembrane/audio_lightrag/utils/echo_utils.py New utility module with finish_conversation, renew_redis_lock, and release_redis_lock functions for Directus and Redis lock management.
echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py Code style and formatting improvements; query limits changed from fixed large number to no limit (-1); no logic changes.

Sequence Diagram(s)

sequenceDiagram
    participant ETL_Main as run_etl.py
    participant DirectusETL as DirectusETLPipeline
    participant AudioETL as AudioETLPipeline
    participant ContextualETL as ContextualChunkETLPipeline
    participant Utils as echo_utils.py
    participant Redis
    participant Directus

    ETL_Main->>DirectusETL: Run Directus ETL
    DirectusETL->>Directus: Fetch data
    DirectusETL->>DirectusETL: validate_directus_response
    DirectusETL-->>ETL_Main: On DirectusException, log and return False
    ETL_Main->>AudioETL: Run Audio ETL
    loop For each conversation
        AudioETL->>Utils: renew_redis_lock(conversation_id)
        Utils->>Redis: Check/renew lock
        Redis-->>Utils: Lock status
        Utils-->>AudioETL: Lock status
        AudioETL->>AudioETL: Process audio chunks
    end
    ETL_Main->>ContextualETL: Run Contextual Chunk ETL
    loop For each segment
        ContextualETL->>Utils: renew_redis_lock(conversation_id)
        Utils->>Redis: Check/renew lock
        Redis-->>Utils: Lock status
        Utils-->>ContextualETL: Lock status
        ContextualETL->>Directus: Batch fetch previous transcripts
        Directus-->>ContextualETL: Transcript data
        ContextualETL->>ContextualETL: Process segment
    end
    ETL_Main->>Utils: release_redis_lock(conv_id) (for each conversation)
    Utils->>Redis: Delete lock key
    Utils->>Directus: Update conversation as finished (via finish_conversation in failure cases)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • ussaama
  • spashii

LGTM.

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d2f9529 and 02fc5ed.

📒 Files selected for processing (1)
  • echo/server/dembrane/audio_lightrag/main/run_etl.py (5 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
echo/server/dembrane/audio_lightrag/main/run_etl.py (2)
echo/server/dembrane/audio_lightrag/utils/echo_utils.py (2)
  • release_redis_lock (70-82)
  • finish_conversation (24-34)
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (2)
  • DirectusException (15-16)
  • DirectusETLPipeline (19-192)
🔇 Additional comments (2)
echo/server/dembrane/audio_lightrag/main/run_etl.py (2)

11-11: LGTM! Clean utility imports and exception handling setup.

Solid refactoring to centralize Redis lock management and conversation finishing through dedicated utility functions. The DirectusException import enables more granular error handling in the pipeline.

Also applies to: 13-16


88-88: Redis lock cleanup pattern looks solid.

Based on the learning that release_redis_lock returns bool, the list comprehension approach is totally valid here. Clean centralization of lock management through the utility function.

Also applies to: 92-92, 102-102, 112-112, 117-117, 126-126

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/hot-fix-ETL-pipelin

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions
Copy link

Hi !

Thank you for contributing to Dembrane ECHO! Before we consider your Pull Request, we ask that you sign our Contributor License Agreement (CLA). This is only required for your first Pull Request.

Please review the CLA, and sign it by adding your GitHub username to the contributors.yml file. Thanks!

@coderabbitai coderabbitai bot requested review from spashii and ussaama July 21, 2025 15:21
@coderabbitai coderabbitai bot added bug Something isn't working improvement labels Jul 21, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 32c5510 and 033b8c3.

📒 Files selected for processing (5)
  • echo/server/dembrane/audio_lightrag/main/run_etl.py (3 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (2 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (4 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (1 hunks)
  • echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (1)

Learnt from: ArindamRoy23
PR: #113
File: echo/server/dembrane/audio_lightrag/utils/audio_utils.py:50-60
Timestamp: 2025-04-18T08:24:01.975Z
Learning: Audio files processed by the process_audio_files function in echo/server/dembrane/audio_lightrag/utils/audio_utils.py are always expected to be in mp3 format, which is why the format parameter is hardcoded.

🧬 Code Graph Analysis (3)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (1)
echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)
  • renew_redis_lock (23-53)
echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (1)
echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)
  • renew_redis_lock (23-53)
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (2)
echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)
  • finish_conversation (15-20)
echo/server/dembrane/audio_lightrag/utils/process_tracker.py (1)
  • ProcessTracker (4-35)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Cursor BugBot
  • GitHub Check: ci-check-server
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (10)
echo/server/dembrane/audio_lightrag/pipelines/audio_etl_pipeline.py (1)

5-5: LGTM! Smart move adding lock renewal.

Adding renew_redis_lock at the start of each conversation processing loop prevents lock expiration during lengthy audio processing operations. This is exactly what you want in a distributed ETL pipeline.

Also applies to: 61-61

echo/server/dembrane/audio_lightrag/main/run_etl.py (2)

11-16: LGTM! Clean exception handling for Directus failures.

Catching DirectusException separately and returning False instead of raising allows the system to handle Directus-specific failures gracefully without crashing the entire pipeline. This is the right approach for a robust ETL system.

Also applies to: 85-87


113-113: LGTM! Good abstraction with finish_conversation.

Replacing direct Directus API calls with the centralized finish_conversation utility improves maintainability and ensures consistent conversation state updates across the codebase.

echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py (2)

19-19: LGTM! Strategic lock renewal placement.

Adding renew_redis_lock calls before processing audio and non-audio segments ensures the lock stays alive during potentially long transcription and contextualization operations. This prevents race conditions in your distributed system.

Also applies to: 67-67, 173-173


126-126: LGTM! Better error context with segment ID.

Adding the segment ID to error messages improves debuggability when things go sideways.

echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (5)

14-16: LGTM! Clean custom exception.

Simple and effective. Having a specific exception type for Directus failures enables precise error handling upstream.


23-35: LGTM! Comprehensive validation logic.

This validation covers all the bases - null responses, empty responses, error keys, and missing chunks. This is exactly the kind of defensive programming that prevents those "god level errors" mentioned in the PR.


69-82: LGTM! Safe extraction with early return.

Returning empty lists when no conversation IDs are provided prevents accidental full table scans. This is a critical safety improvement.


99-106: LGTM! Proper error flow with validation.

The validation check followed by failure handling and specific exception raising creates a clean error flow. The DirectusException propagates up to run_etl.py where it's handled gracefully.


183-199: LGTM! Excellent failure tracking and cleanup.

The directus_failure method does three things right:

  1. Logs the failure with full context to processing_status
  2. Marks all affected conversations as finished to prevent stuck states
  3. Provides detailed error information for debugging

This is production-grade error handling.

cursor[bot]

This comment was marked as outdated.

- Introduced a private `_get_redis_client` function to manage Redis client instantiation.
- Improved error handling in `finish_conversation` to log failures when updating conversation status.
- Updated `renew_redis_lock` to utilize the new Redis client management function for better consistency.
cursor[bot]

This comment was marked as outdated.

…ogging

- Replaced direct call to create processing status item with a utility function `add_processing_status` for improved code clarity and maintainability.
- Updated conversation limit in the Directus query to -1 for better handling of conversation retrieval.
cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

Copy link
Member

@spashii spashii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left comments, looks good to me.

  • when exception raised and processing is stopped - makes sense to release the lock.
  • use the conversation_id, project_id parameters for add_processing_status, it links the processing log to the relevant object - helps debugging

- Updated `directus_failure` method to improve error handling by logging specific conversation failures and releasing Redis locks.
- Introduced `release_redis_lock` function in `echo_utils.py` to manage Redis lock release more effectively.
- Enhanced error messages for better clarity during ETL pipeline failures.
cursor[bot]

This comment was marked as outdated.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Error Handling Skips Valid Segments

The continue statement at line 91 causes the current segment to be entirely skipped if an error occurs while retrieving previous segments for context. Although previous_contextual_transcript_li is set to [] (line 90) to allow processing with empty context, the continue negates this, unnecessarily skipping segments that could otherwise be processed. This leads to incomplete data and changes the previous behavior where the current segment would still be processed.

echo/server/dembrane/audio_lightrag/pipelines/contextual_chunk_etl_pipeline.py#L87-L91

previous_contextual_transcript_li = []
except Exception as e:
logger.warning(f"Warning: Error in getting previous segments : {e}")
previous_contextual_transcript_li = []
continue

Fix in CursorFix in Web


Bug: Directus Failure Method Fails on Empty Data

The directus_failure method has two issues:

  1. It can raise a TypeError if called with conversations=None (e.g., when the Directus API returns no data), as it attempts to iterate over None.
  2. Unhandled exceptions from finish_conversation or release_redis_lock during iteration can cause the loop to terminate prematurely or skip subsequent cleanup steps for the current conversation, leading to unreleased Redis locks and unupdated processing statuses.

echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py#L183-L193

def directus_failure(self, conversations: List[Dict[str, Any]]) -> None:
for conversation in conversations:
conversation_id = conversation["id"]
finish_conversation(conversation_id)
add_processing_status(
conversation_id=conversation_id,
event="directus_etl_pipeline.failed",
message=f"Directus ETL pipeline failed for conversation due to directus error: {conversation_id}",
)
release_redis_lock(conversation_id)

Fix in CursorFix in Web


Was this report helpful? Give feedback by reacting with 👍 or 👎

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1)

24-33: Past review comment properly addressed.

The error handling for finish_conversation is now robust with proper logging and exception propagation. Good stuff.

echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (2)

51-51: Past review addressed - using limit -1.

Good to see the unlimited fetch implemented as requested in the previous review.


184-192: Clean failure handling with proper status tracking.

The directus_failure method properly marks conversations as finished and logs the failure with the conversation ID as requested in the previous review. This ensures proper audit trail and prevents stuck conversations.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3bba69a and d071887.

📒 Files selected for processing (3)
  • echo/server/dembrane/audio_lightrag/main/run_etl.py (4 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (1 hunks)
  • echo/server/dembrane/audio_lightrag/utils/echo_utils.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
echo/server/dembrane/audio_lightrag/main/run_etl.py (2)
echo/server/dembrane/audio_lightrag/utils/echo_utils.py (2)
  • release_redis_lock (69-81)
  • finish_conversation (24-33)
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (2)
  • DirectusException (15-16)
  • DirectusETLPipeline (19-192)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Cursor Bugbot
  • GitHub Check: ci-check-server
🔇 Additional comments (7)
echo/server/dembrane/audio_lightrag/main/run_etl.py (2)

11-16: LGTM! Clean abstraction.

The centralization of Redis lock management and conversation finishing into utility functions is the way. This reduces code duplication and makes the codebase more maintainable.


85-88: Lock release confirmed necessary

The directus_failure method in echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py only calls finish_conversation and does not invoke release_redis_lock. The explicit lock release in the exception handler (lines 85–88 of run_etl.py) is therefore required and non-duplicative. LGTM.

echo/server/dembrane/audio_lightrag/utils/echo_utils.py (2)

14-22: LGTM! Proper Redis client caching.

The module-level caching with lazy initialization is exactly what we need. This prevents creating new connections on every call. Ship it.


36-81: Rock solid lock management utilities.

Both renew_redis_lock and release_redis_lock are production-ready with proper race condition handling (nx=True), existence checks, and comprehensive error handling. The boolean return values allow callers to handle failures gracefully.

echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (3)

15-35: LGTM! Comprehensive validation logic.

The DirectusException provides proper error categorization, and validate_directus_response covers all the edge cases - null responses, API errors, and missing chunks. This is how you build resilient ETL pipelines.


78-82: Smart defensive programming.

Preventing accidental full conversation fetches by requiring an explicit list is the right call. This protects both Directus and your ETL pipeline from unintended load.


100-106: Fail fast with proper cleanup - this is the way.

Validating early in the transform and calling directus_failure for cleanup before raising the exception ensures no half-processed data remains. Clean error handling.

except Exception as e:
logger.warning(f"Warning: Error in getting previous segments : {e}")
previous_contextual_transcript_li = []
continue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Error Handling Skips Entire Segment

The continue statement on line 91 causes the entire current segment to be skipped if an error occurs while retrieving previous segments for context. This differs from the original behavior, which only skipped individual problematic previous segments. The current implementation prevents the segment from being processed (e.g., transcribed, contextualized, inserted into LightRAG), despite previous_contextual_transcript_li being correctly set to an empty list in the exception handler. The continue should be removed to allow processing to proceed with an empty previous context.

Locations (1)

Fix in CursorFix in Web

except Exception as e:
logger.warning(f"Warning: Error in getting previous segments : {e}")
previous_contextual_transcript_li = []
continue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Context Retrieval Bug Affects Segment Processing

The continue statement in the previous segment retrieval exception handler incorrectly skips processing the current segment entirely when previous context cannot be fetched. The current segment should still be processed with an empty context, as previous_contextual_transcript_li is already set as a fallback. Furthermore, the batch get_items query for previous segments uses sort: ["id"], which can disrupt the chronological order of contextual transcripts, potentially degrading AI model prompt quality.

Locations (1)

Fix in CursorFix in Web

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (1)

184-193: Add resilient error handling for conversation cleanup

LGTM on the cleanup flow, but let's make it bulletproof.

Wrap the operations to ensure all conversations are attempted:

 def directus_failure(self, conversations: List[Dict[str, Any]]) -> None:
     for conversation in conversations:
         conversation_id = conversation["id"]
-        finish_conversation(conversation_id)
-        add_processing_status(
-            conversation_id=conversation_id,
-            event="directus_etl_pipeline.failed",
-            message=f"Directus ETL pipeline failed for conversation due to directus error: {conversation_id}",
-        )
+        try:
+            finish_conversation(conversation_id)
+            add_processing_status(
+                conversation_id=conversation_id,
+                event="directus_etl_pipeline.failed",
+                message=f"Directus ETL pipeline failed for conversation due to directus error: {conversation_id}",
+            )
+        except Exception as e:
+            logger.error(f"Failed to process failure for conversation {conversation_id}: {e}")
📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ea2e3cf and d2f9529.

📒 Files selected for processing (3)
  • echo/server/dembrane/audio_lightrag/main/run_etl.py (5 hunks)
  • echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (1 hunks)
  • echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py (8 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py (2)
echo/frontend/src/lib/directus.ts (1)
  • directus (6-14)
echo/server/dembrane/utils.py (1)
  • get (67-79)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: ci-check-server
🔇 Additional comments (10)
echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py (1)

78-83: Solid defensive programming against full DB fetches

LGTM! This prevents the "god level error" of accidentally fetching all conversations. 🚀

echo/server/dembrane/audio_lightrag/utils/lightrag_utils.py (9)

140-141: LGTM! Infinite limit makes sense for comprehensive data fetching.

The change from a fixed large number (100000) to -1 for unlimited results is solid engineering. This ensures we don't artificially cap data retrieval in the ETL pipeline.


166-167: LGTM! Consistent infinite limit across the codebase.

Same excellent change here - removing arbitrary limits. This aligns perfectly with the similar changes in directus_etl_pipeline.py mentioned in the AI summary.


22-22: LGTM! Proper logger naming convention.

Clean double-quote formatting for the logger name. Consistency FTW.


65-66: LGTM! Clean query formatting and consistent quoting.

The reformatted Directus query with proper double quotes and spacing looks crisp. Good consistency with the rest of the codebase.


187-187: LGTM! Optional type annotation is properly formatted.

The Optional[Callable[[], Any]] type hint is clean and correctly formatted with double quotes throughout.


241-243: LGTM! Excellent logging clarity for distributed lock debugging.

The multi-line formatting of the lock retry logging makes it way more readable. Critical for debugging distributed systems - you'll thank yourself when troubleshooting lock contention at 3 AM.


354-355: LGTM! Directus query cleanup looks solid.

The formatted query structure is much cleaner and easier to read. Good refactoring without changing the core logic.


376-386: LGTM! Bulk conversation metadata fetching is beautifully formatted.

This dictionary comprehension with the reformatted Directus query is clean AF. The multi-line structure makes the query parameters crystal clear.


425-432: LGTM! SQL template formatting is on point.

The UPSERT_TRANSCRIPT template with consistent indentation and trailing comma looks professional. Easy to read and maintain.

Comment on lines +86 to 93
except DirectusException as e:
logger.error(f"Directus ETL pipeline failed: {str(e)}")
[release_redis_lock(conv_id) for conv_id in filtered_conv_ids]
return False
except Exception as e:
logger.error(f"Directus ETL pipeline failed: {str(e)}")
[release_redis_lock(conv_id) for conv_id in filtered_conv_ids]
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Inconsistent error handling between DirectusException and generic exceptions

LGTM on the lock cleanup, but the error handling is inconsistent. DirectusException returns False while other exceptions re-raise. This could mask critical failures.

Consider unified error handling:

 except DirectusException as e:
     logger.error(f"Directus ETL pipeline failed: {str(e)}")
     [release_redis_lock(conv_id) for conv_id in filtered_conv_ids]
-    return False
+    raise

Or if DirectusException is truly recoverable, document why it's treated differently.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except DirectusException as e:
logger.error(f"Directus ETL pipeline failed: {str(e)}")
[release_redis_lock(conv_id) for conv_id in filtered_conv_ids]
return False
except Exception as e:
logger.error(f"Directus ETL pipeline failed: {str(e)}")
[release_redis_lock(conv_id) for conv_id in filtered_conv_ids]
raise
except DirectusException as e:
logger.error(f"Directus ETL pipeline failed: {str(e)}")
[release_redis_lock(conv_id) for conv_id in filtered_conv_ids]
- return False
+ raise
except Exception as e:
logger.error(f"Directus ETL pipeline failed: {str(e)}")
[release_redis_lock(conv_id) for conv_id in filtered_conv_ids]
raise
🤖 Prompt for AI Agents
In echo/server/dembrane/audio_lightrag/main/run_etl.py around lines 86 to 93,
the error handling for DirectusException returns False while other exceptions
are re-raised, causing inconsistency and potential masking of critical errors.
To fix this, unify the error handling by either re-raising both exceptions after
logging and releasing locks or clearly document why DirectusException is
recoverable and handled differently. Adjust the code accordingly to ensure
consistent and clear error management.

Comment on lines +24 to +35
def validate_directus_response(self, response_list: List[Dict[str, Any]]) -> bool:
if response_list is None or len(response_list) == 0:
logger.warning("No response from Directus")
return False
for response in response_list:
if "error" in response.keys():
logger.warning(f"Directus Error: {response['error']}")
return False
if "chunks" in response.keys() and len(response["chunks"]) == 0:
logger.warning(f"No chunks found for conversation: {response['id']}")
return False
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential AttributeError on None response items

LGTM on the validation approach, but watch out for None items in the response list.

Add None check for individual responses:

 for response in response_list:
+    if response is None:
+        logger.warning("None response item in Directus response")
+        return False
     if "error" in response.keys():
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def validate_directus_response(self, response_list: List[Dict[str, Any]]) -> bool:
if response_list is None or len(response_list) == 0:
logger.warning("No response from Directus")
return False
for response in response_list:
if "error" in response.keys():
logger.warning(f"Directus Error: {response['error']}")
return False
if "chunks" in response.keys() and len(response["chunks"]) == 0:
logger.warning(f"No chunks found for conversation: {response['id']}")
return False
return True
def validate_directus_response(self, response_list: List[Dict[str, Any]]) -> bool:
if response_list is None or len(response_list) == 0:
logger.warning("No response from Directus")
return False
for response in response_list:
if response is None:
logger.warning("None response item in Directus response")
return False
if "error" in response.keys():
logger.warning(f"Directus Error: {response['error']}")
return False
if "chunks" in response.keys() and len(response["chunks"]) == 0:
logger.warning(f"No chunks found for conversation: {response['id']}")
return False
return True
🤖 Prompt for AI Agents
In echo/server/dembrane/audio_lightrag/pipelines/directus_etl_pipeline.py around
lines 24 to 35, the method validate_directus_response does not check if
individual items in response_list are None, which can cause AttributeError when
accessing keys. Add a check inside the loop to skip or handle None response
items before accessing their keys to prevent errors.

@spashii spashii merged commit 6f3d7b5 into main Jul 25, 2025
12 checks passed
@spashii spashii deleted the fix/hot-fix-ETL-pipelin branch October 30, 2025 12:02
spashii added a commit that referenced this pull request Nov 18, 2025
* Added logic to renew redis lock
* Added logic to finish conversation in case data is incorrect from directus

---
* Refactor ETL pipelines to improve error handling and Redis lock management

- Introduced `finish_conversation` and `renew_redis_lock` utility functions for better conversation state management and Redis lock handling.
- Enhanced `DirectusETLPipeline` with improved error logging and response validation.
- Updated `AudioETLPipeline` and `ContextualChunkETLPipeline` to utilize the new utility functions, ensuring Redis locks are maintained during processing.
- Added specific error handling for Directus ETL failures to improve robustness and logging clarity.

* Enhance error handling and Redis client management in echo_utils.py

- Introduced a private `_get_redis_client` function to manage Redis client instantiation.
- Improved error handling in `finish_conversation` to log failures when updating conversation status.
- Updated `renew_redis_lock` to utilize the new Redis client management function for better consistency.

* Refactor Directus ETL pipeline error handling and processing status logging

- Replaced direct call to create processing status item with a utility function `add_processing_status` for improved code clarity and maintainability.
- Updated conversation limit in the Directus query to -1 for better handling of conversation retrieval.

* Refactor Directus ETL pipeline error handling and Redis lock management

- Updated `directus_failure` method to improve error handling by logging specific conversation failures and releasing Redis locks.
- Introduced `release_redis_lock` function in `echo_utils.py` to manage Redis lock release more effectively.
- Enhanced error messages for better clarity during ETL pipeline failures.

* Brought all lock handling in one file

* changed finish_conversation to bool for stopping interrupt in case of directus fail

* push updates, ready for release

* add finish conversation call

---------

Co-authored-by: roy <aroy@gmail.com>
Co-authored-by: Sameer Pashikanti <sameer@dembrane.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working improvement

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants