δΈζζζ‘£ | English
A distributed agent framework built on top of LangGraph that enables multiple AI agents to work together seamlessly using Redis as a message broker. This SDK provides a robust foundation for building scalable, multi-agent AI systems with real-time communication and state persistence.
Sensitive tool execution requires human approval - Built-in safety mechanisms ensure that critical operations, sensitive data access, and potentially impactful actions are reviewed and approved by humans before execution. Real-time monitoring and intervention capabilities provide complete control over agent behavior.
Horizontally scalable multi-agent systems - Multiple agents run independently across different processes or machines, communicating through Redis streams. Each agent can be deployed, scaled, and managed separately while maintaining seamless coordination.
Intelligent workflow coordination - Agents can be organized in hierarchical structures where coordinator agents delegate tasks to specialized sub-agents. This enables complex workflow orchestration with clear responsibility chains and efficient task distribution.
agents-ui-demo.mp4
- MCP Server Integration: Support for Model Context Protocol servers to extend agent capabilities
- Persistent State Management: MySQL/SQLite checkpoint storage for conversation history
- Scalable Design: Horizontal scaling with Redis streams and consumer groups
- Easy Integration: Simple client interface for interacting with the agent system
The system consists of several key components:
- Agent Workers: Individual agents that process tasks and communicate via Redis streams
- Agent Client: Interface for sending messages and receiving responses from agents
- Agent Runner: High-level wrapper for creating and managing agents
- Redis Streams: Message broker for inter-agent communication
- Checkpoint Storage: Persistent state management using MySQL or SQLite
pip install langgraph_distributed_agentThe package requires Python 3.10+ and the following key dependencies:
langgraph- Core graph-based agent frameworkredis- Redis client for message streaminglangchain- LLM integrationpydantic- Data validation and settings management
Create a .env file with your configuration:
REDIS_URL=redis://:password@localhost:6379/0
CHECKPOINT_DB_URL=agent_checkpoints.db
OPENAI_BASE_URL=https://api.openai.com/v1
OPENAI_MODEL=gpt-4
OPENAI_API_KEY=sk-your-api-keyfrom langchain_core.tools import tool
from langgraph.runtime import get_runtime
import asyncio
from langgraph_distributed_agent.agent_runner import AgentRunner
from langgraph_distributed_agent.utils import human_approval_required
import os
from typing import Annotated
from langchain_core.tools import tool, InjectedToolCallId
from langchain_core.runnables import RunnableConfig
import dotenv
dotenv.load_dotenv()
@tool
def get_city_weather(city: str) -> str:
"""
Get the weather for a specific city.
Parameters:
city (str): Name of the city, e.g., "London".
Returns:
str: Weather description for the given city.
"""
print("current context", get_runtime().context)
return f"It's always sunny in {city}!"
@tool
@human_approval_required
def get_city_gdp(city: str,
config: RunnableConfig,
injected_tool_call_id: Annotated[str, InjectedToolCallId]) -> str:
"""Get city gdp"""
print(get_runtime())
return f"The gdp of {city} is 500 billion yuan!"
async def main():
runner = AgentRunner(
agent_name="demo_agent",
system_prompt="You are a helpful assistant.",
redis_url=os.environ.get("REDIS_URL", ""),
mysql_url=os.environ.get("CHECKPOINT_DB_URL", ""),
openai_base_url=os.environ.get(
"OPENAI_BASE_URL", ""),
openai_model=os.environ.get("OPENAI_MODEL", ""),
openai_api_key=os.environ.get("OPENAI_API_KEY", "")
)
runner.add_tool(get_city_weather)
runner.add_tool(get_city_gdp)
await runner.start()
if __name__ == '__main__':
asyncio.run(main())or test with ui https://github.com/SelfRefLab/agents-ui
import asyncio
from langgraph_distributed_agent.agent_cli import AgentCLI
import os
import dotenv
dotenv.load_dotenv()
async def main():
cli = AgentCLI(target_agent="demo_agent",
redis_url=os.environ.get("REDIS_URL", ""))
await cli.run()
if __name__ == '__main__':
asyncio.run(main())The examples/agent_demo/ directory contains a complete working example with:
- Main Agent (
main_agent.py): Coordinator agent that delegates tasks - Weather Agent (
weather_agent.py): Specialized weather information agent - Economics Agent (
economics_agent.py): Specialized economic analysis agent - MCP Server (
demo_mcp_server.py): Example MCP server integration - CLI Client (
cli.py): Interactive command-line interface
- Start the MCP server:
python -m examples.agent_demo.demo_mcp_server- Start the agents:
python -m examples.agent_demo.main_agent
python -m examples.agent_demo.weather_agent
python -m examples.agent_demo.economics_agent- Run the CLI client:
python -m examples.agent_demo.cliMain class for creating and managing agents.
class AgentRunner:
def __init__(self, agent_name: str, system_prompt: str, ...)
async def add_tool(self, tool)
async def add_mcp_server(self, server_url: str)
def add_subagent(self, agent_name: str, description: str)
async def start(self)Client interface for interacting with agents.
import uuid
import os
from langgraph_distributed_agent.agent_client import AgentClient
import dotenv
dotenv.load_dotenv()
async def agent_client_test():
client = AgentClient(
target_agent="main_agent",
redis_url=os.environ.get("REDIS_URL", "")
)
context_id = str(uuid.uuid4())
await client.send_message("hi", context_id)
# get response
async for event in client.progress_events(context_id):
AgentClient.print_progress_event(event)
last_event = await client.get_last_event(context_id)
if last_event.data.type == 'interrupt':
# accept tool invocation
await client.accept_tool_invocation(context_id)
# await client.reject_tool_invocation(context_id)
# get response
async for event in client.progress_events(context_id):
AgentClient.print_progress_event(event)
# get chat history
print("\n\n======= Get Chat History =======\n\n")
his = await client.get_chat_history(context_id)
for item in his:
AgentClient.print_progress_event(item['data'])
# asyncio.run(agent_client_test())
await agent_client_test() # on jupyter notebookLow-level worker for processing agent events.
class DistributedAgentWorker:
def __init__(self, agent: CompiledStateGraph, redis_url: str)
async def start(self)- Clone the repository:
git clone https://github.com/SelfRefLab/langgraph_distributed_agent.git
cd langgraph_distributed_agent- Install dependencies:
pip install -e .- Set up Redis:
# Using Docker
docker run -d -p 6379:6379 redis:latest
# Or install locally
# Follow Redis installation guide for your OS- Copy and configure environment:
cp .env.example .env
# Edit .env with your configurationlanggraph_distributed_agent/
βββ langgraph_distributed_agent/ # Main package
β βββ agent_client.py # Client interface
β βββ agent_runner.py # High-level agent runner
β βββ distributed_agent_worker.py # Core worker implementation
β βββ redis_lock.py # Redis-based locking
β βββ utils.py # Utility functions
βββ examples/ # Example implementations
β βββ agent_demo/ # Complete demo system
We welcome contributions! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
- Built on top of LangGraph
- Inspired by distributed systems patterns
- Developed by the Huya AIOps team
If you have any questions or need help, please:
- Check the examples directory
- Open an issue on GitHub
- Contact the maintainers
Authors: panjianning, lanxuanli
Organization: Huya AIOps Team