From c33cd0d4206ce1f8a81c9c75f23e5e686bd54a3c Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 24 Nov 2025 22:44:14 -0800 Subject: [PATCH 01/27] wip: adding agents This adds agents (a combination of MCP client and an LLM backend and a prompt that is also served by the MCP server!) I need a token (ideally Gemini, soon) to continue development and testing. it is looking good! The manager design is similar to before with recovery, and each agent step (an LLM that is given MCP functions and a specific goal) is like a tiny state machine! Signed-off-by: vsoch --- README.md | 54 +++- examples/plans/docker-build-lammps.yaml | 14 ++ fractale/agent/__init__.py | 0 fractale/agent/backends/__init__.py | 24 ++ fractale/agent/backends/gemini.py | 87 +++++++ fractale/agent/backends/llama.py | 109 +++++++++ fractale/agent/backends/llm.py | 32 +++ fractale/agent/backends/openai.py | 77 ++++++ fractale/agent/base.py | 108 ++++++++ fractale/agent/context.py | 98 ++++++++ fractale/agent/defaults.py | 17 ++ fractale/agent/logger.py | 64 +++++ fractale/agent/manager/__init__.py | 1 + fractale/agent/manager/agent.py | 313 ++++++++++++++++++++++++ fractale/agent/manager/plan.py | 172 +++++++++++++ fractale/agent/manager/prompts.py | 48 ++++ fractale/agent/prompts.py | 51 ++++ fractale/cli/__init__.py | 21 +- fractale/cli/agent.py | 22 +- fractale/cli/start.py | 5 +- fractale/tools/build/docker/prompts.py | 77 ++++++ fractale/tools/build/docker/tool.py | 68 ++++- fractale/tools/decorator.py | 46 +--- fractale/tools/manager.py | 58 ++++- fractale/tools/prompts.py | 2 + fractale/version.py | 8 +- setup.py | 4 + 27 files changed, 1474 insertions(+), 106 deletions(-) create mode 100644 examples/plans/docker-build-lammps.yaml create mode 100644 fractale/agent/__init__.py create mode 100644 fractale/agent/backends/__init__.py create mode 100644 fractale/agent/backends/gemini.py create mode 100644 fractale/agent/backends/llama.py create mode 100644 fractale/agent/backends/llm.py create mode 100644 fractale/agent/backends/openai.py create mode 100644 fractale/agent/base.py create mode 100644 fractale/agent/context.py create mode 100644 fractale/agent/defaults.py create mode 100644 fractale/agent/logger.py create mode 100644 fractale/agent/manager/__init__.py create mode 100644 fractale/agent/manager/agent.py create mode 100644 fractale/agent/manager/plan.py create mode 100644 fractale/agent/manager/prompts.py create mode 100644 fractale/agent/prompts.py create mode 100644 fractale/tools/build/docker/prompts.py create mode 100644 fractale/tools/prompts.py diff --git a/README.md b/README.md index 19b7178..88c6e75 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ Tools to add: Start the server in one terminal. Export `FRACTALE_MCP_TOKEN` if you want to require simple token auth. Here is for http. ```bash +export FRACTALE_TOKEN_AUTH=dudewheresmycar fractale start --transport http --port 8089 ``` @@ -64,22 +65,30 @@ curl -s http://0.0.0.0:8089/health | jq python3 examples/mcp/test_echo.py ``` -TODO: +### Agents - - we will want to keep track of state (retries, etc.) for agents somewhere. +The `fractale agent` command provides means to run build, job generation, and deployment agents. +In our [first version](https://github.com/compspec/fractale), an agent corresponded to a kind of task (e.g., build). For this refactored version, the concept of an agent is represented in a prompt or persona, which can be deployed by a generic MCP agent with some model backend (e.g., Gemini, Llama, or OpenAI). Let's test +doing a build: -### Agents +```bash +# In both terminals +export FRACTALE_MCP_TOKEN=dude -**Not written yet** +# In one terminal (start MCP) +fractale start -t http --port 8089 + +# In the other, run the plan +fractale agent ./examples/plans/docker-build-lammps.yaml +``` -The `fractale agent` command provides means to run build, job generation, and deployment agents. -This part of the library is under development. There are three kinds of agents: - - `step` agents are experts on doing specific tasks (do hold state) - `manager` agents know how to orchestrate step agents and choose between them (don't hold state, but could) - - `helper` agents are used by step agents to do small tasks (e.g., suggest a fix for an error) + - `step` agents are experts on doing specific tasks. This originally was an agent with specific functions to do something (e.g., docker build) and now is a generic MCP agent with a prompt that gives it context and a goal. + +The initial design of `helper` agents from the first fractale is subsumed by the idea of an MCP function. A helper agent _is_ an MCP tool. -The design is simple in that each agent is responding to state of error vs. success. In the [first version]() of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. In the case of a step agent, the return code determines to continue or try again. In the case of a helper, the input is typically an erroneous response (or something that needs changing) with respect to a goal. For a manager, we are making a choice based on a previous erroneous step. +The design is simple in that each agent is responding to state of error vs. success. In the [first version](https://github.com/compspec/fractale) of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. It has the same top level design with a manager, but each step agent is like a small state machine governmed by an LLM with access to MCP tools and resources. See [examples/agent](examples/agent) for an example, along with observations, research questions, ideas, and experiment brainstorming! @@ -96,6 +105,33 @@ Here are a few design choices (subject to change, of course). I am starting with - The backend of FastMCP is essentially starlette, so we define (and add) other routes to the server. +### Job Specifications + +#### Simple + +We provide a simple translation layer between job specifications. We take the assumption that although each manager has many options, the actual options a user would use is a much smaller set, and it's relatively straight forward to translate (and have better accuracy). + +See [examples/transform](examples/transform) for an example. + +#### Complex + +We want to: + +1. Generate software graphs for some cluster (fluxion JGF) (this is done with [compspec](https://github.com/compspec/compspec) +2. Register N clusters to a tool (should be written as a python module) +3. Tool would have ability to select clusters from resources known, return +4. Need graphical representation (json) of each cluster - this will be used with the LLM inference + +See [examples/fractale](examples/fractale) for a detailed walk-through of the above. + +For graph tool: + +```bash +conda install -c conda-forge graph-tool +``` + + + ## License HPCIC DevTools is distributed under the terms of the MIT license. diff --git a/examples/plans/docker-build-lammps.yaml b/examples/plans/docker-build-lammps.yaml new file mode 100644 index 0000000..990b591 --- /dev/null +++ b/examples/plans/docker-build-lammps.yaml @@ -0,0 +1,14 @@ +name: "LAMMPS Pipeline" + +plan: + - name: "build" + prompt: "docker-build-persona" + inputs: + description: "LAMMPS (Large-scale Atomic/Molecular Massively Parallel Simulator)" + container: "ghcr.io/hpc-lab/lammps:cpu-latest" + environment: "Rocky Linux 9, CPU Only" + +# - name: "deploy" +# prompt: "k8s-deploy-persona" +# inputs: +# replicas: 4 \ No newline at end of file diff --git a/fractale/agent/__init__.py b/fractale/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fractale/agent/backends/__init__.py b/fractale/agent/backends/__init__.py new file mode 100644 index 0000000..231037d --- /dev/null +++ b/fractale/agent/backends/__init__.py @@ -0,0 +1,24 @@ +BACKENDS = {} + +# Attempt import of each +# This is ugly, but it works! +try: + from .gemini import GeminiBackend + + BACKENDS["gemini"] = GeminiBackend +except ImportError: + pass + +try: + from .openai import OpenAIBackend + + BACKENDS["openai"] = OpenAIBackend +except ImportError: + pass + +try: + from .llama import LlamaBackend + + BACKENDS["llama"] = LlamaBackend +except ImportError: + pass diff --git a/fractale/agent/backends/gemini.py b/fractale/agent/backends/gemini.py new file mode 100644 index 0000000..2055574 --- /dev/null +++ b/fractale/agent/backends/gemini.py @@ -0,0 +1,87 @@ +import os +from typing import Any, Dict, List + +import google.generativeai as genai + +from .llm import LLMBackend + + +class GeminiBackend(LLMBackend): + def __init__(self, model_name="gemini-1.5-pro"): + """ + Init Gemini! We can try the newer one (3.0) when we test. + """ + try: + genai.configure(api_key=os.environ["GEMINI_API_KEY"]) + # I'm allowing this for now because I don't have a working one... + except KeyError: + print("❌ GEMINI_API_KEY missing.") + self.model_name = model_name + self.chat = None + self._usage = {} + + async def initialize(self, mcp_tools: List[Any]): + """ + Convert MCP tools to Gemini Format + """ + gemini_tools = [] + for tool in mcp_tools: + gemini_tools.append( + { + "name": tool.name.replace("-", "_"), # Gemini hates dashes + "description": tool.description, + "parameters": tool.inputSchema, + } + ) + + model = genai.GenerativeModel(self.model_name, tools=gemini_tools) + self.chat = model.start_chat(enable_automatic_function_calling=False) + + async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + """ + Generate Gemini response. + + This is currently setup as a chat - we need to make sure we can do a one-off + message (no memory or bias). + """ + response = None + + # Sending tool outputs (previous turn was a function call) + if tool_outputs: + parts = [] + for output in tool_outputs: + parts.append( + genai.protos.Part( + function_response=genai.protos.FunctionResponse( + name=output["name"].replace("-", "_"), + response={"result": output["content"]}, + ) + ) + ) + response = await self.chat.send_message_async(genai.protos.Content(parts=parts)) + + # Sending new text + elif prompt: + response = await self.chat.send_message_async(prompt) + + # Extract Logic + self._usage = { + "prompt_tokens": response.usage_metadata.prompt_token_count, + "completion_tokens": response.usage_metadata.candidates_token_count, + } + + part = response.candidates[0].content.parts[0] + text_content = response.text if not part.function_call else "" + + tool_calls = [] + if part.function_call: + fc = part.function_call + tool_calls.append( + {"name": fc.name.replace("_", "-"), "args": dict(fc.args)} # Map back to MCP + ) + + return text_content, tool_calls + + @property + def token_usage(self): + return self._usage diff --git a/fractale/agent/backends/llama.py b/fractale/agent/backends/llama.py new file mode 100644 index 0000000..3b060e5 --- /dev/null +++ b/fractale/agent/backends/llama.py @@ -0,0 +1,109 @@ +import json +import os +from typing import Any, Dict, List + +from openai import AsyncOpenAI + +from .llm import LLMBackend + + +class LlamaBackend(LLMBackend): + """ + Backend for Meta Llama 3.1+ models. + """ + + def __init__(self, model_name=None): + # This should be provided by LLAMAME but I haven't tested. + # Why is a llama trying to call me that's not OK. Not sure if I need ollama + base_url = os.environ.get("LLAMA_BASE_URL", "http://localhost:11434/v1") + api_key = os.environ.get("LLAMA_API_KEY", "ollama") + + self.client = AsyncOpenAI(base_url=base_url, api_key=api_key) + + # Default to Llama 3.1 8B if not specified + self.model_name = model_name or os.environ.get("LLAMA_MODEL", "llama3.1") + + self.history = [] + self.tools_schema = [] + self._usage = {} + + async def initialize(self, mcp_tools: List[Any]): + """ + Llama 3.1 follows the OpenAI Tool Schema standard. + + TODO: vsoch see if we can consolidate with OpenAI base when testing. + """ + self.tools_schema = [] + for tool in mcp_tools: + self.tools_schema.append( + { + "type": "function", + "function": { + "name": tool.name, # Llama handles dashes fine + "description": tool.description, + "parameters": tool.inputSchema, + }, + } + ) + + async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + """ + Manage history and call Llama. + """ + if prompt: + # Check if we have a System Prompt set (usually the first message). + # If not, Llama often behaves better with one. + if not self.history: + self.history.append( + { + "role": "system", + "content": "You are a helpful assistant with access to tools. You must use them to answer questions.", + } + ) + self.history.append({"role": "user", "content": prompt}) + + # Handle tool outputs (Function results) + if tool_outputs: + for out in tool_outputs: + self.history.append( + { + "role": "tool", + # Required for the conversation graph + "tool_call_id": out["id"], + "content": str(out["content"]), + } + ) + + try: + response = await self.client.chat.completions.create( + model=self.model_name, + messages=self.history, + tools=self.tools_schema or None, + tool_choice="auto" if self.tools_schema else None, + ) + except Exception as e: + return f"LLAMA API ERROR: {str(e)}", [] + + msg = response.choices[0].message + if response.usage: + self._usage = dict(response.usage) + # Store history and get text content + self.history.append(msg) + text_content = msg.content or "" + + tool_calls = [] + if msg.tool_calls: + for tc in msg.tool_calls: + tool_calls.append( + { + "id": tc.id, + "name": tc.function.name, + "args": json.loads(tc.function.arguments), + } + ) + + return text_content, tool_calls + + @property + def token_usage(self): + return self._usage diff --git a/fractale/agent/backends/llm.py b/fractale/agent/backends/llm.py new file mode 100644 index 0000000..4bd7e14 --- /dev/null +++ b/fractale/agent/backends/llm.py @@ -0,0 +1,32 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List + + +class LLMBackend(ABC): + """ + Abstract interface for any LLM provider (Gemini, OpenAI, Llama, Local). + """ + + @abstractmethod + async def initialize(self, mcp_tools: List[Any]): + """ + Convert MCP tools to provider-specific format and setup session. + """ + pass + + @abstractmethod + async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + """ + Returns a text_response, tool_calls + """ + pass + + @property + @abstractmethod + def token_usage(self) -> Dict: + """ + Return token stats for metadata stuffs. + + Note from V: we need a more robust provenance tracking thing. + """ + pass diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py new file mode 100644 index 0000000..bf56bfd --- /dev/null +++ b/fractale/agent/backends/openai.py @@ -0,0 +1,77 @@ +from typing import Any, Dict, List + +from openai import AsyncOpenAI + +from .llm import LLMBackend + + +class OpenAIBackend(LLMBackend): + """ + Backend to use OpenAI (not tested yet) + """ + + def __init__(self, model_name="gpt-4o"): + self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY")) + self.model_name = model_name + self.history = [] + self.tools_schema = [] + self._usage = {} + + async def initialize(self, mcp_tools: List[Any]): + """ + Tell this jerk about all the MCP tools. + """ + self.tools_schema = [] + for tool in mcp_tools: + self.tools_schema.append( + { + "type": "function", + "function": { + # OpenAI is fine with dashes + "name": tool.name, + "description": tool.description, + "parameters": tool.inputSchema, + }, + } + ) + + async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + """ + Generate the response and update history. + """ + if prompt: + self.history.append({"role": "user", "content": prompt}) + if tool_outputs: + for out in tool_outputs: + self.history.append( + {"role": "tool", "tool_call_id": out["id"], "content": str(out["content"])} + ) + + # Call API + response = await self.client.chat.completions.create( + model=self.model_name, messages=self.history, tools=self.tools_schema or None + ) + + msg = response.choices[0].message + + # Save assistant reply to history + self.history.append(msg) + self._usage = dict(response.usage) + + tool_calls = [] + if msg.tool_calls: + for tc in msg.tool_calls: + tool_calls.append( + { + # OpenAI needs IDs + "id": tc.id, + "name": tc.function.name, + "args": json.loads(tc.function.arguments), + } + ) + + return msg.content, tool_calls + + @property + def token_usage(self): + return self._usage diff --git a/fractale/agent/base.py b/fractale/agent/base.py new file mode 100644 index 0000000..1a92995 --- /dev/null +++ b/fractale/agent/base.py @@ -0,0 +1,108 @@ +import copy +import os +import time +from typing import Any, Dict + +from fractale.logger import logger + + +class Agent: + """ + Base Agent infrastructure. + """ + + def __init__( + self, + name: str = "agent", + results_dir: str = None, + save_incremental: bool = False, + max_attempts: int = None, + ): + self.name = name + self.attempts = 0 + self.max_attempts = max_attempts + + self.results_dir = results_dir or os.getcwd() + self.save_incremental = save_incremental + self.init_metadata() + + # Called by subclass for its specific setup + self.init() + + def init(self): + """ + Init operations, intended to override in subclass. + """ + pass + + def init_metadata(self): + self.metadata = { + "name": self.name, + "times": {}, + "assets": {}, + "failures": [], + # TODO: likely we want to replace this with the metadata registry. + "counts": {"retries": 0, "return_to_manager": 0, "return_to_human": 0}, + "llm_usage": [], + } + + def run(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Main execution wrapper + """ + # Ensure max_attempts is set + context["max_attempts"] = self.max_attempts or context.get("max_attempts") + + # 3. RUN STEP + logger.info(f"▶️ Running {self.name}...") + start_time = time.time() + + try: + # Call abstract method + context = self.run_step(context) + + finally: + duration = time.time() - start_time + self.metadata["times"]["execution"] = duration + + return context + + def run_step(self, context): + """ + Abstract: Implemented by MCPAgent + """ + raise NotImplementedError(f"Agent {self.name} missing run_step") + + def reset_context(self, context): + """ + Clears output variables to prepare for a retry. + """ + # Convert to dict if it's a Context object + is_obj = hasattr(context, "data") + data = context.data if is_obj else context + + # Clear state variables + for key in self.state_variables: + if key in data: + del data[key] + + # Archive current metadata into failures list + if "failures" not in self.metadata: + self.metadata["failures"] = [] + + # Snapshot current metadata + self.metadata["failures"].append(copy.deepcopy(self.metadata)) + + # Reset current counters (keep retries count consistent) + current_retries = self.metadata["counts"]["retries"] + self.init_metadata() + self.metadata["counts"]["retries"] = current_retries + return context + + def reached_max_attempts(self): + """ + Return true if we have reached maximum number of attempts. + """ + if not self.max_attempts: + return False + return self.attempts >= self.max_attempts diff --git a/fractale/agent/context.py b/fractale/agent/context.py new file mode 100644 index 0000000..f11cebb --- /dev/null +++ b/fractale/agent/context.py @@ -0,0 +1,98 @@ +import collections + +import fractale.agent.logger as logger + + +def get_context(context): + """ + Get or create the context. + """ + if isinstance(context, Context): + return context + return Context(context) + + +class Context(collections.UserDict): + """ + A custom dictionary that allows attribute-style access to keys, + and extends the 'get' method with a 'required' argument. + + The context for an agent should be populated with metadata that + needs to move between agents. The manager decides what from the + context to pass to agents for an updated context. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def reset(self): + """ + Reset the return code and result. + """ + for key in ["return_code", "result", "error_message"]: + self.data[key] = None + + def is_managed(self): + """ + Is the context being managed? + """ + return self.get("managed") is True + + def __getattribute__(self, name): + """ + Intercepts all attribute lookups (including methods/functions) + """ + try: + # Step 1: this would be a normal attribute + attr = object.__getattribute__(self, name) + except AttributeError: + # Then handle lookup of dict key by attribute + return super().__getattribute__(name) + + # Step 2: We allow "get" to be called with defaults / required. + if name == "get": + original_get = attr + + def custom_get(key, default=None, required=False): + """ + Wrapper for the standard dict.get() method. + Accepts the custom 'required' argument. + """ + if required: + if key not in self.data: + raise ValueError(f"Key `{key}` is required but missing") + logger.exit(f"Key `{key}` is required but missing", title="Context Status") + + # If required and found, just return the value + return self.data[key] + else: + # If not required, use the original dict.get behavior + return original_get(key, default) + + # Return the wrapper function instead of the original method + return custom_get + + # 4. For any other attribute (like keys(), items(), update(), or custom methods) + # return the attribute we found earlier + return attr + + # 5. Override __getattr__ to handle attribute-style access to dictionary keys + def __getattr__(self, name): + """ + Allows access to dictionary keys as attributes. + """ + if name in self.data: + return self.data[name] + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + def __setattr__(self, name, value): + """ + Allows setting keys via attribute assignment. + """ + # If the attribute name is a reserved name (like 'data'), set it normally + if name in ("data", "_data"): + super().__setattr__(name, value) + + # Otherwise, treat it as a dictionary key + else: + self.data[name] = value diff --git a/fractale/agent/defaults.py b/fractale/agent/defaults.py new file mode 100644 index 0000000..214a5b1 --- /dev/null +++ b/fractale/agent/defaults.py @@ -0,0 +1,17 @@ +environment = "generic cloud environment" +gemini_model = "gemini-2.5-pro" + +# These are common / default args we don't need to give in any prompt. +shared_args = { + "command", + "config_dir", + "debug", + "error_message", + "incremental", + "outfile", + "plan", + "quiet", + "result", + "results", + "version", +} diff --git a/fractale/agent/logger.py b/fractale/agent/logger.py new file mode 100644 index 0000000..a560136 --- /dev/null +++ b/fractale/agent/logger.py @@ -0,0 +1,64 @@ +import sys + +from rich import print +from rich.panel import Panel + + +def success(message, title="Success", border_style="green", expand=True): + """ + Helper function to print successful message. + """ + print( + Panel( + f"[bold green]✅ {message}[/bold green]", + title=title, + border_style=border_style, + expand=expand, + ) + ) + + +def error(message, title="Error", border_style="red", expand=True): + """ + Helper function to print error "beep boop" message. + """ + print( + Panel( + f"[bold red]❌ {message}[/bold red]", + title=title, + border_style=border_style, + expand=expand, + ) + ) + + +def exit(message, title="Error", border_style="red", expand=True): + error(message, title, border_style, expand) + sys.exit(-1) + + +def warning(message, title="Warning", border_style="yellow"): + """ + Helper function to print a warning + """ + print( + Panel( + message, + title=f"[yellow]{title}[/yellow]", + border_style=border_style, + ) + ) + + +def custom(message, title, border_style=None, expand=True): + """ + Custom message / title Panel. + """ + if not border_style: + print(Panel(message, title=title, expand=expand)) + else: + print(Panel(message, title=title, border_style=border_style, expand=expand)) + + +def info(message): + print(f"\n[bold cyan] {message}[/bold cyan]") diff --git a/fractale/agent/manager/__init__.py b/fractale/agent/manager/__init__.py new file mode 100644 index 0000000..cf32844 --- /dev/null +++ b/fractale/agent/manager/__init__.py @@ -0,0 +1 @@ +from .agent import ManagerAgent diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py new file mode 100644 index 0000000..2ac5d02 --- /dev/null +++ b/fractale/agent/manager/agent.py @@ -0,0 +1,313 @@ +import asyncio +import json +import os +from datetime import datetime + +from rich import print + +import fractale.agent.logger as logger +import fractale.agent.manager.prompts as prompts +import fractale.utils as utils +from fractale.agent.agent import MCPAgent +from fractale.agent.context import get_context +from fractale.agent.manager.plan import Plan +from fractale.utils.timer import Timer + +# The manager IS an agent itself since it needs to decide how to recover. + + +class ManagerAgent(MCPAgent): + """ + An LLM-powered orchestrator that executes a plan. + It can perform error recovery by asking the backend LLM. + """ + + def init(self): + """ + Initialize the MCPAgent infrastructure (MCP Client + Backend). + """ + # This sets up the MCP connection and LLM Backend + super().init() + + def get_recovery_step(self, context, failed_step, plan): + """ + Uses the LLM Backend to decide which agent to call to fix an error. + """ + # We describe each step (akin to a function) for the manager to choose + descriptions = "" + for step in plan.agents: + descriptions += f"- {step.agent}: {step.description}\n" + if step.agent == failed_step.agent: + break + + # Build the Prompt to recover from some failure. + prompt_text = prompts.recovery_prompt % ( + descriptions, + failed_step.agent, + context.error_message, + ) + logger.warning( + f"🤔 Consulting Manager for recovery from {failed_step.agent} failure...", + title="Error Triage", + ) + + step_json = None + attempts = 0 + + while not step_json and attempts < 3: + attempts += 1 + try: + # Use the Backend directly - get back tuple (text, calls) + # We ask the backend to generate a response based on this prompt. + response_text = asyncio.run(self.backend.generate_response(prompt=prompt_text))[0] + step_json = json.loads(utils.get_code_block(response_text, "json")) + + # Validate + if "agent_name" not in step_json or "task_description" not in step_json: + raise ValueError("Missing keys") + if step_json["agent_name"] not in plan.agent_names: + raise ValueError(f"Unknown agent {step_json['agent_name']}") + + except Exception as e: + step_json = None + + # Tell agent what it did wrong :) + prompt_text = prompts.recovery_error_prompt % ( + descriptions, + failed_step.agent, + context.error_message, + str(e), + ) + logger.warning(f"Manager failed to parse recovery plan, retrying: {e}") + + return step_json + + def run(self, context): + """ + Executes a plan-driven workflow with intelligent error recovery. + """ + # Ensure context is wrapped + context = get_context(context) + + # Init metadata if needed + if "recovery" not in self.metadata["assets"]: + self.metadata["assets"]["recovery"] = {} + + context.managed = True + self.max_attempts = self.max_attempts or 10 + + # Plan parses the list of agent configs (prompts) + plan_path = context.get("plan", required=True) + plan = Plan(plan_path, save_incremental=self.save_incremental) + + # STOPPED HERE - need working token to run! + import IPython + + IPython.embed() + + # Ensure that persons in plan exist at API. + print(f"🔎 Validating personas. Available: {len(valid_personas)}") + + # These are personas / roles available for agents + # 1. Connect & Discover Tools + prompts = asyncio.run(self.get_prompts_list()) + + # 2. Initialize Backend with these tools + + # await self.backend.initialize(mcp_tools) + + # 3. Initial Prompt + # 'response_text' is what the LLM says to the user + # 'calls' is a list of tools it wants to run + # response_text, calls = await self.backend.generate_response(prompt=prompt_text) + + # for step in plan.agents: + # if step.prompt not in valid_persons: + + # if prompt_name not in valid_personas: + # raise ValueError( + # f"❌ Plan Error: Unknown Persona '{prompt_name}' in step '{step['name']}'.\n" + # f"Available Personas: {list(valid_personas)}" + # ) + # except ImportError: + # Fallback if registry module isn't ready or circular import issues persist + # print("⚠️ Warning: Skipping persona validation (registry not available)") + + # import IPython + # IPython.embed() + + # logger.custom( + # f"Manager Initialized with Agents: [bold cyan]{plan.agent_names}[/bold cyan]", + # title="[green]Manager Status[/green]", + # ) + + # try: + # tracker = self.run_tasks(context, plan) + # logger.custom( + # f"Tasks complete: [bold magenta]{len(tracker)} steps[/bold magenta]", + # title="[green]Manager Status[/green]", + # ) + # self.save_results(tracker, plan) + # return tracker # Return tracker as result + + # except Exception as e: + # logger.error(f"Orchestration failed:\n{str(e)}", title="Orchestration Failed", expand=False) + # raise e + + # NOTE from vsoch: stuff below here is old (from v1) and I'll refactor it when I can keep testing above. + + def run_tasks(self, context, plan): + """ + Run agent tasks until stopping condition. + """ + tracker = [] + timer = Timer() + current_step_index = 0 + + # Global Manager Loop + while current_step_index < len(plan): + step_agent = plan[ + current_step_index + ] # This is an instance of MCPAgent (e.g. UniversalAgent) + + logger.custom( + f"Executing step {current_step_index + 1}/{len(plan)}: [bold cyan]{step_agent.name}[/bold cyan]", + title=f"[blue]Attempt {self.attempts}[/blue]", + ) + + # --- EXECUTE AGENT --- + # Using the new .run() interface from BaseAgent + with timer: + # The agent updates the context in place and returns it + context = step_agent.run(context) + + # Record metrics + # Note: step_agent.metadata is populated by BaseAgent + tracker.append( + { + "agent": step_agent.name, + "total_seconds": timer.elapsed_time, + "result": context.get("result"), + "error": context.get("error_message"), + "attempts": step_agent.attempts + 1, + "metadata": step_agent.metadata, # Detailed logs + } + ) + + # --- CHECK SUCCESS --- + # If we have a result and no error message, success. + if context.get("result") and not context.get("error_message"): + current_step_index += 1 + context.reset() # Clear temp results for next step + continue # Move to next step + + # --- FAILURE & RECOVERY --- + else: + logger.error(f"Step {step_agent.name} Failed: {context.get('error_message')}") + + # Check global manager limits + if self.reached_max_attempts(): + logger.error("Manager reached max attempts. Aborting.") + break + + self.attempts += 1 + + # If first step fails, just hard reset + if current_step_index == 0: + context = self.reset_context(context, plan=plan) + continue + + # RECOVERY LOGIC + # Ask the Manager (Self) to pick a previous step to retry from + recovery_step = self.get_recovery_step(context, step_agent, plan) + + if not recovery_step: + logger.error("Manager could not determine a recovery plan. Aborting.") + break + + if step_agent.name not in self.metadata["assets"]["recovery"]: + self.metadata["assets"]["recovery"][step_agent.name] = [] + self.metadata["assets"]["recovery"][step_agent.name].append(recovery_step) + + target_agent_name = recovery_step["agent_name"] + logger.warning(f"Rolling back to agent: [bold cyan]{target_agent_name}[/bold cyan]") + + # Find index of target agent + # (Assuming plan object allows finding index by name) + # Simple linear search logic: + found_index = -1 + for idx, ag in enumerate(plan.agents): + if ag.name == target_agent_name: + found_index = idx + break + + if found_index == -1: + logger.error(f"Recovery agent {target_agent_name} not found in plan!") + break + + current_step_index = found_index + + # Reset context up to that point + # (We rely on the step.reset_context implementation from BaseAgent) + context = self.reset_context(context, plan, plan[current_step_index]) + + # Inject advice so it doesn't repeat the mistake + issues = self.assemble_issues(step_agent.name) + # Update context with a hint for the next run + context["error_message"] = prompts.get_retry_prompt(context, issues) + + continue + + # Final Status + if current_step_index == len(plan): + self.metadata["status"] = "Succeeded" + else: + self.metadata["status"] = "Failed" + + return tracker + + def save_results(self, tracker, plan): + """Save results to file based on timestamp.""" + if not os.path.exists(self.results_dir): + os.makedirs(self.results_dir) + now = datetime.now() + timestamp = now.strftime("%Y-%m-%d_%H-%M-%S") + results_file = os.path.join(self.results_dir, f"results-{timestamp}.json") + + # We assume plan has a .plan attribute or similar to get raw dict + manager_meta = getattr(plan, "plan", {}) + + if self.metadata["times"]: + manager_meta["times"] = self.metadata["times"] + if self.metadata["assets"]["recovery"]: + manager_meta["recovery"] = self.metadata["assets"]["recovery"] + + result = {"steps": tracker, "manager": manager_meta, "status": self.metadata["status"]} + utils.write_json(result, results_file) + + def reset_context(self, context, plan, failed_step=None): + """ + Reset context state variables up to the failed step. + """ + # We iterate through agents and call their reset logic + for step in plan.agents: + context = step.reset_context(context) + + # If we reached the step we are rolling back to, stop clearing. + # (We want to KEEP the state generated BY this step's previous successful run? + # Actually, usually if we roll back TO a step, we want to clear THAT step's output + # so it runs fresh.) + if failed_step is not None and step.name == failed_step.name: + break + return context + + def assemble_issues(self, agent_name): + """ + Get list of previous issues for context injection. + """ + if agent_name not in self.metadata["assets"]["recovery"]: + return [] + issues = [] + for issue in self.metadata["assets"]["recovery"][agent_name]: + issues.append(issue["task_description"]) + return issues diff --git a/fractale/agent/manager/plan.py b/fractale/agent/manager/plan.py new file mode 100644 index 0000000..7c8d07a --- /dev/null +++ b/fractale/agent/manager/plan.py @@ -0,0 +1,172 @@ +import jsonschema +from jsonschema import validators +from rich import print + +import fractale.utils as utils + + +def set_defaults(validator, properties, instance, schema): + """ + Fill in default values for properties that are missing. + """ + for prop, sub_schema in properties.items(): + if "default" in sub_schema: + instance.setdefault(prop, sub_schema["default"]) + + +# Extend validator to apply defaults +plan_validator = validators.extend( + jsonschema.Draft7Validator, + {"properties": set_defaults}, +) + +plan_schema = { + "type": "object", + "properties": { + "name": {"type": "string"}, + "description": {"type": "string"}, + "inputs": {"type": "object", "default": {}}, + "plan": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + # This defines the (previous) agent or persona/role + "prompt": {"type": "string"}, + "description": {"type": "string"}, + "inputs": { + "type": "object", + "additionalProperties": True, + }, + }, + "required": ["name", "prompt"], + }, + }, + }, + "required": ["name", "plan"], +} + + +class Plan: + """ + A plan for a manager includes one or more steps, each defined by a Prompt Persona. + """ + + def __init__(self, plan, save_incremental=False): + if isinstance(plan, dict): + self.plan = plan + self.plan_path = "memory" + else: + self.plan_path = plan + self.plan = utils.read_yaml(self.plan_path) + + self.step_names = set() + self.save_incremental = save_incremental + + # Validate structure... + self.validate_schema() + + # And create steps. Personas validated outside of here with manager. + self.load() + + def validate_schema(self): + """ + Validate against JSON schema. + """ + validator = plan_validator(plan_schema) + try: + validator.validate(self.plan) + print("✅ Plan schema is valid.") + except Exception as e: + raise ValueError(f"❌ Plan YAML invalid: {e}!") + + def load(self): + """ + Initialize the Step objects. + """ + print(f"Loading plan from [bold magenta]{self.plan_path}[/bold magenta]...") + self.agents = [] # We refer to steps as agents (v1 of this library) + + for spec in self.plan.get("plan", []): + step_name = spec["name"] + + if step_name in self.step_names: + raise ValueError(f"Duplicate step name: '{step_name}'") + self.step_names.add(step_name) + + step = Step(spec, save_incremental=self.save_incremental) + self.agents.append(step) + + def __len__(self): + return len(self.agents) + + def __getitem__(self, key): + """ + Allows indexing plan[0] + """ + return self.agents[key] + + @property + def agent_names(self): + """ + Used by Manager for recovery lookup. + """ + return [step.name for step in self.agents] + + +class Step: + """ + Wraps a specific execution step. + """ + + def __init__(self, step_spec, save_incremental=False): + self.step_spec = step_spec + self.save_incremental = save_incremental + + def execute(self, context): + """ + Run this step. + """ + # TODO vsoch: need to think about if this is necessary. + # I don't think so, I think a step is just a metadata holder. + # The "run step" will be a call to the MCP function. + # I'm leaving for now so I don't forget the previous design. + context = self.update_context(context) + if not hasattr(context, "agent_config"): + context.agent_config = {} + + # Map prompt from YAML to the config the Agent expects + context.agent_config = {"source_prompt": self.step_spec["prompt"], "step_name": self.name} + print("RUN STEP") + import IPython + + IPython.embed() + + def update_context(self, context): + """ + Merge step-specific inputs into the context. + """ + overrides = ["max_attempts"] + inputs = self.step_spec.get("inputs", {}) + + for k, v in inputs.items(): + if k not in overrides: + context[k] = v + return context + + @property + def name(self): + return self.step_spec["name"] + + @property + def prompt(self): + return self.step_spec["prompt"] + + @property + def description(self): + return self.step_spec.get("description", f"Executes persona: {self.prompt}") + + def get(self, name, default=None): + return self.step_spec.get(name, default) diff --git a/fractale/agent/manager/prompts.py b/fractale/agent/manager/prompts.py new file mode 100644 index 0000000..d647bbd --- /dev/null +++ b/fractale/agent/manager/prompts.py @@ -0,0 +1,48 @@ +from fractale.agent.prompts import Prompt + +recovery_prompt = f"""You are an expert AI workflow troubleshooter. A step in a workflow has failed and reached a maximum number of retries. THis could mean that we need to go back in the workflow and redo work. Your job is to analyze the error and recommend a single, corrective step. The steps, each associated with an agent, you can choose from are as follows: + +Available Agents: +%s + +The above is in the correct order, and ends on the agent that ran last with the failure (%s). The error message of the last step is the following: + +%s + +Your job is to analyze the error message to determine the root cause, and decide which agent is best suited to fix this specific error. +- You MUST formulate a JSON object for the corrective step with two keys: "agent_name" and "task_description". +- The new "task_description" MUST be a clear instruction for the agent to correct the specific error. +- You MUST only provide a single JSON object for the corrective step in your response. +- You MUST format your `task_description` to be a "You MUST" statement to the agent. +""" + +# Same three inputs as above plus the unsuccessful attempt +recovery_error_prompt = recovery_prompt.strip() + " Your last attempt was not successful:\n%s" + +retry_task = """You have had previous attempts, and here are summaries of previous issues: + +{% for issue in issues %} + - {{ issue }} +{% endfor %} +""" + +retry_instructions = ["You MUST avoid making these errors again."] + +persona = "You are manager of an agentic AI team." +retry_prompt = { + "persona": persona, + "context": "A step in your workflow has determined it cannot continue and returned to you.", + "instructions": retry_instructions, + "task": retry_task, +} + + +def get_retry_prompt(context, issues): + """ + In testing, this should JUST be the error message. + """ + # This is an impossible case - we would have appended the task descriptions if this is getting called + # but you never know... + if not issues: + return "" + return Prompt(retry_prompt, context).render({"issues": issues}) diff --git a/fractale/agent/prompts.py b/fractale/agent/prompts.py new file mode 100644 index 0000000..afe1eb4 --- /dev/null +++ b/fractale/agent/prompts.py @@ -0,0 +1,51 @@ +import copy + +from jinja2 import Template + +template = """ + Persona: + {{persona}} + + {% if context %}Context: {{context|trim}}{% endif %} + + Task: {{task|trim}} + + {% if instructions %}Instructions & Constraints: + {% for line in instructions %}{{line}} + {% endfor %}{% endif %} +""" + + +class Prompt: + """ + A prompt is a structured instruction for an LLM. + We attempt to define the persona, context, task, audience, instructions, and constraints. + Data sections should use words MUST, MUST NOT, AVOID, ENSURE + """ + + def __init__(self, data, context): + """ + This currently assumes setting a consistent context for one generation. + If that isn't the case, context should be provided in the function below. + """ + self.data = data + self.context = context + + def render(self, kwargs): + """ + Render the final user task, and then the full prompt. + """ + # The kwargs are rendered into task + render = copy.deepcopy(self.data) + + # Do we have additional details for instrucitons? + try: + render["instructions"] += (self.context.get("details") or "").split("\n") + except: + print("ISSUE WITH RENDER IN GENERIC PROMPTS") + import IPython + + IPython.embed() + render["task"] = Template(self.data["task"]).render(**kwargs) + prompt = Template(template).render(**render) + return prompt diff --git a/fractale/cli/__init__.py b/fractale/cli/__init__.py index fb06e8a..11042d6 100644 --- a/fractale/cli/__init__.py +++ b/fractale/cli/__init__.py @@ -10,8 +10,6 @@ install() import fractale -import fractale.agent.parser as parsers -import fractale.defaults as defaults from fractale.logger import setup_logger @@ -94,24 +92,10 @@ def get_parser(): formatter_class=argparse.RawTextHelpFormatter, description="run an agent", ) - agents = agent.add_subparsers( - title="agent", - description="Run an agent", - ) agent.add_argument( - "--plan", - "-p", - dest="plan", + "plan", help="provide a plan to a manager", ) - # If exists, we will attempt to load and use. - agent.add_argument( - "--use-cache", - dest="use_cache", - help="Use (load and save) local cache in pwd/.fractale/", - action="store_true", - default=False, - ) agent.add_argument( "--max-attempts", help="Maximum attempts for a manager or individual agent", @@ -128,9 +112,6 @@ def get_parser(): action="store_true", default=False, ) - - # Add agent parsers - parsers.register(agents) return parser diff --git a/fractale/cli/agent.py b/fractale/cli/agent.py index 8fbbf3f..8f099b8 100644 --- a/fractale/cli/agent.py +++ b/fractale/cli/agent.py @@ -1,37 +1,19 @@ -import sys - -from fractale.agent import get_agents +from fractale.agent.manager import ManagerAgent def main(args, extra, **kwargs): """ Run an agent (do with caution!) """ - agents = get_agents() - - # If we have a plan, we run the manager. - if args.plan is not None: - args.agent_name = "manager" - - # Right now we only have a build agent :) - if args.agent_name not in agents: - sys.exit(f"{args.agent_name} is not a recognized agent.") - # Get the agent and run! # - results determines if we want to save state to an output directory # - save_incremental will add a metadata section # - max_attempts is for the manager agent (defaults to 10) - agent = agents[args.agent_name]( - use_cache=args.use_cache, + agent = ManagerAgent( results_dir=args.results, save_incremental=args.incremental, max_attempts=args.max_attempts, ) - # This is the context - we can remove variables not needed context = vars(args) - del context["use_cache"] - - # This is built and tested! We can do something with it :) - # Note that vars converts the argparse arguments into a dictionary agent.run(context) diff --git a/fractale/cli/start.py b/fractale/cli/start.py index 77dbfad..3bc6f51 100644 --- a/fractale/cli/start.py +++ b/fractale/cli/start.py @@ -31,8 +31,7 @@ def main(args, extra, **kwargs): print(f"🔌 Loading tools... ") # Load into the manager (tools, resources, prompts) - for tool in manager.load_tools(args.tools): - mcp.add_tool(tool) + for tool in manager.load_tools(mcp, args.tools): print(f" ✅ Registered: {tool.name}") # Mount the MCP server. Note from V: we can use mount with antother FastMCP @@ -50,4 +49,4 @@ def main(args, extra, **kwargs): # For testing we usually control+C, let's not make it ugly except KeyboardInterrupt: - print("🖥️ Shutting down...") + print("🖥️ Shutting down...") diff --git a/fractale/tools/build/docker/prompts.py b/fractale/tools/build/docker/prompts.py new file mode 100644 index 0000000..477396a --- /dev/null +++ b/fractale/tools/build/docker/prompts.py @@ -0,0 +1,77 @@ +from fractale.tools.prompts import format_rules + +PERSONA = "You are a Dockerfile build expert." + +CONTEXT = """ +We are running experiments that deploy containerized HPC applications. +You are the agent responsible for the build step in that pipeline. +""" + +REQUIRES = [ + "You MUST NOT change the name of the application container image provided.", + "Don't worry about users/permissions - just be root.", + "DO NOT forget to install certificates and you MUST NOT apt-get purge.", + "Assume a default of CPU if GPU or CPU is not stated.", + "Do NOT do a multi-stage build, and do NOT COPY or ADD anything.", + "You MUST COPY executables to a system location to be on the PATH. Do NOT symlink", + "You are only scoped to edit a Dockerfile to build the image.", +] + +COMMON_INSTRUCTIONS = [ + "If the application involves MPI, configure it for compatibility for the containerized environment.", + 'Do NOT add your narration unless it has a "#" prefix to indicate a comment.', +] + REQUIRES + + +def get_build_text(application, environment, build_rules): + """ + Get prompt text for an initial build. + """ + return f""" +### PERSONA +{PERSONA} + +### CONTEXT +{CONTEXT} + +### GOAL +I need to create a Dockerfile for an application '{application}'. +The target environment is '{environment}'. +Please generate a robust, production-ready Dockerfile. + +### REQUIREMENTS & CONSTRAINTS +You must adhere to these rules strictly: +{format_rules(build_rules)} + +### INSTRUCTIONS +1. Analyze the requirements and generate the Dockerfile content. +2. Use provided tools to build the image. +3. Respond appropriately to errors. +""" + + +def get_retry_prompt(fix_rules, error_message): + return f""" +### PERSONA +{PERSONA} + +### CONTEXT +{CONTEXT} + +### STATUS: BUILD FAILED +Your previous Dockerfile build has failed. Here is the instruction for how to fix it: + +```text +{error_message} +``` + +### REQUIREMENTS + +Please analyze the error and your previous work, and provide a corrected version. +{format_rules(fix_rules)} + +### INSTRUCTIONS +1. Read the error log above carefully. +2. Modify the Dockerfile using your file tools. +3. Use a provided tool to retry the build. +""" diff --git a/fractale/tools/build/docker/tool.py b/fractale/tools/build/docker/tool.py index d642875..25d68f4 100644 --- a/fractale/tools/build/docker/tool.py +++ b/fractale/tools/build/docker/tool.py @@ -1,24 +1,21 @@ from fractale.tools.base import BaseTool from fractale.tools.decorator import mcp +import fractale.tools.build.docker.prompts as prompts import fractale.agent.logger as logger import fractale.utils as utils import shutil import re import os -import sys import shutil import tempfile import subprocess -import textwrap +import shlex from rich import print from rich.syntax import Syntax -name = "docker-build" - class DockerBuildTool(BaseTool): - def setup(self): """ Setup ensures we have docker or podman installed. @@ -29,6 +26,29 @@ def setup(self): if not self.docker: raise ValueError("docker and podman are not present on the system.") + # @mcp.tool(name="docker-run") + def run_container(self, uri: str, command: str): + """ + Run a docker container. Accepts an optional unique resource identifier (URI). + + uri: the unique resource identifier. + command: string to run (will be shlex split) + """ + # Prepare command to push (docker or podman) + command = [self.docker, "run", "-it", uri] + shlex.split(command) + logger.info(f"Running {command}...") + p = subprocess.run( + command, + capture_output=True, + text=True, + check=False, + ) + if p.returncode != 0: + output = "ERROR: " + p.stdout + p.stderr + logger.warning(f"Issue with docker run: {output}") + return logger.failure(output) + return logger.success(output) + @mcp.tool(name="docker-push") def push_container(self, uri: str, all_tags: bool = False): """ @@ -59,7 +79,7 @@ def push_container(self, uri: str, all_tags: bool = False): return logger.failure(output) return logger.success(output) - @mcp.tool(name=name) + @mcp.tool(name="docker-build") def build_container(self, dockerfile: str, uri: str, platforms: str = None): """ Build a docker container. Accepts an optional unique resource identifier (URI). @@ -134,7 +154,7 @@ def filter_output(self, output): # Try to match lines that start with # return "\n".join([x for x in output.split("\n") if not re.search(r"^#(\d)+ ", x)]) - @mcp.tool(name="kind-docker-load") + # @mcp.tool(name="kind-docker-load") def load_kind(self, uri: str): """ Load a Docker URI into Kind (Kubernetes in Docker) @@ -166,3 +186,37 @@ def print_result(self, dockerfile): logger.custom( highlighted_syntax, title="Final Dockerfile", border_style="green", expand=True ) + + @mcp.prompt(name="docker-build-persona", description="Instructions for a fresh build") + def build_persona_prompt(self, application: str, environment: str = "CPU") -> dict: + """ + Generates agent instructions for creating a NEW Dockerfile. + """ + # Specific rules for a fresh build + build_rules = [ + "The Dockerfile content you generate must be complete and robust.", + "The response should ONLY contain the complete Dockerfile.", + "Use the available tools (files-write) to save the Dockerfile to disk.", + ] + prompts.COMMON_INSTRUCTIONS + + # Construct the text from our template + prompt_text = prompts.get_build_prompt(application, environment, build_rules) + + # Return MCP format + return {"messages": [{"role": "user", "content": {"type": "text", "text": prompt_text}}]} + + @mcp.prompt( + name="docker-fix-persona", description="Instructions for fixing or retrying a build" + ) + def fix_persona_prompt(self, error_message: str) -> dict: + """ + Generates system instructions for retrying a failed build. + """ + # 1. Define specific rules for fixing + fix_rules = [ + "The response should only contain the complete, corrected Dockerfile content.", + "Use succinct comments in the Dockerfile to explain build logic and changes.", + ] + prompts.COMMON_INSTRUCTIONS + + prompt_text = prompts.get_retry_prompt(fix_rules, error_message) + return {"messages": [{"role": "user", "content": {"type": "text", "text": prompt_text}}]} diff --git a/fractale/tools/decorator.py b/fractale/tools/decorator.py index 4b17bba..727b22c 100644 --- a/fractale/tools/decorator.py +++ b/fractale/tools/decorator.py @@ -1,9 +1,5 @@ -import functools -import time from typing import List -from fractale.metrics import DurationMetric, metrics - class McpProxy: """ @@ -18,44 +14,12 @@ def tool(self, name: str = None, description: str = None, tags: List[str] = None """ def decorator(func): - - def record_timing(start_time, error=None): - """ - Wrapper to record timing of tool. - """ - end_time = time.perf_counter() - tool_id = name or func.__name__ - - # Create the specific Metric object - metric = DurationMetric( - name=tool_id, - start_time=start_time, - end_time=end_time, - duration=end_time - start_time, - success=(error is None), - metadata={"error": str(error)} if error else {}, - ) - - # Push to generic registry - metrics.record(metric) - return metric.duration - - @functools.wraps(func) - def sync_wrapper(*args, **kwargs): - start = time.perf_counter() - result = func(*args, **kwargs) - dur = record_timing(start) - # Add the duration to the result for the LLM - result += f"\n\n[⏱️ {dur:.2f}s]" - return result - - wrapper = sync_wrapper default_name = (func.__module__.lower() + "-" + func.__name__.lower()).replace(".", "-") - wrapper._mcp_name = name or default_name - wrapper._mcp_desc = description - wrapper._mcp_tags = tags - wrapper._is_mcp_tool = True - return wrapper + func._mcp_name = name or default_name + func._mcp_desc = description + func._mcp_tags = tags + func._is_mcp_tool = True + return func return decorator diff --git a/fractale/tools/manager.py b/fractale/tools/manager.py index b0706dc..03b2a01 100644 --- a/fractale/tools/manager.py +++ b/fractale/tools/manager.py @@ -61,7 +61,7 @@ def discover_tools(self, root_path: str, module_path: str) -> Dict[str, Path]: discovered[tool_id] = {"path": file_path, "module": import_path, "root": root_path} return discovered - def load_tools(self, names=None): + def load_tools(self, mcp, names=None): """ Load a set of named tools, or default to all those discovered. """ @@ -95,7 +95,22 @@ def load_tools(self, names=None): # Get the decorated functions for func in getfunc(): - yield ToolClass.from_function(func, name=func._mcp_name) + + # This is how we handle dynamic loading + endpoint = ToolClass.from_function(func, name=func._mcp_name) + + # @mcp.tool + if ToolClass == Tool: + mcp.add_tool(endpoint) + + # @mcp.prompt + elif ToolClass == Prompt: + mcp.add_prompt(endpoint) + + # @mcp.resource + else: + mcp.add_resource(endpoint) + yield endpoint def load_tool(self, tool_id: str) -> BaseTool: """ @@ -122,3 +137,42 @@ def load_tool(self, tool_id: str) -> BaseTool: except ImportError as e: print(f"❌ Error importing {tool_id}: {e}") return None + + def get_available_prompts(self): + """ + Scans all discoverable tools for functions decorated with @mcp.prompt. + Returns a set of prompt names (personas). We need this to validate a plan. + A plan is not valid if it names a persona (prompt) that is not known. + """ + print("GET AVAILABLE PROMPTS") + import IPython + + IPython.embed() + + prompts = set() + + # 2. Load them (to execute decorators) + for tool_id, path in self.tools.items(): + mod = self.load_tool_module(tool_id, path) + if not mod: + continue + + # 3. Inspect the classes/functions in the module + for name, obj in inspect.getmembers(mod): + # We usually look for classes inheriting from BaseTool + # But we can also just scan the class attributes + if inspect.isclass(obj): + for attr_name in dir(obj): + try: + func = getattr(obj, attr_name) + except: + continue + + # CHECK FOR PROXY TAG + if callable(func) and getattr(func, "_is_mcp_prompt", False): + # Get the name from the decorator + p_name = getattr(func, "_mcp_name", None) + if p_name: + prompts.add(p_name) + + return prompts diff --git a/fractale/tools/prompts.py b/fractale/tools/prompts.py new file mode 100644 index 0000000..a5bca61 --- /dev/null +++ b/fractale/tools/prompts.py @@ -0,0 +1,2 @@ +def format_rules(rules): + return "\n".join([f"- {r}" for r in rules]) diff --git a/fractale/version.py b/fractale/version.py index d07e16c..a4107f5 100644 --- a/fractale/version.py +++ b/fractale/version.py @@ -9,20 +9,20 @@ ################################################################################ -# Global requirements +# TODO vsoch: refactor this to use newer pyproject stuff. -# Note that the spack / environment modules plugins are installed automatically. -# This doesn't need to be the case. INSTALL_REQUIRES = ( ("jsonschema", {"min_version": None}), ("Jinja2", {"min_version": None}), ("uvicorn", {"min_version": None}), ("mcp", {"min_version": None}), ("fastmcp", {"min_version": None}), + ("fastapi", {"min_version": None}), # Yeah, probably overkill, just being used for printing the scripts ("rich", {"min_version": None}), ) +OPENAI_REQUIRES = (("openai", {"min_version": None}),) GOOGLE_REQUIRES = (("google-generativeai", {"min_version": None}),) TESTS_REQUIRES = (("pytest", {"min_version": "4.6.2"}),) -INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES + GOOGLE_REQUIRES +INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES + GOOGLE_REQUIRES + OPENAI_REQUIRES diff --git a/setup.py b/setup.py index 3a4a3e4..f29fd6e 100644 --- a/setup.py +++ b/setup.py @@ -67,6 +67,8 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"): TESTS_REQUIRES = get_reqs(lookup, "TESTS_REQUIRES") INSTALL_REQUIRES_ALL = get_reqs(lookup, "INSTALL_REQUIRES_ALL") GOOGLE_REQUIRES = get_reqs(lookup, "GOOGLE_REQUIRES") + OPENAI_REQUIRES = get_reqs(lookup, "OPENAI_REQUIRES") + LLAMA_REQUIRES = get_reqs(lookup, "OPENAI_REQUIRES") setup( name=NAME, @@ -89,6 +91,8 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"): extras_require={ "all": [INSTALL_REQUIRES_ALL], "google": [GOOGLE_REQUIRES], + "openai": [OPENAI_REQUIRES], + "llama": [LLAMA_REQUIRES], }, classifiers=[ "Intended Audience :: Science/Research", From 4c7f7a66601a59c159758f8c4ffbb05b4affe8ec Mon Sep 17 00:00:00 2001 From: vsoch Date: Tue, 25 Nov 2025 14:29:30 -0800 Subject: [PATCH 02/27] dev: working on agents on tuo Signed-off-by: vsoch --- fractale/agent/agent.py | 166 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 fractale/agent/agent.py diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py new file mode 100644 index 0000000..18d9560 --- /dev/null +++ b/fractale/agent/agent.py @@ -0,0 +1,166 @@ +import asyncio +import os +import time + +from fastmcp import Client +from fastmcp.client.transports import StreamableHttpTransport + +import fractale.agent.backends as backends +import fractale.agent.logger as logger +from fractale.agent.base import Agent + + +class MCPAgent(Agent): + """ + Backend-Agnostic Agent that uses MCP Tools. + """ + + def init(self): + # 1. Setup MCP Client + port = os.environ.get("FRACTALE_MCP_PORT", "8089") + token = os.environ.get("FRACTALE_MCP_TOKEN") + url = f"http://localhost:{port}/mcp" + + if token: + transport = StreamableHttpTransport(url=url, headers={"Authorization": token}) + self.client = Client(transport) + else: + self.client = Client(url) + + # 2. Select Backend based on Config/Env + provider = os.environ.get("FRACTALE_LLM_PROVIDER", "gemini").lower() + if provider in backends.BACKENDS: + self.backend = backends.BACKENDS[provider]() + else: + raise ValueError(f"Provider {provider} is not available. Did you install dependencies?") + + async def get_prompts_list(self): + """ + Get list of prompts. A prompt is technically a persona/role + that was previously considered an entire agent. Now we pair a prompt + with an MCP backend and get a full agent. + """ + async with self.client: + prompts = await self.client.list_prompts_mcp() + return prompts + + async def get_tools_list(self): + """ + Get list of tools. + """ + async with self.client: + tools = await self.client.list_tools() + return tools + + async def execute_mission_async(self, prompt_text: str): + """ + The Async Loop: Think -> Act -> Observe -> Think + """ + start_time = time.perf_counter() + + # 1. Connect & Discover Tools + async with self.client: + mcp_tools = await self.client.list_tools() + + # 2. Initialize Backend with these tools + await self.backend.initialize(mcp_tools) + + # 3. Initial Prompt + # 'response_text' is what the LLM says to the user + # 'calls' is a list of tools it wants to run + response_text, calls = await self.backend.generate_response(prompt=prompt_text) + + max_loops = 15 + loops = 0 + + while loops < max_loops: + loops += 1 + + # If there are tool calls, we MUST execute them and feed back results + if calls: + tool_outputs = [] + + for call in calls: + t_name = call["name"] + t_args = call["args"] + t_id = call.get("id") # Needed for OpenAI + + logger.info(f"🛠️ Tool Call: {t_name} {t_args}") + + # --- EXECUTE TOOL --- + try: + result = await self.client.call_tool(t_name, t_args) + # Handle FastMCP result object + output_str = ( + result.content[0].text + if hasattr(result, "content") + else str(result) + ) + except Exception as e: + output_str = f"Error: {str(e)}" + + # Record Metadata (Your Requirement) + self._record_step(t_name, t_args, output_str) + + tool_outputs.append({"name": t_name, "content": output_str, "id": t_id}) + + # --- FEEDBACK LOOP --- + # We pass the outputs back to the backend. + # It returns the NEXT thought. + response_text, calls = await self.backend.generate_response( + tool_outputs=tool_outputs + ) + + else: + # No tool calls? The LLM is done thinking. + break + + end_time = time.perf_counter() + + # Save Summary Metadata + self.save_mcp_metadata(end_time - start_time) + + return response_text + + def _record_step(self, tool, args, output): + if "steps" not in self.metadata: + self.metadata["steps"] = [] + self.metadata["steps"].append( + { + "tool": tool, + "args": args, + "output_snippet": str(output)[:200], + "timestamp": time.time(), + } + ) + + def save_mcp_metadata(self, duration): + """Save token usage from backend.""" + usage = self.backend.token_usage + if "llm_usage" not in self.metadata: + self.metadata["llm_usage"] = [] + + self.metadata["llm_usage"].append( + { + "duration": duration, + "prompt_tokens": usage.get("prompt_tokens", 0), + "completion_tokens": usage.get("completion_tokens", 0), + } + ) + + def run_step(self, context): + """ + Bridge the sync Base Class to the async implementation. + """ + prompt_text = self.get_prompt(context) + + try: + # Run the loop + final_result = asyncio.run(self.execute_mission_async(prompt_text)) + context["result"] = final_result + except Exception as e: + context["error_message"] = str(e) + logger.error(f"Agent failed: {e}") + raise # Or handle gracefully depending on policy + + return context From 41f45cc02afb4b244ec3c3576cd3b39a9c911889 Mon Sep 17 00:00:00 2001 From: vsoch Date: Wed, 26 Nov 2025 19:27:55 -0800 Subject: [PATCH 03/27] agent: structure to test Signed-off-by: vsoch --- README.md | 18 +- examples/plans/docker-build-lammps.yaml | 4 +- fractale/agent/agent.py | 210 ++++++++------ fractale/agent/backends/openai.py | 10 +- fractale/agent/base.py | 9 +- fractale/agent/defaults.py | 1 + fractale/agent/manager/agent.py | 351 +++++++++++++----------- fractale/agent/manager/plan.py | 93 +++++-- fractale/agent/manager/prompts.py | 20 +- 9 files changed, 433 insertions(+), 283 deletions(-) diff --git a/README.md b/README.md index 88c6e75..f49b1d3 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,16 @@ Tools to add: - build - docker +### Environment + +The following variables can be set in the environment. + +| Name | Description | Default | +|-------|------------|---------------| +| `FRACTALE_MCP_PORT` | Port to run MCP server on, if using http variant | 8089 | +| `FRACTALE_MCP_TOKEN` | Token to use for testing | unset | +| `FRACTALE_LLM_PROVIDER` | LLM Backend to use (gemini, openai, llama) | gemini | + ### Testing Start the server in one terminal. Export `FRACTALE_MCP_TOKEN` if you want to require simple token auth. Here is for http. @@ -78,17 +88,21 @@ export FRACTALE_MCP_TOKEN=dude # In one terminal (start MCP) fractale start -t http --port 8089 +# Define the model (provider and endpoints) to use. +export FRACTALE_LLM_PROVIDER=openai +export OPENAI_API_KEY=xxxxxxxxxxxxxxxx +export OPENAI_BASE_URL=https://my.custom.url/v1 + # In the other, run the plan fractale agent ./examples/plans/docker-build-lammps.yaml ``` - - `manager` agents know how to orchestrate step agents and choose between them (don't hold state, but could) - `step` agents are experts on doing specific tasks. This originally was an agent with specific functions to do something (e.g., docker build) and now is a generic MCP agent with a prompt that gives it context and a goal. The initial design of `helper` agents from the first fractale is subsumed by the idea of an MCP function. A helper agent _is_ an MCP tool. -The design is simple in that each agent is responding to state of error vs. success. In the [first version](https://github.com/compspec/fractale) of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. It has the same top level design with a manager, but each step agent is like a small state machine governmed by an LLM with access to MCP tools and resources. +The design is simple in that each agent is responding to state of error vs. success. In the [first version](https://github.com/compspec/fractale) of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. It has the same top level design with a manager, but each step agent is like a small state machine governed by an LLM with access to MCP tools and resources. See [examples/agent](examples/agent) for an example, along with observations, research questions, ideas, and experiment brainstorming! diff --git a/examples/plans/docker-build-lammps.yaml b/examples/plans/docker-build-lammps.yaml index 990b591..445ebbd 100644 --- a/examples/plans/docker-build-lammps.yaml +++ b/examples/plans/docker-build-lammps.yaml @@ -4,11 +4,11 @@ plan: - name: "build" prompt: "docker-build-persona" inputs: - description: "LAMMPS (Large-scale Atomic/Molecular Massively Parallel Simulator)" + application: "LAMMPS (Large-scale Atomic/Molecular Massively Parallel Simulator)" container: "ghcr.io/hpc-lab/lammps:cpu-latest" environment: "Rocky Linux 9, CPU Only" # - name: "deploy" # prompt: "k8s-deploy-persona" # inputs: -# replicas: 4 \ No newline at end of file +# replicas: 4 diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index 18d9560..261f4a6 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -6,6 +6,7 @@ from fastmcp.client.transports import StreamableHttpTransport import fractale.agent.backends as backends +import fractale.agent.defaults as defaults import fractale.agent.logger as logger from fractale.agent.base import Agent @@ -17,7 +18,7 @@ class MCPAgent(Agent): def init(self): # 1. Setup MCP Client - port = os.environ.get("FRACTALE_MCP_PORT", "8089") + port = os.environ.get("FRACTALE_MCP_PORT", defaults.mcp_port) token = os.environ.get("FRACTALE_MCP_TOKEN") url = f"http://localhost:{port}/mcp" @@ -27,8 +28,18 @@ def init(self): else: self.client = Client(url) - # 2. Select Backend based on Config/Env - provider = os.environ.get("FRACTALE_LLM_PROVIDER", "gemini").lower() + # Initialize the provider. We will do this for each step. + self.init_provider() + + def init_provider(self): + """ + Initialize the provider. + """ + # select Backend based on Config/Env first, then cached version + provider = self._provider or os.environ.get("FRACTALE_LLM_PROVIDER", "gemini").lower() + self._provider = provider + + # Other envars come from provider backend if provider in backends.BACKENDS: self.backend = backends.BACKENDS[provider]() else: @@ -52,79 +63,118 @@ async def get_tools_list(self): tools = await self.client.list_tools() return tools - async def execute_mission_async(self, prompt_text: str): + async def execute(self, context, step): """ - The Async Loop: Think -> Act -> Observe -> Think + The Async Loop that will start with a prompt name, retrieve it, + and then respond to it until the state is successful. """ start_time = time.perf_counter() - # 1. Connect & Discover Tools + # We keep the client connection open for the duration of the step async with self.client: - mcp_tools = await self.client.list_tools() - # 2. Initialize Backend with these tools + # These are tools available to agent + # TODO need to filter these to be agent specific? + mcp_tools = await self.client.list_tools() await self.backend.initialize(mcp_tools) - # 3. Initial Prompt - # 'response_text' is what the LLM says to the user - # 'calls' is a list of tools it wants to run - response_text, calls = await self.backend.generate_response(prompt=prompt_text) - - max_loops = 15 - loops = 0 - - while loops < max_loops: - loops += 1 + # Get prompt to give goal/task/personality to agent + args = getattr(context, "data", context) - # If there are tool calls, we MUST execute them and feed back results - if calls: - tool_outputs = [] + # This partitions inputs, adding inputs from the step and separating + # those from extra + args, extra = step.partition_inputs(args) + instruction = await self.fetch_persona(step.prompt, args) + # TODO STOPPED HERE should we add "extra" to context? + print("INSTRUCTION") + print(instruction) + print("EXTRA") + print(extra) - for call in calls: - t_name = call["name"] - t_args = call["args"] - t_id = call.get("id") # Needed for OpenAI + # Run the loop up to some max attempts (internal state machine with MCP tools) + max_loops = context.get("max_loops", 15) + response_text = await self.run_llm_loop(instruction, max_loops) - logger.info(f"🛠️ Tool Call: {t_name} {t_args}") - - # --- EXECUTE TOOL --- - try: - result = await self.client.call_tool(t_name, t_args) - # Handle FastMCP result object - output_str = ( - result.content[0].text - if hasattr(result, "content") - else str(result) - ) - except Exception as e: - output_str = f"Error: {str(e)}" - - # Record Metadata (Your Requirement) - self._record_step(t_name, t_args, output_str) + self.record_usage(time.perf_counter() - start_time) + return response_text - tool_outputs.append({"name": t_name, "content": output_str, "id": t_id}) + async def run_llm_loop(self, instruction: str, max_loops: int) -> str: + """ + Process -> Tool -> Process loop. + We need to return on some state of success or ultimate failure. + """ + # Initial response to first prompt + response_text, calls = await self.backend.generate_response(prompt=instruction) + + loops = 0 + while loops < max_loops: + loops += 1 + + # If no tools called, we are done + if not calls: + break + + # Execute all requested tools + tool_outputs = [] + for call in calls: + t_name = call["name"] + t_args = call["args"] + t_id = call.get("id") + logger.info(f"🛠️ Calling: {t_name}") + + try: + # Get result and unpack (FastMCP format) + result = await self.client.call_tool(t_name, t_args) + if hasattr(result, "content") and isinstance(result.content, list): + content = result.content[0].text + else: + content = str(result) + except Exception as e: + content = f"Error executing {t_name}: {str(e)}" + + # Record metadata about the step + self.record_step(t_name, t_args, content) + + # Save outputs (name, id, and content) + tool_outputs.append({"id": t_id, "name": t_name, "content": content}) + + # Feed results back to backend with history. + response_text, calls = await self.backend.generate_response(tool_outputs=tool_outputs) + if not calls: + logger.info("🎢 Agent has not requested new calls, ending loop.") + + # When we get here, we either have no calls, or we reached max attempts. + return response_text - # --- FEEDBACK LOOP --- - # We pass the outputs back to the backend. - # It returns the NEXT thought. - response_text, calls = await self.backend.generate_response( - tool_outputs=tool_outputs - ) + async def fetch_persona(self, prompt_name: str, arguments: dict) -> str: + """ + Asks the MCP Server to render the prompt template. + This is akin to rendering or fetching the person. E.g., "You are X and + here are your instructions for a task." + """ + logger.info(f"📥 Bootstrapping Persona: {prompt_name}") + try: + prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments) + # MCP Prompts return a list of messages (User/Assistant/Text). + # We squash them into a single string for the instruction. + msgs = [] + for m in prompt_result.messages: + if hasattr(m.content, "text"): + msgs.append(m.content.text) else: - # No tool calls? The LLM is done thinking. - break - - end_time = time.perf_counter() + msgs.append(str(m.content)) - # Save Summary Metadata - self.save_mcp_metadata(end_time - start_time) + return "\n\n".join(msgs) - return response_text + except Exception as e: + raise RuntimeError(f"Failed to load persona '{prompt_name}': {e}") - def _record_step(self, tool, args, output): - if "steps" not in self.metadata: - self.metadata["steps"] = [] + def record_step(self, tool, args, output): + """ + Record step metadata. + TODO: refactor this into metadata registry (decorator) + """ self.metadata["steps"].append( { "tool": tool, @@ -134,33 +184,31 @@ def _record_step(self, tool, args, output): } ) - def save_mcp_metadata(self, duration): - """Save token usage from backend.""" - usage = self.backend.token_usage - if "llm_usage" not in self.metadata: - self.metadata["llm_usage"] = [] - - self.metadata["llm_usage"].append( - { - "duration": duration, - "prompt_tokens": usage.get("prompt_tokens", 0), - "completion_tokens": usage.get("completion_tokens", 0), - } - ) - - def run_step(self, context): + def record_usage(self, duration): """ - Bridge the sync Base Class to the async implementation. + Record token usage. + TODO: refactor this into metadata registry (decorator) + """ + if hasattr(self.backend, "token_usage"): + usage = self.backend.token_usage + self.metadata["llm_usage"].append( + { + "duration": duration, + "prompt": usage.get("prompt_tokens", 0), + "completion": usage.get("completion_tokens", 0), + } + ) + + def run_step(self, context, step): + """ + Run step is called from the Agent run (base class) + It's here so we can asyncio.run the thing! """ - prompt_text = self.get_prompt(context) - try: - # Run the loop - final_result = asyncio.run(self.execute_mission_async(prompt_text)) - context["result"] = final_result + final_result = asyncio.run(self.execute(context, step)) + context.result = final_result except Exception as e: context["error_message"] = str(e) logger.error(f"Agent failed: {e}") - raise # Or handle gracefully depending on policy - + raise e return context diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py index bf56bfd..247d930 100644 --- a/fractale/agent/backends/openai.py +++ b/fractale/agent/backends/openai.py @@ -1,6 +1,7 @@ +import os from typing import Any, Dict, List -from openai import AsyncOpenAI +from openai import AsyncOpenAI, OpenAI from .llm import LLMBackend @@ -11,7 +12,12 @@ class OpenAIBackend(LLMBackend): """ def __init__(self, model_name="gpt-4o"): - self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY")) + # Needs to be tested if base url is None. + # Switch to async if/when needed. Annoying for development + # self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL")) + self.client = OpenAI( + api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL") + ) self.model_name = model_name self.history = [] self.tools_schema = [] diff --git a/fractale/agent/base.py b/fractale/agent/base.py index 1a92995..b9acc86 100644 --- a/fractale/agent/base.py +++ b/fractale/agent/base.py @@ -21,6 +21,7 @@ def __init__( self.name = name self.attempts = 0 self.max_attempts = max_attempts + self._provider = None self.results_dir = results_dir or os.getcwd() self.save_incremental = save_incremental @@ -46,20 +47,18 @@ def init_metadata(self): "llm_usage": [], } - def run(self, context: Dict[str, Any]) -> Dict[str, Any]: + def run(self, context, step): """ Main execution wrapper """ # Ensure max_attempts is set context["max_attempts"] = self.max_attempts or context.get("max_attempts") - - # 3. RUN STEP logger.info(f"▶️ Running {self.name}...") start_time = time.time() try: # Call abstract method - context = self.run_step(context) + context = self.run_step(context, step) finally: duration = time.time() - start_time @@ -67,7 +66,7 @@ def run(self, context: Dict[str, Any]) -> Dict[str, Any]: return context - def run_step(self, context): + def run_step(self, context, step): """ Abstract: Implemented by MCPAgent """ diff --git a/fractale/agent/defaults.py b/fractale/agent/defaults.py index 214a5b1..d774c90 100644 --- a/fractale/agent/defaults.py +++ b/fractale/agent/defaults.py @@ -1,5 +1,6 @@ environment = "generic cloud environment" gemini_model = "gemini-2.5-pro" +mcp_port = "8089" # These are common / default args we don't need to give in any prompt. shared_args = { diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py index 2ac5d02..95629c7 100644 --- a/fractale/agent/manager/agent.py +++ b/fractale/agent/manager/agent.py @@ -4,6 +4,7 @@ from datetime import datetime from rich import print +from rich.prompt import Prompt as RichPrompt import fractale.agent.logger as logger import fractale.agent.manager.prompts as prompts @@ -19,7 +20,10 @@ class ManagerAgent(MCPAgent): """ An LLM-powered orchestrator that executes a plan. - It can perform error recovery by asking the backend LLM. + It acts as a supervisor: + 1. Validates the plan against the server. + 2. Dispatches steps to UniversalAgents. + 3. Handles failure via LLM reasoning OR Human intervention. """ def init(self): @@ -29,6 +33,49 @@ def init(self): # This sets up the MCP connection and LLM Backend super().init() + async def validate_personas(self, plan): + """ + Queries the MCP server to ensure all personas in the plan exist. + A persona == a prompt. Each step has an associated prompt (persona). + """ + try: + # Attempt to list prompts from server + server_prompts_page = await self.client.list_prompts() + available_names = set() + schema_map = {} + + # FastMCP (bottom) vs Standard MCP (top) return types + if hasattr(server_prompts, "prompts"): + prompts_list = server_prompts.prompts + else: + prompts_list = server_prompts + + # We do this once so later we can call prompt functions and know args vs. context + for p in prompts_list: + available_names.add(p.name) + # Store set of valid argument names + args = {arg.name for arg in p.arguments} if p.arguments else set() + schema_map[p.name] = args + + print(f"🔎 Validating {len(plan)} steps...") + for step in plan.agents: + if step.prompt not in available_names: + raise ValueError( + f"❌ Plan Validation Failed: Step '{step.name}' requests persona '{step.prompt}', " + f"but server only has: {available_names}" + ) + # Ensure we separate arguments from extra + # This does not check to see if we have required, since they + # might come from a previous step. + step.set_schema(schema_map[step.prompt]) + + # store in the manager metadata so steps can access it + self.metadata["assets"]["prompt_schemas"] = schema_map + print("✅ Personas validated and schemas cached") + + except Exception as e: + logger.warning(f"⚠️ Could not validate personas against server (proceeding anyway): {e}") + def get_recovery_step(self, context, failed_step, plan): """ Uses the LLM Backend to decide which agent to call to fix an error. @@ -40,7 +87,7 @@ def get_recovery_step(self, context, failed_step, plan): if step.agent == failed_step.agent: break - # Build the Prompt to recover from some failure. + # Build the prompt to recover from some failure. prompt_text = prompts.recovery_prompt % ( descriptions, failed_step.agent, @@ -51,36 +98,53 @@ def get_recovery_step(self, context, failed_step, plan): title="Error Triage", ) - step_json = None - attempts = 0 - - while not step_json and attempts < 3: - attempts += 1 - try: - # Use the Backend directly - get back tuple (text, calls) - # We ask the backend to generate a response based on this prompt. - response_text = asyncio.run(self.backend.generate_response(prompt=prompt_text))[0] - step_json = json.loads(utils.get_code_block(response_text, "json")) - - # Validate - if "agent_name" not in step_json or "task_description" not in step_json: - raise ValueError("Missing keys") - if step_json["agent_name"] not in plan.agent_names: - raise ValueError(f"Unknown agent {step_json['agent_name']}") - - except Exception as e: - step_json = None - - # Tell agent what it did wrong :) - prompt_text = prompts.recovery_error_prompt % ( - descriptions, - failed_step.agent, - context.error_message, - str(e), - ) - logger.warning(f"Manager failed to parse recovery plan, retrying: {e}") + # TODO: test and make more resilient if needed + next_step = None + while not next_step: + + # Use the backend directly - get back tuple (text, calls) + response_text = asyncio.run(self.backend.generate_response(prompt=prompt_text))[0] + next_step = json.loads(utils.get_code_block(response_text, "json")) + + # Validate - we require the agent name and description + if ( + "agent_name" not in step_json + or "task_description" not in next_step + or "reason" not in next_step + ): + raise ValueError("Missing keys") + if step_json["agent_name"] not in plan.agent_names: + raise ValueError(f"Unknown agent {step_json['agent_name']}") + + agent_name = recovery_step["agent_name"] + logger.warning(f"Recovering to agent: [bold cyan]{agent_name}[/bold cyan]") + + # Find index of target agent + found_index = -1 + for idx, ag in enumerate(plan.agents): + if ag.name == agent_name: + found_index = idx + break + + next_step["index"] = found_index + + # Update recovery metadata with choice. + if failed_step.name not in self.metadata["assets"]["recovery"]: + self.metadata["assets"]["recovery"][failed_step.name] = [] + self.metadata["assets"]["recovery"][failed_step.name].append(next_step) - return step_json + return next_step + + def check_personas(self, plan, personas): + """ + Ensure that the prompt (persona) requested by each step is one + known to the MCP server. + """ + for step in plan.agents: + if step.prompt not in persons: + raise ValueError( + f"Unknown persona {step.prompt} in step {step.name}. Available: {personas}" + ) def run(self, context): """ @@ -100,61 +164,36 @@ def run(self, context): plan_path = context.get("plan", required=True) plan = Plan(plan_path, save_incremental=self.save_incremental) - # STOPPED HERE - need working token to run! - import IPython - - IPython.embed() - - # Ensure that persons in plan exist at API. - print(f"🔎 Validating personas. Available: {len(valid_personas)}") + # Connect and validate (don't allow connect without validate) + asyncio.run(self.connect_and_validate(plan)) - # These are personas / roles available for agents - # 1. Connect & Discover Tools - prompts = asyncio.run(self.get_prompts_list()) - - # 2. Initialize Backend with these tools - - # await self.backend.initialize(mcp_tools) - - # 3. Initial Prompt - # 'response_text' is what the LLM says to the user - # 'calls' is a list of tools it wants to run - # response_text, calls = await self.backend.generate_response(prompt=prompt_text) - - # for step in plan.agents: - # if step.prompt not in valid_persons: - - # if prompt_name not in valid_personas: - # raise ValueError( - # f"❌ Plan Error: Unknown Persona '{prompt_name}' in step '{step['name']}'.\n" - # f"Available Personas: {list(valid_personas)}" - # ) - # except ImportError: - # Fallback if registry module isn't ready or circular import issues persist - # print("⚠️ Warning: Skipping persona validation (registry not available)") - - # import IPython - # IPython.embed() - - # logger.custom( - # f"Manager Initialized with Agents: [bold cyan]{plan.agent_names}[/bold cyan]", - # title="[green]Manager Status[/green]", - # ) + # Still pass the shared context to all tasks + try: + tracker = self.run_tasks(context, plan) + self.metadata["status"] = "Succeeded" + self.save_results(tracker, plan) + logger.custom( + f"Workflow Complete. {len(tracker)} steps executed.", + title="[bold green]Success[/bold green]", + ) + return tracker - # try: - # tracker = self.run_tasks(context, plan) - # logger.custom( - # f"Tasks complete: [bold magenta]{len(tracker)} steps[/bold magenta]", - # title="[green]Manager Status[/green]", - # ) - # self.save_results(tracker, plan) - # return tracker # Return tracker as result + except Exception as e: + self.metadata["status"] = "Failed" + logger.error(f"Orchestration failed: {e}", title="Failure") + raise e - # except Exception as e: - # logger.error(f"Orchestration failed:\n{str(e)}", title="Orchestration Failed", expand=False) - # raise e + async def connect_and_validate(self, plan): + """ + Setup client and check prompts. + """ + async with self.client: + # Check if server has the prompts we need + await self.validate_personas(plan) - # NOTE from vsoch: stuff below here is old (from v1) and I'll refactor it when I can keep testing above. + # Initialize our backend LLM with the available tools + mcp_tools = await self.client.list_tools() + await self.backend.initialize(mcp_tools) def run_tasks(self, context, plan): """ @@ -164,98 +203,112 @@ def run_tasks(self, context, plan): timer = Timer() current_step_index = 0 + # Initialize recovery history + if "recovery" not in self.metadata["assets"]: + self.metadata["assets"]["recovery"] = {} + # Global Manager Loop while current_step_index < len(plan): - step_agent = plan[ - current_step_index - ] # This is an instance of MCPAgent (e.g. UniversalAgent) + # This is an instance of MCPAgent + step = plan[current_step_index] logger.custom( - f"Executing step {current_step_index + 1}/{len(plan)}: [bold cyan]{step_agent.name}[/bold cyan]", + f"Executing step {current_step_index + 1}/{len(plan)}: [bold cyan]{step.name}[/bold cyan]", title=f"[blue]Attempt {self.attempts}[/blue]", ) - # --- EXECUTE AGENT --- - # Using the new .run() interface from BaseAgent + # instantiate the agent here. If we need/want, we can cache the + # initial envars (credentials) to not need to discover them again. + agent = MCPAgent( + name=step.name, + save_incremental=plan.save_incremental, + max_attempts=step.max_attempts, + ) + # Update step context + context = step.update_context(context) + + # Execute the step. This is akin to a tiny state machine + # The agent (persona prompt + LLM) is making calls to MCP tools + # Agent -> run is a wrapper to agent.run_step. with timer: - # The agent updates the context in place and returns it - context = step_agent.run(context) + context = agent.run(context, step) # Record metrics - # Note: step_agent.metadata is populated by BaseAgent + # Note: step_agent.metadata is populated by the agent tracker.append( { - "agent": step_agent.name, - "total_seconds": timer.elapsed_time, + "agent": step.name, + "duration": timer.elapsed_time, "result": context.get("result"), "error": context.get("error_message"), - "attempts": step_agent.attempts + 1, - "metadata": step_agent.metadata, # Detailed logs + "attempts": step.attempts + 1, + "metadata": step.metadata, } ) - # --- CHECK SUCCESS --- # If we have a result and no error message, success. if context.get("result") and not context.get("error_message"): current_step_index += 1 - context.reset() # Clear temp results for next step - continue # Move to next step + # Clear temp results for next step + context.reset() + continue - # --- FAILURE & RECOVERY --- - else: - logger.error(f"Step {step_agent.name} Failed: {context.get('error_message')}") + logger.error(f"Step {agent.name} Failed: {context.get('error_message')}") - # Check global manager limits - if self.reached_max_attempts(): - logger.error("Manager reached max attempts. Aborting.") - break + # Check global manager limits + if self.reached_max_attempts(): + logger.error("Manager reached max attempts. Aborting.") + break - self.attempts += 1 + self.attempts += 1 - # If first step fails, just hard reset - if current_step_index == 0: - context = self.reset_context(context, plan=plan) - continue + # Always ask user if an entire step fails. + print("\n[bold red]⚠️ Step Failed.[/bold red]") + action = RichPrompt.ask( + "Choose Action:", choices=["retry", "assist", "auto", "quit"], default="auto" + ) - # RECOVERY LOGIC - # Ask the Manager (Self) to pick a previous step to retry from - recovery_step = self.get_recovery_step(context, step_agent, plan) + # If first step fails, just hard reset + if action == "quit": + break - if not recovery_step: - logger.error("Manager could not determine a recovery plan. Aborting.") - break + elif action == "retry": + context = self.reset_context(context, plan, step) + continue - if step_agent.name not in self.metadata["assets"]["recovery"]: - self.metadata["assets"]["recovery"][step_agent.name] = [] - self.metadata["assets"]["recovery"][step_agent.name].append(recovery_step) + # Human in the loop! Ask for a hint and add to error message + elif action == "assist": + hint = RichPrompt.ask("Enter instructions for the agent") + context["error_message"] = ( + f"Previous Error: {context.get('error_message')}\nUSER ADVICE: {hint}" + ) + context = self.reset_context(context, plan, step) + continue - target_agent_name = recovery_step["agent_name"] - logger.warning(f"Rolling back to agent: [bold cyan]{target_agent_name}[/bold cyan]") + # If we get down here (auto) we ask the manager for a recovery step. + elif action == "auto": - # Find index of target agent - # (Assuming plan object allows finding index by name) - # Simple linear search logic: - found_index = -1 - for idx, ag in enumerate(plan.agents): - if ag.name == target_agent_name: - found_index = idx - break + # If we failed the first step, just try again. + if current_step_index == 0: + context = self.reset_context(context, plan) + continue - if found_index == -1: - logger.error(f"Recovery agent {target_agent_name} not found in plan!") + # Otherwise ask the manager to choose. + recovery_step = self.get_recovery_step(context, agent, plan) + if not recovery_step: + logger.error("Manager could not determine a recovery plan. Aborting.") break - current_step_index = found_index + if recovery_step["index"] == -1: + logger.error(f"Recovery agent {recovery_step['agent_name']} not found in plan!") + break # Reset context up to that point - # (We rely on the step.reset_context implementation from BaseAgent) + current_step_index = recovery_step["index"] context = self.reset_context(context, plan, plan[current_step_index]) - - # Inject advice so it doesn't repeat the mistake - issues = self.assemble_issues(step_agent.name) - # Update context with a hint for the next run - context["error_message"] = prompts.get_retry_prompt(context, issues) - + context["error_message"] = prompts.get_retry_prompt( + context, recovery_step["reason"] + ) continue # Final Status @@ -263,11 +316,12 @@ def run_tasks(self, context, plan): self.metadata["status"] = "Succeeded" else: self.metadata["status"] = "Failed" - return tracker def save_results(self, tracker, plan): - """Save results to file based on timestamp.""" + """ + Save results to file based on timestamp. + """ if not os.path.exists(self.results_dir): os.makedirs(self.results_dir) now = datetime.now() @@ -294,20 +348,9 @@ def reset_context(self, context, plan, failed_step=None): context = step.reset_context(context) # If we reached the step we are rolling back to, stop clearing. - # (We want to KEEP the state generated BY this step's previous successful run? - # Actually, usually if we roll back TO a step, we want to clear THAT step's output - # so it runs fresh.) if failed_step is not None and step.name == failed_step.name: break return context - def assemble_issues(self, agent_name): - """ - Get list of previous issues for context injection. - """ - if agent_name not in self.metadata["assets"]["recovery"]: - return [] - issues = [] - for issue in self.metadata["assets"]["recovery"][agent_name]: - issues.append(issue["task_description"]) - return issues + def reached_max_attempts(self): + return self.attempts >= self.max_attempts diff --git a/fractale/agent/manager/plan.py b/fractale/agent/manager/plan.py index 7c8d07a..7626bfa 100644 --- a/fractale/agent/manager/plan.py +++ b/fractale/agent/manager/plan.py @@ -84,7 +84,7 @@ def validate_schema(self): def load(self): """ - Initialize the Step objects. + Initialize the steps in the plan. """ print(f"Loading plan from [bold magenta]{self.plan_path}[/bold magenta]...") self.agents = [] # We refer to steps as agents (v1 of this library) @@ -119,37 +119,60 @@ def agent_names(self): class Step: """ Wraps a specific execution step. + Holds configuration + schema. """ - def __init__(self, step_spec, save_incremental=False): - self.step_spec = step_spec + def __init__(self, spec, save_incremental=False): + self.spec = spec self.save_incremental = save_incremental + self._prompt_args = set() - def execute(self, context): + def set_schema(self, valid_args: set): """ - Run this step. + Called by Manager to define which arguments the Server Prompt accepts. """ - # TODO vsoch: need to think about if this is necessary. - # I don't think so, I think a step is just a metadata holder. - # The "run step" will be a call to the MCP function. - # I'm leaving for now so I don't forget the previous design. - context = self.update_context(context) - if not hasattr(context, "agent_config"): - context.agent_config = {} + self._prompt_args = valid_args - # Map prompt from YAML to the config the Agent expects - context.agent_config = {"source_prompt": self.step_spec["prompt"], "step_name": self.name} - print("RUN STEP") - import IPython - - IPython.embed() + def partition_inputs(self, full_context: dict) -> tuple[dict, dict]: + """ + Splits context into: + 1. Direct Arguments for the MCP Prompt function. + 2. Supplemental Context for the LLM. + """ + # Only update inputs that aren't already defined for the context + full_context = self.update_context(full_context) + print(dict(full_context)) + + # Fallback if schema missing + if self._prompt_args is None: + return full_context, {} + + prompt_args = {} + background_info = {} + + ignored = { + "agent_config", + "managed", + "max_loops", + "max_attempts", + "result", + "error_message", + } + + for key, value in full_context.items(): + if key in self._prompt_args: + prompt_args[key] = value + elif key not in ignored: + background_info[key] = value + + return prompt_args, background_info def update_context(self, context): """ Merge step-specific inputs into the context. """ - overrides = ["max_attempts"] - inputs = self.step_spec.get("inputs", {}) + overrides = ["max_attempts", "llm_provider", "llm_model"] + inputs = self.spec.get("inputs", {}) for k, v in inputs.items(): if k not in overrides: @@ -158,15 +181,37 @@ def update_context(self, context): @property def name(self): - return self.step_spec["name"] + return self.spec["name"] + + @property + def max_attempts(self): + return self.spec.get("max_attempts") + + @property + def agent(self): + return self.name @property def prompt(self): - return self.step_spec["prompt"] + return self.spec["prompt"] @property def description(self): - return self.step_spec.get("description", f"Executes persona: {self.prompt}") + return self.spec.get("description", f"Executes persona: {self.prompt}") + + @property + def attempts(self): + return self.get_agent().attempts + + @property + def metadata(self): + return self.get_agent().metadata + + def logs(self): + return self.get_agent().metadata + + def reset_context(self, context): + return self.get_agent().reset_context(context) def get(self, name, default=None): - return self.step_spec.get(name, default) + return self.spec.get(name, default) diff --git a/fractale/agent/manager/prompts.py b/fractale/agent/manager/prompts.py index d647bbd..321cd02 100644 --- a/fractale/agent/manager/prompts.py +++ b/fractale/agent/manager/prompts.py @@ -1,6 +1,6 @@ from fractale.agent.prompts import Prompt -recovery_prompt = f"""You are an expert AI workflow troubleshooter. A step in a workflow has failed and reached a maximum number of retries. THis could mean that we need to go back in the workflow and redo work. Your job is to analyze the error and recommend a single, corrective step. The steps, each associated with an agent, you can choose from are as follows: +recovery_prompt = f"""You are an expert AI workflow troubleshooter. A step in a workflow has failed and reached a maximum number of retries. This could mean that we need to go back in the workflow and redo work. Your job is to analyze the error and recommend a single, corrective step. The steps, each associated with an agent, you can choose from are as follows: Available Agents: %s @@ -10,8 +10,9 @@ %s Your job is to analyze the error message to determine the root cause, and decide which agent is best suited to fix this specific error. -- You MUST formulate a JSON object for the corrective step with two keys: "agent_name" and "task_description". +- You MUST formulate a JSON object for the corrective step with two keys: "agent_name" "reason" and "task_description". - The new "task_description" MUST be a clear instruction for the agent to correct the specific error. +- The "reason" must explain why you chose the step and what needs to be done differently. - You MUST only provide a single JSON object for the corrective step in your response. - You MUST format your `task_description` to be a "You MUST" statement to the agent. """ @@ -19,11 +20,8 @@ # Same three inputs as above plus the unsuccessful attempt recovery_error_prompt = recovery_prompt.strip() + " Your last attempt was not successful:\n%s" -retry_task = """You have had previous attempts, and here are summaries of previous issues: - -{% for issue in issues %} - - {{ issue }} -{% endfor %} +retry_task = """You have had previous attempts, and here is the reason we are retrying this step: +{{ issue }} """ retry_instructions = ["You MUST avoid making these errors again."] @@ -37,12 +35,8 @@ } -def get_retry_prompt(context, issues): +def get_retry_prompt(context, issue): """ In testing, this should JUST be the error message. """ - # This is an impossible case - we would have appended the task descriptions if this is getting called - # but you never know... - if not issues: - return "" - return Prompt(retry_prompt, context).render({"issues": issues}) + return Prompt(retry_prompt, context).render({"issue": issue}) From 2a8ef2c4df6581c537575b7a4b27d8e21d5623e8 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 17:53:17 -0800 Subject: [PATCH 04/27] debug Signed-off-by: vsoch --- fractale/agent/manager/plan.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/fractale/agent/manager/plan.py b/fractale/agent/manager/plan.py index 7626bfa..7c9765d 100644 --- a/fractale/agent/manager/plan.py +++ b/fractale/agent/manager/plan.py @@ -141,9 +141,12 @@ def partition_inputs(self, full_context: dict) -> tuple[dict, dict]: """ # Only update inputs that aren't already defined for the context full_context = self.update_context(full_context) - print(dict(full_context)) + print('FULL CONTEXT') + print(full_context) # Fallback if schema missing + print('PROMPT ARGS') + self._prompt_args if self._prompt_args is None: return full_context, {} @@ -151,7 +154,6 @@ def partition_inputs(self, full_context: dict) -> tuple[dict, dict]: background_info = {} ignored = { - "agent_config", "managed", "max_loops", "max_attempts", @@ -164,7 +166,10 @@ def partition_inputs(self, full_context: dict) -> tuple[dict, dict]: prompt_args[key] = value elif key not in ignored: background_info[key] = value - + print('args') + print(prompt_args) + print('bg') + print(background_info) return prompt_args, background_info def update_context(self, context): From c17922f4322169b3ad852d6fa030bb9995690b2a Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 19:16:17 -0800 Subject: [PATCH 05/27] debug Signed-off-by: vsoch --- fractale/agent/manager/agent.py | 4 ++++ fractale/agent/manager/plan.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py index 95629c7..b99ccbb 100644 --- a/fractale/agent/manager/agent.py +++ b/fractale/agent/manager/agent.py @@ -67,11 +67,13 @@ async def validate_personas(self, plan): # Ensure we separate arguments from extra # This does not check to see if we have required, since they # might come from a previous step. + # TODO this not being poulated (emptyset) step.set_schema(schema_map[step.prompt]) # store in the manager metadata so steps can access it self.metadata["assets"]["prompt_schemas"] = schema_map print("✅ Personas validated and schemas cached") + return schema_map except Exception as e: logger.warning(f"⚠️ Could not validate personas against server (proceeding anyway): {e}") @@ -194,6 +196,8 @@ async def connect_and_validate(self, plan): # Initialize our backend LLM with the available tools mcp_tools = await self.client.list_tools() await self.backend.initialize(mcp_tools) + import IPython + IPython.embed() def run_tasks(self, context, plan): """ diff --git a/fractale/agent/manager/plan.py b/fractale/agent/manager/plan.py index 7c9765d..aa9c1c6 100644 --- a/fractale/agent/manager/plan.py +++ b/fractale/agent/manager/plan.py @@ -146,7 +146,7 @@ def partition_inputs(self, full_context: dict) -> tuple[dict, dict]: # Fallback if schema missing print('PROMPT ARGS') - self._prompt_args + print(self._prompt_args) if self._prompt_args is None: return full_context, {} From bd9f392fc146e20569de3f5f909d91681a66b14e Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 19:17:27 -0800 Subject: [PATCH 06/27] debug Signed-off-by: vsoch --- fractale/agent/manager/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py index b99ccbb..6da9322 100644 --- a/fractale/agent/manager/agent.py +++ b/fractale/agent/manager/agent.py @@ -40,7 +40,7 @@ async def validate_personas(self, plan): """ try: # Attempt to list prompts from server - server_prompts_page = await self.client.list_prompts() + server_prompts = await self.client.list_prompts() available_names = set() schema_map = {} From 59ea2b0e11e885532b20fcc0adb4c7c72d108fc6 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 19:19:32 -0800 Subject: [PATCH 07/27] debug Signed-off-by: vsoch --- fractale/agent/manager/agent.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py index 6da9322..bebeda5 100644 --- a/fractale/agent/manager/agent.py +++ b/fractale/agent/manager/agent.py @@ -43,6 +43,7 @@ async def validate_personas(self, plan): server_prompts = await self.client.list_prompts() available_names = set() schema_map = {} + return server_prompts # FastMCP (bottom) vs Standard MCP (top) return types if hasattr(server_prompts, "prompts"): @@ -167,7 +168,9 @@ def run(self, context): plan = Plan(plan_path, save_incremental=self.save_incremental) # Connect and validate (don't allow connect without validate) - asyncio.run(self.connect_and_validate(plan)) + res = asyncio.run(self.connect_and_validate(plan)) + import IPython + IPython.embed() # Still pass the shared context to all tasks try: @@ -191,13 +194,11 @@ async def connect_and_validate(self, plan): """ async with self.client: # Check if server has the prompts we need - await self.validate_personas(plan) + return await self.validate_personas(plan) # Initialize our backend LLM with the available tools mcp_tools = await self.client.list_tools() await self.backend.initialize(mcp_tools) - import IPython - IPython.embed() def run_tasks(self, context, plan): """ From 56702829e9525a968416e6776d2c8ea778bd0b5f Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 19:25:41 -0800 Subject: [PATCH 08/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 4 ++ fractale/agent/manager/agent.py | 69 ++++++++++++++------------------- 2 files changed, 33 insertions(+), 40 deletions(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index 261f4a6..36307f1 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -204,6 +204,10 @@ def run_step(self, context, step): Run step is called from the Agent run (base class) It's here so we can asyncio.run the thing! """ + print('RUN STEP') + import IPython + IPython.embed() + try: final_result = asyncio.run(self.execute(context, step)) context.result = final_result diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py index bebeda5..f85eed0 100644 --- a/fractale/agent/manager/agent.py +++ b/fractale/agent/manager/agent.py @@ -38,46 +38,37 @@ async def validate_personas(self, plan): Queries the MCP server to ensure all personas in the plan exist. A persona == a prompt. Each step has an associated prompt (persona). """ - try: - # Attempt to list prompts from server - server_prompts = await self.client.list_prompts() - available_names = set() - schema_map = {} - return server_prompts - - # FastMCP (bottom) vs Standard MCP (top) return types - if hasattr(server_prompts, "prompts"): - prompts_list = server_prompts.prompts - else: - prompts_list = server_prompts - - # We do this once so later we can call prompt functions and know args vs. context - for p in prompts_list: - available_names.add(p.name) - # Store set of valid argument names - args = {arg.name for arg in p.arguments} if p.arguments else set() - schema_map[p.name] = args - - print(f"🔎 Validating {len(plan)} steps...") - for step in plan.agents: - if step.prompt not in available_names: - raise ValueError( - f"❌ Plan Validation Failed: Step '{step.name}' requests persona '{step.prompt}', " - f"but server only has: {available_names}" - ) - # Ensure we separate arguments from extra - # This does not check to see if we have required, since they - # might come from a previous step. - # TODO this not being poulated (emptyset) - step.set_schema(schema_map[step.prompt]) + # Attempt to list prompts from server + server_prompts = await self.client.list_prompts() + schema_map = {} + + # FastMCP (bottom) vs Standard MCP (top) return types + if hasattr(server_prompts, "prompts"): + prompts_list = server_prompts.prompts + else: + prompts_list = server_prompts + + # We do this once so later we can call prompt functions and know args vs. context + for p in prompts_list: + # Store set of valid argument names + args = {arg.name for arg in p.arguments} if p.arguments else set() + schema_map[p.name] = args + + print(f"🔎 Validating {len(plan)} steps...") + for step in plan.agents: + if step.prompt not in schema_map: + raise ValueError( + f"❌ Plan Validation Failed: Step '{step.name}' requests persona '{step.prompt}', " + f"but server only has: {available_names}" + ) + # Ensure we separate arguments from extra + # This does not check to see if we have required, since they + # might come from a previous step. + step.set_schema(schema_map[step.prompt]) # store in the manager metadata so steps can access it self.metadata["assets"]["prompt_schemas"] = schema_map print("✅ Personas validated and schemas cached") - return schema_map - - except Exception as e: - logger.warning(f"⚠️ Could not validate personas against server (proceeding anyway): {e}") def get_recovery_step(self, context, failed_step, plan): """ @@ -168,9 +159,7 @@ def run(self, context): plan = Plan(plan_path, save_incremental=self.save_incremental) # Connect and validate (don't allow connect without validate) - res = asyncio.run(self.connect_and_validate(plan)) - import IPython - IPython.embed() + asyncio.run(self.connect_and_validate(plan)) # Still pass the shared context to all tasks try: @@ -194,7 +183,7 @@ async def connect_and_validate(self, plan): """ async with self.client: # Check if server has the prompts we need - return await self.validate_personas(plan) + await self.validate_personas(plan) # Initialize our backend LLM with the available tools mcp_tools = await self.client.list_tools() From 36d9311ff65d39a513262e278c5d55e4045d9148 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 19:39:19 -0800 Subject: [PATCH 09/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index 36307f1..da121a5 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -85,6 +85,7 @@ async def execute(self, context, step): # those from extra args, extra = step.partition_inputs(args) instruction = await self.fetch_persona(step.prompt, args) + # TODO STOPPED HERE should we add "extra" to context? print("INSTRUCTION") print(instruction) @@ -154,21 +155,16 @@ async def fetch_persona(self, prompt_name: str, arguments: dict) -> str: here are your instructions for a task." """ logger.info(f"📥 Bootstrapping Persona: {prompt_name}") - try: - prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments) - # MCP Prompts return a list of messages (User/Assistant/Text). - # We squash them into a single string for the instruction. - msgs = [] - for m in prompt_result.messages: + prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments) + # MCP Prompts return a list of messages (User/Assistant/Text). + # We squash them into a single string for the instruction. + msgs = [] + for m in prompt_result.messages: if hasattr(m.content, "text"): msgs.append(m.content.text) else: msgs.append(str(m.content)) - - return "\n\n".join(msgs) - - except Exception as e: - raise RuntimeError(f"Failed to load persona '{prompt_name}': {e}") + return "\n\n".join(msgs) def record_step(self, tool, args, output): """ @@ -204,10 +200,6 @@ def run_step(self, context, step): Run step is called from the Agent run (base class) It's here so we can asyncio.run the thing! """ - print('RUN STEP') - import IPython - IPython.embed() - try: final_result = asyncio.run(self.execute(context, step)) context.result = final_result From bf394fb8f01a95936126e6c199600393358688ed Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 19:42:10 -0800 Subject: [PATCH 10/27] debug Signed-off-by: vsoch --- fractale/tools/build/docker/tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fractale/tools/build/docker/tool.py b/fractale/tools/build/docker/tool.py index 25d68f4..f70f1a4 100644 --- a/fractale/tools/build/docker/tool.py +++ b/fractale/tools/build/docker/tool.py @@ -200,7 +200,7 @@ def build_persona_prompt(self, application: str, environment: str = "CPU") -> di ] + prompts.COMMON_INSTRUCTIONS # Construct the text from our template - prompt_text = prompts.get_build_prompt(application, environment, build_rules) + prompt_text = prompts.get_build_text(application, environment, build_rules) # Return MCP format return {"messages": [{"role": "user", "content": {"type": "text", "text": prompt_text}}]} From c9d49df1441e82e2045f0aef6e095bdd7a4f561e Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 19:43:48 -0800 Subject: [PATCH 11/27] debug Signed-off-by: vsoch --- fractale/agent/backends/openai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py index 247d930..ce418ec 100644 --- a/fractale/agent/backends/openai.py +++ b/fractale/agent/backends/openai.py @@ -11,7 +11,7 @@ class OpenAIBackend(LLMBackend): Backend to use OpenAI (not tested yet) """ - def __init__(self, model_name="gpt-4o"): + def __init__(self, model_name="gpt-oss-120b"): # Needs to be tested if base url is None. # Switch to async if/when needed. Annoying for development # self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL")) From e5975ebe23724be5c385fbd127fee86bafa2b337 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 21:33:27 -0800 Subject: [PATCH 12/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 2 +- fractale/agent/backends/gemini.py | 6 +++--- fractale/agent/backends/llama.py | 7 ++++--- fractale/agent/backends/llm.py | 2 +- fractale/agent/backends/openai.py | 9 ++++----- fractale/agent/manager/agent.py | 5 ++++- 6 files changed, 17 insertions(+), 14 deletions(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index da121a5..c05b0a0 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -105,7 +105,7 @@ async def run_llm_loop(self, instruction: str, max_loops: int) -> str: We need to return on some state of success or ultimate failure. """ # Initial response to first prompt - response_text, calls = await self.backend.generate_response(prompt=instruction) + response_text, calls = self.backend.generate_response(prompt=instruction) loops = 0 while loops < max_loops: diff --git a/fractale/agent/backends/gemini.py b/fractale/agent/backends/gemini.py index 2055574..5d72cc4 100644 --- a/fractale/agent/backends/gemini.py +++ b/fractale/agent/backends/gemini.py @@ -37,7 +37,7 @@ async def initialize(self, mcp_tools: List[Any]): model = genai.GenerativeModel(self.model_name, tools=gemini_tools) self.chat = model.start_chat(enable_automatic_function_calling=False) - async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): """ Generate Gemini response. @@ -58,11 +58,11 @@ async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = ) ) ) - response = await self.chat.send_message_async(genai.protos.Content(parts=parts)) + response = self.chat.send_message(genai.protos.Content(parts=parts)) # Sending new text elif prompt: - response = await self.chat.send_message_async(prompt) + response = self.chat.send_message(prompt) # Extract Logic self._usage = { diff --git a/fractale/agent/backends/llama.py b/fractale/agent/backends/llama.py index 3b060e5..3295700 100644 --- a/fractale/agent/backends/llama.py +++ b/fractale/agent/backends/llama.py @@ -2,7 +2,7 @@ import os from typing import Any, Dict, List -from openai import AsyncOpenAI +from openai import AsyncOpenAI, OpenAI from .llm import LLMBackend @@ -18,7 +18,8 @@ def __init__(self, model_name=None): base_url = os.environ.get("LLAMA_BASE_URL", "http://localhost:11434/v1") api_key = os.environ.get("LLAMA_API_KEY", "ollama") - self.client = AsyncOpenAI(base_url=base_url, api_key=api_key) + # self.client = AsyncOpenAI(base_url=base_url, api_key=api_key) + self.client = OpenAI(base_url=base_url, api_key=api_key) # Default to Llama 3.1 8B if not specified self.model_name = model_name or os.environ.get("LLAMA_MODEL", "llama3.1") @@ -46,7 +47,7 @@ async def initialize(self, mcp_tools: List[Any]): } ) - async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): """ Manage history and call Llama. """ diff --git a/fractale/agent/backends/llm.py b/fractale/agent/backends/llm.py index 4bd7e14..b35d6bd 100644 --- a/fractale/agent/backends/llm.py +++ b/fractale/agent/backends/llm.py @@ -15,7 +15,7 @@ async def initialize(self, mcp_tools: List[Any]): pass @abstractmethod - async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): """ Returns a text_response, tool_calls """ diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py index ce418ec..83c1001 100644 --- a/fractale/agent/backends/openai.py +++ b/fractale/agent/backends/openai.py @@ -11,7 +11,7 @@ class OpenAIBackend(LLMBackend): Backend to use OpenAI (not tested yet) """ - def __init__(self, model_name="gpt-oss-120b"): + def __init__(self, model_name="openai/gpt-oss-120b"): # Needs to be tested if base url is None. # Switch to async if/when needed. Annoying for development # self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL")) @@ -41,7 +41,7 @@ async def initialize(self, mcp_tools: List[Any]): } ) - async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): """ Generate the response and update history. """ @@ -53,11 +53,10 @@ async def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = {"role": "tool", "tool_call_id": out["id"], "content": str(out["content"])} ) - # Call API - response = await self.client.chat.completions.create( + # Call API (this doesn't need async I don't think, unless we create an async client) + response = self.client.chat.completions.create( model=self.model_name, messages=self.history, tools=self.tools_schema or None ) - msg = response.choices[0].message # Save assistant reply to history diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py index f85eed0..b4dc213 100644 --- a/fractale/agent/manager/agent.py +++ b/fractale/agent/manager/agent.py @@ -97,7 +97,10 @@ def get_recovery_step(self, context, failed_step, plan): while not next_step: # Use the backend directly - get back tuple (text, calls) - response_text = asyncio.run(self.backend.generate_response(prompt=prompt_text))[0] + # Note I'm trying to do these NOT async because it's easier to debug + response_text = self.backend.generate_response(prompt=prompt_text)[0] + import IPython + IPython.embed() next_step = json.loads(utils.get_code_block(response_text, "json")) # Validate - we require the agent name and description From 10bd44134bc48c83dfe05c311f5652637f622fc9 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 21:46:41 -0800 Subject: [PATCH 13/27] debug Signed-off-by: vsoch --- fractale/agent/backends/llama.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fractale/agent/backends/llama.py b/fractale/agent/backends/llama.py index 3295700..64f4d70 100644 --- a/fractale/agent/backends/llama.py +++ b/fractale/agent/backends/llama.py @@ -76,7 +76,7 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) ) try: - response = await self.client.chat.completions.create( + response = self.client.chat.completions.create( model=self.model_name, messages=self.history, tools=self.tools_schema or None, From 3cd2c2feba2fa8e27a02a3bd4621c97ac359a210 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:06:18 -0800 Subject: [PATCH 14/27] debug Signed-off-by: vsoch --- fractale/agent/manager/agent.py | 2 +- fractale/agent/manager/plan.py | 24 +++--------------------- 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py index b4dc213..38077aa 100644 --- a/fractale/agent/manager/agent.py +++ b/fractale/agent/manager/agent.py @@ -238,7 +238,7 @@ def run_tasks(self, context, plan): "duration": timer.elapsed_time, "result": context.get("result"), "error": context.get("error_message"), - "attempts": step.attempts + 1, + "attempts": self.attempts, "metadata": step.metadata, } ) diff --git a/fractale/agent/manager/plan.py b/fractale/agent/manager/plan.py index aa9c1c6..89c0c34 100644 --- a/fractale/agent/manager/plan.py +++ b/fractale/agent/manager/plan.py @@ -126,6 +126,7 @@ def __init__(self, spec, save_incremental=False): self.spec = spec self.save_incremental = save_incremental self._prompt_args = set() + self.attempts = 0 def set_schema(self, valid_args: set): """ @@ -141,12 +142,8 @@ def partition_inputs(self, full_context: dict) -> tuple[dict, dict]: """ # Only update inputs that aren't already defined for the context full_context = self.update_context(full_context) - print('FULL CONTEXT') - print(full_context) # Fallback if schema missing - print('PROMPT ARGS') - print(self._prompt_args) if self._prompt_args is None: return full_context, {} @@ -166,10 +163,9 @@ def partition_inputs(self, full_context: dict) -> tuple[dict, dict]: prompt_args[key] = value elif key not in ignored: background_info[key] = value - print('args') + + # TODO: we may want to supplement prompt with other "background info" print(prompt_args) - print('bg') - print(background_info) return prompt_args, background_info def update_context(self, context): @@ -204,19 +200,5 @@ def prompt(self): def description(self): return self.spec.get("description", f"Executes persona: {self.prompt}") - @property - def attempts(self): - return self.get_agent().attempts - - @property - def metadata(self): - return self.get_agent().metadata - - def logs(self): - return self.get_agent().metadata - - def reset_context(self, context): - return self.get_agent().reset_context(context) - def get(self, name, default=None): return self.spec.get(name, default) From 2da583a57618165459b561e47ce021218983cff2 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:09:59 -0800 Subject: [PATCH 15/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 7 +------ fractale/agent/manager/agent.py | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index c05b0a0..6d64569 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -85,12 +85,7 @@ async def execute(self, context, step): # those from extra args, extra = step.partition_inputs(args) instruction = await self.fetch_persona(step.prompt, args) - - # TODO STOPPED HERE should we add "extra" to context? - print("INSTRUCTION") - print(instruction) - print("EXTRA") - print(extra) + logger.custom(instructoin, title="[green]Instruction[/green]", border_style="green") # Run the loop up to some max attempts (internal state machine with MCP tools) max_loops = context.get("max_loops", 15) diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py index 38077aa..8f8b17c 100644 --- a/fractale/agent/manager/agent.py +++ b/fractale/agent/manager/agent.py @@ -239,7 +239,7 @@ def run_tasks(self, context, plan): "result": context.get("result"), "error": context.get("error_message"), "attempts": self.attempts, - "metadata": step.metadata, + "metadata": agent.metadata, } ) From 3b2571c939dce97245abb225f5fc47fdb119ba2b Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:10:51 -0800 Subject: [PATCH 16/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index 6d64569..217086c 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -85,7 +85,7 @@ async def execute(self, context, step): # those from extra args, extra = step.partition_inputs(args) instruction = await self.fetch_persona(step.prompt, args) - logger.custom(instructoin, title="[green]Instruction[/green]", border_style="green") + logger.custom(instruction, title="[green]Instruction[/green]", border_style="green") # Run the loop up to some max attempts (internal state machine with MCP tools) max_loops = context.get("max_loops", 15) From 6c73fba33cb03deaf985c5d717401f10534bb883 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:13:16 -0800 Subject: [PATCH 17/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index 217086c..eb5bd42 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -85,6 +85,7 @@ async def execute(self, context, step): # those from extra args, extra = step.partition_inputs(args) instruction = await self.fetch_persona(step.prompt, args) + message = instruction['messages'][0]['content']['text'] logger.custom(instruction, title="[green]Instruction[/green]", border_style="green") # Run the loop up to some max attempts (internal state machine with MCP tools) @@ -101,6 +102,10 @@ async def run_llm_loop(self, instruction: str, max_loops: int) -> str: """ # Initial response to first prompt response_text, calls = self.backend.generate_response(prompt=instruction) + print('response text') + print(response_text) + print('calls') + print(calls) loops = 0 while loops < max_loops: @@ -113,6 +118,7 @@ async def run_llm_loop(self, instruction: str, max_loops: int) -> str: # Execute all requested tools tool_outputs = [] for call in calls: + print(call) t_name = call["name"] t_args = call["args"] t_id = call.get("id") From d831a443d67aad37900a2ed9b2ee0f6a358ebcba Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:15:48 -0800 Subject: [PATCH 18/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index eb5bd42..c735920 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -85,7 +85,9 @@ async def execute(self, context, step): # those from extra args, extra = step.partition_inputs(args) instruction = await self.fetch_persona(step.prompt, args) - message = instruction['messages'][0]['content']['text'] + print(dir(instruction)) + print(instruction.__dict__) + message = instruction.messages[0]['content']['text'] logger.custom(instruction, title="[green]Instruction[/green]", border_style="green") # Run the loop up to some max attempts (internal state machine with MCP tools) @@ -155,7 +157,7 @@ async def fetch_persona(self, prompt_name: str, arguments: dict) -> str: This is akin to rendering or fetching the person. E.g., "You are X and here are your instructions for a task." """ - logger.info(f"📥 Bootstrapping Persona: {prompt_name}") + logger.info(f"📥 Preparing Persona: {prompt_name}") prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments) # MCP Prompts return a list of messages (User/Assistant/Text). # We squash them into a single string for the instruction. From 851eaa3662a1ccfe16c27e794a0b0ce13556d87f Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:17:08 -0800 Subject: [PATCH 19/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index c735920..6858e05 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -85,9 +85,7 @@ async def execute(self, context, step): # those from extra args, extra = step.partition_inputs(args) instruction = await self.fetch_persona(step.prompt, args) - print(dir(instruction)) - print(instruction.__dict__) - message = instruction.messages[0]['content']['text'] + message = json.loads(instruction)['messages'][0]['content']['text'] logger.custom(instruction, title="[green]Instruction[/green]", border_style="green") # Run the loop up to some max attempts (internal state machine with MCP tools) From 0fc598ada19430944508ae3efb0e84d0a7e2c801 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:17:48 -0800 Subject: [PATCH 20/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 1 + 1 file changed, 1 insertion(+) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index 6858e05..c0baf36 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -1,6 +1,7 @@ import asyncio import os import time +import json from fastmcp import Client from fastmcp.client.transports import StreamableHttpTransport From cd12e85b4e5fa4997af6f468641a61c5419bb8cb Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:18:19 -0800 Subject: [PATCH 21/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index c0baf36..23d022b 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -87,7 +87,7 @@ async def execute(self, context, step): args, extra = step.partition_inputs(args) instruction = await self.fetch_persona(step.prompt, args) message = json.loads(instruction)['messages'][0]['content']['text'] - logger.custom(instruction, title="[green]Instruction[/green]", border_style="green") + logger.custom(message, title="[green]Instruction[/green]", border_style="green") # Run the loop up to some max attempts (internal state machine with MCP tools) max_loops = context.get("max_loops", 15) From 8ffaab0046db727141a46399a550cfbc0edb6118 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:19:12 -0800 Subject: [PATCH 22/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index 23d022b..1338ba3 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -156,7 +156,7 @@ async def fetch_persona(self, prompt_name: str, arguments: dict) -> str: This is akin to rendering or fetching the person. E.g., "You are X and here are your instructions for a task." """ - logger.info(f"📥 Preparing Persona: {prompt_name}") + logger.info(f"👩‍💻 Persona: {prompt_name}") prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments) # MCP Prompts return a list of messages (User/Assistant/Text). # We squash them into a single string for the instruction. From d40fa827283384308c229e4b62329f4effac3273 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:23:54 -0800 Subject: [PATCH 23/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 2 +- fractale/agent/backends/llama.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index 1338ba3..f26b9c1 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -156,7 +156,7 @@ async def fetch_persona(self, prompt_name: str, arguments: dict) -> str: This is akin to rendering or fetching the person. E.g., "You are X and here are your instructions for a task." """ - logger.info(f"👩‍💻 Persona: {prompt_name}") + logger.info(f"📥 Persona: {prompt_name}") prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments) # MCP Prompts return a list of messages (User/Assistant/Text). # We squash them into a single string for the instruction. diff --git a/fractale/agent/backends/llama.py b/fractale/agent/backends/llama.py index 64f4d70..a913ef6 100644 --- a/fractale/agent/backends/llama.py +++ b/fractale/agent/backends/llama.py @@ -51,6 +51,9 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) """ Manage history and call Llama. """ + print('TEST GENERATE RESPONSE') + import IPython + IPYthon.embed() if prompt: # Check if we have a System Prompt set (usually the first message). # If not, Llama often behaves better with one. @@ -88,6 +91,7 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) msg = response.choices[0].message if response.usage: self._usage = dict(response.usage) + # Store history and get text content self.history.append(msg) text_content = msg.content or "" From 24156738621324ac24788236b299d0dac640cabd Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:37:53 -0800 Subject: [PATCH 24/27] debug Signed-off-by: vsoch --- fractale/agent/backends/openai.py | 4 ++++ fractale/tools/build/docker/prompts.py | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py index 83c1001..e3795ea 100644 --- a/fractale/agent/backends/openai.py +++ b/fractale/agent/backends/openai.py @@ -45,6 +45,10 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) """ Generate the response and update history. """ + print('TEST GENERATE RESPONSE') + import IPython + IPYthon.embed() + if prompt: self.history.append({"role": "user", "content": prompt}) if tool_outputs: diff --git a/fractale/tools/build/docker/prompts.py b/fractale/tools/build/docker/prompts.py index 477396a..90f6528 100644 --- a/fractale/tools/build/docker/prompts.py +++ b/fractale/tools/build/docker/prompts.py @@ -37,7 +37,7 @@ def get_build_text(application, environment, build_rules): ### GOAL I need to create a Dockerfile for an application '{application}'. The target environment is '{environment}'. -Please generate a robust, production-ready Dockerfile. +Please generate AND build a robust, production-ready Dockerfile using the tools available to you. ### REQUIREMENTS & CONSTRAINTS You must adhere to these rules strictly: @@ -47,6 +47,7 @@ def get_build_text(application, environment, build_rules): 1. Analyze the requirements and generate the Dockerfile content. 2. Use provided tools to build the image. 3. Respond appropriately to errors. +4. Return only when the tool returns a successful build. """ From eaee491b9131459eb3c4d61822bf6c653e54e70c Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:38:38 -0800 Subject: [PATCH 25/27] debug Signed-off-by: vsoch --- fractale/agent/backends/openai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py index e3795ea..5f91fb0 100644 --- a/fractale/agent/backends/openai.py +++ b/fractale/agent/backends/openai.py @@ -47,7 +47,7 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) """ print('TEST GENERATE RESPONSE') import IPython - IPYthon.embed() + IPython.embed() if prompt: self.history.append({"role": "user", "content": prompt}) From 578bfa064d09134c6ee444278215938622a6a0cf Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:39:39 -0800 Subject: [PATCH 26/27] debug Signed-off-by: vsoch --- fractale/agent/backends/openai.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py index 5f91fb0..6d23ae2 100644 --- a/fractale/agent/backends/openai.py +++ b/fractale/agent/backends/openai.py @@ -45,10 +45,6 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) """ Generate the response and update history. """ - print('TEST GENERATE RESPONSE') - import IPython - IPython.embed() - if prompt: self.history.append({"role": "user", "content": prompt}) if tool_outputs: @@ -62,6 +58,7 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) model=self.model_name, messages=self.history, tools=self.tools_schema or None ) msg = response.choices[0].message + print(response) # Save assistant reply to history self.history.append(msg) From fc6ddd0e0c10335570569e29460ae2b8cf16e795 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 27 Nov 2025 22:46:36 -0800 Subject: [PATCH 27/27] debug Signed-off-by: vsoch --- fractale/agent/agent.py | 5 +++-- fractale/agent/backends/gemini.py | 2 +- fractale/agent/backends/llama.py | 2 +- fractale/agent/backends/openai.py | 4 ++-- fractale/tools/build/docker/prompts.py | 9 ++++----- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py index f26b9c1..454a5a0 100644 --- a/fractale/agent/agent.py +++ b/fractale/agent/agent.py @@ -2,6 +2,7 @@ import os import time import json +from rich import print from fastmcp import Client from fastmcp.client.transports import StreamableHttpTransport @@ -102,9 +103,9 @@ async def run_llm_loop(self, instruction: str, max_loops: int) -> str: We need to return on some state of success or ultimate failure. """ # Initial response to first prompt - response_text, calls = self.backend.generate_response(prompt=instruction) - print('response text') + response_text, reason, calls = self.backend.generate_response(prompt=instruction) print(response_text) + logger.custom(reason, title="[blue]Reason[/blue]", border_style="blue") print('calls') print(calls) diff --git a/fractale/agent/backends/gemini.py b/fractale/agent/backends/gemini.py index 5d72cc4..1042bfe 100644 --- a/fractale/agent/backends/gemini.py +++ b/fractale/agent/backends/gemini.py @@ -80,7 +80,7 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) {"name": fc.name.replace("_", "-"), "args": dict(fc.args)} # Map back to MCP ) - return text_content, tool_calls + return text_content, msg.reasoning_content, tool_calls @property def token_usage(self): diff --git a/fractale/agent/backends/llama.py b/fractale/agent/backends/llama.py index a913ef6..a5e1634 100644 --- a/fractale/agent/backends/llama.py +++ b/fractale/agent/backends/llama.py @@ -107,7 +107,7 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) } ) - return text_content, tool_calls + return text_content, msg.reasoning_content, tool_calls @property def token_usage(self): diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py index 6d23ae2..54f3ffc 100644 --- a/fractale/agent/backends/openai.py +++ b/fractale/agent/backends/openai.py @@ -1,5 +1,6 @@ import os from typing import Any, Dict, List +from rich import print from openai import AsyncOpenAI, OpenAI @@ -58,7 +59,6 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) model=self.model_name, messages=self.history, tools=self.tools_schema or None ) msg = response.choices[0].message - print(response) # Save assistant reply to history self.history.append(msg) @@ -76,7 +76,7 @@ def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None) } ) - return msg.content, tool_calls + return msg.content, msg.reasoning_content, tool_calls @property def token_usage(self): diff --git a/fractale/tools/build/docker/prompts.py b/fractale/tools/build/docker/prompts.py index 90f6528..e575662 100644 --- a/fractale/tools/build/docker/prompts.py +++ b/fractale/tools/build/docker/prompts.py @@ -2,10 +2,8 @@ PERSONA = "You are a Dockerfile build expert." -CONTEXT = """ -We are running experiments that deploy containerized HPC applications. -You are the agent responsible for the build step in that pipeline. -""" +CONTEXT = """We are running experiments that deploy containerized HPC applications. +You are the agent responsible for the build step in that pipeline.""" REQUIRES = [ "You MUST NOT change the name of the application container image provided.", @@ -37,7 +35,8 @@ def get_build_text(application, environment, build_rules): ### GOAL I need to create a Dockerfile for an application '{application}'. The target environment is '{environment}'. -Please generate AND build a robust, production-ready Dockerfile using the tools available to you. +Please generate the text for AND build a robust, production-ready Dockerfile using the tools available to you. +You do NOT need to write the Dockerfile to disk, but rather provide to the build tool to handle. ### REQUIREMENTS & CONSTRAINTS You must adhere to these rules strictly: