diff --git a/README.md b/README.md index 19b7178..034061f 100644 --- a/README.md +++ b/README.md @@ -46,11 +46,22 @@ Tools to add: - build - docker +### Environment + +The following variables can be set in the environment. + +| Name | Description | Default | +|-------|------------|---------------| +| `FRACTALE_MCP_PORT` | Port to run MCP server on, if using http variant | 8089 | +| `FRACTALE_MCP_TOKEN` | Token to use for testing | unset | +| `FRACTALE_LLM_PROVIDER` | LLM Backend to use (gemini, openai, llama) | gemini | + ### Testing Start the server in one terminal. Export `FRACTALE_MCP_TOKEN` if you want to require simple token auth. Here is for http. ```bash +export FRACTALE_TOKEN_AUTH=dudewheresmycar fractale start --transport http --port 8089 ``` @@ -64,26 +75,41 @@ curl -s http://0.0.0.0:8089/health | jq python3 examples/mcp/test_echo.py ``` -TODO: +### Agents - - we will want to keep track of state (retries, etc.) for agents somewhere. +The `fractale agent` command provides means to run build, job generation, and deployment agents. +In our [first version](https://github.com/compspec/fractale), an agent corresponded to a kind of task (e.g., build). For this refactored version, the concept of an agent is represented in a prompt or persona, which can be deployed by a generic MCP agent with some model backend (e.g., Gemini, Llama, or OpenAI). Let's test +doing a build: -### Agents +```bash +# In both terminals +export FRACTALE_MCP_TOKEN=dude -**Not written yet** +# In one terminal (start MCP) +fractale start -t http --port 8089 -The `fractale agent` command provides means to run build, job generation, and deployment agents. -This part of the library is under development. There are three kinds of agents: +# Define the model (provider and endpoints) to use. +export FRACTALE_LLM_PROVIDER=openai +export OPENAI_API_KEY=xxxxxxxxxxxxxxxx +export OPENAI_BASE_URL=https://my.custom.url/v1 + +# In the other, run the plan +fractale agent ./examples/plans/docker-build-lammps.yaml +``` - - `step` agents are experts on doing specific tasks (do hold state) - `manager` agents know how to orchestrate step agents and choose between them (don't hold state, but could) - - `helper` agents are used by step agents to do small tasks (e.g., suggest a fix for an error) + - `step` agents are experts on doing specific tasks. This originally was an agent with specific functions to do something (e.g., docker build) and now is a generic MCP agent with a prompt that gives it context and a goal. -The design is simple in that each agent is responding to state of error vs. success. In the [first version]() of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. In the case of a step agent, the return code determines to continue or try again. In the case of a helper, the input is typically an erroneous response (or something that needs changing) with respect to a goal. For a manager, we are making a choice based on a previous erroneous step. +The initial design of `helper` agents from the first fractale is subsumed by the idea of an MCP function. A helper agent _is_ an MCP tool. + +The design is simple in that each agent is responding to state of error vs. success. In the [first version](https://github.com/compspec/fractale) of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. It has the same top level design with a manager, but each step agent is like a small state machine governed by an LLM with access to MCP tools and resources. See [examples/agent](examples/agent) for an example, along with observations, research questions, ideas, and experiment brainstorming! -TODO refactor examples. +#### TODO + +- refactor examples +- debug why the startup is so slow. ### Design Choices @@ -96,6 +122,33 @@ Here are a few design choices (subject to change, of course). I am starting with - The backend of FastMCP is essentially starlette, so we define (and add) other routes to the server. +### Job Specifications + +#### Simple + +We provide a simple translation layer between job specifications. We take the assumption that although each manager has many options, the actual options a user would use is a much smaller set, and it's relatively straight forward to translate (and have better accuracy). + +See [examples/transform](examples/transform) for an example. + +#### Complex + +We want to: + +1. Generate software graphs for some cluster (fluxion JGF) (this is done with [compspec](https://github.com/compspec/compspec) +2. Register N clusters to a tool (should be written as a python module) +3. Tool would have ability to select clusters from resources known, return +4. Need graphical representation (json) of each cluster - this will be used with the LLM inference + +See [examples/fractale](examples/fractale) for a detailed walk-through of the above. + +For graph tool: + +```bash +conda install -c conda-forge graph-tool +``` + + + ## License HPCIC DevTools is distributed under the terms of the MIT license. diff --git a/examples/plans/docker-build-lammps.yaml b/examples/plans/docker-build-lammps.yaml new file mode 100644 index 0000000..445ebbd --- /dev/null +++ b/examples/plans/docker-build-lammps.yaml @@ -0,0 +1,14 @@ +name: "LAMMPS Pipeline" + +plan: + - name: "build" + prompt: "docker-build-persona" + inputs: + application: "LAMMPS (Large-scale Atomic/Molecular Massively Parallel Simulator)" + container: "ghcr.io/hpc-lab/lammps:cpu-latest" + environment: "Rocky Linux 9, CPU Only" + +# - name: "deploy" +# prompt: "k8s-deploy-persona" +# inputs: +# replicas: 4 diff --git a/fractale/agent/__init__.py b/fractale/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py new file mode 100644 index 0000000..03dcb72 --- /dev/null +++ b/fractale/agent/agent.py @@ -0,0 +1,216 @@ +import asyncio +import json +import os +import time + +from fastmcp import Client +from fastmcp.client.transports import StreamableHttpTransport +from rich import print + +import fractale.agent.backends as backends +import fractale.agent.defaults as defaults +import fractale.agent.logger as logger +from fractale.agent.base import Agent + + +class MCPAgent(Agent): + """ + Backend-Agnostic Agent that uses MCP Tools. + """ + + def init(self): + # 1. Setup MCP Client + port = os.environ.get("FRACTALE_MCP_PORT", defaults.mcp_port) + token = os.environ.get("FRACTALE_MCP_TOKEN") + url = f"http://localhost:{port}/mcp" + + if token: + transport = StreamableHttpTransport(url=url, headers={"Authorization": token}) + self.client = Client(transport) + else: + self.client = Client(url) + + # Initialize the provider. We will do this for each step. + self.init_provider() + + def init_provider(self): + """ + Initialize the provider. + """ + # select Backend based on Config/Env first, then cached version + provider = self._provider or os.environ.get("FRACTALE_LLM_PROVIDER", "gemini").lower() + self._provider = provider + + # Other envars come from provider backend + if provider in backends.BACKENDS: + self.backend = backends.BACKENDS[provider]() + else: + raise ValueError(f"Provider {provider} is not available. Did you install dependencies?") + + async def get_prompts_list(self): + """ + Get list of prompts. A prompt is technically a persona/role + that was previously considered an entire agent. Now we pair a prompt + with an MCP backend and get a full agent. + """ + async with self.client: + prompts = await self.client.list_prompts_mcp() + return prompts + + async def get_tools_list(self): + """ + Get list of tools. + """ + async with self.client: + tools = await self.client.list_tools() + return tools + + async def execute(self, context, step): + """ + The Async Loop that will start with a prompt name, retrieve it, + and then respond to it until the state is successful. + """ + start_time = time.perf_counter() + + # We keep the client connection open for the duration of the step + async with self.client: + + # These are tools available to agent + # TODO need to filter these to be agent specific? + mcp_tools = await self.client.list_tools() + await self.backend.initialize(mcp_tools) + + # Get prompt to give goal/task/personality to agent + args = getattr(context, "data", context) + + # This partitions inputs, adding inputs from the step and separating + # those from extra + args, extra = step.partition_inputs(args) + instruction = await self.fetch_persona(step.prompt, args) + message = json.loads(instruction)["messages"][0]["content"]["text"] + self.ui.log(message) + + # Run the loop up to some max attempts (internal state machine with MCP tools) + max_loops = context.get("max_loops", 15) + response_text = self.run_llm_loop(instruction, max_loops) + + self.record_usage(time.perf_counter() - start_time) + return response_text + + def run_llm_loop(self, instruction: str, max_loops: int) -> str: + """ + Process -> Tool -> Process loop. + We need to return on some state of success or ultimate failure. + """ + # Initial response to first prompt + response_text, reason, calls = self.backend.generate_response(prompt=instruction) + print(response_text) + self.ui.log(reason) + print("calls") + print(calls) + import IPython + + IPython.embed() + + loops = 0 + while loops < max_loops: + loops += 1 + + # If no tools called, we are done + if not calls: + self.ui.log("šŸŽ¢ Agent has not requested tool calls, ending loop.") + break + + # Execute all requested tools + tool_outputs = [] + for call in calls: + print(call) + t_name = call["name"] + t_args = call["args"] + t_id = call.get("id") + self.ui.log(f"šŸ› ļø Calling: {t_name}") + try: + # Get result and unpack (FastMCP format) + result = self.client.call_tool(t_name, t_args) + if hasattr(result, "content") and isinstance(result.content, list): + content = result.content[0].text + else: + content = str(result) + except Exception as e: + content = f"Error executing {t_name}: {str(e)}" + + # Record metadata about the step + self.record_step(t_name, t_args, content) + + # Save outputs (name, id, and content) + tool_outputs.append({"id": t_id, "name": t_name, "content": content}) + + # Feed results back to backend with history. + response_text, reason, calls = self.backend.generate_response(tool_outputs=tool_outputs) + if not calls: + self.ui.log("šŸŽ¢ Agent has not requested new calls, ending loop.") + + # When we get here, we either have no calls, or we reached max attempts. + return response_text + + async def fetch_persona(self, prompt_name: str, arguments: dict) -> str: + """ + Asks the MCP Server to render the prompt template. + + This is akin to rendering or fetching the person. E.g., "You are X and + here are your instructions for a task." + """ + self.ui.log(f"šŸ“„ Persona: {prompt_name}") + prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments) + # MCP Prompts return a list of messages (User/Assistant/Text). + # We squash them into a single string for the instruction. + msgs = [] + for m in prompt_result.messages: + if hasattr(m.content, "text"): + msgs.append(m.content.text) + else: + msgs.append(str(m.content)) + return "\n\n".join(msgs) + + def record_step(self, tool, args, output): + """ + Record step metadata. + TODO: refactor this into metadata registry (decorator) + """ + self.metadata["steps"].append( + { + "tool": tool, + "args": args, + "output_snippet": str(output)[:200], + "timestamp": time.time(), + } + ) + + def record_usage(self, duration): + """ + Record token usage. + TODO: refactor this into metadata registry (decorator) + """ + if hasattr(self.backend, "token_usage"): + usage = self.backend.token_usage + self.metadata["llm_usage"].append( + { + "duration": duration, + "prompt": usage.get("prompt_tokens", 0), + "completion": usage.get("completion_tokens", 0), + } + ) + + def run_step(self, context, step): + """ + Run step is called from the Agent run (base class) + It's here so we can asyncio.run the thing! + """ + try: + final_result = asyncio.run(self.execute(context, step)) + context.result = final_result + except Exception as e: + context["error_message"] = str(e) + logger.error(f"Agent failed: {e}") + raise e + return context diff --git a/fractale/agent/backends/__init__.py b/fractale/agent/backends/__init__.py new file mode 100644 index 0000000..231037d --- /dev/null +++ b/fractale/agent/backends/__init__.py @@ -0,0 +1,24 @@ +BACKENDS = {} + +# Attempt import of each +# This is ugly, but it works! +try: + from .gemini import GeminiBackend + + BACKENDS["gemini"] = GeminiBackend +except ImportError: + pass + +try: + from .openai import OpenAIBackend + + BACKENDS["openai"] = OpenAIBackend +except ImportError: + pass + +try: + from .llama import LlamaBackend + + BACKENDS["llama"] = LlamaBackend +except ImportError: + pass diff --git a/fractale/agent/backends/gemini.py b/fractale/agent/backends/gemini.py new file mode 100644 index 0000000..0ec3e6b --- /dev/null +++ b/fractale/agent/backends/gemini.py @@ -0,0 +1,87 @@ +import os +from typing import Any, Dict, List +from .llm import LLMBackend + + +class GeminiBackend(LLMBackend): + def __init__(self, model_name="gemini-1.5-pro"): + """ + Init Gemini! We can try the newer one (3.0) when we test. + """ + # Don't import unless we are actually using. + import google.generativeai as genai + self.genai = genai + try: + genai.configure(api_key=os.environ["GEMINI_API_KEY"]) + # I'm allowing this for now because I don't have a working one... + except KeyError: + print("āŒ GEMINI_API_KEY missing.") + self.model_name = model_name + self.chat = None + self._usage = {} + + async def initialize(self, mcp_tools: List[Any]): + """ + Convert MCP tools to Gemini Format + """ + gemini_tools = [] + for tool in mcp_tools: + gemini_tools.append( + { + "name": tool.name.replace("-", "_"), # Gemini hates dashes + "description": tool.description, + "parameters": tool.inputSchema, + } + ) + + model = self.genai.GenerativeModel(self.model_name, tools=gemini_tools) + self.chat = model.start_chat(enable_automatic_function_calling=False) + + def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + """ + Generate Gemini response. + + This is currently setup as a chat - we need to make sure we can do a one-off + message (no memory or bias). + """ + response = None + + # Sending tool outputs (previous turn was a function call) + if tool_outputs: + parts = [] + for output in tool_outputs: + parts.append( + self.genai.protos.Part( + function_response=genai.protos.FunctionResponse( + name=output["name"].replace("-", "_"), + response={"result": output["content"]}, + ) + ) + ) + response = self.chat.send_message(self.genai.protos.Content(parts=parts)) + + # Sending new text + elif prompt: + response = self.chat.send_message(prompt) + + # Extract Logic + self._usage = { + "prompt_tokens": response.usage_metadata.prompt_token_count, + "completion_tokens": response.usage_metadata.candidates_token_count, + } + + part = response.candidates[0].content.parts[0] + text_content = response.text if not part.function_call else "" + + tool_calls = [] + if part.function_call: + fc = part.function_call + tool_calls.append( + {"name": fc.name.replace("_", "-"), "args": dict(fc.args)} # Map back to MCP + ) + + return text_content, msg.reasoning_content, tool_calls + + @property + def token_usage(self): + return self._usage diff --git a/fractale/agent/backends/llama.py b/fractale/agent/backends/llama.py new file mode 100644 index 0000000..901ea48 --- /dev/null +++ b/fractale/agent/backends/llama.py @@ -0,0 +1,113 @@ +import json +import os +from typing import Any, Dict, List +from .llm import LLMBackend + + +class LlamaBackend(LLMBackend): + """ + Backend for Meta Llama 3.1+ models. + """ + + def __init__(self, model_name=None): + # This should be provided by LLAMAME but I haven't tested. + # Why is a llama trying to call me that's not OK. Not sure if I need ollama + base_url = os.environ.get("LLAMA_BASE_URL", "http://localhost:11434/v1") + api_key = os.environ.get("LLAMA_API_KEY", "ollama") + + # self.client = AsyncOpenAI(base_url=base_url, api_key=api_key) + import openai + self.client = openai.OpenAI(base_url=base_url, api_key=api_key) + + # Default to Llama 3.1 8B if not specified + self.model_name = model_name or os.environ.get("LLAMA_MODEL", "llama3.1") + + self.history = [] + self.tools_schema = [] + self._usage = {} + + async def initialize(self, mcp_tools: List[Any]): + """ + Llama 3.1 follows the OpenAI Tool Schema standard. + + TODO: vsoch see if we can consolidate with OpenAI base when testing. + """ + self.tools_schema = [] + for tool in mcp_tools: + self.tools_schema.append( + { + "type": "function", + "function": { + "name": tool.name, # Llama handles dashes fine + "description": tool.description, + "parameters": tool.inputSchema, + }, + } + ) + + def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + """ + Manage history and call Llama. + """ + print("TEST GENERATE RESPONSE") + import IPython + + IPYthon.embed() + if prompt: + # Check if we have a System Prompt set (usually the first message). + # If not, Llama often behaves better with one. + if not self.history: + self.history.append( + { + "role": "system", + "content": "You are a helpful assistant with access to tools. You must use them to answer questions.", + } + ) + self.history.append({"role": "user", "content": prompt}) + + # Handle tool outputs (Function results) + if tool_outputs: + for out in tool_outputs: + self.history.append( + { + "role": "tool", + # Required for the conversation graph + "tool_call_id": out["id"], + "content": str(out["content"]), + } + ) + + try: + response = self.client.chat.completions.create( + model=self.model_name, + messages=self.history, + tools=self.tools_schema or None, + tool_choice="auto" if self.tools_schema else None, + ) + except Exception as e: + return f"LLAMA API ERROR: {str(e)}", [] + + msg = response.choices[0].message + if response.usage: + self._usage = dict(response.usage) + + # Store history and get text content + self.history.append(msg) + text_content = msg.content or "" + + tool_calls = [] + if msg.tool_calls: + for tc in msg.tool_calls: + tool_calls.append( + { + "id": tc.id, + "name": tc.function.name, + "args": json.loads(tc.function.arguments), + } + ) + + return text_content, msg.reasoning_content, tool_calls + + @property + def token_usage(self): + return self._usage diff --git a/fractale/agent/backends/llm.py b/fractale/agent/backends/llm.py new file mode 100644 index 0000000..b35d6bd --- /dev/null +++ b/fractale/agent/backends/llm.py @@ -0,0 +1,32 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List + + +class LLMBackend(ABC): + """ + Abstract interface for any LLM provider (Gemini, OpenAI, Llama, Local). + """ + + @abstractmethod + async def initialize(self, mcp_tools: List[Any]): + """ + Convert MCP tools to provider-specific format and setup session. + """ + pass + + @abstractmethod + def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + """ + Returns a text_response, tool_calls + """ + pass + + @property + @abstractmethod + def token_usage(self) -> Dict: + """ + Return token stats for metadata stuffs. + + Note from V: we need a more robust provenance tracking thing. + """ + pass diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py new file mode 100644 index 0000000..8065fa5 --- /dev/null +++ b/fractale/agent/backends/openai.py @@ -0,0 +1,82 @@ +import os +from typing import Any, Dict, List +from rich import print + +from .llm import LLMBackend + + +class OpenAIBackend(LLMBackend): + """ + Backend to use OpenAI (not tested yet) + """ + + def __init__(self, model_name="openai/gpt-oss-120b"): + # Needs to be tested if base url is None. + # Switch to async if/when needed. Annoying for development + # self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL")) + import openai + self.client = openai.OpenAI( + api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL") + ) + self.model_name = model_name + self.history = [] + self.tools_schema = [] + self._usage = {} + + async def initialize(self, mcp_tools: List[Any]): + """ + Tell this jerk about all the MCP tools. + """ + self.tools_schema = [] + for tool in mcp_tools: + self.tools_schema.append( + { + "type": "function", + "function": { + # OpenAI is fine with dashes + "name": tool.name, + "description": tool.description, + "parameters": tool.inputSchema, + }, + } + ) + + def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + """ + Generate the response and update history. + """ + if prompt: + self.history.append({"role": "user", "content": prompt}) + if tool_outputs: + for out in tool_outputs: + self.history.append( + {"role": "tool", "tool_call_id": out["id"], "content": str(out["content"])} + ) + + # Call API (this doesn't need async I don't think, unless we create an async client) + response = self.client.chat.completions.create( + model=self.model_name, messages=self.history, tools=self.tools_schema or None + ) + msg = response.choices[0].message + + # Save assistant reply to history + self.history.append(msg) + self._usage = dict(response.usage) + + tool_calls = [] + if msg.tool_calls: + for tc in msg.tool_calls: + tool_calls.append( + { + # OpenAI needs IDs + "id": tc.id, + "name": tc.function.name, + "args": json.loads(tc.function.arguments), + } + ) + + return msg.content, msg.reasoning_content, tool_calls + + @property + def token_usage(self): + return self._usage diff --git a/fractale/agent/base.py b/fractale/agent/base.py new file mode 100644 index 0000000..0deb7e3 --- /dev/null +++ b/fractale/agent/base.py @@ -0,0 +1,112 @@ +import copy +import os +import time +from typing import Any, Dict + +from fractale.logger import logger + + +class Agent: + """ + Base Agent Infrastructure. + Handles caching, retry counters, metadata logging, and error wrappers. + Does NOT know about MCP or LLMs. + """ + + # Variables to clear on retry + state_variables = ["result", "error_message"] + + def __init__( + self, + name: str = "agent", + use_cache: bool = False, + results_dir: str = None, + save_incremental: bool = False, + max_attempts: int = None, + ): + self.name = name + self.attempts = 0 + self.max_attempts = max_attempts + + self.results_dir = results_dir or os.getcwd() + self.save_incremental = save_incremental + + # Initialize Metadata tracking + self.init_metadata() + self.init() + + def init(self): + """ + Init operations, intended to override in subclass. + """ + pass + + def init_metadata(self): + self.metadata = { + "name": self.name, + "times": {}, + "assets": {}, + "failures": [], + # TODO: likely we want to replace this with the metadata registry. + "counts": {"retries": 0, "return_to_manager": 0, "return_to_human": 0}, + "llm_usage": [], + } + + def run(self, context, step): + """ + Main execution wrapper + """ + # Ensure max_attempts is set + context["max_attempts"] = self.max_attempts or context.get("max_attempts") + logger.info(f"ā–¶ļø Running {self.name}...") + start_time = time.time() + + try: + # Call abstract method + context = self.run_step(context, step) + + finally: + duration = time.time() - start_time + self.metadata["times"]["execution"] = duration + + return context + + def run_step(self, context, step): + """ + Abstract: Implemented by MCPAgent + """ + raise NotImplementedError(f"Agent {self.name} missing run_step") + + def reset_context(self, context): + """ + Clears output variables to prepare for a retry. + """ + # Convert to dict if it's a Context object + is_obj = hasattr(context, "data") + data = context.data if is_obj else context + + # Clear state variables + for key in self.state_variables: + if key in data: + del data[key] + + # Archive current metadata into failures list + if "failures" not in self.metadata: + self.metadata["failures"] = [] + + # Snapshot current metadata + self.metadata["failures"].append(copy.deepcopy(self.metadata)) + + # Reset current counters (keep retries count consistent) + current_retries = self.metadata["counts"]["retries"] + self.init_metadata() + self.metadata["counts"]["retries"] = current_retries + return context + + def reached_max_attempts(self): + """ + Return true if we have reached maximum number of attempts. + """ + if not self.max_attempts: + return False + return self.attempts >= self.max_attempts diff --git a/fractale/agent/context.py b/fractale/agent/context.py new file mode 100644 index 0000000..f11cebb --- /dev/null +++ b/fractale/agent/context.py @@ -0,0 +1,98 @@ +import collections + +import fractale.agent.logger as logger + + +def get_context(context): + """ + Get or create the context. + """ + if isinstance(context, Context): + return context + return Context(context) + + +class Context(collections.UserDict): + """ + A custom dictionary that allows attribute-style access to keys, + and extends the 'get' method with a 'required' argument. + + The context for an agent should be populated with metadata that + needs to move between agents. The manager decides what from the + context to pass to agents for an updated context. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def reset(self): + """ + Reset the return code and result. + """ + for key in ["return_code", "result", "error_message"]: + self.data[key] = None + + def is_managed(self): + """ + Is the context being managed? + """ + return self.get("managed") is True + + def __getattribute__(self, name): + """ + Intercepts all attribute lookups (including methods/functions) + """ + try: + # Step 1: this would be a normal attribute + attr = object.__getattribute__(self, name) + except AttributeError: + # Then handle lookup of dict key by attribute + return super().__getattribute__(name) + + # Step 2: We allow "get" to be called with defaults / required. + if name == "get": + original_get = attr + + def custom_get(key, default=None, required=False): + """ + Wrapper for the standard dict.get() method. + Accepts the custom 'required' argument. + """ + if required: + if key not in self.data: + raise ValueError(f"Key `{key}` is required but missing") + logger.exit(f"Key `{key}` is required but missing", title="Context Status") + + # If required and found, just return the value + return self.data[key] + else: + # If not required, use the original dict.get behavior + return original_get(key, default) + + # Return the wrapper function instead of the original method + return custom_get + + # 4. For any other attribute (like keys(), items(), update(), or custom methods) + # return the attribute we found earlier + return attr + + # 5. Override __getattr__ to handle attribute-style access to dictionary keys + def __getattr__(self, name): + """ + Allows access to dictionary keys as attributes. + """ + if name in self.data: + return self.data[name] + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + def __setattr__(self, name, value): + """ + Allows setting keys via attribute assignment. + """ + # If the attribute name is a reserved name (like 'data'), set it normally + if name in ("data", "_data"): + super().__setattr__(name, value) + + # Otherwise, treat it as a dictionary key + else: + self.data[name] = value diff --git a/fractale/agent/defaults.py b/fractale/agent/defaults.py new file mode 100644 index 0000000..d774c90 --- /dev/null +++ b/fractale/agent/defaults.py @@ -0,0 +1,18 @@ +environment = "generic cloud environment" +gemini_model = "gemini-2.5-pro" +mcp_port = "8089" + +# These are common / default args we don't need to give in any prompt. +shared_args = { + "command", + "config_dir", + "debug", + "error_message", + "incremental", + "outfile", + "plan", + "quiet", + "result", + "results", + "version", +} diff --git a/fractale/agent/logger.py b/fractale/agent/logger.py new file mode 100644 index 0000000..a560136 --- /dev/null +++ b/fractale/agent/logger.py @@ -0,0 +1,64 @@ +import sys + +from rich import print +from rich.panel import Panel + + +def success(message, title="Success", border_style="green", expand=True): + """ + Helper function to print successful message. + """ + print( + Panel( + f"[bold green]āœ… {message}[/bold green]", + title=title, + border_style=border_style, + expand=expand, + ) + ) + + +def error(message, title="Error", border_style="red", expand=True): + """ + Helper function to print error "beep boop" message. + """ + print( + Panel( + f"[bold red]āŒ {message}[/bold red]", + title=title, + border_style=border_style, + expand=expand, + ) + ) + + +def exit(message, title="Error", border_style="red", expand=True): + error(message, title, border_style, expand) + sys.exit(-1) + + +def warning(message, title="Warning", border_style="yellow"): + """ + Helper function to print a warning + """ + print( + Panel( + message, + title=f"[yellow]{title}[/yellow]", + border_style=border_style, + ) + ) + + +def custom(message, title, border_style=None, expand=True): + """ + Custom message / title Panel. + """ + if not border_style: + print(Panel(message, title=title, expand=expand)) + else: + print(Panel(message, title=title, border_style=border_style, expand=expand)) + + +def info(message): + print(f"\n[bold cyan] {message}[/bold cyan]") diff --git a/fractale/agent/manager/__init__.py b/fractale/agent/manager/__init__.py new file mode 100644 index 0000000..cf32844 --- /dev/null +++ b/fractale/agent/manager/__init__.py @@ -0,0 +1 @@ +from .agent import ManagerAgent diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py new file mode 100644 index 0000000..65aa36d --- /dev/null +++ b/fractale/agent/manager/agent.py @@ -0,0 +1,356 @@ +import asyncio +import json +import os +from datetime import datetime + +from rich import print +from rich.prompt import Prompt as RichPrompt + +import fractale.agent.logger as logger +import fractale.agent.manager.prompts as prompts +import fractale.utils as utils +from fractale.agent.agent import MCPAgent +from fractale.agent.context import get_context +from fractale.agent.manager.plan import Plan +from fractale.utils.timer import Timer + +# The manager IS an agent itself since it needs to decide how to recover. + + +class ManagerAgent(MCPAgent): + """ + An LLM-powered orchestrator that executes a plan. + It acts as a supervisor: + 1. Validates the plan against the server. + 2. Dispatches steps to UniversalAgents. + 3. Handles failure via LLM reasoning OR Human intervention. + """ + + def init(self): + """ + Initialize the MCPAgent infrastructure (MCP Client + Backend). + """ + # This sets up the MCP connection and LLM Backend + super().init() + + async def validate_personas(self, plan): + """ + Queries the MCP server to ensure all personas in the plan exist. + A persona == a prompt. Each step has an associated prompt (persona). + """ + # Attempt to list prompts from server + server_prompts = await self.client.list_prompts() + schema_map = {} + + # FastMCP (bottom) vs Standard MCP (top) return types + if hasattr(server_prompts, "prompts"): + prompts_list = server_prompts.prompts + else: + prompts_list = server_prompts + + # We do this once so later we can call prompt functions and know args vs. context + for p in prompts_list: + # Store set of valid argument names + args = {arg.name for arg in p.arguments} if p.arguments else set() + schema_map[p.name] = args + + print(f"šŸ”Ž Validating {len(plan)} steps...") + for step in plan.agents: + if step.prompt not in schema_map: + raise ValueError( + f"āŒ Plan Validation Failed: Step '{step.name}' requests persona '{step.prompt}', " + f"but server only has: {available_names}" + ) + # Ensure we separate arguments from extra + # This does not check to see if we have required, since they + # might come from a previous step. + step.set_schema(schema_map[step.prompt]) + + # store in the manager metadata so steps can access it + self.metadata["assets"]["prompt_schemas"] = schema_map + print("āœ… Personas validated and schemas cached") + + def get_recovery_step(self, context, failed_step, plan): + """ + Uses the LLM Backend to decide which agent to call to fix an error. + """ + # We describe each step (akin to a function) for the manager to choose + descriptions = "" + for step in plan.agents: + descriptions += f"- {step.agent}: {step.description}\n" + if step.agent == failed_step.agent: + break + + # Build the prompt to recover from some failure. + prompt_text = prompts.recovery_prompt % ( + descriptions, + failed_step.agent, + context.error_message, + ) + logger.warning( + f"šŸ¤” Consulting Manager for recovery from {failed_step.agent} failure...", + title="Error Triage", + ) + + # TODO: test and make more resilient if needed + next_step = None + while not next_step: + + # Use the backend directly - get back tuple (text, calls) + # Note I'm trying to do these NOT async because it's easier to debug + response_text = self.backend.generate_response(prompt=prompt_text)[0] + import IPython + + IPython.embed() + next_step = json.loads(utils.get_code_block(response_text, "json")) + + # Validate - we require the agent name and description + if ( + "agent_name" not in step_json + or "task_description" not in next_step + or "reason" not in next_step + ): + raise ValueError("Missing keys") + if step_json["agent_name"] not in plan.agent_names: + raise ValueError(f"Unknown agent {step_json['agent_name']}") + + agent_name = recovery_step["agent_name"] + logger.warning(f"Recovering to agent: [bold cyan]{agent_name}[/bold cyan]") + + # Find index of target agent + found_index = -1 + for idx, ag in enumerate(plan.agents): + if ag.name == agent_name: + found_index = idx + break + + next_step["index"] = found_index + + # Update recovery metadata with choice. + if failed_step.name not in self.metadata["assets"]["recovery"]: + self.metadata["assets"]["recovery"][failed_step.name] = [] + self.metadata["assets"]["recovery"][failed_step.name].append(next_step) + + return next_step + + def check_personas(self, plan, personas): + """ + Ensure that the prompt (persona) requested by each step is one + known to the MCP server. + """ + for step in plan.agents: + if step.prompt not in persons: + raise ValueError( + f"Unknown persona {step.prompt} in step {step.name}. Available: {personas}" + ) + + def run(self, context): + """ + Executes a plan-driven workflow with intelligent error recovery. + """ + # Ensure context is wrapped + context = get_context(context) + + # Init metadata if needed + if "recovery" not in self.metadata["assets"]: + self.metadata["assets"]["recovery"] = {} + + context.managed = True + self.max_attempts = self.max_attempts or 10 + + # Plan parses the list of agent configs (prompts) + plan_path = context.get("plan", required=True) + plan = Plan(plan_path, save_incremental=self.save_incremental) + + # Connect and validate (don't allow connect without validate) + asyncio.run(self.connect_and_validate(plan)) + + # Still pass the shared context to all tasks + try: + tracker = self.run_tasks(context, plan) + self.metadata["status"] = "Succeeded" + self.save_results(tracker, plan) + logger.custom( + f"Workflow Complete. {len(tracker)} steps executed.", + title="[bold green]Success[/bold green]", + ) + return tracker + + except Exception as e: + self.metadata["status"] = "Failed" + logger.error(f"Orchestration failed: {e}", title="Failure") + raise e + + async def connect_and_validate(self, plan): + """ + Setup client and check prompts. + """ + async with self.client: + # Check if server has the prompts we need + await self.validate_personas(plan) + + # Initialize our backend LLM with the available tools + mcp_tools = await self.client.list_tools() + await self.backend.initialize(mcp_tools) + + def run_tasks(self, context, plan): + """ + Run agent tasks until stopping condition. + """ + tracker = [] + timer = Timer() + current_step_index = 0 + + # Initialize recovery history + if "recovery" not in self.metadata["assets"]: + self.metadata["assets"]["recovery"] = {} + + # Global Manager Loop + while current_step_index < len(plan): + + # This is an instance of MCPAgent + step = plan[current_step_index] + self.ui.on_step_start(step.name, step.description, inputs) + + # instantiate the agent here. If we need/want, we can cache the + # initial envars (credentials) to not need to discover them again. + agent = MCPAgent( + name=step.name, + save_incremental=plan.save_incremental, + max_attempts=step.max_attempts, + # Pass the UI down so the agent uses same interface + ui=self.ui, + ) + # Update step context + context = step.update_context(context) + + # Execute the step. This is akin to a tiny state machine + # The agent (persona prompt + LLM) is making calls to MCP tools + # Agent -> run is a wrapper to agent.run_step. + with timer: + context = agent.run(context, step) + + # Results, error, and metadata + result = context.get("result") + error = context.get("error_message") + metadata = agent.metadata + + # update the accordion header color and shows the result/error box + self.ui.on_step_finish(step.name, result, error, metadata) + + # Record metrics + # Note: step_agent.metadata is populated by the agent + tracker.append( + { + "agent": step.name, + "duration": timer.elapsed_time, + "result": result, + "error": error, + "attempts": self.attempts, + "metadata": metadata, + } + ) + + # If we have a result and no error message, success. + if result and not error: + current_step_index += 1 + context.reset() + continue + + # Check global manager limits + if self.reached_max_attempts(): + self.ui.on_log("Manager reached max attempts. Aborting.", level="error") + break + self.attempts += 1 + + # Always ask user if an entire step fails. + action = self.ui.ask_user( + f"Step Failed: {err}\nAction?", options=["retry", "assist", "auto", "quit"] + ) + action = action.lower().strip() or "auto" + + # Respond to the action + if action == "quit": + break + elif action == "retry": + context = self.reset_context(context, plan, step) + continue + + # Human in the loop! Ask for a hint and add to error message + elif action == "assist": + hint = self.ui.ask_user("Enter instructions for the agent") + context["error_message"] = f"Previous Error: {error}\nUser Advice: {hint}" + context = self.reset_context(context, plan, step) + continue + + # If we get down here (auto) we ask the manager for a recovery step. + elif action == "auto": + + # If we failed the first step, just try again. + if current_step_index == 0: + context = self.reset_context(context, plan) + continue + + # Otherwise ask the manager to choose. + recovery_step = self.get_recovery_step(context, agent, plan) + index = recovery_step["index"] + if not recovery_step: + self.ui.on_log("Manager could not determine recovery.", level="error") + break + + if index == -1: + self.ui.on_log(f"Recovery agent {target_name} not found!", level="error") + break + + # Reset context up to that point + current_step_index = index + context = self.reset_context(context, plan, plan[current_step_index]) + context["error_message"] = prompts.get_retry_prompt( + context, recovery_step["reason"] + ) + continue + + if current_step_index == len(plan): + self.metadata["status"] = "Succeeded" + self.ui.on_workflow_complete("Success") + else: + self.metadata["status"] = "Failed" + self.ui.on_workflow_complete("Failed") + return tracker + + def save_results(self, tracker, plan): + """ + Save results to file based on timestamp. + """ + if not os.path.exists(self.results_dir): + os.makedirs(self.results_dir) + now = datetime.now() + timestamp = now.strftime("%Y-%m-%d_%H-%M-%S") + results_file = os.path.join(self.results_dir, f"results-{timestamp}.json") + + # We assume plan has a .plan attribute or similar to get raw dict + manager_meta = getattr(plan, "plan", {}) + + if self.metadata["times"]: + manager_meta["times"] = self.metadata["times"] + if self.metadata["assets"]["recovery"]: + manager_meta["recovery"] = self.metadata["assets"]["recovery"] + + result = {"steps": tracker, "manager": manager_meta, "status": self.metadata["status"]} + utils.write_json(result, results_file) + + def reset_context(self, context, plan, failed_step=None): + """ + Reset context state variables up to the failed step. + """ + # We iterate through agents and call their reset logic + for step in plan.agents: + context = step.reset_context(context) + + # If we reached the step we are rolling back to, stop clearing. + if failed_step is not None and step.name == failed_step.name: + break + return context + + def reached_max_attempts(self): + return self.attempts >= self.max_attempts diff --git a/fractale/agent/manager/plan.py b/fractale/agent/manager/plan.py new file mode 100644 index 0000000..89c0c34 --- /dev/null +++ b/fractale/agent/manager/plan.py @@ -0,0 +1,204 @@ +import jsonschema +from jsonschema import validators +from rich import print + +import fractale.utils as utils + + +def set_defaults(validator, properties, instance, schema): + """ + Fill in default values for properties that are missing. + """ + for prop, sub_schema in properties.items(): + if "default" in sub_schema: + instance.setdefault(prop, sub_schema["default"]) + + +# Extend validator to apply defaults +plan_validator = validators.extend( + jsonschema.Draft7Validator, + {"properties": set_defaults}, +) + +plan_schema = { + "type": "object", + "properties": { + "name": {"type": "string"}, + "description": {"type": "string"}, + "inputs": {"type": "object", "default": {}}, + "plan": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + # This defines the (previous) agent or persona/role + "prompt": {"type": "string"}, + "description": {"type": "string"}, + "inputs": { + "type": "object", + "additionalProperties": True, + }, + }, + "required": ["name", "prompt"], + }, + }, + }, + "required": ["name", "plan"], +} + + +class Plan: + """ + A plan for a manager includes one or more steps, each defined by a Prompt Persona. + """ + + def __init__(self, plan, save_incremental=False): + if isinstance(plan, dict): + self.plan = plan + self.plan_path = "memory" + else: + self.plan_path = plan + self.plan = utils.read_yaml(self.plan_path) + + self.step_names = set() + self.save_incremental = save_incremental + + # Validate structure... + self.validate_schema() + + # And create steps. Personas validated outside of here with manager. + self.load() + + def validate_schema(self): + """ + Validate against JSON schema. + """ + validator = plan_validator(plan_schema) + try: + validator.validate(self.plan) + print("āœ… Plan schema is valid.") + except Exception as e: + raise ValueError(f"āŒ Plan YAML invalid: {e}!") + + def load(self): + """ + Initialize the steps in the plan. + """ + print(f"Loading plan from [bold magenta]{self.plan_path}[/bold magenta]...") + self.agents = [] # We refer to steps as agents (v1 of this library) + + for spec in self.plan.get("plan", []): + step_name = spec["name"] + + if step_name in self.step_names: + raise ValueError(f"Duplicate step name: '{step_name}'") + self.step_names.add(step_name) + + step = Step(spec, save_incremental=self.save_incremental) + self.agents.append(step) + + def __len__(self): + return len(self.agents) + + def __getitem__(self, key): + """ + Allows indexing plan[0] + """ + return self.agents[key] + + @property + def agent_names(self): + """ + Used by Manager for recovery lookup. + """ + return [step.name for step in self.agents] + + +class Step: + """ + Wraps a specific execution step. + Holds configuration + schema. + """ + + def __init__(self, spec, save_incremental=False): + self.spec = spec + self.save_incremental = save_incremental + self._prompt_args = set() + self.attempts = 0 + + def set_schema(self, valid_args: set): + """ + Called by Manager to define which arguments the Server Prompt accepts. + """ + self._prompt_args = valid_args + + def partition_inputs(self, full_context: dict) -> tuple[dict, dict]: + """ + Splits context into: + 1. Direct Arguments for the MCP Prompt function. + 2. Supplemental Context for the LLM. + """ + # Only update inputs that aren't already defined for the context + full_context = self.update_context(full_context) + + # Fallback if schema missing + if self._prompt_args is None: + return full_context, {} + + prompt_args = {} + background_info = {} + + ignored = { + "managed", + "max_loops", + "max_attempts", + "result", + "error_message", + } + + for key, value in full_context.items(): + if key in self._prompt_args: + prompt_args[key] = value + elif key not in ignored: + background_info[key] = value + + # TODO: we may want to supplement prompt with other "background info" + print(prompt_args) + return prompt_args, background_info + + def update_context(self, context): + """ + Merge step-specific inputs into the context. + """ + overrides = ["max_attempts", "llm_provider", "llm_model"] + inputs = self.spec.get("inputs", {}) + + for k, v in inputs.items(): + if k not in overrides: + context[k] = v + return context + + @property + def name(self): + return self.spec["name"] + + @property + def max_attempts(self): + return self.spec.get("max_attempts") + + @property + def agent(self): + return self.name + + @property + def prompt(self): + return self.spec["prompt"] + + @property + def description(self): + return self.spec.get("description", f"Executes persona: {self.prompt}") + + def get(self, name, default=None): + return self.spec.get(name, default) diff --git a/fractale/agent/manager/prompts.py b/fractale/agent/manager/prompts.py new file mode 100644 index 0000000..321cd02 --- /dev/null +++ b/fractale/agent/manager/prompts.py @@ -0,0 +1,42 @@ +from fractale.agent.prompts import Prompt + +recovery_prompt = f"""You are an expert AI workflow troubleshooter. A step in a workflow has failed and reached a maximum number of retries. This could mean that we need to go back in the workflow and redo work. Your job is to analyze the error and recommend a single, corrective step. The steps, each associated with an agent, you can choose from are as follows: + +Available Agents: +%s + +The above is in the correct order, and ends on the agent that ran last with the failure (%s). The error message of the last step is the following: + +%s + +Your job is to analyze the error message to determine the root cause, and decide which agent is best suited to fix this specific error. +- You MUST formulate a JSON object for the corrective step with two keys: "agent_name" "reason" and "task_description". +- The new "task_description" MUST be a clear instruction for the agent to correct the specific error. +- The "reason" must explain why you chose the step and what needs to be done differently. +- You MUST only provide a single JSON object for the corrective step in your response. +- You MUST format your `task_description` to be a "You MUST" statement to the agent. +""" + +# Same three inputs as above plus the unsuccessful attempt +recovery_error_prompt = recovery_prompt.strip() + " Your last attempt was not successful:\n%s" + +retry_task = """You have had previous attempts, and here is the reason we are retrying this step: +{{ issue }} +""" + +retry_instructions = ["You MUST avoid making these errors again."] + +persona = "You are manager of an agentic AI team." +retry_prompt = { + "persona": persona, + "context": "A step in your workflow has determined it cannot continue and returned to you.", + "instructions": retry_instructions, + "task": retry_task, +} + + +def get_retry_prompt(context, issue): + """ + In testing, this should JUST be the error message. + """ + return Prompt(retry_prompt, context).render({"issue": issue}) diff --git a/fractale/agent/prompts.py b/fractale/agent/prompts.py new file mode 100644 index 0000000..afe1eb4 --- /dev/null +++ b/fractale/agent/prompts.py @@ -0,0 +1,51 @@ +import copy + +from jinja2 import Template + +template = """ + Persona: + {{persona}} + + {% if context %}Context: {{context|trim}}{% endif %} + + Task: {{task|trim}} + + {% if instructions %}Instructions & Constraints: + {% for line in instructions %}{{line}} + {% endfor %}{% endif %} +""" + + +class Prompt: + """ + A prompt is a structured instruction for an LLM. + We attempt to define the persona, context, task, audience, instructions, and constraints. + Data sections should use words MUST, MUST NOT, AVOID, ENSURE + """ + + def __init__(self, data, context): + """ + This currently assumes setting a consistent context for one generation. + If that isn't the case, context should be provided in the function below. + """ + self.data = data + self.context = context + + def render(self, kwargs): + """ + Render the final user task, and then the full prompt. + """ + # The kwargs are rendered into task + render = copy.deepcopy(self.data) + + # Do we have additional details for instrucitons? + try: + render["instructions"] += (self.context.get("details") or "").split("\n") + except: + print("ISSUE WITH RENDER IN GENERIC PROMPTS") + import IPython + + IPython.embed() + render["task"] = Template(self.data["task"]).render(**kwargs) + prompt = Template(template).render(**render) + return prompt diff --git a/fractale/cli/__init__.py b/fractale/cli/__init__.py index fb06e8a..976cac2 100644 --- a/fractale/cli/__init__.py +++ b/fractale/cli/__init__.py @@ -10,8 +10,6 @@ install() import fractale -import fractale.agent.parser as parsers -import fractale.defaults as defaults from fractale.logger import setup_logger @@ -94,24 +92,11 @@ def get_parser(): formatter_class=argparse.RawTextHelpFormatter, description="run an agent", ) - agents = agent.add_subparsers( - title="agent", - description="Run an agent", - ) agent.add_argument( - "--plan", - "-p", - dest="plan", + "plan", help="provide a plan to a manager", ) - # If exists, we will attempt to load and use. - agent.add_argument( - "--use-cache", - dest="use_cache", - help="Use (load and save) local cache in pwd/.fractale/", - action="store_true", - default=False, - ) + agent.add_argument("--mode", choices=["cli", "tui", "web"], default="tui") agent.add_argument( "--max-attempts", help="Maximum attempts for a manager or individual agent", @@ -128,9 +113,6 @@ def get_parser(): action="store_true", default=False, ) - - # Add agent parsers - parsers.register(agents) return parser diff --git a/fractale/cli/agent.py b/fractale/cli/agent.py index 8fbbf3f..8774dc3 100644 --- a/fractale/cli/agent.py +++ b/fractale/cli/agent.py @@ -1,37 +1,48 @@ -import sys - -from fractale.agent import get_agents +from fractale.agent.manager import ManagerAgent def main(args, extra, **kwargs): """ Run an agent (do with caution!) """ - agents = get_agents() + import IPython + + IPython.embed() + if args.mode == "tui": + from fractale.app import FractaleApp + from fractale.ui.adapters.tui import TextualAdapter + + # TUI requires the App object to exist first + # We create a "shell" app that will receive the manager later? + # Or better: We instantiate the app, and the App's on_mount creates the adapter. + # Let's stick to the previous pattern: App runs manager. + app = FractaleApp(plan_path="lammps.yaml", inputs={}) + + # We don't create the adapter here, the App creates it internally + # OR we pass the class to the app. + app.run() + return - # If we have a plan, we run the manager. - if args.plan is not None: - args.agent_name = "manager" + elif args.mode == "web": + from fractale.ui.adapters.web import WebAdapter - # Right now we only have a build agent :) - if args.agent_name not in agents: - sys.exit(f"{args.agent_name} is not a recognized agent.") + ui = WebAdapter("http://localhost:3000") + + else: + from fractale.ui.adapters.cli import CLIAdapter + + ui = CLIAdapter() # Get the agent and run! # - results determines if we want to save state to an output directory # - save_incremental will add a metadata section # - max_attempts is for the manager agent (defaults to 10) - agent = agents[args.agent_name]( - use_cache=args.use_cache, + agent = ManagerAgent( results_dir=args.results, save_incremental=args.incremental, max_attempts=args.max_attempts, + ui=ui, ) - # This is the context - we can remove variables not needed context = vars(args) - del context["use_cache"] - - # This is built and tested! We can do something with it :) - # Note that vars converts the argparse arguments into a dictionary agent.run(context) diff --git a/fractale/cli/start.py b/fractale/cli/start.py index 77dbfad..3bc6f51 100644 --- a/fractale/cli/start.py +++ b/fractale/cli/start.py @@ -31,8 +31,7 @@ def main(args, extra, **kwargs): print(f"šŸ”Œ Loading tools... ") # Load into the manager (tools, resources, prompts) - for tool in manager.load_tools(args.tools): - mcp.add_tool(tool) + for tool in manager.load_tools(mcp, args.tools): print(f" āœ… Registered: {tool.name}") # Mount the MCP server. Note from V: we can use mount with antother FastMCP @@ -50,4 +49,4 @@ def main(args, extra, **kwargs): # For testing we usually control+C, let's not make it ugly except KeyboardInterrupt: - print("šŸ–„ļø Shutting down...") + print("šŸ–„ļø Shutting down...") diff --git a/fractale/tools/build/docker/prompts.py b/fractale/tools/build/docker/prompts.py new file mode 100644 index 0000000..e0eeb9d --- /dev/null +++ b/fractale/tools/build/docker/prompts.py @@ -0,0 +1,77 @@ +from fractale.tools.prompts import format_rules + +PERSONA = "You are a Dockerfile build expert." + +CONTEXT = """We are running experiments that deploy containerized HPC applications. +You are the agent responsible for the build step in that pipeline.""" + +REQUIRES = [ + "You MUST NOT change the name of the application container image provided.", + "Don't worry about users/permissions - just be root.", + "DO NOT forget to install certificates and you MUST NOT apt-get purge.", + "Assume a default of CPU if GPU or CPU is not stated.", + "Do NOT do a multi-stage build, and do NOT COPY or ADD anything from the host.", + "You MUST copy executables to a system location to be on the PATH. Do NOT symlink", + "You are only scoped to edit a Dockerfile to build the image.", +] + +COMMON_INSTRUCTIONS = [ + "If the application involves MPI, configure it for compatibility for the containerized environment.", + 'Do NOT add your narration unless it has a "#" prefix to indicate a comment.', +] + REQUIRES + + +def get_build_text(application, environment, build_rules): + """ + Get prompt text for an initial build. + """ + return f""" +### PERSONA +{PERSONA} + +### CONTEXT +{CONTEXT} + +### GOAL +I need to create a Dockerfile for an application '{application}'. +The target environment is '{environment}'. +Please generate the text for AND build a robust, production-ready Dockerfile using the tools available to you. +You do NOT need to write the Dockerfile to disk, but rather provide to the build tool to handle. + +### REQUIREMENTS & CONSTRAINTS +You must adhere to these rules strictly: +{format_rules(build_rules)} + +### INSTRUCTIONS +1. Analyze the requirements and generate the Dockerfile content. +2. Use provided tools to build the image. +3. Respond appropriately to errors. +4. Return only when the tool returns a successful build. +""" + + +def get_retry_prompt(fix_rules, error_message): + return f""" +### PERSONA +{PERSONA} + +### CONTEXT +{CONTEXT} + +### STATUS: BUILD FAILED +Your previous Dockerfile build has failed. Here is the instruction for how to fix it: + +```text +{error_message} +``` + +### REQUIREMENTS + +Please analyze the error and your previous work, and provide a corrected version. +{format_rules(fix_rules)} + +### INSTRUCTIONS +1. Read the error log above carefully. +2. Modify the Dockerfile using your file tools. +3. Use a provided tool to retry the build. +""" diff --git a/fractale/tools/build/docker/tool.py b/fractale/tools/build/docker/tool.py index d642875..f70f1a4 100644 --- a/fractale/tools/build/docker/tool.py +++ b/fractale/tools/build/docker/tool.py @@ -1,24 +1,21 @@ from fractale.tools.base import BaseTool from fractale.tools.decorator import mcp +import fractale.tools.build.docker.prompts as prompts import fractale.agent.logger as logger import fractale.utils as utils import shutil import re import os -import sys import shutil import tempfile import subprocess -import textwrap +import shlex from rich import print from rich.syntax import Syntax -name = "docker-build" - class DockerBuildTool(BaseTool): - def setup(self): """ Setup ensures we have docker or podman installed. @@ -29,6 +26,29 @@ def setup(self): if not self.docker: raise ValueError("docker and podman are not present on the system.") + # @mcp.tool(name="docker-run") + def run_container(self, uri: str, command: str): + """ + Run a docker container. Accepts an optional unique resource identifier (URI). + + uri: the unique resource identifier. + command: string to run (will be shlex split) + """ + # Prepare command to push (docker or podman) + command = [self.docker, "run", "-it", uri] + shlex.split(command) + logger.info(f"Running {command}...") + p = subprocess.run( + command, + capture_output=True, + text=True, + check=False, + ) + if p.returncode != 0: + output = "ERROR: " + p.stdout + p.stderr + logger.warning(f"Issue with docker run: {output}") + return logger.failure(output) + return logger.success(output) + @mcp.tool(name="docker-push") def push_container(self, uri: str, all_tags: bool = False): """ @@ -59,7 +79,7 @@ def push_container(self, uri: str, all_tags: bool = False): return logger.failure(output) return logger.success(output) - @mcp.tool(name=name) + @mcp.tool(name="docker-build") def build_container(self, dockerfile: str, uri: str, platforms: str = None): """ Build a docker container. Accepts an optional unique resource identifier (URI). @@ -134,7 +154,7 @@ def filter_output(self, output): # Try to match lines that start with # return "\n".join([x for x in output.split("\n") if not re.search(r"^#(\d)+ ", x)]) - @mcp.tool(name="kind-docker-load") + # @mcp.tool(name="kind-docker-load") def load_kind(self, uri: str): """ Load a Docker URI into Kind (Kubernetes in Docker) @@ -166,3 +186,37 @@ def print_result(self, dockerfile): logger.custom( highlighted_syntax, title="Final Dockerfile", border_style="green", expand=True ) + + @mcp.prompt(name="docker-build-persona", description="Instructions for a fresh build") + def build_persona_prompt(self, application: str, environment: str = "CPU") -> dict: + """ + Generates agent instructions for creating a NEW Dockerfile. + """ + # Specific rules for a fresh build + build_rules = [ + "The Dockerfile content you generate must be complete and robust.", + "The response should ONLY contain the complete Dockerfile.", + "Use the available tools (files-write) to save the Dockerfile to disk.", + ] + prompts.COMMON_INSTRUCTIONS + + # Construct the text from our template + prompt_text = prompts.get_build_text(application, environment, build_rules) + + # Return MCP format + return {"messages": [{"role": "user", "content": {"type": "text", "text": prompt_text}}]} + + @mcp.prompt( + name="docker-fix-persona", description="Instructions for fixing or retrying a build" + ) + def fix_persona_prompt(self, error_message: str) -> dict: + """ + Generates system instructions for retrying a failed build. + """ + # 1. Define specific rules for fixing + fix_rules = [ + "The response should only contain the complete, corrected Dockerfile content.", + "Use succinct comments in the Dockerfile to explain build logic and changes.", + ] + prompts.COMMON_INSTRUCTIONS + + prompt_text = prompts.get_retry_prompt(fix_rules, error_message) + return {"messages": [{"role": "user", "content": {"type": "text", "text": prompt_text}}]} diff --git a/fractale/tools/decorator.py b/fractale/tools/decorator.py index 4b17bba..727b22c 100644 --- a/fractale/tools/decorator.py +++ b/fractale/tools/decorator.py @@ -1,9 +1,5 @@ -import functools -import time from typing import List -from fractale.metrics import DurationMetric, metrics - class McpProxy: """ @@ -18,44 +14,12 @@ def tool(self, name: str = None, description: str = None, tags: List[str] = None """ def decorator(func): - - def record_timing(start_time, error=None): - """ - Wrapper to record timing of tool. - """ - end_time = time.perf_counter() - tool_id = name or func.__name__ - - # Create the specific Metric object - metric = DurationMetric( - name=tool_id, - start_time=start_time, - end_time=end_time, - duration=end_time - start_time, - success=(error is None), - metadata={"error": str(error)} if error else {}, - ) - - # Push to generic registry - metrics.record(metric) - return metric.duration - - @functools.wraps(func) - def sync_wrapper(*args, **kwargs): - start = time.perf_counter() - result = func(*args, **kwargs) - dur = record_timing(start) - # Add the duration to the result for the LLM - result += f"\n\n[ā±ļø {dur:.2f}s]" - return result - - wrapper = sync_wrapper default_name = (func.__module__.lower() + "-" + func.__name__.lower()).replace(".", "-") - wrapper._mcp_name = name or default_name - wrapper._mcp_desc = description - wrapper._mcp_tags = tags - wrapper._is_mcp_tool = True - return wrapper + func._mcp_name = name or default_name + func._mcp_desc = description + func._mcp_tags = tags + func._is_mcp_tool = True + return func return decorator diff --git a/fractale/tools/manager.py b/fractale/tools/manager.py index b0706dc..03b2a01 100644 --- a/fractale/tools/manager.py +++ b/fractale/tools/manager.py @@ -61,7 +61,7 @@ def discover_tools(self, root_path: str, module_path: str) -> Dict[str, Path]: discovered[tool_id] = {"path": file_path, "module": import_path, "root": root_path} return discovered - def load_tools(self, names=None): + def load_tools(self, mcp, names=None): """ Load a set of named tools, or default to all those discovered. """ @@ -95,7 +95,22 @@ def load_tools(self, names=None): # Get the decorated functions for func in getfunc(): - yield ToolClass.from_function(func, name=func._mcp_name) + + # This is how we handle dynamic loading + endpoint = ToolClass.from_function(func, name=func._mcp_name) + + # @mcp.tool + if ToolClass == Tool: + mcp.add_tool(endpoint) + + # @mcp.prompt + elif ToolClass == Prompt: + mcp.add_prompt(endpoint) + + # @mcp.resource + else: + mcp.add_resource(endpoint) + yield endpoint def load_tool(self, tool_id: str) -> BaseTool: """ @@ -122,3 +137,42 @@ def load_tool(self, tool_id: str) -> BaseTool: except ImportError as e: print(f"āŒ Error importing {tool_id}: {e}") return None + + def get_available_prompts(self): + """ + Scans all discoverable tools for functions decorated with @mcp.prompt. + Returns a set of prompt names (personas). We need this to validate a plan. + A plan is not valid if it names a persona (prompt) that is not known. + """ + print("GET AVAILABLE PROMPTS") + import IPython + + IPython.embed() + + prompts = set() + + # 2. Load them (to execute decorators) + for tool_id, path in self.tools.items(): + mod = self.load_tool_module(tool_id, path) + if not mod: + continue + + # 3. Inspect the classes/functions in the module + for name, obj in inspect.getmembers(mod): + # We usually look for classes inheriting from BaseTool + # But we can also just scan the class attributes + if inspect.isclass(obj): + for attr_name in dir(obj): + try: + func = getattr(obj, attr_name) + except: + continue + + # CHECK FOR PROXY TAG + if callable(func) and getattr(func, "_is_mcp_prompt", False): + # Get the name from the decorator + p_name = getattr(func, "_mcp_name", None) + if p_name: + prompts.add(p_name) + + return prompts diff --git a/fractale/tools/prompts.py b/fractale/tools/prompts.py new file mode 100644 index 0000000..a5bca61 --- /dev/null +++ b/fractale/tools/prompts.py @@ -0,0 +1,2 @@ +def format_rules(rules): + return "\n".join([f"- {r}" for r in rules]) diff --git a/fractale/ui/__init__.py b/fractale/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fractale/ui/adapters/__init__.py b/fractale/ui/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fractale/ui/adapters/cli.py b/fractale/ui/adapters/cli.py new file mode 100644 index 0000000..b702de8 --- /dev/null +++ b/fractale/ui/adapters/cli.py @@ -0,0 +1,27 @@ +import sys + +from fractale.ui.base import UserInterface + + +class CLIAdapter(UserInterface): + def on_step_start(self, name, description, inputs): + print(f"\nšŸš€ STEP: {name}") + print(f" Goal: {description}") + + def on_log(self, message, level="info"): + # Simple print + print(f" {message}") + + def on_step_finish(self, name, result, error, metadata): + if error: + print(f"āŒ {name} Failed: {error}") + else: + print(f"āœ… {name} Complete.") + + def on_workflow_complete(self, status): + print(f"\nšŸ Workflow: {status}") + + def ask_user(self, question, options=None) -> str: + # Standard Python input + opt_str = f"[{'/'.join(options)}]" if options else "" + return input(f"ā“ {question} {opt_str}: ").strip() diff --git a/fractale/ui/adapters/tui.py b/fractale/ui/adapters/tui.py new file mode 100644 index 0000000..3b6997b --- /dev/null +++ b/fractale/ui/adapters/tui.py @@ -0,0 +1,177 @@ +import queue + +from textual.app import App, ComposeResult +from textual.containers import VerticalScroll +from textual.widgets import Collapsible, Footer, Header, Label, Log, Markdown + +from fractale.ui.base import UserInterface +from fractale.ui.screens import UserInputScreen + +# STOPPED HERE - go back to client and ensure this is created correctly, then need to test? + +# ============================================================================= +# 1. THE ADAPTER +# Translates abstract UI calls (from Manager) into Thread-Safe App actions. +# ============================================================================= + + +class TextualAdapter(UserInterface): + def __init__(self, app: "FractaleApp"): + self.app = app + + def on_step_start(self, name: str, description: str, inputs: dict): + self.app.call_from_thread(self.app.action_add_step, name, description) + + def on_log(self, message: str, level: str = "info"): + self.app.call_from_thread(self.app.action_log, message) + + def on_step_finish(self, name: str, result: str, error: str, metadata: dict): + if error: + self.app.call_from_thread(self.app.action_log, f"[bold red]ERROR:[/bold red] {error}") + self.app.call_from_thread(self.app.action_status, "Step Failed", "red") + else: + self.app.call_from_thread(self.app.action_set_result, str(result)) + self.app.call_from_thread(self.app.action_status, "Step Complete", "green") + + def on_workflow_complete(self, status: str): + self.app.call_from_thread(self.app.action_status, f"Done: {status}", "blue") + + def ask_user(self, question: str, options: list[str] = None) -> str: + """ + Blocking call. + 1. Creates a Queue. + 2. Signals App to show Modal. + 3. Waits for Modal to put result in Queue. + """ + reply_queue = queue.Queue() + self.app.call_from_thread(self.app.action_prompt_user, question, reply_queue) + return reply_queue.get() + + +# ============================================================================= +# 2. THE WIDGETS +# ============================================================================= + + +class StepDisplay(Collapsible): + """ + A widget representing one Step in the plan. + """ + + def __init__(self, title: str, description: str, **kwargs): + super().__init__(title=title, **kwargs) + self.description = description + self.log_widget = Log(highlight=True, classes="step-log") + self.result_widget = Markdown("", classes="step-result") + + def compose(self) -> ComposeResult: + yield Label(f"Goal: {self.description}", classes="step-desc") + + yield Label("🧠 Reasoning & Tools:", classes="section-header") + yield self.log_widget + + yield Label("šŸ“ Output:", classes="section-header") + yield self.result_widget + + +# ============================================================================= +# 3. THE APP (The TUI Host) +# ============================================================================= + + +class FractaleApp(App): + CSS = """ + .step-log { + height: 20; + border: solid $accent; + background: $surface; + margin-bottom: 1; + overflow-y: scroll; + } + .step-result { + height: auto; + min-height: 5; + border: solid green; + background: $surface-darken-1; + padding: 1; + } + .step-desc { + text-style: italic; + color: $text-muted; + margin-bottom: 1; + } + .section-header { + text-style: bold; + color: $secondary; + margin-top: 1; + } + """ + + BINDINGS = [ + ("q", "quit", "Quit"), + ] + + def __init__(self, manager_instance, execution_context: dict): + super().__init__() + self.manager = manager_instance + self.context = execution_context + self.current_step_widget = None + + def compose(self) -> ComposeResult: + yield Header(show_clock=True) + yield VerticalScroll(id="steps-container") + yield Footer() + + def on_mount(self) -> None: + self.title = "Fractale Workflow" + # Run the manager in a background worker thread + self.run_worker(self.run_process, exclusive=True, thread=True) + + def run_process(self): + """ + The Background Worker. + """ + # 1. Create the Adapter (Connecting THIS app instance to the Logic) + adapter = TextualAdapter(app=self) + + # 2. Inject Dependency into Manager + self.manager.ui = adapter + + try: + # 3. Run Logic (Blocking Call) + self.manager.run(self.context) + + except Exception as e: + self.call_from_thread(self.action_log, f"CRITICAL ERROR: {e}") + self.call_from_thread(self.action_status, "Failed", "red") + + # --- UI ACTIONS (Main Thread) --- + + def action_add_step(self, name: str, desc: str): + container = self.query_one("#steps-container") + + if self.current_step_widget: + self.current_step_widget.collapsed = True + + step = StepDisplay(title=f"ā–¶ļø {name}", description=desc) + container.mount(step) + step.scroll_visible() + self.current_step_widget = step + + def action_log(self, message: str): + if self.current_step_widget: + self.current_step_widget.log_widget.write_line(message) + + def action_set_result(self, content: str): + if self.current_step_widget: + self.current_step_widget.title = self.current_step_widget.title.replace("ā–¶ļø", "āœ…") + self.current_step_widget.result_widget.update(content) + + def action_status(self, msg: str, style: str): + self.sub_title = msg + + def action_prompt_user(self, question: str, reply_queue): + def on_input_done(result: str): + reply_queue.put(result) + + self.push_screen(UserInputScreen(question), on_input_done) diff --git a/fractale/ui/adapters/web.py b/fractale/ui/adapters/web.py new file mode 100644 index 0000000..08e3ac3 --- /dev/null +++ b/fractale/ui/adapters/web.py @@ -0,0 +1,34 @@ +import time + +import requests + +from fractale.ui.base import UserInterface + + +class WebAdapter(UserInterface): + def __init__(self, api_url): + self.api_url = api_url # e.g. http://localhost:3000/api/events + + def _post(self, event_type, payload): + requests.post(self.api_url, json={"type": event_type, "data": payload}) + + def on_step_start(self, name, description, inputs): + self._post("step_start", {"name": name, "desc": description}) + + def on_log(self, message, level="info"): + self._post("log", {"msg": message}) + + # ... other outputs ... + + def ask_user(self, question, options=None) -> str: + # 1. Post the question to the UI + req_id = f"req_{time.time()}" + self._post("ask_user", {"question": question, "id": req_id}) + + # 2. POLL for an answer (or use a Redis queue / Websocket listener) + # This blocks the Manager thread until the frontend user clicks a button. + while True: + resp = requests.get(f"{self.api_url}/answers/{req_id}") + if resp.status_code == 200: + return resp.json()["answer"] + time.sleep(1) diff --git a/fractale/ui/base.py b/fractale/ui/base.py new file mode 100644 index 0000000..34834f2 --- /dev/null +++ b/fractale/ui/base.py @@ -0,0 +1,39 @@ +from typing import Any, Optional, Protocol + + +class UserInterface(Protocol): + """ + The strict contract that ManagerAgent relies on. + Any implementation (Web, TUI, CLI) must provide these methods. + """ + + def on_step_start(self, name: str, description: str, inputs: dict): + """ + Onset of a new step. + """ + pass + + def on_log(self, message: str, level: str = "info"): + """ + Let there be logs. + """ + pass + + def on_step_finish(self, name: str, result: str, error: Optional[str], metadata: dict): + """ + A step completes (success or failure). + """ + pass + + def on_workflow_complete(self, status: str): + """ + The whole plan finishes. + """ + pass + + # --- INPUT (Blocking) --- + def ask_user(self, question: str, options: list[str] = None) -> str: + """ + The Manager pauses until the user answers (blocking) + """ + pass diff --git a/fractale/version.py b/fractale/version.py index d07e16c..a4107f5 100644 --- a/fractale/version.py +++ b/fractale/version.py @@ -9,20 +9,20 @@ ################################################################################ -# Global requirements +# TODO vsoch: refactor this to use newer pyproject stuff. -# Note that the spack / environment modules plugins are installed automatically. -# This doesn't need to be the case. INSTALL_REQUIRES = ( ("jsonschema", {"min_version": None}), ("Jinja2", {"min_version": None}), ("uvicorn", {"min_version": None}), ("mcp", {"min_version": None}), ("fastmcp", {"min_version": None}), + ("fastapi", {"min_version": None}), # Yeah, probably overkill, just being used for printing the scripts ("rich", {"min_version": None}), ) +OPENAI_REQUIRES = (("openai", {"min_version": None}),) GOOGLE_REQUIRES = (("google-generativeai", {"min_version": None}),) TESTS_REQUIRES = (("pytest", {"min_version": "4.6.2"}),) -INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES + GOOGLE_REQUIRES +INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES + GOOGLE_REQUIRES + OPENAI_REQUIRES diff --git a/setup.py b/setup.py index 3a4a3e4..f29fd6e 100644 --- a/setup.py +++ b/setup.py @@ -67,6 +67,8 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"): TESTS_REQUIRES = get_reqs(lookup, "TESTS_REQUIRES") INSTALL_REQUIRES_ALL = get_reqs(lookup, "INSTALL_REQUIRES_ALL") GOOGLE_REQUIRES = get_reqs(lookup, "GOOGLE_REQUIRES") + OPENAI_REQUIRES = get_reqs(lookup, "OPENAI_REQUIRES") + LLAMA_REQUIRES = get_reqs(lookup, "OPENAI_REQUIRES") setup( name=NAME, @@ -89,6 +91,8 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"): extras_require={ "all": [INSTALL_REQUIRES_ALL], "google": [GOOGLE_REQUIRES], + "openai": [OPENAI_REQUIRES], + "llama": [LLAMA_REQUIRES], }, classifiers=[ "Intended Audience :: Science/Research",