Conversation
There was a problem hiding this comment.
Pull request overview
This pull request adds support for multi-step geospatial processing workflows by implementing a recursive sub-graph architecture. The implementation allows users to specify complex queries that involve multiple sequential operations (e.g., "buffer layer X by 50m AND calculate statistics from that buffer").
Changes:
- Added task decomposition, dependency analysis, and error handling for multi-step workflows
- Introduced new Pydantic schemas for task definitions, decomposition, and error analysis
- Extended ProcessingState with fields for task queue, results tracking, and workflow metadata
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| prompts/system.py | Added prompts for task decomposition, dependency analysis, and multi-step error handling; improved formatting of parameter gathering prompt |
| agents/schemas.py | Added TaskDefinition, TaskDecomposition, DependencyInjection, TaskResult, and ErrorAnalysis schemas; minor formatting improvements to existing schemas |
| agents/states.py | Extended ProcessingState with multi-task workflow fields (task_queue, task_results, current_task_index, completed_tasks, is_multi_step) |
| agents/multi_step_processing.py | New file implementing multi-step processing graph with nodes for decomposition, dependency analysis, task execution, error handling, and workflow finalization |
| agents/graph.py | Added multi_step parameter to build_unified_graph to toggle between single-step and multi-step processing modes |
| agents/init.py | Exported new schemas and build_multi_step_processing_graph function |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| state["_current_task_id"] = task_id | ||
| state["_previous_outputs"] = previous_outputs | ||
|
|
||
| _logger.info( | ||
| f"NODE: analyze_dependencies_node END -> {len(previous_outputs)} outputs available" | ||
| ) | ||
| return state |
There was a problem hiding this comment.
Direct state mutation: The code directly mutates the input state dictionary by assigning to state["_current_task_id"] and state["_previous_outputs"]. Node functions in LangGraph should not mutate the input state but instead return a new state dictionary. This should be changed to return {**state, "_current_task_id": task_id, "_previous_outputs": previous_outputs} at the end of the function (line 235).
| state["_current_task_id"] = task_id | |
| state["_previous_outputs"] = previous_outputs | |
| _logger.info( | |
| f"NODE: analyze_dependencies_node END -> {len(previous_outputs)} outputs available" | |
| ) | |
| return state | |
| _logger.info( | |
| f"NODE: analyze_dependencies_node END -> {len(previous_outputs)} outputs available" | |
| ) | |
| return { | |
| **state, | |
| "_current_task_id": task_id, | |
| "_previous_outputs": previous_outputs, | |
| } |
| for layer in output_layers: | ||
| previous_outputs[f"task_{prev_task_id}_output"] = layer | ||
| _logger.debug(f" Available from task {prev_task_id}: {layer}") |
There was a problem hiding this comment.
Logic error in output layer tracking: When a task produces multiple output layers, this loop overwrites the same dictionary key f"task_{prev_task_id}_output" for each layer, resulting in only the last layer being preserved. If multiple outputs need to be tracked, consider using indexed keys like f"task_{prev_task_id}_output_{idx}" or storing all layers in a list. Alternatively, if only one output per task is expected, use a break statement or take only the first layer.
| for layer in output_layers: | |
| previous_outputs[f"task_{prev_task_id}_output"] = layer | |
| _logger.debug(f" Available from task {prev_task_id}: {layer}") | |
| for idx, layer in enumerate(output_layers): | |
| # Preserve original key for the first output to maintain compatibility, | |
| # and use indexed keys for any additional outputs from the same task. | |
| if idx == 0: | |
| key = f"task_{prev_task_id}_output" | |
| else: | |
| key = f"task_{prev_task_id}_output_{idx}" | |
| previous_outputs[key] = layer | |
| _logger.debug(f" Available from task {prev_task_id}: {layer} (as {key})") |
| graph.set_entry_point("route") | ||
| graph.add_edge("route", "decompose") | ||
| graph.add_edge("decompose", "analyze_deps") | ||
| graph.add_edge("analyze_deps", "discover") |
There was a problem hiding this comment.
Missing conditional routing after route node: The graph unconditionally proceeds from route to decompose and through the processing pipeline, even for non-processing tasks. While the nodes check is_processing_task and skip processing, they still execute unnecessary logic. Consider adding a conditional edge after the route node to bypass the processing pipeline entirely for non-processing tasks, going directly to the llm node for general queries. This would match the pattern used in the single-step processing graph.
| from langgraph.checkpoint.memory import MemorySaver | ||
| from langchain_core.messages import BaseMessage, AIMessage, ToolMessage | ||
|
|
||
| from agents.multi_step_processing import build_multi_step_processing_graph |
There was a problem hiding this comment.
Inconsistent import style: this line uses an absolute import while the surrounding imports use relative imports (lines 14, 16, 17). For consistency with the codebase conventions, this should be changed to a relative import: from .multi_step_processing import build_multi_step_processing_graph
| f"LLM extracted: {parameters}" | ||
| ) | ||
| _logger.error(error_msg) | ||
| return {"error_message": error_msg} |
There was a problem hiding this comment.
Inconsistent state handling: this return statement doesn't preserve the existing state (missing **state), unlike the error handling at lines 512-515. When validation fails for missing required parameters, the function should return {**state, "error_message": error_msg} to preserve all existing state fields.
| return {"error_message": error_msg} | |
| return {**state, "error_message": error_msg} |
| task_results = state.get("task_results", {}) | ||
| task_results[current_task_id] = { | ||
| "task_id": current_task_id, | ||
| "success": True, | ||
| "output_layers": output_layers, | ||
| "execution_result": result, | ||
| "error": None, | ||
| } |
There was a problem hiding this comment.
Potential state mutation issue: The code gets task_results from state and directly mutates it by assigning to task_results[current_task_id]. In LangGraph's state management, this could lead to unintended side effects if the state dictionary is reused. Instead, create a new dictionary: task_results = {**state.get("task_results", {}), current_task_id: {...}} or create a copy before mutation: task_results = dict(state.get("task_results", {})).
| task_results = state.get("task_results", {}) | ||
| task_results[current_task_id] = { | ||
| "task_id": current_task_id, | ||
| "success": False, | ||
| "output_layers": [], | ||
| "execution_result": None, | ||
| "error": str(e), | ||
| } |
There was a problem hiding this comment.
Potential state mutation issue: The code gets task_results from state and directly mutates it by assigning to task_results[current_task_id]. In LangGraph's state management, this could lead to unintended side effects if the state dictionary is reused. Instead, create a new dictionary: task_results = {**state.get("task_results", {}), current_task_id: {...}} or create a copy before mutation: task_results = dict(state.get("task_results", {})).
| 3. Finalize: Summarize all tasks and results | ||
| """ | ||
| import json | ||
| from typing import List, Dict, Any, Optional |
There was a problem hiding this comment.
Import of 'List' is not used.
Import of 'Dict' is not used.
Import of 'Optional' is not used.
| from typing import List, Dict, Any, Optional | |
| from typing import Any |
| from langchain_core.messages import ( | ||
| BaseMessage, | ||
| AIMessage, | ||
| HumanMessage, | ||
| SystemMessage, | ||
| ToolMessage, | ||
| ) |
There was a problem hiding this comment.
Import of 'BaseMessage' is not used.
| ParameterGathering, | ||
| TaskDecomposition, | ||
| DependencyInjection, | ||
| TaskResult, |
There was a problem hiding this comment.
Import of 'TaskResult' is not used.
| TaskResult, |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| from langgraph.checkpoint.memory import MemorySaver | ||
| from langchain_core.messages import BaseMessage, AIMessage, ToolMessage | ||
|
|
||
| from agents.multi_step_processing import build_multi_step_processing_graph |
There was a problem hiding this comment.
Import style is inconsistent with the rest of the codebase. This line uses an absolute import from agents.multi_step_processing while other imports in this file and throughout the codebase use relative imports (e.g., from .states, from ..tools). This should be changed to from .multi_step_processing import build_multi_step_processing_graph to maintain consistency with the established pattern.
| from agents.multi_step_processing import build_multi_step_processing_graph | |
| from .multi_step_processing import build_multi_step_processing_graph |
|
|
||
| if current_idx < len(tasks): | ||
| current_task = tasks[current_idx] | ||
| current_task_id = current_task.get("task_id", current_idx + 1) |
There was a problem hiding this comment.
Variable current_task_id is not used.
| current_task_id = current_task.get("task_id", current_idx + 1) |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Refactor multi-step processing logic to enhance clarity and functionality. Update routing and error handling for better task management.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 10 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| task_results = state.get("task_results", {}) | ||
| task_results[current_task_id] = { | ||
| "task_id": current_task_id, | ||
| "success": True, | ||
| "output_layers": output_layers, | ||
| "execution_result": result, | ||
| "error": None, | ||
| } |
There was a problem hiding this comment.
Similar state mutation issue: task_results is retrieved from state and then modified in-place. This should create a new dictionary instead.
Use the same pattern:
task_results = dict(state.get("task_results", {}))
task_results[current_task_id] = {...}There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
| global llm | ||
| llm = llm_instance |
There was a problem hiding this comment.
This module uses a global llm variable pattern where node functions at module level access a global llm that's only initialized when build_multi_step_processing_graph is called (line 1031-1032). This differs from the pattern in agents/processing.py (which uses nested functions and closures to access llm safely).
While this pattern works, it has potential issues:
- Functions will fail if called before the graph is built
- The dependency on global state is not obvious from function signatures
- It could lead to issues in multi-threaded or test scenarios
Consider refactoring to match the closure-based pattern from processing.py, or at minimum add a module-level docstring explaining this dependency.
There was a problem hiding this comment.
@copilot open a new pull request to apply changes based on this feedback
| for key, value in output_obj.items(): | ||
| if isinstance(value, str) and value not in ["TEMPORARY_OUTPUT"]: |
There was a problem hiding this comment.
The output layer extraction logic iterates through all values in the result dictionary and treats any string value (except "TEMPORARY_OUTPUT") as an output layer. This could incorrectly extract non-layer strings from the result.
For example, if the result contains {"OUTPUT": "layer1", "status": "completed", "method": "buffer"}, all three strings would be extracted as output layers.
Consider filtering more carefully by checking specific known output keys, or verifying that the string value looks like a layer name or file path:
# Known output parameter names
OUTPUT_KEYS = ["OUTPUT", "output", "OUTPUT_LAYER", "output_layer", "RESULT", "result"]
for key, value in output_obj.items():
if key in OUTPUT_KEYS and isinstance(value, str) and value not in ["TEMPORARY_OUTPUT"]:
output_layers.append(value)| for key, value in output_obj.items(): | |
| if isinstance(value, str) and value not in ["TEMPORARY_OUTPUT"]: | |
| # Only consider known output-related keys to avoid treating | |
| # unrelated string fields (e.g. status, method) as layers. | |
| output_keys = ["OUTPUT", "output", "OUTPUT_LAYER", "output_layer", "RESULT", "result"] | |
| for key, value in output_obj.items(): | |
| if ( | |
| key in output_keys | |
| and isinstance(value, str) | |
| and value not in ["TEMPORARY_OUTPUT"] | |
| ): |
| task_results = state.get("task_results", {}) | ||
| task_results[current_task_id] = { | ||
| "task_id": current_task_id, | ||
| "success": False, | ||
| "output_layers": [], | ||
| "execution_result": None, | ||
| "error": str(e), | ||
| } | ||
|
|
There was a problem hiding this comment.
The task_results dictionary is being mutated in-place by retrieving it from state and then modifying it. This can cause issues with state management in LangGraph. The pattern should be to create a new dictionary for task_results.
Change to:
task_results = dict(state.get("task_results", {}))
task_results[current_task_id] = {...}
return {
**state,
"task_results": task_results,
...
}This same issue exists in execute_node_multi (lines 601, 627), execute_llm_task_node (lines 957, 980), and other places where task_results is modified.
| task_results = state.get("task_results", {}) | ||
| task_results[current_task_id] = { | ||
| "task_id": current_task_id, | ||
| "success": True, | ||
| "output_layers": [], | ||
| "execution_result": response_content, | ||
| "error": None, | ||
| } |
There was a problem hiding this comment.
Similar state mutation issue: task_results is being modified in-place. Create a new dictionary copy before modifying.
task_results = dict(state.get("task_results", {}))
task_results[current_task_id] = {...}Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
@iamtekson I've opened a new pull request, #48, to work on those changes. Once the pull request is ready, I'll request review from you. |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
@iamtekson I've opened a new pull request, #49, to work on those changes. Once the pull request is ready, I'll request review from you. |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: iamtekson <39838116+iamtekson@users.noreply.github.com>
Fix state mutation in task_results handling

No description provided.