-
Notifications
You must be signed in to change notification settings - Fork 558
UN-2807 [FEAT] Add packet processing support for HITL workflows #1541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Added packet_id parameter to API execution flow - Integrated packet-based routing in destination connector - Enhanced queue management to support packet-based review alongside traditional HITL queues - Updated DTOs and serializers to include packet_id field This enables workflows to be grouped into packets for batch review in HITL scenarios. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Summary by CodeRabbitRelease Notes
WalkthroughAdds an optional hitl_packet_id field and constant, validates and gates it at the API layer, and propagates it through DeploymentHelper → WorkflowHelper → DestinationConnector into worker flows and queueing logic to enable packet-based HITL routing while preserving existing behavior when absent. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant API_View
participant DeploymentHelper
participant WorkflowHelper
participant DestinationConnector
participant PacketQueue
participant HITL_Queue
participant Std_Queue
Client->>API_View: POST /execute (files, hitl_packet_id?)
API_View->>DeploymentHelper: execute_workflow(..., hitl_packet_id)
DeploymentHelper->>WorkflowHelper: execute_workflow_async(..., hitl_packet_id)
WorkflowHelper->>DestinationConnector: init(..., packet_id=hitl_packet_id)
Note right of DestinationConnector: Decision: packet_id takes precedence
alt packet_id present
DestinationConnector->>PacketQueue: enqueue_to_packet(packet_id, payload)
PacketQueue-->>DestinationConnector: ack / err
else hitl_queue present
DestinationConnector->>HITL_Queue: enqueue_manual_review(payload)
HITL_Queue-->>DestinationConnector: ack / err
else
DestinationConnector->>Std_Queue: enqueue_standard(payload)
Std_Queue-->>DestinationConnector: ack / err
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Potential attention points:
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (4)
backend/workflow_manager/endpoint_v2/dto.py (1)
102-109: Avoid serializing packet_id when None.Prevents emitting "packet_id": null where consumers might treat null differently.
Apply:
- return { + payload = { "workflow_id": str(self.workflow_id), "execution_id": str(self.execution_id), "use_file_history": self.use_file_history, "file_execution_id": file_execution_id, "hitl_queue_name": self.hitl_queue_name, - "packet_id": self.packet_id, } + if self.packet_id is not None: + payload["packet_id"] = self.packet_id + return payloadbackend/api_v2/deployment_helper.py (2)
146-160: Signature extension — OK; avoid mutable default for tag_names.Use None sentinel to prevent shared list bugs.
- tag_names: list[str] = [], + tag_names: list[str] | None = None,And initialize before use:
- tags = Tag.bulk_get_or_create(tag_names=tag_names) + if tag_names is None: + tag_names = [] + tags = Tag.bulk_get_or_create(tag_names=tag_names)
166-173: Docstring nit: pluralize file_obj.- file_obj (UploadedFile): input file + file_objs (list[UploadedFile]): input filesbackend/workflow_manager/endpoint_v2/destination.py (1)
783-786: Include packet_id in from_config log.Improves traceability when reconstructing connectors.
- logger.info( - f"Creating DestinationConnector from config: hitl_queue_name={config.hitl_queue_name}" - ) + logger.info( + f"Creating DestinationConnector from config: hitl_queue_name={config.hitl_queue_name}, packet_id={config.packet_id}" + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (5)
backend/api_v2/api_deployment_views.py(2 hunks)backend/api_v2/constants.py(1 hunks)backend/api_v2/deployment_helper.py(4 hunks)backend/workflow_manager/endpoint_v2/destination.py(8 hunks)backend/workflow_manager/endpoint_v2/dto.py(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (6)
backend/api_v2/api_deployment_views.py (2)
89-91: Plumbing to DeploymentHelper — LGTM.Parameter is forwarded correctly.
73-74: Define precedence when both hitl_queue_name and packet_id are suppliedCurrent flow accepts both; downstream routing prefers packet which can make logs/messaging misleading — make precedence explicit at the API edge.
Apply (packet_id wins, HITL ignored):
hitl_queue_name = serializer.validated_data.get(ApiExecution.HITL_QUEUE_NAME) packet_id = serializer.validated_data.get(ApiExecution.PACKET_ID) + + # Disallow ambiguous routing; prefer packet routing when both provided + if hitl_queue_name and packet_id: + logger.warning( + "Both hitl_queue_name and packet_id supplied; routing to packet queue and ignoring hitl_queue_name." + ) + hitl_queue_name = NoneConfirm intended precedence per UN-2807 or prefer returning 400 on conflict.
backend/workflow_manager/endpoint_v2/destination.py (3)
68-69: packet_id plumbed into instance — LGTM.Also applies to: 87-88
9-9: Missing/enterprise-only import — guard or provide implementationbackend/workflow_manager/endpoint_v2/destination.py imports PacketQueueUtils (line 9) and calls PacketQueueUtils.enqueue_to_packet (lines 892, 954), but pluggable_apps/manual_review_v2/packet_queue_utils.py is not present in this repo; add the OSS implementation or wrap the import/usage with try/except (lazy-load) or a feature flag to avoid breaking OSS builds.
889-900: Verify PacketQueueUtils.enqueue_to_packet parameter type
- backend/workflow_manager/endpoint_v2/destination.py:892 passes queue_result (dict via .to_dict()); line 954 passes queue_result_obj (QueueResult instance). pluggable_apps/manual_review_v2/packet_queue_utils.py is not present in the repo — confirm enqueue_to_packet's expected type and either update it to accept QueueResult or make the call at line 954 use queue_result_obj.to_dict() for consistency.
backend/workflow_manager/endpoint_v2/dto.py (1)
95-96: DestinationConfig.packet_id added — confirm external consumers expect JSON null vs absent key.to_json now emits "packet_id": self.packet_id (None → JSON null); internal code treats falsy None (see backend/workflow_manager/endpoint_v2/dto.py, backend/workflow_manager/endpoint_v2/destination.py, backend/workflow_manager/workflow_v2/workflow_helper.py, backend/workflow_manager/workflow_v2/file_execution_tasks.py). Verify external integrations / API clients expect null; if not, omit the key when None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments apart from that, I have these concerns
- What's the difference between this
packet_idandhitl_queue_name- if its only to group API deployment calls, can't we use existing params liketagor simply send it to a givenhitl_queue_name? - Check the code rabbit comments to see if they're valid
- Make sure to raise a PR and keep it ready in the docs repo, it should be merged when this feature goes in
Signed-off-by: jagadeeswaran-zipstack <jagadeeswaran@zipstack.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
backend/workflow_manager/endpoint_v2/destination.py (2)
178-186: Verify packet_id takes precedence over HITL queue routing.A previous review comment suggested that packet routing should short-circuit before HITL to avoid misleading logs. Currently,
hitl_queue_nameis checked first (line 167), thenpacket_id(line 178). Ifpacket_idshould take precedence, swap these blocks.Based on past review comments, the intended precedence order should be verified. If packet-based routing is more specific than HITL queue routing, consider reordering:
- if self.hitl_queue_name: - logger.info(f"API HITL override: pushing to queue for file {file_name}") - self._push_data_to_queue( - file_name=file_name, - workflow=workflow, - input_file_path=input_file_path, - file_execution_id=file_execution_id, - ) - logger.info(f"Successfully pushed {file_name} to HITL queue") - return True - if self.packet_id: self._push_data_to_queue( file_name=file_name, workflow=workflow, input_file_path=input_file_path, file_execution_id=file_execution_id, ) logger.info(f"Successfully pushed {file_name} to packet queue") return True + + if self.hitl_queue_name: + logger.info(f"API HITL override: pushing to queue for file {file_name}") + self._push_data_to_queue( + file_name=file_name, + workflow=workflow, + input_file_path=input_file_path, + file_execution_id=file_execution_id, + ) + logger.info(f"Successfully pushed {file_name} to HITL queue") + return True
952-962: Bug: Type mismatch in non-API packet enqueue.Line 955 passes
queue_result_obj(QueueResult object) instead ofqueue_result(dict from line 941). This is inconsistent with the API path (line 893) which passes the dict.PacketQueueUtils.enqueue_to_packetlikely expects a dict for serialization.Apply this diff to fix the type mismatch:
# Check if this is a packet-based execution if self.packet_id: # Route to packet queue instead of regular HITL queue success = PacketQueueUtils.enqueue_to_packet( - packet_id=self.packet_id, queue_result=queue_result_obj + packet_id=self.packet_id, queue_result=queue_result ) if not success: error_msg = f"Failed to push {file_name} to packet {self.packet_id}" logger.error(error_msg) raise RuntimeError(error_msg) return
🧹 Nitpick comments (1)
backend/api_v2/serializers.py (1)
227-227: Add hitl_packet_id format validationNo existing format checks for
hitl_packet_idin PacketQueueUtils or serializers. Mirrorvalidate_hitl_queue_nameby implementingvalidate_hitl_packet_idinExecutionRequestSerializerto enforce the expected pattern (e.g., alphanumeric, length limits).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (6)
backend/api_v2/api_deployment_views.py(2 hunks)backend/api_v2/constants.py(1 hunks)backend/api_v2/deployment_helper.py(3 hunks)backend/api_v2/serializers.py(1 hunks)backend/workflow_manager/endpoint_v2/destination.py(9 hunks)backend/workflow_manager/workflow_v2/workflow_helper.py(8 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/api_v2/constants.py
- backend/api_v2/deployment_helper.py
🔇 Additional comments (11)
backend/api_v2/api_deployment_views.py (2)
73-73: LGTM! Consistent parameter extraction.The extraction of
hitl_packet_idfollows the same pattern as other optional parameters likehitl_queue_nameandllm_profile_id.
89-89: LGTM! Parameter propagation is consistent.The
hitl_packet_idis correctly passed through toDeploymentHelper.execute_workflow, maintaining consistency with the parameter naming convention.backend/workflow_manager/endpoint_v2/destination.py (5)
9-9: LGTM! Import added for packet routing.The
PacketQueueUtilsimport is necessary to support packet-based queue routing.
68-68: LGTM! Constructor parameter added correctly.The
packet_idparameter is properly added to the constructor signature and stored as an instance variable, following the same pattern ashitl_queue_name.Also applies to: 87-87
765-765: LGTM! Configuration serialization includes packet_id.The
packet_idis correctly included inget_config()andfrom_config(), ensuring proper serialization and deserialization of the connector configuration.Also applies to: 794-794
854-857: Early return for packet processing without result looks correct.When
packet_idis present but no result is available, the code uses a placeholder result instead of returning early. This ensures packet-based executions can proceed even without initial results.
890-900: LGTM! Packet routing for API path uses correct dict type.Line 893 passes
queue_result(dict) toPacketQueueUtils.enqueue_to_packet, which is consistent with how it's constructed on line 876-885 usingQueueResult(...).to_dict().backend/workflow_manager/workflow_v2/workflow_helper.py (4)
67-67: LGTM! Excluded params updated correctly.Adding
"hitl_packet_id"toEXECUTION_EXCLUDED_PARAMSprevents it from being passed tocreate_workflow_execution(line 671), which is correct since packet_id is a routing parameter, not an execution attribute.
271-271: LGTM! Parameter added to run_workflow and propagated to DestinationConnector.The
packet_idparameter is correctly added torun_workflowsignature and passed toDestinationConnectorconstructor, completing the propagation chain.Also applies to: 300-300
442-442: LGTM! Async execution path updated correctly.The
hitl_packet_idparameter is properly added toexecute_workflow_async, documented in the docstring, and passed through kwargs to the celery task.Also applies to: 456-456, 483-483
687-687: LGTM! Parameter extraction and propagation in execute_workflow.The
hitl_packet_idis correctly extracted from kwargs, logged for debugging, and passed torun_workflowaspacket_id, maintaining naming consistency.Also applies to: 689-689, 702-702
Signed-off-by: jagadeeswaran-zipstack <jagadeeswaran@zipstack.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/workflow_manager/endpoint_v2/destination.py (1)
173-192: Critical: Packet routing precedence is still incorrect.Despite the past review comment being marked as "Addressed", the precedence order remains incorrect. The
packet_idcheck (lines 184-192) still comes AFTER thehitl_queue_namecheck (lines 173-182). Per the previous review, packet-based routing should take precedence and short-circuit before HITL queue routing to ensure correct behavior when both are specified.Apply this diff to fix the precedence:
- if self.hitl_queue_name: - logger.info(f"API HITL override: pushing to queue for file {file_name}") - self._push_data_to_queue( - file_name=file_name, - workflow=workflow, - input_file_path=input_file_path, - file_execution_id=file_execution_id, - ) - logger.info(f"Successfully pushed {file_name} to HITL queue") - return True - if self.packet_id: self._push_data_to_queue( file_name=file_name, workflow=workflow, input_file_path=input_file_path, file_execution_id=file_execution_id, ) logger.info(f"Successfully pushed {file_name} to packet queue") return True + + if self.hitl_queue_name: + logger.info(f"API HITL override: pushing to queue for file {file_name}") + self._push_data_to_queue( + file_name=file_name, + workflow=workflow, + input_file_path=input_file_path, + file_execution_id=file_execution_id, + ) + logger.info(f"Successfully pushed {file_name} to HITL queue") + return True
♻️ Duplicate comments (1)
backend/workflow_manager/endpoint_v2/destination.py (1)
957-967: Critical: Inconsistent payload type in non-API packet enqueue path.Line 961 passes
queue_result_obj(a QueueResult object) toPacketQueueUtils.enqueue_to_packet, while the API path (line 897) correctly passesqueue_result(dict). This inconsistency was flagged in a previous review and remains unresolved. The non-API path should pass the dict representation for consistency and to avoid serialization/type errors.Apply this diff to fix the payload type:
# Check if this is a packet-based execution if self.packet_id: # Route to packet queue instead of regular HITL queue success = PacketQueueUtils.enqueue_to_packet( - packet_id=self.packet_id, queue_result=queue_result_obj + packet_id=self.packet_id, queue_result=queue_result ) if not success: error_msg = f"Failed to push {file_name} to packet {self.packet_id}" logger.error(error_msg) raise RuntimeError(error_msg) return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (4)
backend/api_v2/api_deployment_views.py(2 hunks)backend/api_v2/serializers.py(1 hunks)backend/workflow_manager/endpoint_v2/destination.py(9 hunks)backend/workflow_manager/workflow_v2/workflow_helper.py(8 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/api_v2/api_deployment_views.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (10)
backend/workflow_manager/endpoint_v2/destination.py (5)
66-94: LGTM: packet_id parameter added consistently.The addition of the
packet_idparameter to the constructor follows the same pattern ashitl_queue_name, maintaining consistency in the codebase.
892-903: LGTM: API path packet enqueue uses correct payload type.The API path correctly passes
queue_result(a dict fromQueueResult.to_dict()) toPacketQueueUtils.enqueue_to_packet, which is the expected payload type.
758-770: LGTM: Configuration includes packet_id.The
get_configmethod correctly includespacket_idin theDestinationConfig, maintaining consistency with thehitl_queue_namefield.
772-801: LGTM: from_config correctly reconstructs packet_id.The
from_configmethod properly passesconfig.packet_idto the constructor, maintaining consistency with the configuration round-trip.
857-861: Verify placeholder result payload compatibilityWhen
packet_idis set andresultis missing, we fall back tojson.dumps({"status": "pending", "message": "Awaiting processing"})Confirm that all downstream consumers of
queue_result(e.g. PacketQueueUtils.enqueue_to_packet, the manual-review UI) accept a JSON-encoded string here or adjust to pass a dict if needed.backend/workflow_manager/workflow_v2/workflow_helper.py (4)
64-69: LGTM: hitl_packet_id correctly excluded from execution parameters.Adding
"hitl_packet_id"toEXECUTION_EXCLUDED_PARAMSis correct, as it's a routing parameter (likehitl_queue_name) that shouldn't be persisted in theWorkflowExecutionmodel.
260-303: LGTM: run_workflow correctly propagates packet_id.The
run_workflowfunction properly acceptspacket_idas a parameter and forwards it to theDestinationConnectorconstructor. The internal naming (packet_id) vs. external naming (hitl_packet_id) provides clarity about the parameter's scope.
434-490: LGTM: execute_workflow_async correctly propagates hitl_packet_id.The function properly accepts
hitl_packet_idas a parameter, documents it (line 460), and includes it in the kwargs passed to the Celery task (line 487), ensuring packet-based routing is preserved in async execution.
688-703: LGTM: execute_workflow correctly extracts and propagates hitl_packet_id.The function properly extracts
hitl_packet_idfrom kwargs (line 688), logs it for traceability (line 690), and converts it to the internalpacket_idparameter when callingrun_workflow(line 703). The explicit naming conversion improves code clarity.backend/api_v2/serializers.py (1)
230-230: Add or document validation for hitl_packet_id
There’s novalidate_hitl_packet_idalongsidevalidate_hitl_queue_name. If packet IDs must follow a specific format or be enterprise-gated, implementvalidate_hitl_packet_idinbackend/api_v2/serializers.py; otherwise, add a comment explaining why no validation is needed.
…m:Zipstack/unstract into UN-2807-packet-processing-support-in-hitl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
backend/workflow_manager/endpoint_v2/destination.py (2)
895-914: LGTM with optional refactor.The packet routing logic correctly checks feature availability, conditionally imports enterprise utilities, and passes the correct payload type (dict) to
PacketQueueUtils.enqueue_to_packet.The static analysis tool flags the inline error message at lines 898-901. Consider extracting repeated enterprise feature messages to a constant if this pattern appears frequently across the codebase.
968-987: LGTM with refactor suggestion.The packet routing logic is correct and addresses the past review comment about payload type (now correctly passes dict instead of QueueResult object).
However, this code block is nearly identical to lines 895-914. Consider extracting the packet routing logic into a private helper method to reduce duplication:
def _route_to_packet_queue( self, packet_id: str, queue_result: dict[str, Any], file_name: str ) -> None: """Route a queue result to packet-based processing.""" if not FeatureRegistry.is_hitl_available(): raise ValueError( "Packet-based HITL processing requires Unstract Enterprise. " "This feature is not available in the OSS version." ) from pluggable_apps.manual_review_v2.packet_queue_utils import ( PacketQueueUtils, ) success = PacketQueueUtils.enqueue_to_packet( packet_id=packet_id, queue_result=queue_result ) if not success: error_msg = f"Failed to push {file_name} to packet {packet_id}" logger.error(error_msg) raise RuntimeError(error_msg)Then both paths (API and non-API) can call:
if self.packet_id: self._route_to_packet_queue(self.packet_id, queue_result, file_name) returnThis would eliminate ~40 lines of duplication and make the packet routing logic easier to maintain.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/api_v2/serializers.py(3 hunks)backend/workflow_manager/endpoint_v2/destination.py(9 hunks)
🧰 Additional context used
🪛 Ruff (0.13.2)
backend/api_v2/serializers.py
260-265: Avoid specifying long messages outside the exception class
(TRY003)
backend/workflow_manager/endpoint_v2/destination.py
898-901: Avoid specifying long messages outside the exception class
(TRY003)
971-974: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (7)
backend/api_v2/serializers.py (2)
9-9: LGTM!The
FeatureRegistryimport follows the recommended pattern for enterprise feature checks.
231-231: LGTM!The
hitl_packet_idfield declaration is consistent with other optional fields and correctly allows null/blank values.backend/workflow_manager/endpoint_v2/destination.py (5)
74-74: LGTM!The
packet_idparameter addition to the constructor and field assignment are correct and consistent with the PR objectives.Also applies to: 93-93
771-771: LGTM!The
packet_idis correctly propagated through config serialization and deserialization, following the same pattern ashitl_queue_name.Also applies to: 800-800
172-183: LGTM!The packet precedence issue from the previous review has been correctly addressed. Packet routing now takes precedence over HITL queue routing, and the log message clearly indicates packet-based processing.
9-9: Bare import of FeatureRegistry is correct. FeatureRegistry lives in the OSS pluggable_apps package and is already unguarded in multiple modules; no try/except wrapper is needed.
859-863: Confirm placeholder JSON structure compatibility with packet queue. Ensureenqueue_to_packet(packet_id, queue_result)accepts the placeholder{"status":"pending","message":"Awaiting processing"}(string or dict) and that downstream packet processing handles it correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
backend/api_v2/serializers.py (1)
253-267: Update docstring to reflect actual validation behavior.The docstring states "Validate packet ID format" but the method only enforces enterprise feature availability, not the format of the packet_id itself.
Apply this diff to clarify the docstring:
- def validate_hitl_packet_id(self, value: str | None) -> str | None: - """Validate packet ID format using enterprise validation if available.""" + def validate_hitl_packet_id(self, value: str | None) -> str | None: + """Validate packet ID availability in enterprise features."""Optional: Consider extracting the enterprise error message to a constant.
The error message on lines 260-264 is similar to the message in
validate_hitl_queue_name(lines 246-249) and appears again indestination.py(lines 899-902). Consider extracting to a module-level constant for maintainability.For example:
ENTERPRISE_HITL_ERROR = ( "Packet-based HITL processing requires Unstract Enterprise. " "This advanced workflow feature is available in our enterprise version. " "Learn more at https://docs.unstract.com/unstract/unstract_platform/features/workflows/hqr_deployment_workflows/ or " "contact our sales team at https://unstract.com/contact/" )backend/workflow_manager/endpoint_v2/destination.py (1)
880-926: LGTM: Well-designed routing with proper OSS/Enterprise gating.The
_enqueue_to_packet_or_regular_queuehelper cleanly separates packet-based and regular queue routing. The runtime check for enterprise features (line 898) and dynamic import ofPacketQueueUtils(lines 904-906) ensure OSS compatibility while enabling enterprise functionality when available.The RuntimeError raised on enqueue failure (lines 911-914) provides clear feedback.
Optional: Consider extracting the error message to a constant as mentioned in the serializers.py review, since it appears in multiple places (serializers.py line 260-264, here line 899-902).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (2)
backend/api_v2/serializers.py(3 hunks)backend/workflow_manager/endpoint_v2/destination.py(7 hunks)
🧰 Additional context used
🪛 Ruff (0.13.3)
backend/api_v2/serializers.py
260-265: Avoid specifying long messages outside the exception class
(TRY003)
backend/workflow_manager/endpoint_v2/destination.py
899-902: Avoid specifying long messages outside the exception class
(TRY003)
1047-1047: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (8)
backend/api_v2/serializers.py (1)
231-231: LGTM: Field definition is correct.The
hitl_packet_idfield is properly configured withrequired=False, allow_null=True, allow_blank=Trueto support optional packet-based HITL processing.backend/workflow_manager/endpoint_v2/destination.py (7)
74-74: LGTM: Constructor updated correctly.The
packet_idparameter is properly added to the constructor signature and stored on the instance for use throughout the destination routing flow.Also applies to: 93-93
172-183: LGTM: Correct precedence for packet-based routing.The packet_id check is correctly placed before hitl_queue_name, ensuring packet-based processing takes precedence when both are provided. This aligns with the PR objectives and addresses the past review feedback about precedence ordering.
859-878: Verify the placeholder result approach for packet processing.When
packet_idis set butresultis missing (line 861-863), the code creates a placeholder result:{"status": "pending", "message": "Awaiting processing"}. This ensures the file can be enqueued to the packet queue even without a processing result.Is this the intended behavior? Confirm that:
- Downstream packet processing expects and handles this placeholder format
- The actual result will be populated later in the packet workflow
- Users are aware that packets may contain pending entries
If this placeholder is temporary or for a specific use case, consider adding a comment explaining the rationale:
# Handle missing result for packet processing if not result: if not self.packet_id: return + # Placeholder result for packet queue - actual result populated during packet review result = json.dumps({"status": "pending", "message": "Awaiting processing"})
927-963: LGTM: Centralized queue result creation reduces duplication.The
_create_queue_resulthelper consolidates QueueResult construction logic, ensuring consistency between API deployment and connector-based queue pushes. Converting to dict via.to_dict()provides a clean serialization boundary.
965-998: LGTM: Clean separation of API deployment queue logic.The
_push_to_queue_for_api_deploymenthandler encapsulates all API-specific queue operations, making the code more maintainable and testable.
1000-1051: LGTM: Connector-based queue handling with proper validation.The
_push_to_queue_for_connectorhandler correctly:
- Reads file content from the source connector filesystem
- Applies TTL for connector-based workflows (line 1031)
- Validates against empty JSON payloads before enqueuing (lines 1045-1047)
The validation at line 1045 prevents subtle bugs where empty messages could be enqueued with TTL, leading to orphaned queue entries.
771-771: LGTM: Configuration serialization updated correctly.The
packet_idfield is properly propagated throughget_config()(line 771) andfrom_config()(line 800), ensuring packet-based routing is preserved across serialization boundaries.Also applies to: 800-800
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
backend/api_v2/serializers.py(3 hunks)
🧰 Additional context used
🪛 Ruff (0.13.3)
backend/api_v2/serializers.py
260-265: Avoid specifying long messages outside the exception class
(TRY003)
270-270: Avoid specifying long messages outside the exception class
(TRY003)
274-276: Avoid specifying long messages outside the exception class
(TRY003)
279-279: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (3)
backend/api_v2/serializers.py (3)
7-7: LGTM!The import is necessary for the enterprise feature check in the
validate_hitl_packet_idmethod.
231-231: LGTM!The field declaration is correct and consistent with the existing
hitl_queue_namefield pattern.
253-281: Updatevalidate_hitl_packet_iddocstring and align validation patterns
- Correct the docstring to state it enforces enterprise availability via
apps.is_installedand applies basic local format checks (no external delegation exists).- Optionally unify the feature-availability check to use the import-based pattern from
validate_hitl_queue_name.
[mandatory_refactors_required, optional_refactors_recommended]
…et-processing-support-in-hitl
…et-processing-support-in-hitl
for more information, see https://pre-commit.ci
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
unstract/core/src/unstract/core/data_models.py (2)
804-832: FileHashData.from_dict misses the new hitl_packet_idWe added
hitl_packet_idtoFileHashData, butfrom_dict()still omits it. Any packet ID coming from serialized payloads will be dropped, so downstream workers never learn about the packet. Please populatehitl_packet_idin the constructor call (and the fallback path) so round‑trips preserve the field.
1296-1366: DestinationConfig must also expose hitl_packet_id
WorkerWorkflowExecutionServicenow injectshitl_packet_idintodestination_config, but this dataclass still has no corresponding field (andto_dict/from_dictignore it). As a result the packet ID is lost before it reaches the connector. Please addhitl_packet_id: str | None = NonetoDestinationConfig, include it into_dict, and pass it throughfrom_dict.workers/api-deployment/tasks.py (1)
845-861: Fix pipeline_id coercion to avoid"None"
- Update pipeline_id conversion to only stringify when present:
- pipeline_id=str(pipeline_id), + pipeline_id=str(pipeline_id) if pipeline_id else None,
- Optional: add
hitl_packet_idto your task-start log context for full traceability.
🧹 Nitpick comments (1)
workers/shared/workflow/destination_connector.py (1)
1269-1337: Tidy helper: mark unused ttl_seconds and fix error message spacing
- Ruff: ttl_seconds is unused here; mark it intentionally unused.
- Minor: error message joins without a space.
Apply:
-def _enqueue_to_packet_or_regular_queue( +def _enqueue_to_packet_or_regular_queue( self, file_name: str, queue_result: dict[str, Any], queue_name: str, workflow_util: Any, - ttl_seconds: int | None = None, + _ttl_seconds: int | None = None, ) -> None: @@ - error_msg = ( - f"Packet queues are not available" - f"Cannot enqueue file '{file_name}' to packet '{self.hitl_packet_id}'. " - f"This is an Enterprise-only feature." - ) + error_msg = ( + "Packet queues are not available. " + f"Cannot enqueue file '{file_name}' to packet '{self.hitl_packet_id}'. " + "This is an Enterprise-only feature." + )If regular-queue TTL is supported in worker path, consider plumbing it instead of marking unused. Otherwise, keep as-is.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (7)
backend/workflow_manager/endpoint_v2/destination.py(6 hunks)unstract/core/src/unstract/core/data_models.py(2 hunks)workers/api-deployment/tasks.py(2 hunks)workers/file_processing/tasks.py(1 hunks)workers/general/tasks.py(2 hunks)workers/shared/workflow/destination_connector.py(12 hunks)workers/shared/workflow/execution/service.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.13.3)
workers/shared/workflow/destination_connector.py
1275-1275: Unused method argument: ttl_seconds
(ARG002)
backend/workflow_manager/endpoint_v2/destination.py
1039-1039: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (8)
workers/shared/workflow/destination_connector.py (3)
114-121: hitl_packet_id config threading is consistentDestinationConfig/from_dict and connector init store hitl_packet_id; matches routing needs.
Also applies to: 156-173, 200-205
1224-1230: Packet precedence before HITL is correctShort-circuiting on hitl_packet_id avoids double-routing and matches desired behavior.
1422-1439: Unified enqueue and accurate queue label in logsDelegating to _enqueue_to_packet_or_regular_queue and logging packet_id vs queue_name is clean and unambiguous.
backend/workflow_manager/endpoint_v2/destination.py (5)
65-75: packet_id plumbing in ctor is correctNew parameter and assignment look good.
Also applies to: 91-93
171-183: Packet override precedence is correctRouting to packet queue first prevents conflicting HITL routing and matches intent.
759-771: Include packet_id in config (get_config/from_config)Serialization and reconstruction now preserve packet_id; good.
Also applies to: 792-800
858-867: Graceful default result for packet flowSetting a pending payload when result is absent ensures packet items are enqueued consistently.
Please confirm downstream consumers accept a JSON string in QueueResult.result for packet entries (vs dict), or consider passing a dict and letting QueueResult handle serialization for consistency.
879-918: Centralized enqueue with packet routing and TTLClean separation: packet via QueueUtils.enqueue_to_packet; else regular queue with TTL. Logging is clear.
Test ResultsSummary
Runner Tests - Full Report
SDK1 Tests - Full Report
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
workers/shared/workflow/execution/service.py (1)
1330-1348: Make packet vs. queue mutually exclusive in config (packet wins)Avoid adding both keys. Prefer packet id when present; else fall back to queue name. Keeps logs and routing unambiguous.
- if hitl_queue_name: - destination_config["hitl_queue_name"] = hitl_queue_name - logger.info( - f"Added HITL queue name to destination config: {hitl_queue_name}" - ) - if hitl_packet_id: - destination_config["hitl_packet_id"] = hitl_packet_id - logger.info( - f"Added HITL packet ID to destination config: {hitl_packet_id}" - ) - if not hitl_queue_name and not hitl_packet_id: + if hitl_packet_id: + destination_config["hitl_packet_id"] = hitl_packet_id + logger.info(f"Added HITL packet ID to destination config: {hitl_packet_id}") + elif hitl_queue_name: + destination_config["hitl_queue_name"] = hitl_queue_name + logger.info(f"Added HITL queue name to destination config: {hitl_queue_name}") + else: logger.info( "No hitl_queue_name or hitl_packet_id found in file_data, proceeding with normal processing" )The earlier “DestinationConfig drops the packet ID” concern appears resolved by adding hitl_packet_id to DestinationConfig/from_dict in destination_connector.py. Nice.
workers/shared/workflow/destination_connector.py (1)
123-125: DestinationConfig now carries hitl_packet_id — resolves earlier propagation gapField addition aligns config with execution service and worker expectations. LGTM.
🧹 Nitpick comments (2)
workers/file_processing/tasks.py (1)
1457-1465: Enforce packet-over-queue precedence here to avoid double application/loggingIf both hitl_packet_id and hitl_queue_name are present, this will apply both and log twice. Make packet take precedence and short‑circuit; only one MR override/log should occur.
Apply:
- if file_data and file_data.hitl_queue_name: - file_hash.hitl_queue_name = file_data.hitl_queue_name - file_hash.is_manualreview_required = True # Override manual review flag for HITL - logger.info( - f"Applied HITL queue name '{file_data.hitl_queue_name}' to file {file_name}" - ) - - if file_data and file_data.hitl_packet_id: + if file_data and file_data.hitl_packet_id: file_hash.hitl_packet_id = file_data.hitl_packet_id file_hash.is_manualreview_required = ( True # Override manual review flag for packet processing ) logger.info( f"Applied HITL packet ID '{file_data.hitl_packet_id}' to file {file_name}" ) + elif file_data and file_data.hitl_queue_name: + file_hash.hitl_queue_name = file_data.hitl_queue_name + file_hash.is_manualreview_required = True # Override manual review flag for HITL + logger.info( + f"Applied HITL queue name '{file_data.hitl_queue_name}' to file {file_name}" + )Please confirm WorkerFileData/FileHashData define hitl_packet_id so this assignment remains type-safe.
workers/shared/workflow/destination_connector.py (1)
1436-1504: Use TTL for regular enqueue; fix error text; consider graceful OSS fallback
- ttl_seconds is unused; pass it to enqueue_manual_review if supported, else remove the arg to satisfy linting.
- Minor: missing space in the NotImplemented message.
- Optional: when enterprise packet enqueue is unavailable, consider falling back to regular queue instead of raising to preserve baseline HITL behavior.
- if self.hitl_packet_id: + if self.hitl_packet_id: # Route to packet queue via backend API (enterprise only) logger.info(f"Routing {file_name} to packet queue {self.hitl_packet_id}") @@ - if not manual_review_client or not hasattr( + if not manual_review_client or not hasattr( manual_review_client, "enqueue_to_packet" ): - error_msg = ( - f"Packet queues are not available" - f"Cannot enqueue file '{file_name}' to packet '{self.hitl_packet_id}'. " - f"This is an Enterprise-only feature." - ) - logger.error(error_msg) - raise NotImplementedError(error_msg) + error_msg = ( + "Packet queues are not available. " + f"Cannot enqueue file '{file_name}' to packet '{self.hitl_packet_id}'. " + "This is an Enterprise-only feature." + ) + logger.error(error_msg) + # Optional graceful fallback: + # return self._enqueue_to_packet_or_regular_queue.__wrapped__ # document fallback, or: + # Fall back to regular queue instead of raising + queue_name = self._get_review_queue_name() + workflow_util.enqueue_manual_review( + queue_name=queue_name, + message=queue_result, + organization_id=self.organization_id, + ttl_seconds=ttl_seconds, # if supported + ) + logger.info( + f"✅ MANUAL REVIEW: (fallback) File '{file_name}' sent to manual review queue '{queue_name}'" + ) + return @@ - workflow_util.enqueue_manual_review( + workflow_util.enqueue_manual_review( queue_name=queue_name, message=queue_result, organization_id=self.organization_id, + ttl_seconds=ttl_seconds, # if supported by implementation )Please confirm workflow_util.enqueue_manual_review accepts ttl_seconds. If not, drop the parameter and rely on queue_result.ttl_seconds for downstream expiry semantics.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (5)
unstract/core/src/unstract/core/data_models.py(2 hunks)workers/file_processing/tasks.py(1 hunks)workers/general/tasks.py(2 hunks)workers/shared/workflow/destination_connector.py(12 hunks)workers/shared/workflow/execution/service.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- workers/general/tasks.py
- unstract/core/src/unstract/core/data_models.py
🧰 Additional context used
🪛 Ruff (0.14.1)
workers/shared/workflow/destination_connector.py
1442-1442: Unused method argument: ttl_seconds
(ARG002)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (4)
workers/shared/workflow/destination_connector.py (4)
165-181: from_dict mapping includes hitl_packet_idGood addition; ensures the packet id survives config deserialization.
210-213: WorkerDestinationConnector copies hitl_packet_idPropagation into the connector instance looks correct.
1391-1397: Packet precedence in HITL decisionChecking hitl_packet_id first matches intended routing priority. LGTM.
1589-1606: Queue display name reflects packet vs. regular — good UXUsing packet id as display name when present is helpful. LGTM.
* UN-2807 [FEAT] Add packet processing support for HITL workflows - Added packet_id parameter to API execution flow - Integrated packet-based routing in destination connector - Enhanced queue management to support packet-based review alongside traditional HITL queues - Updated DTOs and serializers to include packet_id field This enables workflows to be grouped into packets for batch review in HITL scenarios. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * added missing references * added hitl prefix to query params * added try-catch on enterprise import * used app registry for plugin check * code refactor * used registry to find installed apps * sonar issue fix * changes for new worker implementation * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: jagadeeswaran-zipstack <jagadeeswaran@zipstack.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Athul <89829560+athul-rs@users.noreply.github.com>
* UN-2807 [FEAT] Add packet processing support for HITL workflows - Added packet_id parameter to API execution flow - Integrated packet-based routing in destination connector - Enhanced queue management to support packet-based review alongside traditional HITL queues - Updated DTOs and serializers to include packet_id field This enables workflows to be grouped into packets for batch review in HITL scenarios. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * added missing references * added hitl prefix to query params * added try-catch on enterprise import * used app registry for plugin check * code refactor * used registry to find installed apps * sonar issue fix * changes for new worker implementation * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: jagadeeswaran-zipstack <jagadeeswaran@zipstack.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Athul <89829560+athul-rs@users.noreply.github.com>



What
Added packet-based processing support for HITL (Human-in-the-Loop) workflows. This feature allows API consumers to group documents into packets for batch review, providing a more organized way to handle manual review processes.
Why
How
packet_idparameter to the API deployment execution flow through serializers and viewsDestinationConnectorclass to detect packet_id and route documents accordinglyPacketQueueUtilsfor packet-based queue management instead of regular HITL queuesRelevant Docs
Related Issues/PRs
Dependencies Versions
Notes on Testing
Test Scenarios:
Screenshots
N/A - Backend only changes, no UI modifications