# Multi-Agent System Skeleton

This notebook demonstrates a **skeleton structure** of a multi-agent system in Python.  

The goals of this notebook are:
- Show the modular design of an agent that can handle queries.
- Explain how caching, retrieval, safe calculations, and policy lookups are organized.



## Imports & Setup

This section imports the required libraries and explains why each is needed:

- `json` → for logging and storing structured data.
- `time`, `datetime` → for timestamps and timing operations.
- `hashlib` → for generating cache keys and run IDs.
- `concurrent.futures` → for running tools with a timeout.
- `re`, `ast` → for regex parsing and safe expression evaluation.
- `sklearn.feature_extraction.text.TfidfVectorizer` → for converting text into numerical vectors (TF-IDF) to support retrieval.
- `numpy` → for numerical operations like sorting and matrix multiplication.



In [None]:
import json
import time
import hashlib
from datetime import datetime, UTC
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeout
import re
import ast
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np


## Simple In-Memory Cache

`SimpleCache` provides a lightweight in-memory key-value store:

- Avoids recomputing results for the same input.
- Keys are generated using SHA256 hashes of the tool name + input arguments.
- This is essential for performance in multi-agent systems where repeated queries occur.


In [None]:
class SimpleCache:
    def __init__(self):
        self._d = {}
    def get(self, key):
        return self._d.get(key)
    def set(self, key, value):
        self._d[key] = value


## Run Callable with Timeout

Some tools (like calculations or retrieval) might take too long.  

- `run_with_timeout` uses `ThreadPoolExecutor` to limit execution time.
- Ensures the agent does not hang indefinitely.
- Supports retries and exponential backoff.


In [None]:
_executor = ThreadPoolExecutor(max_workers=6)

def run_with_timeout(fn, args=(), timeout=5):
    fut = _executor.submit(fn, *args)
    try:
        return fut.result(timeout=timeout)
    except FutureTimeout:
        fut.cancel()
        raise TimeoutError("timeout")


## Calculator: Safe Expression Evaluation

- Uses Python's `ast` module to safely evaluate arithmetic expressions.
- Only allows certain operators: `+ - * / % ** //`.
- Prevents code injection since we do **not** use `eval`.
- Supports both binary and unary operations.



In [None]:
ALLOWED_BINOPS = (ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Mod, ast.Pow, ast.FloorDiv)
ALLOWED_UNARYOPS = (ast.UAdd, ast.USub)

def safe_eval_expr(expr: str):
    """
    Evaluate arithmetic expressions with numbers and + - * / % ** and parentheses.
    """
    expr = expr.strip()
    node = ast.parse(expr, mode='eval')

    def _eval(n):
        if isinstance(n, ast.Expression):
            return _eval(n.body)
        if isinstance(n, ast.Num):
            return n.n
        if isinstance(n, ast.BinOp):
            if not isinstance(n.op, ALLOWED_BINOPS):
                raise ValueError("Operator not allowed")
            left = _eval(n.left)
            right = _eval(n.right)
            if isinstance(n.op, ast.Add): return left + right
            if isinstance(n.op, ast.Sub): return left - right
            if isinstance(n.op, ast.Mult): return left * right
            if isinstance(n.op, ast.Div): return left / right
            if isinstance(n.op, ast.Mod): return left % right
            if isinstance(n.op, ast.Pow): return left ** right
            if isinstance(n.op, ast.FloorDiv): return left // right
        if isinstance(n, ast.UnaryOp):
            if not isinstance(n.op, ALLOWED_UNARYOPS):
                raise ValueError("Unary op not allowed")
            val = _eval(n.operand)
            if isinstance(n.op, ast.UAdd): return +val
            if isinstance(n.op, ast.USub): return -val
        raise ValueError(f"Expression node not allowed: {type(n)}")
    return _eval(node)


## Retriever: TF-IDF over KB

- Converts all text passages into TF-IDF vectors.
- Computes cosine similarity to a query.
- Returns top-k most relevant passages.
- Essential for answering open-ended queries from the knowledge base.


In [None]:
class Retriever:
    def __init__(self, passages):
        self.passages = passages
        texts = [p['text'] for p in passages]
        self.vec = TfidfVectorizer().fit(texts)
        self.mat = self.vec.transform(texts)
    def retrieve(self, query, k=3):
        qv = self.vec.transform([query])
        scores = (self.mat @ qv.T).toarray()[:,0]
        idx = np.argsort(scores)[::-1][:k]
        return [{'score': float(scores[i]), **self.passages[i]} for i in idx if scores[i] > 0]


## PolicyLookup

- A shortcut for policy or FAQ queries.
- Uses simple string matching on titles and keywords.
- Useful for corporate-style queries like "working hours" or "reimbursements".


In [None]:
class PolicyLookup:
    def __init__(self, passages):
        self.kb = passages
    def lookup(self, query):
        results = []
        q = query.lower()
        for p in self.kb:
            if p['title'].lower().startswith("company policy") or 'working hours' in p['title'].lower():
                if any(tok in q for tok in ['working hours','overtime','hours']):
                    results.append(p)
        for p in self.kb:
            if p['title'].lower() in q or any(word in p['title'].lower() for word in q.split()):
                if p not in results:
                    results.append(p)
        return results


## StringTools

- Helper functions to extract numbers and percentages from text.
- Useful for parsing queries like "15% of 640" or "Compute 125 * 6".


In [None]:
class StringTools:
    @staticmethod
    def extract_numbers(s):
        return re.findall(r'[-+]?\d*\.?\d+%?', s)
    @staticmethod
    def extract_percentages(s):
        return re.findall(r'(\d+(\.\d+)?)\s*%', s)


## Agent Core

The `Agent` class orchestrates the multi-agent system:

- **Planner**: decides which tool(s) to use based on the query.
- **Router**: calls the appropriate tool.
- **Executor**: runs tools with retries and timeout.
- **Cache**: avoids recomputation.
- **Critic**: minimal check if an answer exists.
- **Logger**: logs every stage with timestamps and run IDs.

This design is modular: you can later add new tools or improve planning logic.


In [None]:
class Agent:
    def __init__(self, retriever, policy_lookup, cache=None):
        self.retriever = retriever
        self.policy_lookup = policy_lookup
        self.cache = cache or SimpleCache()
        self.logs = []
    def plan(self, query):
        if re.search(r'\d', query) and re.search(r'[%\+\-\*\/]', query):
            return ['calculator']
        if any(word in query.lower() for word in ['policy','working hours','reimbursement','reimburse']):
            return ['policylookup']
        return ['retriever']
    def run_tool_with_retries(self, tool_name, func, *args, retries=2, timeout=4):
        cache_key = hashlib.sha256((tool_name + '|' + json.dumps(args, default=str)).encode()).hexdigest()
        cached = self.cache.get(cache_key)
        if cached:
            return {'cached': True, 'result': cached, 'meta': {'retries': 0}}
        last_exc = None
        for attempt in range(1, retries+1):
            try:
                start = time.time()
                res = run_with_timeout(func, args=args, timeout=timeout)
                dur = (time.time()-start)*1000
                self.cache.set(cache_key, res)
                return {'cached': False, 'result': res, 'meta': {'retries': attempt-1, 'duration_ms': dur}}
            except Exception as e:
                last_exc = e
                time.sleep(0.2 * attempt)
        raise last_exc
    def handle(self, query):
        run_id = hashlib.sha1(query.encode()).hexdigest()[:8]
        ts = datetime.now(UTC).isoformat().replace("+00:00", "Z")
        plan_steps = self.plan(query)
        self.logs.append({'id': run_id, 'timestamp': ts, 'stage': 'plan', 'details': plan_steps})
        final_answer = None
        tool_calls = []
        for step in plan_steps:
            if step == 'calculator':
                def calc_fn(q): return safe_eval_expr(q)
                try:
                    res = self.run_tool_with_retries('calculator', calc_fn, query)
                    tool_calls.append({'tool': 'calculator', 'input': query, 'output': str(res['result'])})
                    final_answer = str(res['result'])
                except Exception as e:
                    tool_calls.append({'tool': 'calculator', 'input': query, 'error': str(e)})
            elif step == 'policylookup':
                def pl_fn(q): return self.policy_lookup.lookup(q)
                res = self.run_tool_with_retries('policylookup', pl_fn, query)
                tool_calls.append({'tool': 'policylookup', 'input': query, 'output': [p['title'] for p in res['result']]})
                if res['result']:
                    final_answer = res['result'][0]['text']
            elif step == 'retriever':
                def r_fn(q): return self.retriever.retrieve(q, k=3)
                res = self.run_tool_with_retries('retriever', r_fn, query)
                tool_calls.append({'tool': 'retriever', 'input': query, 'output_count': len(res['result'])})
                if res['result']:
                    final_answer = res['result'][0]['text']

        critic_ok = final_answer is not None
        self.logs.append({'id': run_id, 'stage': 'tool_calls', 'details': tool_calls})
        self.logs.append({'id': run_id, 'stage': 'critic', 'details': {'ok': critic_ok}})
        output_record = {'id': run_id, 'timestamp': ts, 'query': query, 'final_output': final_answer}
        self.logs.append({'id': run_id, 'stage': 'final', 'details': output_record})
        return output_record

    def dump_logs(self, path='logs.json'):
        with open(path,'w') as f:
            json.dump(self.logs, f, indent=2)


## Knowledge Base Loader

- Parses `knowledgeBase.txt` into a list of documents.
- Each document contains an `id`, `title`, and `text`.
- This is a **naive parser**, depending on KB format `[DOC-X] ...`.


In [None]:
def load_kb_from_file(path='knowledgeBase.txt'):

    docs = []
    with open(path, 'r') as f:
        text = f.read()

    parts = text.split('[DOC-')
    for p in parts[1:]:
        header, body = p.split(']',1)
        id_num = header.strip()
        title_line = body.strip().splitlines()[0]
        title = title_line.replace('Title:','').strip() if 'Title:' in title_line else f'DOC-{id_num}'
        txt = body.split('Text:')[-1].strip()
        docs.append({'id': f'DOC-{id_num}', 'title': title, 'text': txt})
    return docs


## Example Usage

- Load KB
- Initialize Retriever, PolicyLookup, and Agent
- Run a sample query
- Dump logs to file


In [None]:
if __name__ == '__main__':
    kb = load_kb_from_file('knowledgeBase.txt')
    retr = Retriever(kb)
    pol = PolicyLookup(kb)
    agent = Agent(retr, pol)
    # Example queries:
    q = "What is LLUMO AI's core value proposition?"
    print(agent.handle(q))
    agent.dump_logs()


{'id': '5e732532', 'timestamp': '2025-09-21T06:36:37.462473Z', 'query': "What is LLUMO AI's core value proposition?", 'final_output': 'LLUMO AI is an evaluation-first reliability layer for LLM apps. It focuses on automated scoring, actionable insights, and cost savings via prompt compression—helping teams ship trustworthy AI.'}
