Skip to content

hooooolea/agent-link

Repository files navigation

Agent Link

Stars Forks License Python

Stay Tuned

English | 中文

A distributed multi-agent orchestration platform. The Manager (pure Python scheduler) parses structured input files, dispatches subtasks to multiple Hermes Workers in parallel, collects outputs into a folder, and triggers a Review Agent for quality auditing with retry feedback.

Architecture

Input Files
     ↓
Manager (parse + dispatch)
     ↓
Redis Queue
     ↓
Worker 1 (hermes)   Worker 2 (hermes)   Worker N (hermes)
     ↓                    ↓                    ↓
output/{job_id}/task_0001.md  ...  task_000N.md
                     ↓
              Review Agent (hermes)
              ├─ audit quality
              └─ retry failed tasks ↩
                     ↓
            output/{job_id}/final_report.md

Design principles:

  • Manager = pure scheduler: deterministic Python — file parsing, routing, result collection, no LLM
  • Workers = Hermes agents: each handles one focused subtask via hermes chat -q
  • Review Agent: independent Hermes session that audits all outputs and can request retries

Features

  • File Pipeline — Parse txt / md / json files into subtasks, dispatch to Workers in parallel, outputs written to output/{job_id}/
  • Review Agent — Audits all Worker outputs, flags low-quality results for retry, generates final_report.md
  • Dynamic Worker Spawning — Manager auto-spawns and recycles Workers based on load (MIN_WORKERS / MAX_WORKERS)
  • Task Orchestration — Single tasks, parallel batches, and multi-step DAG pipelines
  • Fault Tolerance — Worker heartbeat monitoring, automatic task requeue on failure
  • Shared Knowledge Base — Results stored in Qdrant for downstream context injection

Quick Start

# 0. Install dependencies
pip install -r requirements.txt

# 1. Start infrastructure
docker compose up -d

# 2. Start Manager (auto-spawns workers if MIN_WORKERS > 0)
python -m manager.main --port 8001

# 3. Start Worker(s) manually (or set MIN_WORKERS env var)
python -m worker.main --worker-id worker-1

# 4. Run unit tests
PYTHONPATH=. pytest tests/ -v

# 5. Run full e2e test (single task / parallel / pipeline / failover)
bash scripts/run_full_test.sh

# 6. Run file pipeline test (requires running Manager + Worker)
bash scripts/test_job_pipeline.sh

File Pipeline API

Submit a file or directory for batch processing:

# Process a txt file (split by "---" separator)
curl -X POST http://localhost:8001/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "input_path": "data/samples/questions.txt",
    "prompt_template": "Answer concisely:\n\n{content}",
    "split_strategy": "separator",
    "separator": "---",
    "review_enabled": true
  }'

# Process a JSON array file
curl -X POST http://localhost:8001/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "input_path": "data/tasks.json",
    "prompt_template": "Explain in plain English:\n\n{content}",
    "json_strategy": "array",
    "review_enabled": false
  }'

# Check job progress
curl http://localhost:8001/jobs/{job_id}

# List all jobs
curl http://localhost:8001/jobs

Split strategies for txt/md:

Strategy Description
lines Split every N lines (default: 50)
separator Split on a separator string (default: ---)
whole Treat entire file as one chunk

JSON strategies:

Strategy Description
array Root must be a JSON array — each element → one task
keys Root must be a JSON object — each top-level key → one task

Worker Scaling API

# Scale worker pool to N workers
curl -X POST http://localhost:8001/workers/scale \
  -H "Content-Type: application/json" \
  -d '{"count": 3}'

# Get current spawner status
curl http://localhost:8001/workers/status

Or set env vars to enable auto-spawning on Manager startup:

MIN_WORKERS=2 MAX_WORKERS=8 python -m manager.main

Parent CLI

# Submit a task and wait for result
python -m parent.main --task-type research --prompt "What are the latest LLM trends?"

# Submit without waiting
python -m parent.main --task-type research --prompt "..." --no-wait

# Check task status
python -m parent.main --status <task-id>

# Run a DAG pipeline from JSON
python -m parent.main --pipeline scripts/pipeline_example.json

HTTP API Reference

Method Endpoint Description
POST /tasks Submit a task, return immediately
POST /tasks/wait Submit a task and wait for result
GET /status/tasks/{id} Get task status and result
POST /pipelines Submit a DAG pipeline and wait
POST /jobs Submit a file batch job
GET /jobs List all jobs
GET /jobs/{id} Get job progress and output path
POST /workers/scale Scale worker pool to N workers
GET /workers/status Spawner status
GET /health Health check
GET /status/workers List registered workers

Project Structure

agent-link/
├── config/
│   └── settings.py              # Pydantic settings
├── shared/
│   ├── redis_queue.py           # Task queue + pub/sub + distributed lock
│   ├── postgres.py              # Task/worker state persistence
│   ├── knowledge_base.py        # Qdrant knowledge base
│   └── output_writer.py         # Writes Worker results to output/{job_id}/
├── manager/
│   ├── router.py                # Task dispatch to Redis
│   ├── scheduler.py             # Task submission + dependency wait
│   ├── pipeline.py              # DAG pipeline executor
│   ├── health.py                # Worker heartbeat + failover
│   ├── file_parser.py           # Parse files into TaskChunk lists
│   ├── job_manager.py           # Orchestrate file pipeline end-to-end
│   ├── reviewer.py              # Review Agent + retry feedback loop
│   ├── spawner.py               # Dynamic Worker process management
│   ├── api.py                   # FastAPI HTTP endpoints
│   └── main.py                  # Manager entry point
├── worker/
│   ├── executor.py              # Task claim, execute, write result
│   ├── hermes_client.py         # Hermes CLI subprocess wrapper
│   └── main.py                  # Worker entry point
├── parent/
│   ├── cli.py                   # Argument parsing
│   └── main.py                  # CLI entry point
├── data/
│   └── samples/                 # Example input files for testing
├── scripts/
│   ├── run_full_test.sh         # Full e2e test (4 scenarios)
│   ├── test_job_pipeline.sh     # File pipeline + Review Agent e2e test
│   ├── start_all.sh / stop_all.sh
│   └── e2e_test.py
└── tests/
    ├── test_redis_queue.py
    ├── test_postgres.py
    ├── test_knowledge_base.py
    └── test_file_pipeline.py    # FileParser + OutputWriter unit tests

Configuration

Copy .env.example to .env:

REDIS_HOST=localhost
REDIS_PORT=6379
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=hermes
POSTGRES_PASSWORD=yourpassword
POSTGRES_DB=hermes
QDRANT_HOST=localhost
QDRANT_PORT=6333

# Worker auto-spawn (optional)
MIN_WORKERS=0
MAX_WORKERS=8

Testing

# Unit tests (no Docker required)
PYTHONPATH=. pytest tests/ -v
# 42 passed ✅

# Full e2e test (requires Docker + hermes)
bash scripts/run_full_test.sh

# File pipeline e2e test (requires running Manager + Worker)
bash scripts/test_job_pipeline.sh

License

MIT

About

A distributed multi-agent orchestration platform enabling parent-child session collaboration with task queuing, shared knowledge base, and fault-tolerant worker management.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors