# MCP demo – Orchestratore multi-sorgente (arXiv + OpenAlex) + fusione + LLM (opzionale)

Questo notebook è l’orchestratore “vero”:
- decide quali sorgenti chiamare (routing)
- chiama i server MCP (arXiv, OpenAlex)
- fonde i risultati
- opzionalmente usa un LLM locale (Ollama) per produrre una risposta con citazioni inline verificabili

Nota: in Jupyter si usa `await` (no `asyncio.run()`).


## 2. Cell 2 — Import e utilità (Python)

In [50]:
import json
import re
import time
from typing import Any, Dict, List, Tuple, Optional

import httpx
from fastmcp import Client
from fastmcp.client.transports import StdioTransport


## 3. Cell 3 — Unpack robusto delle risposte MCP (Python)

Questo evita problemi tipo TextContent e formati diversi.

In [51]:
def unpack(resp):
    """
    Estrae payload utile da CallToolResult / TextContent.
    Caso chiave: content = [TextContent(json_obj_1), TextContent(json_obj_2), ...]
    -> ritorna [dict, dict, ...]
    """
    # 1) prendi il contenuto "vero"
    content = None
    for key in ("data", "result", "content", "value", "structuredContent"):
        if hasattr(resp, key):
            v = getattr(resp, key)
            if v is not None:
                content = v
                break
    if content is None:
        content = resp

    # 2) lista di TextContent: parse elemento-per-elemento
    if isinstance(content, list) and content and hasattr(content[0], "text"):
        items = []
        for c in content:
            if not hasattr(c, "text"):
                continue
            t = (c.text or "").strip()
            if not t:
                continue
            try:
                items.append(json.loads(t))   # <-- qui risolviamo il tuo caso
            except Exception:
                items.append(t)
        return items

    # 3) singolo TextContent
    if hasattr(content, "text"):
        t = (content.text or "").strip()
        try:
            return json.loads(t)
        except Exception:
            return t

    # 4) già list/dict
    if isinstance(content, (list, dict)):
        return content

    return content


## 4. Cell 4 — Routing: domain detection (Python)

In [52]:
def detect_domain(question: str) -> str:
    """
    Ritorna:
      - "ai" per domande su RAG/LLM/agentic
      - "general" per tutto il resto (es: product management)
    """
    q = question.lower()
    ai_markers = (
        "rag", "retrieval", "llm", "agent",
        "transformer", "embedding", "rerank",
        "vector", "prompt", "hallucination",
        "evaluation", "benchmark"
    )
    return "ai" if any(k in q for k in ai_markers) else "general"


## 5. Cell 5 — Policy sorgenti (Python)


In [53]:
def pick_sources(question: str, domain: str) -> Tuple[bool, bool]:
    """
    Decide quali sorgenti chiamare:
      - ai     -> arXiv + OpenAlex (mode ai)
      - general-> OpenAlex (mode general), arXiv spesso inutile
    """
    if domain == "general":
        return False, True
    return True, True


## 6. Cell 6 — Client MCP helper (Python) 

In [54]:
def make_arxiv_client() -> Client:
    return Client(StdioTransport(command="python", args=["servers/arxiv_server.py"]))

def make_openalex_client() -> Client:
    return Client(StdioTransport(command="python", args=["servers/openalex_server.py"]))

## 7. Cell 7 — Fetch robusto con retry + fallback (Python)

Qui gestiamo il 429: se arXiv fallisce, continuiamo con OpenAlex.

In [55]:
import asyncio

async def asyncio_sleep(s: float):
    await asyncio.sleep(s)

async def call_arxiv_safe(arxiv: Client, question: str, k: int) -> List[Dict]:
    backoffs = [1.5, 3.0, 6.0]
    for attempt, wait_s in enumerate([0.0] + backoffs):
        if wait_s:
            await asyncio_sleep(wait_s)
        try:
            ax = await arxiv.call_tool("arxiv_search", {"topic": question, "max_results": k})
            data = unpack(ax)
            return data if isinstance(data, list) else []
        except Exception as e:
            msg = str(e)
            if ("429" in msg) or ("rate" in msg.lower()):
                if attempt < len(backoffs):
                    continue
            return []
    return []

async def call_openalex(openalex: Client, question: str, k: int, mode: str) -> List[Dict]:
    ox = await openalex.call_tool(
        "openalex_search_works",
        {"query": question, "per_page": k, "mode": mode}
    )
    data = unpack(ox)
    return data if isinstance(data, list) else []

async def fetch(
    question: str,
    k: int = 3,
    use_arxiv: Optional[bool] = None,
    use_openalex: Optional[bool] = None
) -> Dict[str, Any]:
    domain = detect_domain(question)

    # default (se l'utente non decide)
    default_arxiv, default_openalex = pick_sources(question, domain)
    if use_arxiv is None:
        use_arxiv = default_arxiv
    if use_openalex is None:
        use_openalex = default_openalex

    arxiv = make_arxiv_client()
    openalex = make_openalex_client()

    ax_data: List[Dict] = []
    ox_data: List[Dict] = []

    ox_mode = "ai" if domain == "ai" else "general"

    # Se non attivi nulla
    if not use_arxiv and not use_openalex:
        return {"domain": domain, "arxiv": [], "openalex": []}

    # Solo arXiv
    if use_arxiv and not use_openalex:
        async with arxiv:
            ax_data = await call_arxiv_safe(arxiv, question, k)
        return {"domain": domain, "arxiv": ax_data, "openalex": []}

    # Solo OpenAlex
    if use_openalex and not use_arxiv:
        async with openalex:
            ox_data = await call_openalex(openalex, question, k, mode=ox_mode)
        return {"domain": domain, "arxiv": [], "openalex": ox_data}

    # Entrambi
    async with arxiv, openalex:
        ax_data = await call_arxiv_safe(arxiv, question, k)
        ox_data = await call_openalex(openalex, question, k, mode=ox_mode)

    return {"domain": domain, "arxiv": ax_data, "openalex": ox_data}


## 8. Cell 8 — Fusione “umana” (senza LLM) con citazioni inline (Python)

In [56]:
def format_sources_block(ax: List[Dict], ox: List[Dict]) -> str:
    lines = []
    if ax:
        lines.append("## Fonti arXiv")
        for i, p in enumerate(ax, 1):
            lines.append(f"- [arXiv-{i}] {p.get('title')}")
            lines.append(f"  PDF: {p.get('pdf_url')}")
        lines.append("")
    if ox:
        lines.append("## Fonti OpenAlex")
        for i, w in enumerate(ox, 1):
            doi = w.get("doi")
            doi_link = f"https://doi.org/{doi}" if doi else None
            lines.append(f"- [OpenAlex-{i}] {w.get('title')}")
            lines.append(f"  Year: {w.get('publication_year')} | Cited by: {w.get('cited_by_count')}")
            if doi_link:
                lines.append(f"  DOI: {doi_link}")
            else:
                lines.append(f"  OpenAlex: {w.get('openalex_url')}")
        lines.append("")
    return "\n".join(lines)

def fuse_answer_no_llm(question: str, domain: str, ax: List[Dict], ox: List[Dict]) -> str:
    """
    Risposta 'chatbot style' senza LLM, ma con citazioni inline.
    """
    lines = []
    lines.append(f"Domanda: {question}")
    lines.append(f"DOMAIN: {domain}\n")

    if domain == "ai":
        if ax:
            lines.append(f"Ho trovato preprint rilevanti su arXiv (es. [arXiv-1]).")
        if ox:
            lines.append(f"Ho trovato anche risultati in OpenAlex con segnali (citazioni/venue) (es. [OpenAlex-1]).")
        if not ax and not ox:
            lines.append("Non emergono evidenze dalle fonti fornite.")
    else:
        if ox:
            lines.append(f"Per una domanda generalista, uso OpenAlex in modalità 'general' (es. [OpenAlex-1]).")
        else:
            lines.append("Non emergono evidenze dalle fonti fornite.")

    lines.append("\n" + format_sources_block(ax, ox))
    return "\n".join(lines)


## 9. Cell 9 — LLM (Ollama) opzionale con citazioni inline + check (Python)

In [57]:
OLLAMA_URL = "http://localhost:11434/api/generate"

def allowed_citations(ax: List[Dict], ox: List[Dict]) -> List[str]:
    allowed = []
    allowed += [f"[arXiv-{i}]" for i in range(1, len(ax) + 1)]
    allowed += [f"[OpenAlex-{i}]" for i in range(1, len(ox) + 1)]
    return allowed

def build_prompt(question: str, ax: List[Dict], ox: List[Dict]) -> str:
    allowed = allowed_citations(ax, ox)
    allowed_str = ", ".join(allowed) if allowed else "(nessuna)"

    return f"""
Sei un assistente di ricerca. Devi rispondere SOLO usando le fonti fornite qui sotto.

DOMANDA:
{question}

FONTI arXiv (ordine -> [arXiv-1], [arXiv-2], ...):
{json.dumps(ax, ensure_ascii=False, indent=2)}

FONTI OpenAlex (ordine -> [OpenAlex-1], [OpenAlex-2], ...):
{json.dumps(ox, ensure_ascii=False, indent=2)}

REGOLE OBBLIGATORIE (vincolanti):
1) Non usare conoscenza generale. Usa SOLO quello che è nelle fonti.
2) Ogni affermazione fattuale deve avere una citazione inline.
3) PUOI USARE SOLO QUESTE CITAZIONI: {allowed_str}
   - Se usi una citazione diversa, la risposta è invalida.
4) Se le fonti non supportano una cosa, scrivi: "Non emergono evidenze dalle fonti fornite."
5) Stile: italiano chiaro, niente parole inventate. Se non sai, dillo.

OUTPUT:
- Un paragrafo di sintesi con citazioni inline.
- Poi "Fonti:" con elenco puntato, link PDF (arXiv) e DOI/OpenAlex (OpenAlex).
""".strip()

async def ollama_generate(prompt: str, model: str = "llama3.2") -> str:
    async with httpx.AsyncClient(timeout=120.0) as client:
        r = await client.post(
            OLLAMA_URL,
            json={"model": model, "prompt": prompt, "stream": False}
        )
        r.raise_for_status()
        data = r.json()
        return (data.get("response") or "").strip()

def has_inline_citations(text: str) -> bool:
    return bool(re.search(r"\[(arXiv|OpenAlex)-\d+\]", text))

def validate_citations(text: str, ax: List[Dict], ox: List[Dict]) -> List[str]:
    allowed = set(allowed_citations(ax, ox))
    found = re.findall(r"\[(arXiv|OpenAlex)-(\d+)\]", text)
    bad = []
    for src, idx in found:
        tag = f"[{src}-{idx}]"
        if tag not in allowed:
            bad.append(tag)
    return sorted(set(bad))

async def answer_with_llm(question: str, ax: List[Dict], ox: List[Dict], model: str = "llama3.2") -> str:
    prompt = build_prompt(question, ax, ox)

    try:
        out = await ollama_generate(prompt, model=model)
    except Exception as e:
        return f"[ERRORE LLM] Non riesco a chiamare Ollama. Avvia `ollama serve`.\nDettagli: {e}"

    # 1) check citazioni presenti
    if not has_inline_citations(out):
        out += "\n\n[WARNING] La risposta non contiene citazioni inline: potrebbe non essere ancorata alle fonti."

    # 2) check citazioni valide (non inventate)
    bad = validate_citations(out, ax, ox)
    if bad:
        out += f"\n\n[WARNING] Citazioni NON valide (non presenti tra le fonti passate): {bad}"

    return out


## 10- Cell 10 — Funzione “answer” orchestratore (Python)

Questa è la funzione “unica” che userai sempre.

In [58]:
async def answer(
    question: str,
    k: int = 3,
    use_llm: bool = False,
    model: str = "llama3.2",
    use_arxiv: Optional[bool] = None,
    use_openalex: Optional[bool] = None
) -> str:
    bundle = await fetch(
        question,
        k=k,
        use_arxiv=use_arxiv,
        use_openalex=use_openalex
    )

    domain = bundle["domain"]
    ax = bundle["arxiv"]
    ox = bundle["openalex"]

    if not use_llm:
        return fuse_answer_no_llm(question, domain, ax, ox)

    return await answer_with_llm(question, ax, ox, model=model)



## 11. Cell 11 — Esempi di test (Python)

⚠️ Questa cella va eseguita con await (Jupyter ok).

In [59]:
out1 = await answer("agentic RAG evaluation", k=3, use_llm=False)
print(out1)

print("\n" + "="*80 + "\n")

out2 = await answer("product management", k=3, use_llm=False)
print(out2)


Domanda: agentic RAG evaluation
DOMAIN: ai

Ho trovato preprint rilevanti su arXiv (es. [arXiv-1]).
Ho trovato anche risultati in OpenAlex con segnali (citazioni/venue) (es. [OpenAlex-1]).

## Fonti arXiv
- [arXiv-1] Retrieval Augmented Generation (RAG) for Fintech: Agentic Design and Evaluation
  PDF: https://arxiv.org/pdf/2510.25518v1
- [arXiv-2] RAG-Gym: Systematic Optimization of Language Agents for Retrieval-Augmented Generation
  PDF: https://arxiv.org/pdf/2502.13957v2
- [arXiv-3] MultiHop-RAG: Benchmarking Retrieval-Augmented Generation for Multi-Hop Queries
  PDF: https://arxiv.org/pdf/2401.15391v1

## Fonti OpenAlex
- [OpenAlex-1] Development and evaluation of an agentic LLM based RAG framework for evidence-based patient education
  Year: 2025 | Cited by: 1
  DOI: https://doi.org/10.1136/bmjhci-2025-101570
- [OpenAlex-2] AI Agents vs. Agentic AI: A Conceptual Taxonomy, Applications and Challenges
  Year: 2025 | Cited by: 35
  DOI: https://doi.org/10.70777/si.v2i3.15161
- [Open

Process group termination failed for PID 47081: [Errno 3] No such process, falling back to simple terminate


## 12. Cell 12 — Test LLM (Python, opzionale)

Richiede Ollama attivo:

In [60]:
# Mini "casella" testuale: scrivi domanda e premi Invio
q = input("Domanda: ").strip()

src = input("Sorgenti (a=arxiv, o=openalex, b=both) [b]: ").strip().lower() or "b"

use_arxiv = (src in ["a", "b"])
use_openalex = (src in ["o", "b"])

use_llm = True
k = 3
model = "llama3.2"

out = await answer(q, k=k, use_llm=use_llm, model=model, use_arxiv=use_arxiv, use_openalex=use_openalex)
print(out)



Ecco il mio risposta:

Rag extentions sono sistemi che migliorano le performance dei modeli di generazione linguaggio utilizzando la ricerca e l'apprendimento automatico. Secondo un recente articolo pubblicato su arXiv ([arXiv-1]), i sistemi RAG-Gym e RAG-Star presentano risultati promettenti nella risoluzione di compiti di ragionamento e generazione linguaggio.

In particolare, l'articolo [arXiv-1] introduce il sistema RAG-Gym, che offre una piattaforma completa per la ricerca e l'ottimizzazione dei sistemi RAG. Il sistema propone tre dimensioni di ottimizzazione: prompt engineering, attuator tuning e critic training.

Un altro articolo pubblicato su arXiv ([arXiv-2]) si concentra sulla risoluzione di multi-hop queries utilizzando sistemi RAG. L'autore sostiene che gli sistemas esistenti sono insufficienti per risolvere queste complessità.

Inoltre, l'articolo [arXiv-3] presenta un sistema chiamato RAG-Star, che integra la ricerca e l'apprendimento automatico per migliorare le perform

## STEP DI PROVA

In [61]:

bundle = await fetch("agentic RAG evaluation", k=3)
bundle.keys(), bundle["domain"], len(bundle["arxiv"]), len(bundle["openalex"])


(dict_keys(['domain', 'arxiv', 'openalex']), 'ai', 3, 3)