A distributed multi-modal agent orchestration framework implementing advanced natural language processing, computer vision, and audio processing capabilities through a microservices architecture.
Mnemosyne Agents operates as a distributed system utilizing a message-passing architecture for inter-agent communication. The core orchestrator implements an asynchronous event loop managing concurrent agent execution through a custom scheduler.
The system implements a layered architecture:
Infrastructure Layer
├── Vector Store (FAISS/Pinecone)
├── Message Queue (Redis/RabbitMQ)
└── Metrics Store (Prometheus)
Core Services Layer
├── Orchestrator Service
│ ├── Task Scheduler
│ ├── Memory Manager
│ └── Agent Router
├── Security Service
└── Telemetry Service
Agent Layer
├── NLP Agent (Text Processing)
├── Computer Vision Agent
├── Audio Processing Agent
└── Video Analysis Agent
The orchestrator implements a custom task scheduler using asyncio for concurrent execution:
from typing import Dict, Any, Optional
from dataclasses import dataclass
from asyncio import Queue, Task
import asyncio
@dataclass
class OrchestratorConfig:
max_concurrent_tasks: int = 10
task_timeout: float = 30.0
retry_attempts: int = 3
memory_backend: str = "faiss"
vector_dimensions: int = 1536
class Orchestrator:
def __init__(self, config: OrchestratorConfig):
self.config = config
self.task_queue: Queue[Task] = Queue()
self.active_tasks: Dict[str, Task] = {}
self._setup_memory_backend()
async def schedule_task(self, task_data: Dict[str, Any]) -> str:
task_id = self._generate_task_id()
task = self._create_task(task_data)
await self.task_queue.put(task)
self.active_tasks[task_id] = task
return task_id
async def _process_queue(self):
while True:
if len(self.active_tasks) < self.config.max_concurrent_tasks:
task = await self.task_queue.get()
await self._execute_task(task)The system implements a sophisticated memory management system using vector stores for semantic search and retrieval:
from typing import List, Optional
import numpy as np
from faiss import IndexFlatL2
class MemoryManager:
def __init__(self, dimensions: int = 1536):
self.index = IndexFlatL2(dimensions)
self.metadata: List[dict] = []
async def store_embedding(self,
embedding: np.ndarray,
metadata: Optional[dict] = None) -> int:
if embedding.ndim == 1:
embedding = embedding.reshape(1, -1)
self.index.add(embedding)
self.metadata.append(metadata or {})
return self.index.ntotal - 1
async def semantic_search(self,
query_embedding: np.ndarray,
k: int = 5) -> List[dict]:
D, I = self.index.search(query_embedding.reshape(1, -1), k)
return [
{"id": int(i), "distance": float(d), "metadata": self.metadata[int(i)]}
for d, i in zip(D[0], I[0])
]Agents are implemented as autonomous services with their own processing pipelines:
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
import asyncio
class BaseAgent(ABC):
def __init__(self, config: Dict[str, Any]):
self.config = config
self.processing_queue = asyncio.Queue()
self._initialize_pipeline()
@abstractmethod
async def process(self, input_data: Any) -> Dict[str, Any]:
pass
async def _preprocess(self, data: Any) -> Any:
pass
async def _postprocess(self, result: Any) -> Dict[str, Any]:
pass
class NLPAgent(BaseAgent):
async def process(self, input_data: str) -> Dict[str, Any]:
embeddings = await self._generate_embeddings(input_data)
intent = await self._classify_intent(input_data)
entities = await self._extract_entities(input_data)
return {
"embeddings": embeddings,
"intent": intent,
"entities": entities,
"raw_text": input_data
}from mnemosyne.pipeline import Pipeline, ProcessingStep
from mnemosyne.agents import NLPAgent, VisionAgent
async def create_multimodal_pipeline():
pipeline = Pipeline()
# Configure processing steps
pipeline.add_step(
ProcessingStep(
agent=NLPAgent,
config={"model": "gpt-4", "max_tokens": 1000}
)
)
pipeline.add_step(
ProcessingStep(
agent=VisionAgent,
config={"model": "clip", "batch_size": 32}
),
parallel=True # Execute in parallel with NLP
)
return pipeline
# Usage
pipeline = await create_multimodal_pipeline()
results = await pipeline.process({
"text": "Analyze this image",
"image": image_bytes
})The system implements several optimization strategies:
- Batched Processing: Requests are automatically batched for optimal throughput
- Adaptive Rate Limiting: Dynamic rate limiting based on system load
- Caching Layer: Two-level cache implementation (in-memory and distributed)
- Connection Pooling: Database and API connection pooling for resource optimization
Security is implemented through a multi-layered approach:
from cryptography.fernet import Fernet
from typing import Optional
class SecurityManager:
def __init__(self, encryption_key: Optional[str] = None):
self.fernet = Fernet(encryption_key or Fernet.generate_key())
self._setup_rate_limiter()
def encrypt_payload(self, data: bytes) -> bytes:
return self.fernet.encrypt(data)
def decrypt_payload(self, encrypted_data: bytes) -> bytes:
return self.fernet.decrypt(encrypted_data)- Python 3.9+
- Redis/RabbitMQ
- Vector Store (FAISS/Pinecone)
- Prometheus/Grafana (optional)
python -m venv venv
source venv/bin/activate # or venv\Scripts\activate on Windows
pip install -r requirements.txtThe system is configured through environment variables or a configuration file:
orchestrator:
max_concurrent_tasks: 10
task_timeout: 30.0
retry_attempts: 3
memory:
backend: "faiss"
vector_dimensions: 1536
index_type: "L2"
security:
encryption_key: "${ENCRYPTION_KEY}"
rate_limit_requests: 100
rate_limit_window: 60This project is licensed under the MIT License. See the LICENSE file for details.