In [14]:
import nest_asyncio
nest_asyncio.apply()

In [15]:
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.settings import Settings
from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step
from llama_index.core.schema import NodeWithScore
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.response_synthesizers import CompactAndRefine

In [16]:
class RetrieverEvent(Event):
    """Result of running retrieval"""

    nodes: list[NodeWithScore]
    query: str

In [17]:
class RAGWorkflow(Workflow):
    def __init__(self, model_name="granite4:350m", embedding_model="BAAI/bge-small-en-v1.5"):
        super().__init__()
        # Initialize LLM and embedding model
        self.llm = Ollama(model=model_name)
        self.embed_model = HuggingFaceEmbedding(model_name=embedding_model)

        # Configure global settings
        Settings.llm = self.llm
        Settings.embed_model = self.embed_model

        self.index = None

    @step
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Entry point to ingest documents from a directory."""
        dirname = ev.get("dirname")
        if not dirname:
            return None

        documents = SimpleDirectoryReader(dirname).load_data()
        self.index = VectorStoreIndex.from_documents(documents=documents)
        return StopEvent(result=self.index)

    @step
    async def retrieve(self, ctx: Context, ev: StartEvent) -> RetrieverEvent | None:
        """Entry point for RAG retrieval."""
        query = ev.get("query")
        index = ev.get("index") or self.index

        if not query:
            return None

        if index is None:
            print("Index is empty, load some documents before querying!")
            return None

        retriever = index.as_retriever(similarity_top_k=2)
        nodes = await retriever.aretrieve(query)
        return RetrieverEvent(nodes=nodes, query=query)

    @step
    async def synthesize(self, ctx: Context, ev: RetrieverEvent) -> StopEvent:
        """Generate a response using retrieved nodes."""
        summarizer = CompactAndRefine(streaming=True, verbose=True)
        query = ev.query
        response = await summarizer.asynthesize(query, nodes=ev.nodes)
        return StopEvent(result=response)

    async def query(self, query_text: str):
        """Helper method to perform a complete RAG query."""
        if self.index is None:
            raise ValueError("No documents have been ingested. Call ingest_documents first.")

        result = await self.run(query=query_text, index=self.index)
        return result

    async def ingest_documents(self, directory: str):
        """Helper method to ingest documents."""
        result = await self.run(dirname=directory)
        self.index = result
        return result

The first entrypoint is ingestion

In [18]:
# Initialize the workflow
print("#####---> Started RAG WorkFlow..")
w = RAGWorkflow()

# Ingest documents
print("#####---> Started Ingesting Documents..")
await w.ingest_documents("data")

2025-12-17 22:43:05,418 - INFO - Load pretrained SentenceTransformer: BAAI/bge-small-en-v1.5


#####---> Started RAG WorkFlow..


2025-12-17 22:43:09,565 - INFO - 1 prompt is loaded, with the key: query


#####---> Started Ingesting Documents..


<llama_index.core.indices.vector_store.base.VectorStoreIndex at 0x17fabffcf50>

The second entry point is retrieval

In [None]:
# Perform a query
print("#####---> Started Querying..")
result = await w.query("How old was Sultan Mehmed when he became the leader of the Ottoman Empire? What was his age?")

# Print the response
print("#####---> Started Printing Response..")
async for chunk in result.async_response_gen():
    print(chunk, end="", flush=True)

#####---> Started Querying..


2025-12-17 22:43:16,167 - INFO - HTTP Request: POST http://localhost:11434/api/show "HTTP/1.1 200 OK"


#####---> Started Printing Response..


2025-12-17 22:43:16,958 - INFO - HTTP Request: POST http://localhost:11434/api/chat "HTTP/1.1 200 OK"


Based on the provided context information, Sultan Mehmed II became the leader of the Ottoman Empire in the year 1453.