### Library Installation

In [None]:
!pip install -U --quiet llama-index llama-index-llms-openai llama-index-readers-web pyvis

### Setting the OpenAI API Key

In [None]:
from google.colab import userdata
import os

os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")

### RAG Event

In [None]:
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore

class RetrieverEvent(Event):
    """Result of running retrieval"""

    nodes: list[NodeWithScore]

### RAG Workflow



In [None]:
from llama_index.core import VectorStoreIndex
from llama_index.readers.web import SimpleWebPageReader
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)

from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding


class RAGWorkflow(Workflow):
    @step(pass_context=True)
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
        dirname = ev.get("dirname")
        if not dirname:
            return None

        documents = SimpleWebPageReader(html_to_text=True).load_data(
            ["https://jhmhp.amegroups.org/article/view/8842/html"])
        ctx.data["index"] = VectorStoreIndex.from_documents(
            documents=documents,
            embed_model=OpenAIEmbedding(model_name="text-embedding-3-small"),
        )
        return StopEvent(result=f"Indexed {len(documents)} documents.")

    @step(pass_context=True)
    async def retrieve(
        self, ctx: Context, ev: StartEvent
    ) -> RetrieverEvent | None:
        "RAG Entry, triggered by a StartEvent with `query`."
        query = ev.get("query")
        if not query:
            return None

        print(f"Query the database with: {query}")

        # store the query in the global context
        ctx.data["query"] = query

        # get the index from the global context
        index = ctx.data.get("index")
        if index is None:
            print("Index is empty, load some documents before querying!")
            return None

        retriever = index.as_retriever(similarity_top_k=2)
        nodes = retriever.retrieve(query)
        print(f"Retrieved {len(nodes)} nodes.")
        return RetrieverEvent(nodes=nodes)

    @step(pass_context=True)
    async def synthesize(self, ctx: Context, ev: RetrieverEvent) -> StopEvent:
        llm = OpenAI(model="gpt-4o-mini")
        summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
        query = ctx.data.get("query")

        response = await summarizer.asynthesize(query, nodes=ev.nodes)
        return StopEvent(result=response)

### Executing the workflow and drawing the flow graph

In [None]:
from llama_index.core.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)

draw_all_possible_flows(RAGWorkflow, filename="RAG_flow.html")

w = RAGWorkflow()
await w.run(dirname="data")
result = await w.run(query="What is the data about?")
async for chunk in result.async_response_gen():
    print(chunk, end="", flush=True)
draw_most_recent_execution(w, filename="rag_flow_recent.html")

RAG_flow.html
Query the database with: What is the data about?
Retrieved 2 nodes.
The data pertains to a narrative review focused on advancing the democratization of generative artificial intelligence in healthcare. It includes references to various studies and articles related to machine learning and risk prediction in healthcare settings. The review is published in the Journal of Hospital Management and Health Policy.rag_flow_recent.html
