-
Notifications
You must be signed in to change notification settings - Fork 582
UN-2663 [FEAT] moving away from redis hitl file handling improvements #1550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
UN-2663 [FEAT] moving away from redis hitl file handling improvements #1550
Conversation
…dling-improvements
…dling-improvements
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings. WalkthroughAdds HITL support: destination propagates Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Destination as endpoint_v2.destination
participant QueueUtils
participant HITLConnector as HITL_Connector
participant Queue as Queue_Instance
Client->>Destination: enqueue request
Destination->>Destination: resolve organization_id (UserContext)
Destination->>QueueUtils: get_queue_inst({use_hitl_backend: true, organization_id})
alt HITL path
QueueUtils->>QueueUtils: get_hitl_queue_inst(backend, settings)
alt backend == PostgreSQL or Hybrid
QueueUtils->>QueueUtils: _import_hitl_connector(name)
QueueUtils->>HITLConnector: instantiate(settings)
HITLConnector-->>QueueUtils: return Queue_Instance
else backend == Redis
QueueUtils->>QueueUtils: clear HITL flag and delegate to standard get_queue_inst
else unknown/missing
QueueUtils-->>Destination: raise UnstractQueueException
end
else non-HITL path
QueueUtils->>QueueUtils: standard get_queue_inst flow
end
QueueUtils-->>Destination: Queue_Instance
Destination->>Queue: enqueue(payload, ttl?, actor_id=None, organization_id)
Queue-->>Destination: enqueue result
Destination-->>Client: acknowledgement
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (4)
unstract/filesystem/src/unstract/filesystem/file_storage_config.py (1)
31-32: Confirm SHARED_TEMPORARY aligns with HITL data handling.If HITL artifacts may include PII, confirm SHARED_TEMPORARY meets isolation, retention, and encryption expectations; otherwise consider a dedicated storage tier/bucket/namespace.
backend/workflow_manager/endpoint_v2/queue_utils.py (3)
28-41: Avoid mutable default for connector_settingsUse None default and initialize inside to satisfy linters and prevent shared state.
- def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue: + def get_queue_inst(connector_settings: dict[str, Any] | None = None) -> UnstractQueue: + if connector_settings is None: + connector_settings = {} """Get queue connector instance based on configuration.
53-56: Avoid mutable default for connector_settings (HITL path)Same fix for get_hitl_queue_inst.
- def get_hitl_queue_inst( - backend: str, connector_settings: dict[str, Any] = {} - ) -> UnstractQueue: + def get_hitl_queue_inst( + backend: str, connector_settings: dict[str, Any] | None = None + ) -> UnstractQueue: + if connector_settings is None: + connector_settings = {}
95-109: Prefer logger.exception and preserve traceback withfrom eImproves observability and complies with linter hints.
- except ImportError as e: - logger.error( - f"HITL queue backend '{backend}' not available: {e}. " - f"Make sure 'pluggable_apps.manual_review_v2' is installed and configured." - ) - raise UnstractQueueException( - detail=f"HITL queue backend '{backend}' not available. " - f"Please install the manual_review_v2 app or use 'redis' backend." - ) - except Exception as e: - logger.error(f"Failed to initialize HITL queue backend '{backend}': {e}") - raise UnstractQueueException( - detail=f"Failed to initialize HITL queue backend '{backend}': {str(e)}" - ) + except ImportError as e: + logger.exception( + "HITL queue backend '%s' not available: %s. Make sure 'pluggable_apps.manual_review_v2' is installed and configured.", + backend, e + ) + raise UnstractQueueException( + detail=f"HITL queue backend '{backend}' not available. " + f"Please install the manual_review_v2 app or use 'redis' backend." + ) from e + except Exception as e: + logger.exception("Failed to initialize HITL queue backend '%s': %s", backend, e) + raise UnstractQueueException( + detail=f"Failed to initialize HITL queue backend '{backend}': {str(e)}" + ) from e
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (4)
backend/workflow_manager/endpoint_v2/destination.py(2 hunks)backend/workflow_manager/endpoint_v2/queue_utils.py(4 hunks)unstract/filesystem/src/unstract/filesystem/file_storage_config.py(1 hunks)unstract/filesystem/src/unstract/filesystem/file_storage_types.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
backend/workflow_manager/endpoint_v2/queue_utils.py
54-54: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
96-99: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
100-103: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
104-104: Do not catch blind exception: Exception
(BLE001)
105-105: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
106-108: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
107-107: Use explicit conversion flag
Replace with conversion flag
(RUF010)
141-143: Avoid specifying long messages outside the exception class
(TRY003)
147-149: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (2)
unstract/filesystem/src/unstract/filesystem/file_storage_types.py (1)
7-7: Add brief usage note or doc where this enum is documented.LGTM. Please ensure HITL_FILES is referenced in any docs/samples that enumerate FileStorageType values.
backend/workflow_manager/endpoint_v2/destination.py (1)
802-816: Verify HITL queue connector API supports enqueue_with_ttl; prefer self.organization_id
- UnstractQueue does not declare enqueue_with_ttl and the repo has no implementations — confirm pluggable HITL connectors (HybridQueue/PostgreSQLQueue) expose enqueue_with_ttl(queue_name, message, ttl_seconds=None, actor_id=None) or add the method/guard the call to avoid AttributeError.
- Replace re-fetch of org id with self.organization_id in backend/workflow_manager/endpoint_v2/destination.py (occurrences near lines ~803 and ~864):
- organization_id = UserContext.get_organization_identifier() + organization_id = self.organization_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/workflow_manager/endpoint_v2/queue_utils.py (1)
54-78: Avoid mutable default for connector_settingsUsing
{}as a default argument means every call without an explicit settings dict shares the same object, which can lead to surprising cross-call mutations if a connector modifies the dict. Please default toNoneand instantiate inside the function.- backend: str, connector_settings: dict[str, Any] = {} + backend: str, connector_settings: dict[str, Any] | None = None ) -> UnstractQueue: @@ - if backend == "redis": + connector_settings = connector_settings or {} + if backend == "redis": # Strip HITL flag to force default (non-HITL) connector path non_hitl_settings = dict(connector_settings)Based on static analysis hints.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/sample.env(1 hunks)backend/workflow_manager/endpoint_v2/queue_utils.py(3 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
backend/workflow_manager/endpoint_v2/queue_utils.py
25-25: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
28-28: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
54-54: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
99-102: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
103-106: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
107-107: Do not catch blind exception: Exception
(BLE001)
108-108: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
109-111: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
110-110: Use explicit conversion flag
Replace with conversion flag
(RUF010)
142-144: Avoid specifying long messages outside the exception class
(TRY003)
148-150: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (1)
backend/sample.env (1)
204-204: LGTM: HITL storage credentials sample entryThanks for wiring up the sample credentials; this keeps the sample env aligned with the new HITL_FILES storage configuration.
…dling-improvements
chandrasekharan-zipstack
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT comments can be resolved later
jaseemjaskp
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…dling-improvements Signed-off-by: vishnuszipstack <117254672+vishnuszipstack@users.noreply.github.com>
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
backend/sample.env (1)
204-204: Consider adding a documentation comment.The new
HITL_FILES_FILE_STORAGE_CREDENTIALSenvironment variable lacks an explanatory comment. Adding a brief comment (similar to those for other sections like "X2Text Service" or "Prompt Studio") would improve clarity for developers.Example:
+# HITL (Human-In-The-Loop) Files Storage HITL_FILES_FILE_STORAGE_CREDENTIALS='{"provider": "minio", "credentials": {"endpoint_url": "http://unstract-minio:9000", "key": "minio", "secret": "minio123"}}'Operational note: While reusing the same MinIO instance and credentials is appropriate for this sample configuration, production deployments should consider isolating HITL file storage with separate credentials or dedicated storage to enhance security boundaries and enable independent scaling/lifecycle management.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/sample.env(1 hunks)backend/workflow_manager/endpoint_v2/destination.py(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
|
…dling-improvements
…dling-improvements
|
…dling-improvements
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @backend/workflow_manager/endpoint_v2/queue_utils.py:
- Line 54: The function parameter uses a mutable default (connector_settings:
dict[str, Any] = {}) which can cause shared-state bugs; change the signature to
accept connector_settings: Optional[dict[str, Any]] = None (add typing.Optional
import) and inside the function set connector_settings = {} if
connector_settings is None; apply the same change to the related get_queue_inst
function if it has the same pattern so both use None-check initialization.
- Line 28: The function get_queue_inst currently uses a mutable default
connector_settings: dict[str, Any] = {}, which can lead to shared-state bugs;
change the signature to accept connector_settings: dict[str, Any] | None = None
(or Optional[dict[str, Any]] = None) and inside get_queue_inst set
connector_settings = connector_settings or {} before using it so each call gets
a fresh dict; update any callers or tests if they relied on the old default
behavior.
- Around line 24-25: The class-level mutable registry _hitl_connectors should be
annotated as a typing.ClassVar[dict] and protected with a class-level
threading.Lock to avoid race conditions during lazy initialization; add a
ClassVar lock (e.g., _hitl_connectors_lock) and in _import_hitl_connector and
get_queue_inst wrap reads/writes to _hitl_connectors with that lock (acquire
before checking/initializing an entry and release after), ensuring only one
thread performs the import/assignment while others wait.
🧹 Nitpick comments (2)
backend/workflow_manager/endpoint_v2/queue_utils.py (2)
98-111: Improve exception handling patterns.The exception handling can be enhanced by following Python best practices:
- Use
logging.exceptioninstead oflogging.errorfor exceptions (provides automatic traceback)- Chain exceptions with
from eorfrom Noneto preserve exception context- The broad
Exceptioncatch is acceptable here but could be more specific♻️ Proposed improvements
except ImportError as e: - logger.error( + logger.exception( f"HITL queue backend '{backend}' not available: {e}. " f"Make sure 'pluggable_apps.manual_review_v2' is installed and configured." ) raise UnstractQueueException( detail=f"HITL queue backend '{backend}' not available. " f"Please install the manual_review_v2 app or use 'redis' backend." - ) + ) from e except Exception as e: - logger.error(f"Failed to initialize HITL queue backend '{backend}': {e}") + logger.exception(f"Failed to initialize HITL queue backend '{backend}'") raise UnstractQueueException( - detail=f"Failed to initialize HITL queue backend '{backend}': {str(e)}" - ) + detail=f"Failed to initialize HITL queue backend '{backend}': {e!s}" + ) from eBased on static analysis hints.
141-150: Consider extracting exception messages to constants (optional).The static analysis suggests avoiding long inline exception messages. While the current approach is acceptable and provides context-specific messages, you could optionally extract these to constants or the exception class for better maintainability.
♻️ Optional refactor
At the top of the file:
# HITL error messages _HITL_CONNECTORS_NOT_AVAILABLE = ( "HITL connectors not available. " "Make sure 'pluggable_apps.manual_review_v2' is installed." ) _HITL_CONNECTOR_UNKNOWN_FMT = ( "Unknown HITL connector: {connector}. Available: {available}" )Then use in the method:
if not cls._hitl_connectors: - raise ImportError( - "HITL connectors not available. Make sure 'pluggable_apps.manual_review_v2' is installed." - ) + raise ImportError(_HITL_CONNECTORS_NOT_AVAILABLE) if connector_name not in cls._hitl_connectors: available_connectors = list(cls._hitl_connectors.keys()) - raise ImportError( - f"Unknown HITL connector: {connector_name}. Available: {available_connectors}" - ) + raise ImportError( + _HITL_CONNECTOR_UNKNOWN_FMT.format( + connector=connector_name, + available=available_connectors + ) + )Based on static analysis hints, though the current inline approach is also acceptable.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (3)
backend/sample.envbackend/workflow_manager/endpoint_v2/destination.pybackend/workflow_manager/endpoint_v2/queue_utils.py
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/workflow_manager/endpoint_v2/destination.py
- backend/sample.env
🧰 Additional context used
🧬 Code graph analysis (1)
backend/workflow_manager/endpoint_v2/queue_utils.py (2)
unstract/connectors/src/unstract/connectors/queues/unstract_queue.py (1)
UnstractQueue(9-136)backend/workflow_manager/endpoint_v2/exceptions.py (1)
UnstractQueueException(106-113)
🪛 Ruff (0.14.10)
backend/workflow_manager/endpoint_v2/queue_utils.py
25-25: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
28-28: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
54-54: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
99-102: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
103-106: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
107-107: Do not catch blind exception: Exception
(BLE001)
108-108: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
109-111: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
110-110: Use explicit conversion flag
Replace with conversion flag
(RUF010)
142-144: Avoid specifying long messages outside the exception class
(TRY003)
148-150: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (1)
backend/workflow_manager/endpoint_v2/queue_utils.py (1)
89-96: Verify unknown backend fallback behavior.When an unknown HITL backend is specified, the code logs a warning but silently falls back to
hybrid. This could mask configuration errors (e.g., typos like "postgersql"). Consider whether failing fast would be more appropriate, or if the graceful degradation is intentional for forward compatibility.You may want to verify this behavior aligns with your operational requirements. If strict validation is preferred, consider raising an exception instead of falling back.
…dling-improvements
…edis-hitl-file-handling-improvements
…itl-file-handling-improvements' into UN-2663-moving-away-from-redis-hitl-file-handling-improvements
…dling-improvements
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|



What
Why
How
Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
Database Migrations
Env Config
Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
Screenshots
Checklist
I have read and understood the Contribution Guidelines.