# 04 — Interactive Chat with Cycles & InterruptNode

**Slide Reference**: Slides 26–27 — DAGs + Cycles Unified

Wrap the RAG pipeline in a cycle with `InterruptNode` for real human-in-the-loop interaction.
Type your questions, get RAG-grounded answers, type `exit` to quit.

In [1]:
from components import DOCUMENTS, Embedder, LLM, VectorStore

embedder = Embedder()
llm = LLM()
vector_store = VectorStore(DOCUMENTS, embedder)

## Build the RAG chat graph

The graph combines:
- **RAG nodes** — embed, retrieve, generate (same as before)
- **`InterruptNode`** — pauses execution to ask the user for input
- **`accumulate_history`** — appends each Q&A turn to the message list
- **`@route`** — decides whether to loop back or END

In [2]:
from hypergraph import END, AsyncRunner, Graph, InterruptNode, node, route


@node(output_name="query_embedding")
def embed_query(query: str, embedder: Embedder) -> list[float]:
    return embedder.embed(query)


@node(output_name="documents")
def retrieve(query_embedding: list[float], vector_store: VectorStore) -> list[str]:
    return [d["text"] for d in vector_store.search(query_embedding, top_k=3)]


@node(output_name="response")
async def generate(
    query: str, documents: list[str], messages: list[dict], llm: LLM
) -> str:
    context = "\n".join(f"- {d}" for d in documents)
    chat_messages = [
        {
            "role": "system",
            "content": "Answer based on the provided context. Be concise.",
        },
        *messages,
        {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
    ]
    return await llm.generate(chat_messages)


rag_graph = Graph([embed_query, retrieve, generate], name="rag").bind(
    embedder=embedder,
    llm=llm,
    vector_store=vector_store,
)
rag_graph.visualize()

In [3]:
@node(output_name="messages")
def accumulate_history(messages: list[dict], query: str, response: str) -> list[dict]:
    return [
        *messages,
        {"role": "user", "content": query},
        {"role": "assistant", "content": response},
    ]

In [4]:
ask_user = InterruptNode(name="ask_user", input_param="messages", output_param="query")

In [5]:
@route(targets=["rag", END])
def should_continue(query: str) -> str:
    if query.strip().lower() == "exit":
        return END
    return "rag"

In [6]:
chat_graph = Graph(
    [rag_graph.as_node(name="rag"), accumulate_history, ask_user, should_continue],
    name="rag_chat",
)

In [7]:
# chat_graph.visualize()  # still not 100% supported

## Interactive chat loop

The runner pauses at `ask_user`, we collect input, then resume.
Type `exit` to end the conversation.

In [8]:
runner = AsyncRunner()

In [9]:
# First run — provide initial query and empty history
result = await runner.run(
    chat_graph, {"query": "What is HyperGraph?", "messages": []}, max_iterations=50
)
print(f"A: {result['response']}\n")

A: HyperGraph is a Python framework designed for building AI/ML workflows using explicit graphs, supporting hierarchy and unifying directed acyclic graphs (DAGs) and cycles. This allows for the management of complex workflows, including both pipelines and agents.



In [10]:
# Continue the conversation interactively
while result.paused:
    user_input = input("You: ")
    result = await runner.run(
        chat_graph,
        {result.pause.response_key: user_input, "messages": result.pause.value},
        max_iterations=50,
    )
    if not result.paused:
        break
    print(f"A: {result['response']}\n")

print("\nGoodbye!")

A: You use the .map() feature in HyperGraph by writing code for a single item, which can then be automatically scaled to process multiple items. This is done by applying the function defined at the node level to each item in a collection, allowing for efficient parallel processing within the graph framework.


Goodbye!
