Skip to content

Commit a7f87c5

Browse files
feat(dspy): implement multi-stage reasoning program for GEPA (#786)
* feat(dspy): implement multi-stage reasoning program for GEPA Implements Phase 1.3 of GEPA optimization with structured reasoning program: - 5 signature classes: TaskAnalysis, Planning, Execution, Monitoring, Recovery - GptmeReasoningProgram module with multi-stage reasoning flow - execute_with_recovery() for robust error handling with retry logic - Integration with PromptOptimizer via use_reasoning_program flag - Maintains backward compatibility with existing prompt optimization Part of implement-gepa-optimization task (Phase 1.3 complete). Co-authored-by: Bob <bob@superuserlabs.org> * fix(dspy): address PR #786 review feedback - Add base_prompt parameter to GptmeReasoningProgram for consistency with GptmeModule - Use logger.exception instead of logger.error for full stack traces - Include eval_spec field in error case Prediction - Improve fallback strings for execution steps - Update prompt_optimizer.py to pass base_prompt to reasoning program - Add docstring note about architecture-only implementation Addresses review comments from ellipsis-dev and greptile-apps bots. Co-authored-by: Bob <bob@superuserlabs.org> * docs(dspy): add inline comment explaining module selection Addresses greptile-apps suggestion to clarify why there are two module creation paths and what use_reasoning_program enables. Co-authored-by: Bob <bob@superuserlabs.org>
1 parent cf741f8 commit a7f87c5

File tree

2 files changed

+329
-1
lines changed

2 files changed

+329
-1
lines changed

gptme/eval/dspy/prompt_optimizer.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
create_composite_metric,
2424
create_trajectory_feedback_metric,
2525
)
26+
from .reasoning_program import GptmeReasoningProgram
2627
from .signatures import GptmeTaskSignature, PromptImprovementSignature
2728

2829
logger = logging.getLogger(__name__)
@@ -182,6 +183,7 @@ def __init__(
182183
max_metric_calls: int | None = None,
183184
reflection_minibatch_size: int = 3,
184185
num_threads: int = 4,
186+
use_reasoning_program: bool = False,
185187
):
186188
self.model = model
187189
self.optimizer_type = optimizer_type
@@ -193,6 +195,7 @@ def __init__(
193195
self.max_metric_calls = max_metric_calls
194196
self.reflection_minibatch_size = reflection_minibatch_size
195197
self.num_threads = num_threads
198+
self.use_reasoning_program = use_reasoning_program
196199
self._setup_dspy()
197200

198201
def _setup_dspy(self):
@@ -224,7 +227,12 @@ def optimize_prompt(
224227
val_data = PromptDataset(eval_specs[train_size : train_size + val_size])
225228

226229
# Create module and optimizer
227-
module = GptmeModule(base_prompt, self.model)
230+
# Create module based on configuration
231+
if self.use_reasoning_program:
232+
# Multi-stage reasoning for GEPA optimization (Phase 1.3)
233+
module = GptmeReasoningProgram(base_prompt)
234+
else:
235+
module = GptmeModule(base_prompt, self.model)
228236
optimizer = self._create_optimizer(eval_specs)
229237

230238
try:
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
"""
2+
Multi-stage reasoning program for GEPA optimization.
3+
4+
This module implements a structured reasoning program that breaks down
5+
task execution into distinct stages: analyze, plan, execute, monitor, and recover.
6+
"""
7+
8+
import logging
9+
from typing import Any
10+
11+
import dspy
12+
13+
from gptme.eval.types import EvalSpec
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
# Stage 1: Task Analysis
19+
class TaskAnalysisSignature(dspy.Signature):
20+
"""
21+
Analyze a task to understand requirements, constraints, and approach.
22+
23+
This is the first stage that breaks down what needs to be done.
24+
"""
25+
26+
task_description = dspy.InputField(desc="The task to be accomplished")
27+
context = dspy.InputField(desc="Available context: files, environment, constraints")
28+
system_capabilities = dspy.InputField(
29+
desc="Available tools and capabilities for task execution"
30+
)
31+
32+
task_type = dspy.OutputField(
33+
desc="Classification of task type (debugging, implementation, research, etc.)"
34+
)
35+
key_requirements = dspy.OutputField(
36+
desc="Essential requirements that must be satisfied"
37+
)
38+
constraints = dspy.OutputField(desc="Constraints and limitations to consider")
39+
approach_strategy = dspy.OutputField(
40+
desc="High-level strategy for approaching this task"
41+
)
42+
43+
44+
# Stage 2: Planning
45+
class PlanningSignature(dspy.Signature):
46+
"""
47+
Create a step-by-step execution plan based on task analysis.
48+
49+
This stage translates analysis into actionable steps.
50+
"""
51+
52+
task_analysis = dspy.InputField(
53+
desc="Results from task analysis stage including requirements and strategy"
54+
)
55+
available_tools = dspy.InputField(
56+
desc="List of available tools and their capabilities"
57+
)
58+
59+
execution_steps = dspy.OutputField(
60+
desc="Ordered list of steps to execute, with tool requirements for each"
61+
)
62+
dependencies = dspy.OutputField(
63+
desc="Dependencies between steps and required ordering"
64+
)
65+
success_criteria = dspy.OutputField(
66+
desc="How to verify each step and overall success"
67+
)
68+
69+
70+
# Stage 3: Execution
71+
class ExecutionSignature(dspy.Signature):
72+
"""
73+
Execute a single step from the plan using appropriate tools.
74+
75+
This stage translates a step into specific tool actions.
76+
"""
77+
78+
step_description = dspy.InputField(
79+
desc="The specific step to execute from the plan"
80+
)
81+
current_state = dspy.InputField(
82+
desc="Current state of execution including previous step results"
83+
)
84+
available_tools = dspy.InputField(desc="Tools available for this step")
85+
86+
tool_selection = dspy.OutputField(desc="Which tool(s) to use and why")
87+
tool_invocation = dspy.OutputField(desc="Specific tool commands or code to execute")
88+
expected_outcome = dspy.OutputField(
89+
desc="What result to expect from this execution"
90+
)
91+
92+
93+
# Stage 4: Monitoring
94+
class MonitoringSignature(dspy.Signature):
95+
"""
96+
Monitor execution results and assess progress toward goals.
97+
98+
This stage evaluates whether steps are succeeding.
99+
"""
100+
101+
step_description = dspy.InputField(desc="The step that was executed")
102+
execution_result = dspy.InputField(desc="The actual result from tool execution")
103+
expected_outcome = dspy.InputField(desc="What was expected from this step")
104+
success_criteria = dspy.InputField(desc="Criteria for determining success")
105+
106+
status = dspy.OutputField(
107+
desc="Status assessment: success, partial_success, failure, or needs_recovery"
108+
)
109+
progress_assessment = dspy.OutputField(
110+
desc="How much progress was made toward the overall goal"
111+
)
112+
issues_detected = dspy.OutputField(desc="Any issues or problems that were detected")
113+
next_action = dspy.OutputField(
114+
desc="Recommended next action: continue, retry, recover, or abort"
115+
)
116+
117+
118+
# Stage 5: Recovery
119+
class RecoverySignature(dspy.Signature):
120+
"""
121+
Develop recovery strategies when errors or failures occur.
122+
123+
This stage handles error cases and develops recovery plans.
124+
"""
125+
126+
error_description = dspy.InputField(desc="Description of what went wrong")
127+
execution_context = dspy.InputField(desc="Context of execution when error occurred")
128+
previous_attempts = dspy.InputField(
129+
desc="Any previous recovery attempts and their outcomes"
130+
)
131+
132+
error_analysis = dspy.OutputField(desc="Analysis of root cause and error type")
133+
recovery_strategy = dspy.OutputField(desc="Strategy for recovering from this error")
134+
alternative_approach = dspy.OutputField(
135+
desc="Alternative approach if recovery strategy fails"
136+
)
137+
preventive_measures = dspy.OutputField(
138+
desc="How to prevent similar errors in future"
139+
)
140+
141+
142+
class GptmeReasoningProgram(dspy.Module):
143+
"""
144+
Multi-stage reasoning program for task execution.
145+
146+
This program structures task execution into five distinct stages:
147+
1. Analyze: Understand the task requirements and constraints
148+
2. Plan: Create a step-by-step execution plan
149+
3. Execute: Execute individual steps using tools
150+
4. Monitor: Assess execution progress and detect issues
151+
5. Recover: Develop recovery strategies when needed
152+
153+
The program is designed to be optimized by GEPA, which can learn
154+
from trajectories across all these stages.
155+
156+
Note: This currently only runs DSPy reasoning chains without actual
157+
gptme evaluation. Future phases will integrate with GptmeModule-style
158+
execution for complete end-to-end optimization.
159+
"""
160+
161+
def __init__(self, base_prompt: str = "You are a helpful AI assistant."):
162+
super().__init__()
163+
self.base_prompt = base_prompt
164+
self.analyze = dspy.ChainOfThought(TaskAnalysisSignature)
165+
self.plan = dspy.ChainOfThought(PlanningSignature)
166+
self.execute = dspy.ChainOfThought(ExecutionSignature)
167+
self.monitor = dspy.ChainOfThought(MonitoringSignature)
168+
self.recover = dspy.ChainOfThought(RecoverySignature)
169+
170+
def forward(
171+
self,
172+
task_description: str,
173+
context: str,
174+
eval_spec: EvalSpec,
175+
available_tools: str = "shell, python, read, save, patch, browser",
176+
) -> dspy.Prediction:
177+
"""
178+
Execute a task through the multi-stage reasoning process.
179+
180+
Args:
181+
task_description: The task to accomplish
182+
context: Context including files, environment, etc.
183+
eval_spec: Original evaluation specification
184+
available_tools: Tools available for execution
185+
186+
Returns:
187+
Prediction containing full execution trajectory
188+
"""
189+
try:
190+
# Stage 1: Analyze the task
191+
analysis = self.analyze(
192+
task_description=task_description,
193+
context=context,
194+
system_capabilities=available_tools,
195+
)
196+
197+
# Stage 2: Create execution plan
198+
plan = self.plan(
199+
task_analysis=str(analysis),
200+
available_tools=available_tools,
201+
)
202+
203+
# Stage 3-5: Execute, monitor, and recover as needed
204+
# For now, we'll execute a simplified version
205+
# Full implementation would iterate through plan steps
206+
execution_steps = (
207+
getattr(plan, "execution_steps", "") or "No execution plan generated"
208+
)
209+
execution = self.execute(
210+
step_description=execution_steps,
211+
current_state="Initial state",
212+
available_tools=available_tools,
213+
)
214+
215+
monitoring = self.monitor(
216+
step_description=execution_steps,
217+
execution_result=str(execution),
218+
expected_outcome=getattr(execution, "expected_outcome", ""),
219+
success_criteria=getattr(plan, "success_criteria", ""),
220+
)
221+
222+
# Build comprehensive response
223+
response_parts = [
224+
"# Task Analysis",
225+
f"Task Type: {getattr(analysis, 'task_type', 'N/A')}",
226+
f"Strategy: {getattr(analysis, 'approach_strategy', 'N/A')}",
227+
"",
228+
"# Execution Plan",
229+
str(getattr(plan, "execution_steps", "N/A")),
230+
"",
231+
"# Execution",
232+
f"Tool Selection: {getattr(execution, 'tool_selection', 'N/A')}",
233+
f"Actions: {getattr(execution, 'tool_invocation', 'N/A')}",
234+
"",
235+
"# Monitoring",
236+
f"Status: {getattr(monitoring, 'status', 'N/A')}",
237+
f"Progress: {getattr(monitoring, 'progress_assessment', 'N/A')}",
238+
]
239+
240+
return dspy.Prediction(
241+
response="\n".join(response_parts),
242+
analysis=analysis,
243+
plan=plan,
244+
execution=execution,
245+
monitoring=monitoring,
246+
eval_spec=eval_spec,
247+
)
248+
249+
except Exception as e:
250+
logger.exception(f"Error in GptmeReasoningProgram: {e}")
251+
return dspy.Prediction(
252+
response=f"Error in reasoning program: {str(e)}",
253+
error=str(e),
254+
eval_spec=eval_spec,
255+
)
256+
257+
def execute_with_recovery(
258+
self,
259+
step_description: str,
260+
current_state: str,
261+
available_tools: str,
262+
max_retries: int = 3,
263+
) -> tuple[dspy.Prediction, bool]:
264+
"""
265+
Execute a step with automatic recovery on failure.
266+
267+
Returns:
268+
Tuple of (execution_result, success_flag)
269+
"""
270+
previous_attempts: list[dict[str, Any]] = []
271+
272+
for attempt in range(max_retries):
273+
# Execute step
274+
execution = self.execute(
275+
step_description=step_description,
276+
current_state=current_state,
277+
available_tools=available_tools,
278+
)
279+
280+
# Monitor execution
281+
monitoring = self.monitor(
282+
step_description=step_description,
283+
execution_result=str(execution),
284+
expected_outcome=getattr(execution, "expected_outcome", ""),
285+
success_criteria="Step completes without errors",
286+
)
287+
288+
status = getattr(monitoring, "status", "failure")
289+
290+
if status in ["success", "partial_success"]:
291+
return execution, True
292+
293+
# Attempt recovery
294+
if attempt < max_retries - 1:
295+
recovery = self.recover(
296+
error_description=getattr(monitoring, "issues_detected", ""),
297+
execution_context=current_state,
298+
previous_attempts=str(previous_attempts),
299+
)
300+
previous_attempts.append(
301+
{
302+
"attempt": attempt + 1,
303+
"error": getattr(monitoring, "issues_detected", ""),
304+
"recovery": str(recovery),
305+
}
306+
)
307+
308+
# Update approach based on recovery strategy
309+
step_description = f"{step_description}\n\nRecovery Strategy: {getattr(recovery, 'recovery_strategy', '')}"
310+
311+
return execution, False
312+
313+
314+
def create_reasoning_program() -> GptmeReasoningProgram:
315+
"""
316+
Factory function to create a reasoning program instance.
317+
318+
This can be extended to support different configurations or variants.
319+
"""
320+
return GptmeReasoningProgram()

0 commit comments

Comments
 (0)