-
Notifications
You must be signed in to change notification settings - Fork 41
Adding background task to create next state #175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding background task to create next state #175
Conversation
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…exospherehost into creating-next-state
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughSummary by CodeRabbit
WalkthroughThe changes introduce a new asynchronous task, Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API (executed_state_route)
participant Controller (executed_state)
participant BackgroundTasks
participant Task (create_next_state)
participant DB
Client->>API (executed_state_route): POST /executed_state
API->>Controller (executed_state): Call with background_tasks
Controller (executed_state)->>DB: Update state to EXECUTED, set outputs
Controller (executed_state)->>BackgroundTasks: Schedule create_next_state(state)
BackgroundTasks->>Task (create_next_state): Run asynchronously
Task (create_next_state)->>DB: Wait for graph validation, fetch templates
Task (create_next_state)->>DB: Fetch parent states, resolve inputs
Task (create_next_state)->>DB: Create next states
Task (create_next_state)->>DB: Update current state status to SUCCESS or ERRORED
Estimated code review effort🎯 4 (Complex) | ⏱️ ~35 minutes Poem
✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @NiveditJain, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
I've implemented a new background task that automates the creation of subsequent states in a workflow. This task is triggered asynchronously whenever a state is marked as 'executed', ensuring that the system can automatically progress through defined graph templates. I've also updated the State model to include parent state references, which is essential for correctly mapping inputs for the next states.
Highlights
- Automated State Progression: Introduced a new background task,
create_next_state, which is responsible for automatically generating and saving the next states in a workflow based on predefined graph templates and the outputs of the current state. - Asynchronous Task Execution: Integrated the
create_next_statetask into theexecuted_stateendpoint using FastAPI'sBackgroundTasks, allowing the system to respond quickly while processing the next state creation asynchronously. - State Model Enhancement: Modified the
Statedatabase model to include aparentsfield, a dictionary mapping identifiers toObjectIds, which is crucial for tracking the lineage of states and retrieving necessary input data from preceding states.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a background task to create subsequent states in a graph, which is a good architectural choice for improving API responsiveness. My review focuses on the new background task logic in create_next_state.py and its integration. I've identified several critical issues related to correctness, such as handling stale data and potential crashes from unsafe dictionary access or string parsing. I've also included suggestions to improve robustness through better error handling, and to enhance performance by addressing polling loops and redundant database queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🔭 Outside diff range comments (1)
state-manager/app/tasks/create_next_state.py (1)
90-97: Persist final status on success; currently not saved.The state is saved on error but not on success. As written, status remains EXECUTED in DB.
Apply this diff:
- state.status = StateStatusEnum.SUCCESS + state.status = StateStatusEnum.SUCCESS + await state.save()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
state-manager/app/controller/executed_state.py(2 hunks)state-manager/app/models/db/state.py(2 hunks)state-manager/app/routes.py(2 hunks)state-manager/app/tasks/create_next_state.py(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
state-manager/app/routes.py (3)
state-manager/app/models/executed_models.py (1)
ExecutedRequestModel(5-6)state-manager/app/utils/check_secret.py (1)
check_api_key(15-19)state-manager/app/controller/executed_state.py (1)
executed_state(12-34)
🔇 Additional comments (3)
state-manager/app/models/db/state.py (1)
17-18: Parents field addition looks good; verify ObjectId serialization support.The field shape is appropriate for dependency tracking. Ensure your BaseDatabaseModel (or Pydantic config) allows arbitrary ObjectId types and serializes them correctly when included in API responses/logs. If not, consider using a custom Pydantic ObjectId type or serializer.
Would you like me to add a serializer/validator for ObjectId on this model?
state-manager/app/routes.py (1)
84-84: BackgroundTasks injection on the executed route: LGTM.Signature change is correct and consistent with FastAPI’s BackgroundTasks usage.
state-manager/app/controller/executed_state.py (1)
3-3: Import of BackgroundTasks: LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (8)
state-manager/app/tasks/create_next_state.py (8)
43-51: Don’t silently continue on missing node templates or unregistered nodes. Fail fast with clear errors.Continuing hides graph/template issues and may leave the system in a partial state. Raise to mark the current state as ERRORED via the outer handler.
- next_node_template = graph_template.get_node_by_identifier(identifier) - if not next_node_template: - continue + next_node_template = graph_template.get_node_by_identifier(identifier) + if not next_node_template: + raise Exception(f"Node template '{identifier}' not found in graph '{state.graph_name}'") @@ - if not registered_node: - continue + if not registered_node: + raise Exception(f"Registered node '{next_node_template.node_name}' not found in namespace '{next_node_template.namespace}'")
63-92: Make placeholder interpolation robust: validate format, handle unclosed braces, support current-state refs, cast outputs, and use cache.The current split-based parsing is brittle and assumes strings; it also fails for unclosed placeholders and wraps ObjectIds unnecessarily.
- constructed_string = "" - for split in splits: - if "}}" in split: - placeholder_content = split.split("}}")[0] - parts = [p.strip() for p in placeholder_content.split('.')] - - if len(parts) != 3 or parts[1] != 'outputs': - raise Exception(f"Invalid input placeholder format: '{placeholder_content}' for field {field_name}") - - input_identifier = parts[0] - input_field = parts[2] - - parent_id = state.parents.get(input_identifier) - - if not parent_id: - raise Exception(f"Parent identifier '{input_identifier}' not found in state parents.") - - dependent_state = await State.get(ObjectId(parent_id)) - if not dependent_state: - raise Exception(f"Dependent state {input_identifier} not found") - - if input_field not in dependent_state.outputs: - raise Exception(f"Input field {input_field} not found in dependent state {input_identifier}") - - constructed_string += dependent_state.outputs[input_field] + split.split("}}")[1] - - else: - constructed_string += split - - next_node_input_data[field_name] = constructed_string + constructed = "" + remainder = raw_value + while True: + start = remainder.find("${{") + if start == -1: + constructed += remainder + break + constructed += remainder[:start] + remainder = remainder[start+3:] + end = remainder.find("}}") + if end == -1: + raise Exception(f"Unclosed placeholder in input '{field_name}' for node '{next_node_template.identifier}'") + expr = remainder[:end].strip() + remainder = remainder[end+2:] + parts = [p.strip() for p in expr.split(".")] + if len(parts) != 3 or parts[1] != "outputs": + raise Exception(f"Invalid input placeholder '{expr}' for field '{field_name}'") + ref_identifier, _, ref_field = parts + # Resolve dependent state id (allow referencing current state) + if ref_identifier == state.identifier: + dep_state_id = state.id + else: + dep_state_id = state.parents.get(ref_identifier) + if not dep_state_id: + raise Exception(f"Dependent state '{ref_identifier}' not found") + # Fetch dependent state with simple cache + dep_state = state_cache.get(ref_identifier) + if not dep_state: + dep_state = await State.get(dep_state_id) + if not dep_state: + raise Exception(f"Dependent state '{ref_identifier}' not found") + state_cache[ref_identifier] = dep_state + if not isinstance(dep_state.outputs, dict) or ref_field not in dep_state.outputs: + raise Exception(f"Output field '{ref_field}' not found on state '{ref_identifier}'") + constructed += str(dep_state.outputs[ref_field]) + next_node_input_data[field_name] = constructed
14-16: Avoid stale ORM instances: accept state_id and reload the state in the task.Background tasks should load the latest state to prevent stale reads and concurrency issues. Replace the signature and load by id.
-async def create_next_state(state: State): - graph_template = None +async def create_next_state(state_id: ObjectId): + state = await State.get(state_id) + if not state: + return + graph_template = NoneOutside this hunk, remember to update the call sites to pass
state.idinstead of the state object.
38-41: Treat terminal nodes as successful completion instead of error.If there are no next nodes, this branch is complete and should not error.
- if not next_node_identifier: - raise Exception(f"Node template {state.identifier} has no next nodes") + if not next_node_identifier: + state.status = StateStatusEnum.SUCCESS + await state.save() + return
52-54: Micro-optimization: cache dependent states during placeholder resolution.Avoid repeated DB fetches when multiple placeholders reference the same parent state.
next_node_input_model = create_model(registered_node.inputs_schema) next_node_input_data = {} + state_cache = {}
75-83: Remove redundant ObjectId cast when fetching parent states.
parentsalready stores ObjectIds. Casting again is unnecessary and can be error-prone.- dependent_state = await State.get(ObjectId(parent_id)) + dependent_state = await State.get(parent_id)
55-62: Guard against missing inputs, non-string values, and fix unreachable condition.
- KeyError if the template misses a required field.
- Inputs may be non-strings;
.splitwould crash.len(splits) == 0is unreachable;.splitreturns at least one element.- for field_name, _ in next_node_input_model.model_fields.items(): - temporary_input = next_node_template.inputs[field_name] - splits = temporary_input.split("${{") - - if len(splits) == 0: - next_node_input_data[field_name] = temporary_input - continue + for field_name, _ in next_node_input_model.model_fields.items(): + if field_name not in next_node_template.inputs: + raise Exception(f"Required input '{field_name}' missing in node template '{next_node_template.identifier}'") + raw_value = next_node_template.inputs[field_name] + if not isinstance(raw_value, str) or "${{" not in raw_value: + next_node_input_data[field_name] = raw_value + continue
94-107: Fix parents mapping (wrong key) and avoid unnecessary ObjectId casting; also validate inputs with the generated model.
- Parents should map the current state's identifier to its id, not the child’s identifier.
state.idandparentsalready hold ObjectIds; avoid wrapping.- Validate inputs via the Pydantic model to catch type mismatches early.
- new_state = State( + # Validate inputs against the generated model + validated_inputs = next_node_input_model(**next_node_input_data).model_dump() + new_state = State( node_name=next_node_template.node_name, namespace_name=next_node_template.namespace, identifier=next_node_template.identifier, graph_name=state.graph_name, - status=StateStatusEnum.CREATED, - inputs=next_node_input_data, + status=StateStatusEnum.CREATED, + inputs=validated_inputs, outputs={}, error=None, parents={ - **state.parents, - next_node_template.identifier: ObjectId(state.id) + **state.parents, + state.identifier: state.id } )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
state-manager/app/tasks/create_next_state.py(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
state-manager/app/tasks/create_next_state.py (5)
state-manager/app/models/db/state.py (1)
State(8-18)state-manager/app/models/db/graph_template_model.py (2)
GraphTemplate(13-81)get_node_by_identifier(30-35)state-manager/app/models/graph_template_validation_status.py (1)
GraphTemplateValidationStatus(4-8)state-manager/app/models/db/registered_node.py (1)
RegisteredNode(6-13)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-14)
🔇 Additional comments (2)
state-manager/app/tasks/create_next_state.py (2)
21-33: Good: bounded wait with timeout prevents infinite polling.The loop now fails fast on timeout and avoids indefinite hangs. LGTM.
110-112: Confirm parent status semantics after spawning children.Marking the current state as SUCCESS immediately after creating next states may be premature if SUCCESS implies end-of-branch completion. Consider EXECUTED (unchanged) or a transitional NEXT_CREATED on the parent if that aligns with your state machine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (6)
state-manager/app/controller/executed_state.py (1)
29-29: Schedule task with state_id, not the full State objectPass only the id and load fresh in the task to avoid stale fields and heavy serialization. This also makes retries idempotent and simplifies task signature.
- background_tasks.add_task(create_next_state, state) + background_tasks.add_task(create_next_state, state.id) @@ - background_tasks.add_task(create_next_state, state) + background_tasks.add_task(create_next_state, state.id) @@ - background_tasks.add_task(create_next_state, new_state) + background_tasks.add_task(create_next_state, new_state.id)Pair this with the task signature change suggested in create_next_state.py.
Also applies to: 35-35, 50-50
state-manager/app/tasks/create_next_state.py (5)
14-15: Accept state_id and load fresh state inside the taskAvoid passing full State objects from the controller. Load the latest state to prevent staleness and reduce coupling.
-async def create_next_state(state: State): - graph_template = None +async def create_next_state(state_id: ObjectId): + # Load latest state + state = await State.get(state_id) + if not state: + return + graph_template = NoneEnsure controller passes state.id, as suggested in the paired comment.
100-113: Fix parents mapping: use current state’s identifier and don’t re-wrap ObjectIdKey should be the current state’s identifier; value is state.id (already an ObjectId). Using the child’s identifier here breaks dependency resolution downstream.
new_state = State( @@ - parents={ - **state.parents, - next_node_template.identifier: ObjectId(state.id) - } + parents={ + **state.parents, + state.identifier: state.id, + } )
105-105: Optional: prefer NEXT_CREATED to distinguish follow-up statesIf your domain uses NEXT_CREATED, it’s clearer than CREATED for generated next states.
- status=StateStatusEnum.CREATED, + status=StateStatusEnum.NEXT_CREATED,
38-41: Terminal node should mark SUCCESS, not errorA node with no next_nodes is a terminal branch. Mark it SUCCESS and return instead of erroring.
- next_node_identifier = node_template.next_nodes - if not next_node_identifier: - raise Exception(f"Node template {state.identifier} has no next nodes") + next_node_identifier = node_template.next_nodes + if not next_node_identifier: + state.status = StateStatusEnum.SUCCESS + await state.save() + return
57-99: Placeholder resolution is brittle; handle non-strings, validate shape, and support current-state referencesIssues:
- Assumes string inputs; will crash for non-strings.
- len(splits) == 0 branch is unreachable.
- Fragile parsing; no validation.
- Cannot reference current state outputs (only parents).
- Concatenation may TypeError if outputs are non-strings.
Safer approach below, plus validate inputs using the generated model:
- for field_name, _ in next_node_input_model.model_fields.items(): - temporary_input = next_node_template.inputs[field_name] - splits = temporary_input.split("${{") - - if len(splits) == 0: - next_node_input_data[field_name] = temporary_input - continue - - constructed_string = "" - for split in splits: - if "}}" in split: - placeholder_content = split.split("}}")[0] - parts = [p.strip() for p in placeholder_content.split('.')] - - if len(parts) != 3 or parts[1] != 'outputs': - raise Exception(f"Invalid input placeholder format: '{placeholder_content}' for field {field_name}") - - input_identifier = parts[0] - input_field = parts[2] - - parent_id = state.parents.get(input_identifier) - - if not parent_id: - raise Exception(f"Parent identifier '{input_identifier}' not found in state parents.") - - if parent_id not in cache_states: - dependent_state = await State.get(ObjectId(parent_id)) - if not dependent_state: - raise Exception(f"Dependent state {input_identifier} not found") - cache_states[parent_id] = dependent_state - else: - dependent_state = cache_states[parent_id] - - if input_field not in dependent_state.outputs: - raise Exception(f"Input field {input_field} not found in dependent state {input_identifier}") - - constructed_string += dependent_state.outputs[input_field] + split.split("}}")[1] - - else: - constructed_string += split - - next_node_input_data[field_name] = constructed_string + for field_name, _ in next_node_input_model.model_fields.items(): + if field_name not in next_node_template.inputs: + raise Exception(f"Required input '{field_name}' missing in node template '{next_node_template.identifier}'") + raw_value = next_node_template.inputs[field_name] + # Only interpolate strings + if not isinstance(raw_value, str) or '${{' not in raw_value: + next_node_input_data[field_name] = raw_value + continue + constructed = "" + remainder = raw_value + while True: + start = remainder.find('${{') + if start == -1: + constructed += remainder + break + constructed += remainder[:start] + remainder = remainder[start+3:] + end = remainder.find('}}') + if end == -1: + raise Exception(f"Unclosed placeholder in input '{field_name}' for node '{next_node_template.identifier}'") + expr = remainder[:end].strip() + remainder = remainder[end+2:] + parts = [p.strip() for p in expr.split('.')] + if len(parts) != 3 or parts[1] != 'outputs': + raise Exception(f"Invalid input placeholder format: '{expr}' for field '{field_name}'") + ref_identifier, _, ref_field = parts + # Resolve dependent state (support current state) + if ref_identifier == state.identifier: + dep_state = state + else: + parent_id = state.parents.get(ref_identifier) + if not parent_id: + raise Exception(f"Parent identifier '{ref_identifier}' not found in state.parents") + if parent_id not in cache_states: + dep_state = await State.get(parent_id) + if not dep_state: + raise Exception(f"Dependent state '{ref_identifier}' not found") + cache_states[parent_id] = dep_state + else: + dep_state = cache_states[parent_id] + if not isinstance(dep_state.outputs, dict) or ref_field not in dep_state.outputs: + raise Exception(f"Output field '{ref_field}' not found on state '{ref_identifier}'") + constructed += str(dep_state.outputs[ref_field]) + next_node_input_data[field_name] = constructed + # Validate/coerce inputs + next_node_input_data = next_node_input_model(**next_node_input_data).model_dump()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
state-manager/app/controller/executed_state.py(2 hunks)state-manager/app/tasks/create_next_state.py(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
state-manager/app/controller/executed_state.py (4)
state-manager/app/models/db/state.py (1)
State(8-18)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-14)state-manager/app/tasks/create_next_state.py (1)
create_next_state(14-124)state-manager/app/models/executed_models.py (2)
ExecutedRequestModel(5-6)ExecutedResponseModel(9-10)
🔇 Additional comments (3)
state-manager/app/controller/executed_state.py (1)
12-12: BackgroundTasks integration: LGTMAdding BackgroundTasks to the controller signature is correct and aligns with the PR objective.
state-manager/app/tasks/create_next_state.py (2)
21-33: Bounded wait for template validity: LGTMYou added a 5-minute timeout with a 1s backoff; this mitigates infinite polling. Good.
117-123: Finalize state status on success/error: LGTMMarking SUCCESS at the end and ERRORED on exceptions with persisted error is appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
state-manager/app/controller/executed_state.py (1)
8-8: Import is fine; avoid passing full State objects to the task.See comments below: schedule with state_id (not the whole State) and load fresh in the task to prevent stale data and reduce coupling.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
state-manager/app/controller/executed_state.py(2 hunks)
🔇 Additional comments (2)
state-manager/app/controller/executed_state.py (2)
3-3: Importing BackgroundTasks is correct.This enables lightweight async scheduling via FastAPI’s background runner.
12-12: executed_state signature verified
Confirmed thatbackground_tasksis passed through the router to the controller:
- state-manager/app/routes.py:94:
return await executed_state(namespace_name, ObjectId(state_id), body, x_exosphere_request_id, background_tasks)LGTM.
No description provided.