Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 59 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,22 @@ Tools to add:
- build
- docker

### Environment

The following variables can be set in the environment.

| Name | Description | Default |
|-------|------------|---------------|
| `FRACTALE_MCP_PORT` | Port to run MCP server on, if using http variant | 8089 |
| `FRACTALE_MCP_TOKEN` | Token to use for testing | unset |
| `FRACTALE_LLM_PROVIDER` | LLM Backend to use (gemini, openai, llama) | gemini |

### Testing

Start the server in one terminal. Export `FRACTALE_MCP_TOKEN` if you want to require simple token auth. Here is for http.

```bash
export FRACTALE_TOKEN_AUTH=dudewheresmycar
fractale start --transport http --port 8089
```

Expand All @@ -64,22 +75,34 @@ curl -s http://0.0.0.0:8089/health | jq
python3 examples/mcp/test_echo.py
```

TODO:
### Agents

- we will want to keep track of state (retries, etc.) for agents somewhere.
The `fractale agent` command provides means to run build, job generation, and deployment agents.
In our [first version](https://github.com/compspec/fractale), an agent corresponded to a kind of task (e.g., build). For this refactored version, the concept of an agent is represented in a prompt or persona, which can be deployed by a generic MCP agent with some model backend (e.g., Gemini, Llama, or OpenAI). Let's test
doing a build:

### Agents
```bash
# In both terminals
export FRACTALE_MCP_TOKEN=dude

**Not written yet**
# In one terminal (start MCP)
fractale start -t http --port 8089

The `fractale agent` command provides means to run build, job generation, and deployment agents.
This part of the library is under development. There are three kinds of agents:
# Define the model (provider and endpoints) to use.
export FRACTALE_LLM_PROVIDER=openai
export OPENAI_API_KEY=xxxxxxxxxxxxxxxx
export OPENAI_BASE_URL=https://my.custom.url/v1

# In the other, run the plan
fractale agent ./examples/plans/docker-build-lammps.yaml
```

- `step` agents are experts on doing specific tasks (do hold state)
- `manager` agents know how to orchestrate step agents and choose between them (don't hold state, but could)
- `helper` agents are used by step agents to do small tasks (e.g., suggest a fix for an error)
- `step` agents are experts on doing specific tasks. This originally was an agent with specific functions to do something (e.g., docker build) and now is a generic MCP agent with a prompt that gives it context and a goal.

The initial design of `helper` agents from the first fractale is subsumed by the idea of an MCP function. A helper agent _is_ an MCP tool.

The design is simple in that each agent is responding to state of error vs. success. In the [first version]() of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. In the case of a step agent, the return code determines to continue or try again. In the case of a helper, the input is typically an erroneous response (or something that needs changing) with respect to a goal. For a manager, we are making a choice based on a previous erroneous step.
The design is simple in that each agent is responding to state of error vs. success. In the [first version](https://github.com/compspec/fractale) of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. It has the same top level design with a manager, but each step agent is like a small state machine governed by an LLM with access to MCP tools and resources.

See [examples/agent](examples/agent) for an example, along with observations, research questions, ideas, and experiment brainstorming!

Expand All @@ -96,6 +119,33 @@ Here are a few design choices (subject to change, of course). I am starting with
- The backend of FastMCP is essentially starlette, so we define (and add) other routes to the server.


### Job Specifications

#### Simple

We provide a simple translation layer between job specifications. We take the assumption that although each manager has many options, the actual options a user would use is a much smaller set, and it's relatively straight forward to translate (and have better accuracy).

See [examples/transform](examples/transform) for an example.

#### Complex

We want to:

1. Generate software graphs for some cluster (fluxion JGF) (this is done with [compspec](https://github.com/compspec/compspec)
2. Register N clusters to a tool (should be written as a python module)
3. Tool would have ability to select clusters from resources known, return
4. Need graphical representation (json) of each cluster - this will be used with the LLM inference

See [examples/fractale](examples/fractale) for a detailed walk-through of the above.

For graph tool:

```bash
conda install -c conda-forge graph-tool
```

<!-- ⭐️ [Documentation](https://compspec.github.io/fractale) ⭐️ -->

## License

HPCIC DevTools is distributed under the terms of the MIT license.
Expand Down
14 changes: 14 additions & 0 deletions examples/plans/docker-build-lammps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: "LAMMPS Pipeline"

plan:
- name: "build"
prompt: "docker-build-persona"
inputs:
application: "LAMMPS (Large-scale Atomic/Molecular Massively Parallel Simulator)"
container: "ghcr.io/hpc-lab/lammps:cpu-latest"
environment: "Rocky Linux 9, CPU Only"

# - name: "deploy"
# prompt: "k8s-deploy-persona"
# inputs:
# replicas: 4
Empty file added fractale/agent/__init__.py
Empty file.
213 changes: 213 additions & 0 deletions fractale/agent/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import asyncio
import os
import time
import json
from rich import print

from fastmcp import Client
from fastmcp.client.transports import StreamableHttpTransport

import fractale.agent.backends as backends
import fractale.agent.defaults as defaults
import fractale.agent.logger as logger
from fractale.agent.base import Agent


class MCPAgent(Agent):
"""
Backend-Agnostic Agent that uses MCP Tools.
"""

def init(self):
# 1. Setup MCP Client
port = os.environ.get("FRACTALE_MCP_PORT", defaults.mcp_port)
token = os.environ.get("FRACTALE_MCP_TOKEN")
url = f"http://localhost:{port}/mcp"

if token:
transport = StreamableHttpTransport(url=url, headers={"Authorization": token})
self.client = Client(transport)
else:
self.client = Client(url)

# Initialize the provider. We will do this for each step.
self.init_provider()

def init_provider(self):
"""
Initialize the provider.
"""
# select Backend based on Config/Env first, then cached version
provider = self._provider or os.environ.get("FRACTALE_LLM_PROVIDER", "gemini").lower()
self._provider = provider

# Other envars come from provider backend
if provider in backends.BACKENDS:
self.backend = backends.BACKENDS[provider]()
else:
raise ValueError(f"Provider {provider} is not available. Did you install dependencies?")

async def get_prompts_list(self):
"""
Get list of prompts. A prompt is technically a persona/role
that was previously considered an entire agent. Now we pair a prompt
with an MCP backend and get a full agent.
"""
async with self.client:
prompts = await self.client.list_prompts_mcp()
return prompts

async def get_tools_list(self):
"""
Get list of tools.
"""
async with self.client:
tools = await self.client.list_tools()
return tools

async def execute(self, context, step):
"""
The Async Loop that will start with a prompt name, retrieve it,
and then respond to it until the state is successful.
"""
start_time = time.perf_counter()

# We keep the client connection open for the duration of the step
async with self.client:

# These are tools available to agent
# TODO need to filter these to be agent specific?
mcp_tools = await self.client.list_tools()
await self.backend.initialize(mcp_tools)

# Get prompt to give goal/task/personality to agent
args = getattr(context, "data", context)

# This partitions inputs, adding inputs from the step and separating
# those from extra
args, extra = step.partition_inputs(args)
instruction = await self.fetch_persona(step.prompt, args)
message = json.loads(instruction)['messages'][0]['content']['text']
logger.custom(message, title="[green]Instruction[/green]", border_style="green")

# Run the loop up to some max attempts (internal state machine with MCP tools)
max_loops = context.get("max_loops", 15)
response_text = await self.run_llm_loop(instruction, max_loops)

self.record_usage(time.perf_counter() - start_time)
return response_text

async def run_llm_loop(self, instruction: str, max_loops: int) -> str:
"""
Process -> Tool -> Process loop.
We need to return on some state of success or ultimate failure.
"""
# Initial response to first prompt
response_text, reason, calls = self.backend.generate_response(prompt=instruction)
print(response_text)
logger.custom(reason, title="[blue]Reason[/blue]", border_style="blue")
print('calls')
print(calls)

loops = 0
while loops < max_loops:
loops += 1

# If no tools called, we are done
if not calls:
break

# Execute all requested tools
tool_outputs = []
for call in calls:
print(call)
t_name = call["name"]
t_args = call["args"]
t_id = call.get("id")
logger.info(f"🛠️ Calling: {t_name}")

try:
# Get result and unpack (FastMCP format)
result = await self.client.call_tool(t_name, t_args)
if hasattr(result, "content") and isinstance(result.content, list):
content = result.content[0].text
else:
content = str(result)
except Exception as e:
content = f"Error executing {t_name}: {str(e)}"

# Record metadata about the step
self.record_step(t_name, t_args, content)

# Save outputs (name, id, and content)
tool_outputs.append({"id": t_id, "name": t_name, "content": content})

# Feed results back to backend with history.
response_text, calls = await self.backend.generate_response(tool_outputs=tool_outputs)
if not calls:
logger.info("🎢 Agent has not requested new calls, ending loop.")

# When we get here, we either have no calls, or we reached max attempts.
return response_text

async def fetch_persona(self, prompt_name: str, arguments: dict) -> str:
"""
Asks the MCP Server to render the prompt template.

This is akin to rendering or fetching the person. E.g., "You are X and
here are your instructions for a task."
"""
logger.info(f"📥 Persona: {prompt_name}")
prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments)
# MCP Prompts return a list of messages (User/Assistant/Text).
# We squash them into a single string for the instruction.
msgs = []
for m in prompt_result.messages:
if hasattr(m.content, "text"):
msgs.append(m.content.text)
else:
msgs.append(str(m.content))
return "\n\n".join(msgs)

def record_step(self, tool, args, output):
"""
Record step metadata.
TODO: refactor this into metadata registry (decorator)
"""
self.metadata["steps"].append(
{
"tool": tool,
"args": args,
"output_snippet": str(output)[:200],
"timestamp": time.time(),
}
)

def record_usage(self, duration):
"""
Record token usage.
TODO: refactor this into metadata registry (decorator)
"""
if hasattr(self.backend, "token_usage"):
usage = self.backend.token_usage
self.metadata["llm_usage"].append(
{
"duration": duration,
"prompt": usage.get("prompt_tokens", 0),
"completion": usage.get("completion_tokens", 0),
}
)

def run_step(self, context, step):
"""
Run step is called from the Agent run (base class)
It's here so we can asyncio.run the thing!
"""
try:
final_result = asyncio.run(self.execute(context, step))
context.result = final_result
except Exception as e:
context["error_message"] = str(e)
logger.error(f"Agent failed: {e}")
raise e
return context
24 changes: 24 additions & 0 deletions fractale/agent/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
BACKENDS = {}

# Attempt import of each
# This is ugly, but it works!
try:
from .gemini import GeminiBackend

BACKENDS["gemini"] = GeminiBackend
except ImportError:
pass

try:
from .openai import OpenAIBackend

BACKENDS["openai"] = OpenAIBackend
except ImportError:
pass

try:
from .llama import LlamaBackend

BACKENDS["llama"] = LlamaBackend
except ImportError:
pass
Loading