#### Pip and installs

In [None]:
!git clone https://github.com/gpapageorgiouedu/Hybrid-Multi-Agent-GraphRAG-for-E-Government.git

In [None]:
!pip install -q \
neo4j-haystack==2.2.1 \
openai==1.72.0 \
sentence-transformers==3.4.1 \
yfiles_jupyter_graphs==1.10.2 \
trafilatura==2.0.0 \
fastapi==0.115.12 \
uvicorn==0.34.1

In [None]:
# core libs imports for data handling, file management, and type annotations
import json
import openai
import os
import re
import ast
import time
from collections import defaultdict
from pathlib import Path
from typing import List, Dict, Any
import pandas as pd
import numpy as np

# google colab utils for output and secure data storage
from google.colab import output, userdata
from google.colab.output import eval_js
from google.colab import files

# neo4j database integration
from neo4j import GraphDatabase

# haystack/ neo4j integration components
from neo4j_haystack import Neo4jDocumentStore, Neo4jEmbeddingRetriever

# graph visualization in notebooks (explanatory)
from yfiles_jupyter_graphs import GraphWidget

# haystack components for pipeline construction and data processing
from haystack import Pipeline, component, Document
from haystack.core.component import component, Component
from haystack.components.agents import Agent
from haystack.components.builders import PromptBuilder
from haystack.components.converters import HTMLToDocument
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.generators import OpenAIGenerator
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.rankers import TransformersSimilarityRanker
from haystack.components.websearch import SerperDevWebSearch
from haystack.components.evaluators import FaithfulnessEvaluator, ContextRelevanceEvaluator
from haystack.core import SuperComponent
from haystack.dataclasses import ChatMessage, ToolCall, ToolCallResult, TextContent
from haystack.tools.component_tool import ComponentTool
from haystack.utils import Secret

# fastapi framework for building backend APIs
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel

# server and threading utilities
from threading import Thread
import uvicorn

In [None]:
# set environment variables from secure colab userdata and read environment variables into local constants
os.environ["NEO4J_URI"] = userdata.get("NEO4J_URI")
os.environ["NEO4J_USERNAME"] = userdata.get("NEO4J_USERNAME")
os.environ["NEO4J_PASSWORD"] = userdata.get("NEO4J_PASSWORD")
os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI")
os.environ["SERPERDEV_API_KEY"] = userdata.get("SERPER")

NEO4J_URI = os.environ["NEO4J_URI"]
NEO4J_USER = os.environ["NEO4J_USERNAME"]
NEO4J_PASS = os.environ["NEO4J_PASSWORD"]
SERPERDEV_API_KEY = os.environ["SERPERDEV_API_KEY"]

# init and test neo4j connection
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASS))
try:
    with driver.session() as session:
        info = session.run("RETURN 1 AS result").single()
        print("Neo4j connected, test query result:", info["result"])
finally:
    driver.close()

#### Delete Docs (Optional)

In [None]:
def delete_all(tx):
    """
    Delete all nodes and relationships from the graph.

    Args:
        tx: Neo4j transaction object.
    """
    tx.run("MATCH (n) DETACH DELETE n")


def count_remaining(tx):
    """
    Count remaining nodes in the graph.

    Args:
        tx: Neo4j transaction object.

    Returns:
        int: Number of nodes remaining.
    """
    result = tx.run("MATCH (n) RETURN count(n) AS node_count")
    return result.single()["node_count"]


def list_node_labels(tx):
    """
    List all unique labels in the graph.

    Args:
        tx: Neo4j transaction object.

    Returns:
        List[str]: A list of label names.
    """
    result = tx.run("CALL db.labels()")
    return [record["label"] for record in result]


def count_by_label(tx, label):
    """
    Count the number of nodes for a specific label.

    Args:
        tx: Neo4j transaction object.
        label (str): The label to count.

    Returns:
        int: Number of nodes with the given label.
    """
    result = tx.run(f"MATCH (n:`{label}`) RETURN count(n) AS count")
    return result.single()["count"]


# display node counts grouped by label
with driver.session() as session:
    labels = session.read_transaction(list_node_labels)
    for label in labels:
        count = session.read_transaction(count_by_label, label)
        print(f"Label: {label}, Count: {count}")


def force_delete_by_labels(tx, labels):
    """
    Forcefully delete nodes by specified labels.

    Args:
        tx: Neo4j transaction object.
        labels (List[str]): A list of node labels to delete.
    """
    for label in labels:
        tx.run(f"MATCH (n:`{label}`) DETACH DELETE n")


# delete all nodes by label, and verify complete deletion
with driver.session() as session:
    labels = session.read_transaction(list_node_labels)
    session.write_transaction(force_delete_by_labels, labels)

with driver.session() as session:
    session.write_transaction(delete_all)
    remaining = session.read_transaction(count_remaining)

    if remaining == 0:
        print("All nodes and relationships successfully deleted.")
    else:
        print(f"Deletion incomplete: {remaining} node(s) still exist.")

#### Index Docs (Optional)

In [None]:
def load_json_documents(json_folder_path):
    """
    Load JSON files from a folder and convert them into Haystack Document objects.

    Args:
        json_folder_path (str or Path): Path to the folder containing .json files.

    Returns:
        List[Document]: A list of Haystack Document objects with content and metadata.
    """
    documents = []
    for json_file in Path(json_folder_path).glob("*.json"):
        with open(json_file, "r", encoding="utf-8") as f:
            data = json.load(f)

            # standardize metadata keys -> add any fits to your data use case
            if "url" in data:
                data["source_url"] = data.pop("source_url")
            if "date" in data:
                data["date"] = data.pop("date")
            if "title" in data:
                data["title"] = data.pop("title")

            content = data.get("content", "")
            metadata = {k: v for k, v in data.items() if k != "content"}

            documents.append(Document(content=content, meta=metadata))
    return documents


# load raw documents from a local folder
raw_docs = load_json_documents("json_docs")

# clean the documents before embedding
cleaner = DocumentCleaner(
    remove_empty_lines=True,
    remove_extra_whitespaces=True,
    remove_substrings=["..."]
)
cleaned_docs = cleaner.run(raw_docs)["documents"]

# init neo4j document store for embedding storage
document_store = Neo4jDocumentStore(
    url=NEO4J_URI, username=NEO4J_USER, password=NEO4J_PASS,
    database="neo4j",
    index="document-embeddings",
    embedding_field="embedding",
    embedding_dim=1536,
    node_label="Document"
)

# generate embeddings (ada-002 in our use case)
embedder = OpenAIDocumentEmbedder(model="text-embedding-ada-002")
documents_with_emb = embedder.run(cleaned_docs)["documents"]

# index the embedded documents in the neo4j store
document_store.write_documents(documents_with_emb)
print(f"Indexed {document_store.count_documents()} documents in Neo4j.")

In [None]:
def extract_structured_triples(text_chunk):
    """
    Extract structured knowledge triples from a given text using an OpenAI language model.

    The output is a list of dictionaries, where each dictionary represents a triple with:
    - 'head': subject of the triple
    - 'head_type': type/classification of the head
    - 'relation': relationship between head and tail (in UPPER_SNAKE_CASE)
    - 'tail': object of the triple
    - 'tail_type': type/classification of the tail

    Args:
        text_chunk (str): A passage of text from which to extract triples.

    Returns:
        List[Dict[str, str]]: A list of structured triples, or an empty list on failure.
    """
    system_prompt = (
        "You are an information extraction assistant."
        "You are an expert in European Union's news, policies, laws, and actions. "
        "Extract all factual knowledge triples from the text in a structured format. "
        "Return the results as a JSON list where each item is an object with the keys: "
        "'head', 'head_type', 'relation', 'tail', 'tail_type'.\n\n"
        "Guidelines:\n"
        "Resolve vague pronouns (like 'I', 'we', 'they', 'he/she') to actual entities based on context.\n"
        "Use the standard full format, even when abbreviations are used in the text. For example, when 'EU' is used, write it as 'European Union'."
        "Use the standard full format for names, even if the full name is not used entirely in a specific sentence."
        "Include the full context in the extracted triples to ensure they are informative and comprehensive.\n"
        "Maintain consistency: refer to entities by their full and most complete identifiers.\n"
        "Use concise relation phrases written in UPPER_SNAKE_CASE.\n"
        "Avoid vague, incomplete, or uninformative triples. Use full context to provide informative and comprehensive triples.\n"
        "Return only the JSON list of objects. Do not include any explanations, additional knowledge, or markdown.\n"
        "If an entity type is unclear, make a reasonable guess or use a general type like 'Entity'."
    )

    user_prompt = f"Text: ```{text_chunk}```"

    response = openai.chat.completions.create(
        model="gpt-4.1-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0
    )

    content = response.choices[0].message.content.strip()
    content = re.sub(r"^```json\n?", "", content)
    content = re.sub(r"\n?```$", "", content)

    try:
        structured_triples = json.loads(content)
    except json.JSONDecodeError as json_err:
        print("JSON decoding error:", json_err)
        print("Raw output was:\n", content[:500])
        return []
    except Exception as e:
        print("Error:", e)
        return []

    return structured_triples

In [None]:
def sanitize_label(label):
    """
    Sanitize a string to be a valid Neo4j label.

    Converts non-alphanumeric characters to underscores and ensures the first letter is uppercase.
    Returns a default value if the label is empty after sanitization.

    Args:
        label (str): The label string to sanitize.

    Returns:
        str: A sanitized, Neo4j-safe label.
    """
    label = re.sub(r"[^a-zA-Z0-9]", "_", label.strip())
    if not label:
        return "Entity"
    return label[0].upper() + label[1:]


# ensure a full-text index exists for entity IDs
with driver.session() as session:
    try:
        session.run("CREATE FULLTEXT INDEX entity_index IF NOT EXISTS FOR (n) ON EACH [n.id]")
    except:
        pass


# extract triples and insert them into neo4j with typed nodes and relationships
with driver.session() as session:
    for doc in documents_with_emb:
        text = doc.content
        triples = extract_structured_triples(text)
        print(len(triples))
        doc_id = doc.id

        for triple in triples:
            subj = triple.get("head")
            subj_type = sanitize_label(triple.get("head_type", "Entity"))
            pred = triple.get("relation")
            obj = triple.get("tail")
            obj_type = sanitize_label(triple.get("tail_type", "Entity"))

            if not subj or not pred or not obj:
                continue

            rel_type = "_".join(pred.strip().split()).upper()
            rel_type = re.sub(r"[^A-Z0-9_]", "_", rel_type)

            cypher = f"""
            MERGE (s:{subj_type} {{id: $subj}})
            MERGE (o:{obj_type} {{id: $obj}})
            MERGE (s)-[r:{rel_type}]->(o)
            MERGE (d:Document {{id: $doc_id}})
            MERGE (d)-[:MENTIONS]->(s)
            MERGE (d)-[:MENTIONS]->(o)
            """

            session.run(cypher, {
                "subj": subj,
                "obj": obj,
                "doc_id": doc_id
            })

#### Pipelines

In [None]:
@component
class KnowledgeGraphRetriever():
    """
    A custom Haystack component for retrieving context-rich documents from a Neo4j knowledge graph
    based on search terms extracted by an OpenAI model.
    """

    def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_pass: str, openai_model="gpt-4.1-mini"):
        self._driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_pass))
        self._model = openai_model

    @component.output_types(documents=List[Document])
    def run(self, query: str) -> Dict[str, List[Document]]:
        """
        Run retrieval based on an input query.

        Uses a language model to extract search terms, runs Cypher queries against Neo4j,
        and formats the results into Haystack Document objects.

        Args:
            query (str): The natural language query from the user.

        Returns:
            Dict[str, List[Document]]: A dictionary with a single key "documents" containing the result set.
        """
        system_prompt = (
            "You are a search term extractor. Based on the user's question, return a list of 1–3 keywords or named entities "
            "that should be used to search a knowledge graph. Use lowercase, and return only a clean list in JSON like [\"term1\", \"term2\"]"
        )

        response = openai.chat.completions.create(
            model=self._model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": query}
            ],
            temperature=0.2
        )

        raw_content = response.choices[0].message.content.strip()
        try:
            terms = re.findall(r'"(.*?)"', raw_content)
            if not terms:
                terms = [query]
        except Exception:
            terms = [query]

        documents = []

        with self._driver.session() as session:
            for term in terms:
                cypher = """
                    MATCH (n)-[r]-(connected)
                    WHERE toLower(n.id) CONTAINS toLower($query)
                    OPTIONAL MATCH (n)<-[:MENTIONS]-(d:Document)
                    OPTIONAL MATCH (connected)<-[:MENTIONS]-(d2:Document)
                    RETURN n, r, connected, coalesce(d, d2) AS doc
                """
                result = session.run(cypher, {"query": term})

                grouped_output = defaultdict(lambda: {"to_doc": [], "other": []})
                doc_text_lookup = {}

                for record in result:
                    n = record["n"]
                    r = record["r"]
                    connected = record["connected"]
                    doc_node = record.get("doc")

                    n_label = list(n.labels)[0] if n.labels else "Entity"
                    connected_label = list(connected.labels)[0] if connected.labels else "Entity"
                    n_id = n.get("id", "[no-id]")

                    if doc_node:
                        doc_id = doc_node.get("id", "unknown")
                        doc_content = doc_node.get("content", "[No content]")
                        doc_title = doc_node.get("title", "[No Title]")
                        doc_url = doc_node.get("source_url", "[No URL]")
                        doc_date = doc_node.get("date", "[No Date]")
                        full_doc_text = f"Title: {doc_title}\nDate: {doc_date}\nURL: {doc_url}\n\n{doc_content}"
                        doc_text_lookup[doc_id] = full_doc_text
                    else:
                        doc_id = "no_doc"

                    is_connected_doc = "Document" in connected.labels

                    if is_connected_doc:
                        triple_line = f"({n_label}: {n_id}) -[{r.type}]-> In Document below:"
                        grouped_output[doc_id]["to_doc"].append(triple_line)
                    else:
                        connected_value = connected.get("id", "[no-id]")
                        triple_line = f"({n_label}: {n_id}) -[{r.type}]-> ({connected_label}: {connected_value})"
                        grouped_output[doc_id]["other"].append(triple_line)

                for doc_id, groups in grouped_output.items():
                    doc_lines = groups["to_doc"]
                    other_lines = groups["other"]
                    content_parts = []

                    if doc_lines:
                        content_parts.extend(doc_lines)
                        doc_text = doc_text_lookup.get(doc_id, "[No document content]")
                        content_parts.append("\nDocument:\n" + doc_text)

                    if other_lines:
                        content_parts.append("")
                        content_parts.extend(other_lines)

                    final_content = "\n".join(content_parts).strip()
                    meta = {"source_doc_id": doc_id} if doc_id != "no_doc" else {}
                    documents.append(Document(content=final_content, meta=meta))

        if not documents:
            documents.append(Document(content="(No results found)"))

        return {"documents": documents}

@component
class DocumentPassthrough:
    def run(self, documents: List[Document]) -> Dict[str, List[Document]]:
        return {"documents": documents}

In [None]:
# config the neo4j document store for retrieval
document_store = Neo4jDocumentStore(
    url=NEO4J_URI, username=NEO4J_USER, password=NEO4J_PASS,
    database="neo4j",
    index="document-embeddings",
    embedding_field="embedding",
    embedding_dim=1536,
    node_label="Document"
)

# init components
embedder_emb = OpenAITextEmbedder(model="text-embedding-ada-002")

retriever_emb = Neo4jEmbeddingRetriever(document_store=document_store)

ranker_emb = TransformersSimilarityRanker(
    model="intfloat/simlm-msmarco-reranker", top_k=5
)

prompt_template_emb = """
You are an AI Assistant with access to official documents about the European Union's news, policies, laws, and actions.
Your task is to answer user questions strictly based on the documents provided below.

Guidelines:
Use only the content from the provided Documents.
Do NOT rely on prior or external knowledge.

Do NOT ask the user for additional information.

Include inline HTML links for referencing URL sources in the answer, using the URLs provided in the Documents.
Use the document’s title as the anchor text. If the title is missing, use the domain name of the document’s URL as the anchor text.
Each fact you refer to should be followed by the corresponding reference.

Output the answer in a structured markdown format.
Use bullet lists whenever it makes sense.
Do not add a references section at the end of the answer, just use references within the body of text.

If a definitive answer cannot be found in the Documents, respond with:
Final Answer: inconclusive

Always end your answer with this disclaimer:
Disclaimer: This is AI generated content — please use it with caution.

Documents:
{% for doc in documents %}
Source: <a href="{{ doc.meta.source_url }}"</a><br>
Title: <a href="{{ doc.meta.title }}"</a><br>
Date: <a href="{{ doc.meta.date }}"</a><br>

{{ doc.content }}
{% endfor %}

Question: {{ query }}

Answer:
"""

prompt_builder_emb = PromptBuilder(
    template=prompt_template_emb,
    required_variables=["documents", "query"]
)

generator_emb = OpenAIGenerator(
    model="gpt-4.1-mini", api_key=Secret.from_env_var("OPENAI_API_KEY")
)

# create the pipeline, register components and connect pipeline components
emb_pipeline = Pipeline()
emb_pipeline.add_component("embedder", embedder_emb)
emb_pipeline.add_component("retriever", retriever_emb)
emb_pipeline.add_component("reranker", ranker_emb)
emb_pipeline.add_component("output_docs", DocumentPassthrough())
emb_pipeline.add_component("prompt_builder", prompt_builder_emb)
emb_pipeline.add_component("generator", generator_emb)

emb_pipeline.connect("embedder.embedding", "retriever.query_embedding")
emb_pipeline.connect("retriever.documents", "reranker.documents")
emb_pipeline.connect("reranker.documents", "output_docs.documents")
emb_pipeline.connect("reranker.documents", "prompt_builder.documents")
emb_pipeline.connect("prompt_builder", "generator")

# prepare pipeline for use
emb_pipeline.warm_up()

In [None]:
# define components for web based retrieval and generation
web_search = SerperDevWebSearch(top_k=5, api_key=Secret.from_env_var("SERPERDEV_API_KEY"))
fetcher = LinkContentFetcher()
converter = HTMLToDocument()
ranker_web = TransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=5)

# prompt template with required variables for generation
prompt_template_web = """
You are a helpful AI assistant. Use only the web search documents below to answer the user’s question.

Instructions:

Use only the content from the provided documents.
Do NOT use prior knowledge or make assumptions.

When referencing a source, include an inline HTML link using the document’s URL.
Each fact you refer to should be followed by the corresponding reference.
If a clear title can be inferred from the document content or URL, you may use it as the anchor text.
If no clear title can be inferred, use the domain name of the URL as the anchor text.

Output the answer in a structured markdown format.
Use bullet lists whenever it makes sense.
Do not add a references section at the end of the answer, just use references within the body of text.

If no relevant information is found in the documents, respond with:
Final Answer: inconclusive

Always end your answer with this disclaimer:
Disclaimer: This is AI generated content — please use it with caution.

Documents:
{% for doc in documents %}
- <b>Source:</b> <a href="{{ doc.meta.url }}">{{ doc.meta.url }}</a><br>
<p>{{ doc.content }}</p><br>
{% endfor %}

Question: {{ query }}

Answer:
"""

prompt_builder_web = PromptBuilder(
    template=prompt_template_web,
    required_variables=["documents", "query"]
)

generator_web = OpenAIGenerator(
    model="gpt-4.1-mini", api_key=Secret.from_env_var("OPENAI_API_KEY")
)

# create, config the web search pipeline and connect the components
web_pipeline = Pipeline()
web_pipeline.add_component("search", web_search)
web_pipeline.add_component("fetcher", fetcher)
web_pipeline.add_component("converter", converter)
web_pipeline.add_component("ranker", ranker_web)
web_pipeline.add_component("output_docs", DocumentPassthrough())
web_pipeline.add_component("prompt_builder", prompt_builder_web)
web_pipeline.add_component("generator", generator_web)

web_pipeline.connect("search.links", "fetcher.urls")
web_pipeline.connect("fetcher.streams", "converter.sources")
web_pipeline.connect("converter.documents", "ranker.documents")
web_pipeline.connect("ranker.documents", "output_docs.documents")
web_pipeline.connect("ranker.documents", "prompt_builder.documents")
web_pipeline.connect("prompt_builder", "generator")

# prepare the pipeline for inference
web_pipeline.warm_up()

In [None]:
# init the knowledge graph retriever component
kg_retriever = KnowledgeGraphRetriever(
    neo4j_uri=NEO4J_URI,
    neo4j_user=NEO4J_USER,
    neo4j_pass=NEO4J_PASS
)

# Set up reranker for refining retrieved graph-based documents
ranker_graph = TransformersSimilarityRanker(
    model="intfloat/simlm-msmarco-reranker", top_k=5
)

# Define prompt template tailored for graph-based documents
prompt_template_graph = """
You are a helpful AI assistant working with structured information derived from a knowledge graph.

Use only the provided documents below to answer the user's question.
Each document contains factual relationships (triples) extracted from a graph, along with the original source content from which the relationships were derived.

Focus specifically on topics related to the European Union's news, policies, laws, and actions.

Instructions:
Base your answer strictly on the information in the documents.

Do NOT use external knowledge or assumptions.

When referencing a source, include an inline HTML link using the document’s title as the anchor text.
If a title cannot be inferred, use the domain name of the document’s URL as the anchor text.
Each fact you refer to should be followed by the corresponding reference.

Output the answer in a structured markdown format.
Use bullet lists whenever it makes sense.
Do not add a references section at the end of the answer, just use references within the body of text.

If no relevant information is found in the documents, respond with:
Final Answer: inconclusive

Always end your answer with this disclaimer:
Disclaimer: This is AI generated content — please use it with caution.

Documents:
{% for doc in documents %}
- <b>Source:</b><br>
{{ doc.content }}<br><br>
{% endfor %}

Question: {{ query }}

Answer (with references using HTML links):
"""

prompt_builder_graph = PromptBuilder(
    template=prompt_template_graph,
    required_variables=["documents", "query"]
)

# init generator
generator_graph = OpenAIGenerator(
    model="gpt-4.1-mini", api_key=Secret.from_env_var("OPENAI_API_KEY")
)

# Build the knowledge graph pipeline
graph_pipeline = Pipeline()
graph_pipeline.add_component("kg_retriever", kg_retriever)
graph_pipeline.add_component("ranker", ranker_graph)
graph_pipeline.add_component("output_docs", DocumentPassthrough())
graph_pipeline.add_component("prompt_builder", prompt_builder_graph)
graph_pipeline.add_component("generator", generator_graph)

# Define the flow of data between components
graph_pipeline.connect("kg_retriever.documents", "ranker.documents")
graph_pipeline.connect("ranker.documents", "output_docs.documents")
graph_pipeline.connect("ranker.documents", "prompt_builder.documents")
graph_pipeline.connect("prompt_builder", "generator")

# Prepare the pipeline
graph_pipeline.warm_up()

#### Agents

In [None]:
# init the chat generator for multi-turn interaction
chat_generator = OpenAIChatGenerator(
    model="gpt-4.1",
    api_key=Secret.from_env_var("OPENAI_API_KEY")
)

# wrap the pipelines with input/output mappings
embedding_super = SuperComponent(
    pipeline=emb_pipeline,
    input_mapping={
        "query": ["embedder.text", "reranker.query", "prompt_builder.query"]
    },
    output_mapping={"generator.replies": "replies"}
)

graph_super = SuperComponent(
    pipeline=graph_pipeline,
    input_mapping={
        "query": ["kg_retriever.query", "ranker.query", "prompt_builder.query"]
    },
    output_mapping={"generator.replies": "replies"}
)

web_super = SuperComponent(
    pipeline=web_pipeline,
    input_mapping={
        "query": ["search.query", "ranker.query", "prompt_builder.query"]
    },
    output_mapping={"generator.replies": "replies"}
)

# define tools based on the wrapped pipelines
embedding_tool = ComponentTool(
    name="embedding_search",
    component=embedding_super,
    description=(
        "Answer questions using information retrieved from an internal document store containing content "
        "about the European Union’s news, policies, laws, and actions. Answers are based strictly on "
        "retrieved documents using semantic similarity, with no assumptions. References are included as HTML links."
    )
)

graph_tool = ComponentTool(
    name="graph_search",
    component=graph_super,
    description=(
        "Answer questions using structured information from a knowledge graph containing factual relationships "
        "about the European Union’s news, policies, laws, and actions. The graph includes relationships (triples) "
        "and the original source documents. Answers are grounded in these facts, with references provided as HTML links."
    )
)

web_tool = ComponentTool(
    name="web_search",
    component=web_super,
    description=(
        "Retrieve potentially relevant information from the web. Results are based on live internet content and may "
        "include a variety of sources. The retrieved information is not guaranteed to be factual. References are provided "
        "as HTML links using either inferred titles or domain names."
    )
)

# define the agent's system behavior and reasoning instructions
system_prompt = """
You are a highly intelligent assistant with access to 3 specialized tools for answering questions
about the European Union’s news, policies, laws, and actions.

You have access to:

- embedding_search: Retrieves semantically relevant information from an internal document store.
  Answers must be based strictly on the retrieved documents, using inline HTML links for references.

- graph_search: Uses a knowledge graph containing factual relationships (triples) and their source documents.
  Answers should be grounded in these structured relationships, using HTML links for citations.

- web_search: Retrieves the most recent and relevant information from the web.
  Answers should reflect real-time sources, with references using HTML links. If no title is available,
  use the domain name of the URL as the anchor text.

Your task:
1. Use all three tools to answer the user's query.
2. Combine insights from embedding_search and graph_search tools to create a complete and informative response in the Internal Search Answer section.
3. Provide separetely inshights from web_search tool to complete the informative response in the Web Search Insights section.
3. In each sentece of your answer add the references you were based on.
4. Ensure all references are included as inline HTML anchor tags, using titles or domain names as specified.
5. If there is a conflict between the information retrieved from the Web Search and the other tools, highlight the discrepancy separetely if there is one in the Conflicts for Internal and Web Search section.
6. For any part of the answer generated from web_search too, always clearly indicate that the information comes from the web.
7. Output the answer in a structured markdown format.
8. Use bullet lists whenever it makes sense.
9. Do not add a references section at the end of the answer, just use references within the body of text.

Your output should have three sections if there are no conflicts or four sections if there are conflicts:

Thought Process:
- Describe step-by-step how each tool contributed to your reasoning and answer.

Internal Search Answer:
- Provide a clear, concise answer supported by insights from embedding_search and graph_search tools, indicating from which tool the answer is based on.

Web Search Insights:
- Any content derived from a web search must be explicitly identified as such in the response here.

Conflicts for Internal and Web Search:
- Any conflict of information derived from the internal compared with the web search be explicitly identified as such in the response here.

Always include this disclaimer at the end of the final answer:
Disclaimer: This is AI generated content — please use it with caution.
"""

# create the agent with the toolset and system prompt
agent = Agent(
    chat_generator=chat_generator,
    tools=[embedding_tool, graph_tool, web_tool],
    system_prompt=system_prompt
)

# prepare the agent for interaction
agent.warm_up()

In [None]:
def run_qa_turn(agent, messages, user_input):
    """
    Run a single Q&A turn with the agent.

    Appends the user's input to the message history, executes the agent run,
    and parses the response into key parts including tool calls, tool outputs,
    and the final answer.

    Args:
        agent (Agent): The config multi-tool agent.
        messages (List[ChatMessage]): Conversation history.
        user_input (str): The current user input.

    Returns:
        Tuple[List[ChatMessage], Dict]: A tuple containing the updated messages list,
        and a dictionary with:
            - user_input: the last user message as a string
            - tool_calls: list of ToolCall objects
            - tool_results: mapping of tool names to stringified output
            - final_answer: the assistant’s concluding response
    """
    # ddd user input to message history and execute the agent pipeline
    messages.append(ChatMessage.from_user(user_input))

    result = agent.run(messages=messages, max_steps=10)
    messages = result["messages"]

    # init output containers
    tool_calls = []
    tool_results = {}
    final_answer = None
    last_user_input = None

    # parse returned messages
    for msg in messages:
        role = msg._role.value.lower()
        content = msg._content

        if role == "user" and content and isinstance(content[0], TextContent):
            last_user_input = content[0].text

        elif role == "assistant" and content:
            if isinstance(content[0], ToolCall):
                tool_calls.extend(content)
            elif isinstance(content[0], TextContent):
                final_answer = content[0].text

        elif role == "tool" and content:
            for tool_result in content:
                if isinstance(tool_result, ToolCallResult):
                    tool_name = tool_result.origin.tool_name
                    raw = tool_result.result

                    try:
                        parsed = ast.literal_eval(raw)

                        if isinstance(parsed, dict) and "replies" in parsed:
                            replies = parsed["replies"]
                            if isinstance(replies, list):
                                reply_text = "\n".join(replies)
                            else:
                                reply_text = str(replies)
                        else:
                            reply_text = str(parsed)

                    except Exception as e:
                        reply_text = raw  # fallback
                        print(f"Failed to parse tool result for {tool_name}: {e}")

                    tool_results[tool_name] = reply_text

    response = {
        "user_input": last_user_input,
        "tool_calls": tool_calls,
        "tool_results": tool_results,
        "final_answer": final_answer
    }

    return messages, response

#### Fast API and UI

In [None]:
# init fastapi app
app = FastAPI()

# mount the static directory
app.mount("/static", StaticFiles(directory="/content/Hybrid-Multi-Agent-GraphRAG-for-E-Government/static"), name="static")


class ChatRequest(BaseModel):
    """Request model for pipeline-based Q&A."""
    question: str
    pipeline: str


class AgentChatRequest(BaseModel):
    """Request model for agent-based Q&A."""
    question: str


@app.get("/", response_class=HTMLResponse)
async def root_chat():
    """
    Serve the main chat tab HTML page.

    Returns:
        HTMLResponse: Rendered HTML or 404 error.
    """
    try:
        with open("/content/Hybrid-Multi-Agent-GraphRAG-for-E-Government/templates/chat_tab.html", "r") as file:
            return HTMLResponse(file.read())
    except FileNotFoundError:
        return HTMLResponse("Chat tab not found", status_code=404)


@app.get("/graphrag", response_class=HTMLResponse)
async def graphrag_page():
    """
    Serve the GraphRAG tab HTML page.

    Returns:
        HTMLResponse: Rendered HTML or 404 error.
    """
    try:
        with open("/content/Hybrid-Multi-Agent-GraphRAG-for-E-Government/templates/graphrag_tab.html", "r") as file:
            return HTMLResponse(file.read())
    except FileNotFoundError:
        return HTMLResponse("GraphRAG tab not found", status_code=404)


@app.get("/visualgraph", response_class=HTMLResponse)
async def visualgraph_page():
    """
    Serve the visual graph tab HTML page.

    Returns:
        HTMLResponse: Rendered HTML or 404 error.
    """
    try:
        with open("/content/Hybrid-Multi-Agent-GraphRAG-for-E-Government/templates/visualgraph_tab.html", "r") as file:
            return HTMLResponse(file.read())
    except FileNotFoundError:
        return HTMLResponse("Visual GraphRAG tab not found", status_code=404)


@app.post("/chat")
async def chat_with_pipeline(data: ChatRequest) -> Dict[str, str]:
    """
    Handle a user query using the selected pipeline.

    Args:
        data (ChatRequest): Contains user question and selected pipeline.

    Returns:
        Dict[str, str]: Response from the generator.
    """
    question = data.question.strip()
    pipeline = data.pipeline.lower()
    try:
        if pipeline == "graph":
          result = graph_pipeline.run({
              "kg_retriever": {"query": question},
              "ranker": {"query": question},
              "prompt_builder": {"query": question}
          })
        elif pipeline == "web":
          result = web_pipeline.run({
              "search": {"query": question},
              "ranker": {"query": question},
              "prompt_builder": {"query": question}
          })
        elif pipeline == "rag":
          result = emb_pipeline.run({
              "embedder": {"text": question},
              "retriever": {"top_k": 5},
              "reranker": {"query": question},
              "prompt_builder": {"query": question}
          })
        else:
            raise HTTPException(status_code=400, detail="Invalid pipeline specified.")

        reply = result["generator"]["replies"][0]
        return {"response": reply}

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Processing error: {repr(e)}")


conversation_state = {}


@app.post("/chat_agent")
async def chat_with_agent(data: AgentChatRequest) -> Dict[str, Any]:
    """
    Handle an agent-based Q&A turn.

    Args:
        data (AgentChatRequest): Contains user question.

    Returns:
        Dict[str, Any]: Final answer and any tool-specific results.
    """
    question = data.question.strip()

    try:
        user_id = "default" # default mock user
        messages = conversation_state.get(user_id, [])

        messages, result = run_qa_turn(agent, messages, question)
        conversation_state[user_id] = messages

        tool_results = {
            tool: output for tool, output in result.get("tool_results", {}).items()
        }

        return {
            "response": result.get("final_answer", "No answer generated."),
            "tool_results": tool_results
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Agent error: {repr(e)}")


@app.post("/graphrag_explain")
async def graphrag_explain(request: Request):
    """
    Query Neo4j and return matching triples for a given keyword.

    Args:
        request (Request): Incoming JSON with 'query'.

    Returns:
        Dict[str, Any]: Matching node and edge information.
    """
    data = await request.json()
    query = data.get("query", "").strip()

    if not query:
        raise HTTPException(status_code=400, detail="Query is required.")

    try:
        with driver.session() as session:
            results = session.run("""
                MATCH (n)-[r]-(connected)
                WHERE toLower(n.id) CONTAINS toLower($query)
                RETURN n, r, connected
            """, {"query": query})

            output = []
            for record in results:
                output.append({
                    "node": dict(record["n"]),
                    "relation": dict(record["r"]),
                    "connected": dict(record["connected"])
                })

        return {"results": output}

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"GraphRAG error: {repr(e)}")


@app.post("/graphrag_graph_data")
async def graphrag_graph_data(request: Request):
    """
    Return nodes and edges for graph visualization.

    Args:
        request (Request): JSON body with optional 'query'.

    Returns:
        Dict[str, Any]: Graph data with node and edge lists.
    """
    data = await request.json()
    query = data.get("query", "").strip()

    try:
        with driver.session() as session:
            nodes = {}
            edges = []

            if query:
                cypher = """
                    MATCH (n)-[r]-(connected)
                    WHERE toLower(n.id) CONTAINS toLower($query)
                    RETURN n, r, connected
                """
                results = session.run(cypher, {"query": query})
            else:
                cypher = """
                    MATCH (s)-[r]->(t)
                    WHERE type(r) <> 'MENTIONS'
                    RETURN s, r, t
                """
                results = session.run(cypher)

            for record in results:
                s_node = record.get("n") or record.get("s")
                t_node = record.get("connected") or record.get("t")
                r = record["r"]

                def build_node(node):
                    return {
                        "id": node["id"],
                        "label": node.get("label", node["id"]),
                        "type": list(node.labels)[0] if node.labels else "default"
                    }

                source = build_node(s_node)
                target = build_node(t_node)

                nodes[source["id"]] = source
                nodes[target["id"]] = target

                edges.append({
                    "from": source["id"],
                    "to": target["id"],
                    "label": r.type
                })

        return {
            "nodes": list(nodes.values()),
            "edges": edges
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Graph fetch error: {repr(e)}")


@app.get("/{file_path:path}", response_class=HTMLResponse)
async def get_html(file_path: str):
    """
    Fallback route to serve any HTML file from /content/Hybrid-Multi-Agent-GraphRAG-for-E-Government/templates.

    Args:
        file_path (str): Relative path to HTML file.

    Returns:
        HTMLResponse: Rendered HTML or 404 error.
    """
    try:
        with open(f"/content/Hybrid-Multi-Agent-GraphRAG-for-E-Government/templates/{file_path}", "r") as file:
            return HTMLResponse(file.read())
    except FileNotFoundError:
        return HTMLResponse("File not found", status_code=404)


def run_app():
    """
    Launch the FastAPI server using Uvicorn.
    """
    uvicorn.run(app, host="0.0.0.0", port=8000, timeout_keep_alive=600)


# start server in a background thread
thread = Thread(target=run_app)
thread.start()

# uutput server url in colab
print(eval_js("google.colab.kernel.proxyPort(8000)"))

#### Faithfulness Evaluation

In [None]:
def find_json_block(text: str) -> str:
    """
    Attempt to find the first valid JSON block in a string using regex.
    """
    json_pattern = re.compile(r'{.*}', re.DOTALL)
    matches = json_pattern.findall(text)
    for match in matches:
        try:
            json.loads(match)
            return match
        except json.JSONDecodeError:
            continue
    return text  # fallback to full text

def try_fix_json(json_str: str) -> str:
    json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
    if json_str.count('{') > json_str.count('}'):
        json_str += "}"
    if json_str.count('[') > json_str.count(']'):
        json_str += "]"
    return json_str

In [None]:
class CustomFaithfulnessEvaluator(Component):
    def __init__(self, model="gpt-4.1", instructions: str = None):
        super().__init__()
        self.chat_generator = OpenAIChatGenerator(model=model)
        self.instructions = instructions or (
            "You are evaluating the faithfulness of a predicted answer based on a provided context.\n\n"
            "You will receive:\n"
            "- a question\n"
            "- a context: a set of retrieved documents or passages used to generate the answer\n"
            "- a predicted answer generated based on this context\n\n"
            "TASK:\n"
            "1. Break the predicted answer into factual statements. Produce the factual statements solely based on the Predicted Answer.\n"
            "2. For each statement:\n"
            "   a. If the statement is clearly supported by the context → score = 1\n"
            "      justification: describe how the statement is explicitly supported in the context\n\n"
            "   b. If the statement is not supported or the context is silent → score = 0\n"
            "      justification: explain the lack of evidence in the context\n\n"
            "   c. If the statement includes or is equivalent to 'Final Answer: inconclusive' AND this is justified by the lack of support in context → score = -1\n"
            "      justification: explain that the answer is inconclusive and no factual claims were made\n\n"
            "   d. If the statement includes or is equivalent to 'Final Answer: inconclusive' BUT the context does contain sufficient information to answer the question → score = -2\n"
            "      justification: explain that the model incorrectly concluded inconclusive despite having supporting context\n\n"
            "Format your response as JSON:\n"
            "{\n"
            '  "statements": [...],\n'
            '  "statement_scores": [...],\n'
            '  "justifications": [\n'
            '    "supported: <details>",\n'
            '    "unsupported: <details>",\n'
            '    "inconclusive (correct): <details>",\n'
            '    "inconclusive (incorrect): <details>"\n'
            '  ]\n'
            "}"
        )

    @component.output_types(
        individual_scores=List[float],
        score=float,
        results=List[Dict[str, Any]]
    )
    def run(self, questions: List[str], contexts: List[List[str]], predicted_answers: List[str]) -> Dict[str, Any]:
        results = []
        individual_scores = []

        for question, context_list, predicted_answer in zip(questions, contexts, predicted_answers):
            full_context = " ".join(context_list)

            prompt = (
                f"{self.instructions}\n\n"
                f"Question: {question}\n\n"
                f"Context:\n{full_context}\n\n"
                f"Predicted Answer:\n{predicted_answer}\n\n"
                f"Respond with a JSON object like this:\n"
                f'{{"statements": [...], "statement_scores": [...]}}'
            )

            parsed = None
            retries = 3

            for attempt in range(retries):
                try:
                    response = self.chat_generator.run([ChatMessage.from_user(prompt)])
                    reply_text = response["replies"][0].text

                    json_candidate = find_json_block(reply_text)

                    try:
                        parsed = json.loads(json_candidate)
                        break
                    except json.JSONDecodeError:
                        print(f"[Attempt {attempt+1}] JSON parsing failed — trying to fix...")
                        fixed_text = try_fix_json(json_candidate)
                        parsed = json.loads(fixed_text)
                        break
                except Exception as e:
                    print(f"[Attempt {attempt+1}] Failed: {str(e)}")
                    time.sleep(1)

            if parsed is None:
                print("Failed to get a valid response after retries.")
                parsed = {"statements": [], "statement_scores": []}

            score = float(np.mean(parsed["statement_scores"])) if parsed["statement_scores"] else 0.0
            results.append({**parsed, "score": score})
            individual_scores.append(score)

        final_score = float(np.mean(individual_scores)) if individual_scores else 0.0

        return {
            "results": results,
            "individual_scores": individual_scores,
            "score": final_score
        }

standard_evaluator = CustomFaithfulnessEvaluator(model="gpt-4.1")

In [None]:
class CustomBasicAgentFaithfulnessEvaluator(Component):
    def __init__(self, model="gpt-4.1", instructions: str = None):
        super().__init__()
        self.chat_generator = OpenAIChatGenerator(model=model)
        self.instructions = instructions or (
            "You are evaluating the faithfulness and attribution of a predicted answer.\n\n"
            "You will receive:\n"
            "- a question\n"
            "- embedding_search_context: information from internal knowledge from embedding_search tool\n"
            "- graph_search_context: information from internal knowledge from graph_search tool\n"
            "- web_search_context: information retrieved from web_search tools\n"
            "- a predicted answer from an agent\n\n"
            "   The predicted answer you will receive should include a Internal Search Answer section produced with embedding_search and graph_search tools results\n"
            "   The predicted answer you will receive should include a Web Search Insights section produced with web_search tool results\n"
            "TASK:\n"
            "1. Break the predicted answer into factual statements. Produce the factual statements solely based on the Predicted Answer.\n"
            "2. For each statement:\n"
            "   a. If the statement is clearly supported by the context → score = 1\n"
            "      justification: describe how the statement is explicitly supported in the context\n\n"
            "   b. If the statement is not supported or the context is silent → score = 0\n"
            "      justification: explain the lack of evidence in the context\n\n"
            "   c. If the statement includes or is equivalent to 'Final Answer: inconclusive' AND this is justified by the lack of support in context → score = -1\n"
            "      justification: explain that the answer is inconclusive and no factual claims were made\n\n"
            "   d. If the statement includes or is equivalent to 'Final Answer: inconclusive' BUT the context does contain sufficient information to answer the question → score = -2\n"
            "      justification: explain that the model incorrectly concluded inconclusive despite having supporting context\n\n"
            "Format your response as JSON:\n"
            "{\n"
            '  "statements": [...],\n'
            '  "statement_scores": [...],\n'
            '  "justifications": [\n'
            '    "supported: <details>",\n'
            '    "unsupported: <details>",\n'
            '    "inconclusive (correct): <details>",\n'
            '    "inconclusive (incorrect): <details>"\n'
            '  ]\n'
            "}"
        )

    def run(
        self,
        questions: List[str],
        emb_contexts: List[List[str]],
        graph_contexts: List[List[str]],
        web_search_contexts: List[List[str]],
        predicted_answers: List[str]
    ) -> Dict[str, Any]:
        results = []

        for question, emb_retrieved, graph_retrieved, web_search, answer in zip(
            questions, emb_contexts, graph_contexts, web_search_contexts, predicted_answers
        ):
            full_emb = " ".join(emb_retrieved)
            full_graph = " ".join(graph_retrieved)
            full_web = " ".join(web_search)

            if "final answer: inconclusive" in answer.lower():
                results.append({
                    "statements": [],
                    "statement_scores": [],
                    "justifications": []
                })
                continue

            prompt = (
                f"{self.instructions}\n\n"
                f"Question:\n{question}\n\n"
                f"Contexts:\n"
                f"embedding_search_context:\n{full_emb}\n\n"
                f"graph_search_context:\n{full_graph}\n\n"
                f"web_search_context:\n{full_web}\n\n"
                f"Predicted Answer:\n{answer}"
            )

            parsed = None
            retries = 3

            for attempt in range(retries):
                try:
                    response = self.chat_generator.run([ChatMessage.from_user(prompt)])
                    reply_text = response["replies"][0].text

                    json_candidate = find_json_block(reply_text)

                    try:
                        parsed = json.loads(json_candidate)
                        break
                    except json.JSONDecodeError:
                        print(f"Attempt {attempt+1}: JSON parsing failed — trying to fix...")
                        fixed_text = try_fix_json(json_candidate)
                        parsed = json.loads(fixed_text)
                        break
                except Exception as e:
                    print(f"Attempt {attempt+1} failed: {str(e)}")
                    time.sleep(1)

            if parsed is None:
                print("Failed to get valid response after retries.")
                parsed = {
                    "statements": [],
                    "statement_scores": [],
                    "justifications": []
                }

            results.append(parsed)

        return {
            "results": results
        }

# instantiate the evaluator
agent_evaluator = CustomBasicAgentFaithfulnessEvaluator(model="gpt-4.1")

In [None]:
# agentic pipeline evaluation

results_file = "agent_basic_faithfulness_evaluation_results.json"

if os.path.exists(results_file):
    with open(results_file, "r") as f:
        results = json.load(f)
else:
    results = []

with open("eval/eval_questions.json", "r") as f: # add the questions in the repository (json format)
    data = json.load(f)
    questions = data.get("questions", [])

processed_questions = {entry["question"] for entry in results if "question" in entry}

for question in questions:
    if question in processed_questions:
        print(f"Skipping already processed question: {question}")
        continue

    conversation_state = {}
    user_id = "default"
    print(f"Processing: {question}")
    messages = conversation_state.get(user_id, [])

    try:
        start_time = time.time()
        messages, result = run_qa_turn(agent, messages, question)
        end_time = time.time()
        latency = end_time - start_time

        conversation_state[user_id] = messages

        tool_results = result.get("tool_results", {})
        answer = result.get("final_answer", "No answer generated.")
        if isinstance(answer, tuple):
            answer = answer[0]

        emb_context = tool_results.get("embedding_search", [])
        if not isinstance(emb_context, list):
            emb_context = [emb_context]

        graph_context = tool_results.get("graph_search", [])
        if not isinstance(graph_context, list):
            graph_context = [graph_context]

        web_context = tool_results.get("web_search", [])
        if not isinstance(web_context, list):
            web_context = [web_context]


        faithfulness_result = agent_evaluator.run(
            questions=[question],
            emb_contexts=[emb_context],
            graph_contexts=[graph_context],
            web_search_contexts=[web_context],
            predicted_answers=[answer]
        )

        results.append({
            "question": question,
            "final_answer": answer,
            "emb_based_context": emb_context,
            "graph_based_context": graph_context,
            "web_based_context": web_context,
            "evaluation": faithfulness_result,
            "latency_seconds": latency
        })

    except Exception as e:
        results.append({
            "question": question,
            "error": str(e)
        })

    with open(results_file, "w") as f:
        json.dump(results, f, indent=4)

In [None]:
# embeddings pipeline evaluation
results_file = "qa_emb_pipeline_evaluation_results.json"

# load the the questions
with open("eval/eval_questions.json", "r") as f:  # add the questions in the repository (json format)
    data = json.load(f)
    questions = data.get("questions", [])

# init or load previous results
if os.path.exists(results_file):
    with open(results_file, "r") as f:
        saved_results = json.load(f)
else:
    saved_results = [{} for _ in questions]

while len(saved_results) < len(questions):
    saved_results.append({})

for idx, question in enumerate(questions):
    existing_entry = saved_results[idx]

    # only in rerun: skip if already successfully processed (no error and has matching question)
    if existing_entry.get("question") == question and "error" not in existing_entry:
        print(f"Skipping already processed question: {question}")
        continue

    print(f"Processing question: {question}")

    try:
        start_time = time.time()
        result = emb_pipeline.run({
            "embedder": {"text": question},
            "retriever": {"top_k": 5},
            "reranker": {"query": question},
            "prompt_builder": {"query": question}
        })
        end_time = time.time()
        latency = end_time - start_time

        generated_answer = result["generator"]["replies"][0]
        contexts = [doc.content for doc in result["output_docs"]["documents"]]
        contexts_for_eval = [contexts]

        faithfulness_result = standard_evaluator.run(
            questions=[question],
            contexts=contexts_for_eval,
            predicted_answers=[generated_answer]
        )

        entry = {
            "question": question,
            "generated_answer": generated_answer,
            "contexts": contexts,
            "results": faithfulness_result,
            "latency_seconds": latency,
        }

    except Exception as e:
        entry = {
            "question": question,
            "error": str(e)
        }
        print(f"Error processing question: {question}")
        print(str(e))

    saved_results[idx] = entry

    with open(results_file, "w") as f:
        json.dump(saved_results, f, indent=4)

In [None]:
# graphrag pipeline evalulation

results_file = "qa_graph_pipeline_evaluation_results.json"

# load the the questions
with open("eval/eval_questions.json", "r") as f:  # add the questions in the repository (json format)
    data = json.load(f)
    questions = data.get("questions", [])

# init or load previous results
if os.path.exists(results_file):
    with open(results_file, "r") as f:
        saved_results = json.load(f)
else:
    saved_results = [{} for _ in questions]

while len(saved_results) < len(questions):
    saved_results.append({})

for idx, question in enumerate(questions):
    existing_entry = saved_results[idx]

    # only in rerun: skip if already successfully processed (no error and has matching question)
    if existing_entry.get("question") == question and "error" not in existing_entry:
        print(f"Skipping already processed question: {question}")
        continue

    print(f"Processing question: {question}")

    try:
        start_time = time.time()
        result = graph_pipeline.run({
            "kg_retriever": {"query": question},
            "ranker": {"query": question},
            "prompt_builder": {"query": question}
        })
        end_time = time.time()
        latency = end_time - start_time
        generated_answer = result["generator"]["replies"][0]
        contexts = [doc.content for doc in result["output_docs"]["documents"]]
        contexts_for_eval = [contexts]

        faithfulness_result = standard_evaluator.run(
            questions=[question],
            contexts=contexts_for_eval,
            predicted_answers=[generated_answer]
        )

        entry = {
            "question": question,
            "generated_answer": generated_answer,
            "contexts": contexts,
            "results": faithfulness_result,
            "latency_seconds": latency,
        }

    except Exception as e:
        entry = {
            "question": question,
            "error": str(e)
        }
        print(f"Error processing question: {question}")
        print(str(e))

    saved_results[idx] = entry

    with open(results_file, "w") as f:
        json.dump(saved_results, f, indent=4)

In [None]:
# web search pipeline evalulation

results_file = "qa_web_pipeline_evaluation_results.json"

# load the the questions
with open("eval/eval_questions.json", "r") as f:  # add the questions in the repository (json format)
    data = json.load(f)
    questions = data.get("questions", [])

# init or load previous results
if os.path.exists(results_file):
    with open(results_file, "r") as f:
        saved_results = json.load(f)
else:
    saved_results = [{} for _ in questions]

while len(saved_results) < len(questions):
    saved_results.append({})

for idx, question in enumerate(questions):
    existing_entry = saved_results[idx]

    # only in rerun: skip if already successfully processed (no error and has matching question)
    if existing_entry.get("question") == question and "error" not in existing_entry:
        print(f"Skipping already processed question: {question}")
        continue

    print(f"Processing question: {question}")

    try:
        start_time = time.time()
        result = web_pipeline.run({
            "search": {"query": question},
            "ranker": {"query": question},
            "prompt_builder": {"query": question}
        })
        end_time = time.time()
        latency = end_time - start_time
        generated_answer = result["generator"]["replies"][0]
        contexts = [doc.content if doc.content is not None else "" for doc in result["output_docs"]["documents"]]
        contexts_for_eval = [contexts]

        faithfulness_result = standard_evaluator.run(
            questions=[question],
            contexts=contexts_for_eval,
            predicted_answers=[generated_answer]
        )

        entry = {
            "question": question,
            "generated_answer": generated_answer,
            "contexts": contexts,
            "results": faithfulness_result,
            "latency_seconds": latency,

        }

    except Exception as e:
        entry = {
            "question": question,
            "error": str(e)
        }
        print(f"Error processing question: {question}")
        print(str(e))

    saved_results[idx] = entry

    with open(results_file, "w") as f:
        json.dump(saved_results, f, indent=4)

#### Exaplanatory

In [None]:
def print_relationship_schema(record):
    """
    Print the relationship between two nodes in a readable schema format.

    Args:
        record (neo4j.Record): A Neo4j record containing 'n', 'r', and 'connected' keys.
    """
    n = record["n"]
    r = record["r"]
    connected = record["connected"]

    n_label = list(n.labels)[0] if n.labels else "Entity"
    connected_label = list(connected.labels)[0] if connected.labels else "Entity"

    print(f"({n_label}: {n['id']}) -[{r.type}]-> ({connected_label}: {connected['id']})")


# query and print relationships for nodes matching the search term
search_term = "clean industrial deal"

with driver.session() as session:
    results = session.run("""
        MATCH (n)-[r]-(connected)
        WHERE toLower(n.id) CONTAINS toLower($query)
        RETURN n, r, connected
    """, {"query": search_term})

    for record in results:
        print_relationship_schema(record)

In [None]:
# enable interactive graph widget rendering in colab
output.enable_custom_widget_manager()


def visualize_graph_from_search(query: str = None):
    """
    Visualize a subgraph from the Neo4j database.

    If a query is provided, performs a fuzzy search on node IDs and visualizes
    the resulting subgraph. If no query is provided, visualizes a default larger
    subgraph (excluding MENTIONS relationships).

    Args:
        query (str, optional): A partial or full node ID to filter the graph. Defaults to None.
    """
    driver = GraphDatabase.driver(
        uri=os.environ["NEO4J_URI"],
        auth=(os.environ["NEO4J_USERNAME"], os.environ["NEO4J_PASSWORD"])
    )

    with driver.session() as session:
        if query and query.strip():
            cypher = """
                MATCH (n)-[r]-(connected)
                WHERE toLower(n.id) CONTAINS toLower($query)
                RETURN n, r, connected
            """
            result = session.run(cypher, {"query": query})
        else:
            cypher = """
                MATCH (s)-[r]->(t)
                WHERE type(r) <> 'MENTIONS'
                RETURN s, r, t LIMIT 1000
            """
            result = session.run(cypher)

        graph = result.graph()

    widget = GraphWidget(graph=graph)
    widget.node_label_mapping = 'id'
    display(widget)


# run graph visualization (default)
visualize_graph_from_search("")

##### Test Default pipelines

In [None]:
# test custom knowledge graph retriever
kg_retriever = KnowledgeGraphRetriever(
    neo4j_uri=NEO4J_URI,
    neo4j_user=NEO4J_USER,
    neo4j_pass=NEO4J_PASS
)

pipe = Pipeline()
pipe.add_component("kg_search", kg_retriever)

result = pipe.run({"kg_search": {"query": "Tell me about the Clean Indrustrial Deal"}})
print(result["kg_search"]["documents"][0].content)

In [None]:
# test embeddings pipeline
question = "Tell me about the Clean Indrustrial Deal"

result = emb_pipeline.run({
    "embedder": {"text": question},
    "retriever": {"top_k": 5},
    "reranker": {"query": question},
    "prompt_builder": {"query": question}
})

print(result["generator"]["replies"][0])

In [None]:
# test web search Pipeline
query = "Tell me about the Clean Indrustrial Deal"

result = web_pipeline.run({
    "search": {"query": query},
    "ranker": {"query": query},
    "prompt_builder": {"query": query}
})

print(result["generator"]["replies"][0])

In [None]:
# test graph search
question = "Tell me about the Clean Indrustrial Deal"

result = graph_pipeline.run({
    "kg_retriever": {"query": question},
    "ranker": {"query": question},
    "prompt_builder": {"query": question}
})

print(result["generator"]["replies"][0])

In [None]:
# test the agents
# start the conversation with an initial question
messages = []
messages, result = run_qa_turn(agent, messages, "Tell me about the Clean Indrustrial Deal")

print("User Input:", result["user_input"])
print("-----------------------------------------------------------------------------------------------")
print("Tool Results:")
for tool, out in result["tool_results"].items():
    print(f"{tool}: {out}")
print("-----------------------------------------------------------------------------------------------")
print("Final Answer:", result["final_answer"])

In [None]:
# make a follow up question
messages, followup = run_qa_turn(agent, messages, "Ok, can you provide a shorter answer in less than 100 words?")
print("User Input:", followup["user_input"])
print("-----------------------------------------------------------------------------------------------")
print("Tool Results:")
for tool, out in followup["tool_results"].items():
    print(f"{tool}: {out}")
print("-----------------------------------------------------------------------------------------------")
print("Final Answer:", followup["final_answer"])