# Building Event-Driven Agentic RAG with Qdrant and LlamaIndex

There are multiple ways to use Qdrant as your vector database for agentic AI workflows built with LlamaIndex. In this mini course, we will be walking through building a simple agent workflow that can make decisions about:
- querying a Qdrant collection, 
- or writing new statements to it.
For this first step, we will be using a local Qdrant collection that we run as a Docker container on our machine. 

Finally, we will see how you can also make use of a managed Qdrant collection as a sink in LlamaCloud

<img src="assets/workflow.png" alt="drawing" style="width:500px;"/>


In [None]:
%pip install qdrant-client fastembed llama-index-vector-stores-qdrant llama-index-embeddings-openai llama-index-llms-openai python-dotenv llama-index llama-index-utils-workflow llama-index-readers-web

Before starting, you should set up a `.env` file containing environment variables needed for the pipeline to run properly, such as the `OPENAI_API_KEY`.

In [None]:
from dotenv import load_dotenv

load_dotenv()

True

## Connect to a Local Qdrant Collection

To get started:
```bash
docker pull qdrant/qdrant
docker run -d --name qdrant -p 6333:6333 -p 6334:6334 qdrant/qdrant:latest
```

We can make use of hybrid dense & sparse retrieval when using Qdrant. Here, while setting up the `QdrantVectorStore`, we set our sparse embedding model too. 

In [None]:
from qdrant_client import QdrantClient
from llama_index.vector_stores.qdrant import QdrantVectorStore


client = QdrantClient("http://localhost:6333")

if client.collection_exists("my_collection"):
    client.delete_collection("my_collection")

vector_store = QdrantVectorStore(
    collection_name="my_collection",
    client=client,
    fastembed_sparse_model="Qdrant/minicoil-v1",
)

In [None]:
from llama_index.core.node_parser import SimpleNodeParser
from llama_index.core import Settings

node_parser = SimpleNodeParser.from_defaults(chunk_size=512, chunk_overlap=32)
Settings.node_parser = SimpleNodeParser.from_defaults(chunk_size=512, chunk_overlap=32)


In [None]:
from llama_index.core.schema import Document

documents = [
    Document(
        text="LlamaIndex is a simple, flexible data framework for connecting custom data sources to large language models.",
        metadata={
            "library": "llama-index",
        },
    ),
    Document(
        text="Tuana is DevRel at LlamaIndex.",
        metadata={
            "library": "llama-index",
        },
    ),
    Document(
        text="Qdrant is a vector database & vector similarity search engine.",
        metadata={
            "library": "qdrant",
        },
    ),
]

In [None]:
from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.embeddings.openai import OpenAIEmbedding

storage_context = StorageContext.from_defaults(vector_store=vector_store)

index = VectorStoreIndex.from_documents(
    documents=documents,
    vector_store=vector_store,
    embed_model=OpenAIEmbedding(),
    storage_context=storage_context,
)

## RAG Over Qdrant

In [None]:
query_engine = index.as_query_engine()

response = query_engine.query("What is Qdrant?")
print(response)

Qdrant is a vector database and vector similarity search engine.


## Building an Agent Workflow 

In this example, we will be building an agent workflow that acts as both an agentic RAG application, as well as a database management tool. Our aim is to make use of [LlamaIndex Workflows](https://docs.llamaindex.ai/en/stable/module_guides/workflow/) to create a workflow that has a decision step, which based on the users query, makes the agent either query the database, or write to it. 

### 1. Function Calling Agent

In [None]:
async def write_statement(statement: str) -> str:
    """Useful for writing statements to a collection"""
    document = Document(text=statement)
    await index.ainsert_nodes([document])
    return f"Wrote the statement: {statement} to the collection"

def query_collection(query: str) -> str:
    """Useful for querying the collection"""
    response = await query_engine.aquery(query)
    return response.response

In [None]:
from llama_index.core.agent.workflow import FunctionAgent

llm = OpenAI(model="gpt-4.1-mini")

agent = FunctionAgent(
    tools=[write_statement, query_collection],
    llm=llm,
    system_prompt="""You are a helpful assistant that can write statements to
    a collectoin or forward queries to it.""",
)

In [None]:
from llama_index.core.agent.workflow import (
    AgentInput,
    AgentOutput,
    ToolCall,
    ToolCallResult,
    AgentStream,
)

async def view_agent_steps(agent, input:str):
    handler = agent.run(input)
    current_agent = None
    current_tool_calls = ""
    async for event in handler.stream_events():
        if (
            hasattr(event, "current_agent_name")
            and event.current_agent_name != current_agent
        ):
            current_agent = event.current_agent_name
            print(f"\n{'='*50}")
            print(f"🤖 Agent: {current_agent}")
            print(f"{'='*50}\n")
        elif isinstance(event, AgentOutput):
            if event.response.content:
                print("📤 Output:", event.response.content)
            if event.tool_calls:
                print(
                    "🛠️  Planning to use tools:",
                    [call.tool_name for call in event.tool_calls],
                )
        elif isinstance(event, ToolCallResult):
            print(f"🔧 Tool Result ({event.tool_name}):")
            print(f"  Arguments: {event.tool_kwargs}")
            print(f"  Output: {event.tool_output}")
        elif isinstance(event, ToolCall):
            print(f"🔨 Calling Tool: {event.tool_name}")
            print(f"  With arguments: {event.tool_kwargs}")

In [None]:
await view_agent_steps(agent, "Who is Tuana?")


🤖 Agent: Agent

🛠️  Planning to use tools: ['query_collection']
🔨 Calling Tool: query_collection
  With arguments: {'query': 'Who is Tuana?'}
🔧 Tool Result (query_collection):
  Arguments: {'query': 'Who is Tuana?'}
  Output: Tuana is DevRel at LlamaIndex.
📤 Output: Tuana is a Developer Relations (DevRel) professional at LlamaIndex. If you would like to know more about Tuana or their work, feel free to ask!


### 2. Custom Workflow

In [None]:
from pydantic import BaseModel, Field
from typing import List, Union

from llama_index.core.workflow import (Workflow, Event, step, StartEvent, StopEvent)
from llama_index.llms.openai import OpenAIResponses
from llama_index.core.llms import ChatMessage

class SaveToDocs(BaseModel):
	"""The statement to save into your collection."""
	statement: str = Field(default_factory=str)


class Ask(BaseModel):
	"""The natural language questions that can be asked to a Q&A agent."""
	queries: List[str] = Field(default_factory=list)


class Actions(BaseModel):
	"""Actions to take based on the latest user message."""
	actions: List[Union[SaveToDocs, Ask]] = Field(default_factory=list)


In [None]:
class WriteStatement(Event):
	statement: str

class QueryIndex(Event):
	queries: List[str]

class QdrantDocumentAgent(Workflow):
	def __init__(self, *args, **kwargs):
		self.sllm = OpenAIResponses(model="gpt-4.1-mini").as_structured_llm(Actions)
		self.system_prompt = """You are a docs assistant. You evaluate incoming queries and break them down to subqueries when needed.
								You decide on the next best course of action. Overall, here are the options:
								- You can write documents to your collection.
								- You can answer a questions based on the contents of your collection."""
		super().__init__(*args, **kwargs)

	@step
	async def start(self, ev: StartEvent) -> WriteStatement | QueryIndex:

		response = await self.sllm.achat(
				[
						ChatMessage(role="system", content=self.system_prompt),
						ChatMessage(role="user", content=ev.query),
				]
		)
		actions = response.raw.actions
		print(actions)
		for action in actions:
			if isinstance(action, SaveToDocs):
				print("Got Save event")
				return WriteStatement(statement=action.statement)
			elif isinstance(action, Ask):
				print("Got Ask event")
				return QueryIndex(queries=action.queries)
	@step
	async def query_index(self, ev: QueryIndex) -> StopEvent:
		print(f"Request to query index with queries: {ev.queries}")
		return StopEvent()

	@step
	async def save_to_index(self, ev: WriteStatement) -> StopEvent:
		print(f"Request to write to index: {ev.statement}")
		return StopEvent()

In [None]:
workflow = QdrantDocumentAgent()

In [None]:
respone = await workflow.run(start_event=StartEvent(query="Who is Kacper?"))

[Ask(queries=['Who is Kacper?'])]
Got Ask event
Request to query index with queries: ['Who is Kacper?']


In [None]:
class QdrantDocumentAgent(Workflow):
  def __init__(self, *args, **kwargs):
        self.sllm = OpenAIResponses(model="gpt-4.1-mini").as_structured_llm(Actions)
        self.system_prompt = """You are a docs assistant. You evaluate incoming queries and break them down to subqueries when needed.
                          You decide on the next best course of action. Overall, here are the options:
                          - You can write documents to your collection.
                          - You can answer a questions based on the contents of your collection."""
        super().__init__(*args, **kwargs)

  @step
  async def start(self, ev: StartEvent) -> WriteStatement | QueryIndex:
    response = await self.sllm.achat(
        [
            ChatMessage(role="system", content=self.system_prompt),
            ChatMessage(role="user", content=ev.query),
        ]
    )
    actions = response.raw.actions
    print(actions)
    for action in actions:
      if isinstance(action, SaveToDocs):
          return WriteStatement(statement=action.statement)
      elif isinstance(action, Ask):
          return QueryIndex(queries=action.queries)

  @step
  async def query_index(self, ev: QueryIndex) -> StopEvent:
    for query in ev.queries:
      response = query_engine.query(query)
      print(response)
    return StopEvent()

  @step
  async def save_to_index(self, ev: WriteStatement) -> StopEvent:
    document = Document(text=ev.statement)
    await index.ainsert_nodes([document])
    print(f"Wrote {document} to the index")
    return StopEvent()

In [None]:
workflow = QdrantDocumentAgent()
response = await workflow.run(start_event=StartEvent(query="Write this statement: Kacper is DevRel at Qdrant."))

[SaveToDocs(statement='Kacper is DevRel at Qdrant.')]
Wrote Doc ID: 4c009c95-eeb2-46ee-87a5-c2384ae51f1c
Text: Kacper is DevRel at Qdrant. to the index


## Bring Real-World Data with Readers and Data Loaders

In [None]:
from llama_index.readers.web import SimpleWebPageReader

async def write_webpages_to_qdrant(index, urls: List[str]):
    documents = SimpleWebPageReader(html_to_text=True).load_data(urls)
    await index.ainsert_nodes([documents])

In [None]:
await write_webpages_to_qdrant(index, urls=["https://docs.llamaindex.ai/en/stable/module_guides/workflow/"])

## Using LlamaCloud and Managed Qdrant Collections

So far, we saw how we can build agentic workflows over locally hosted Qdrant collections. But, you can also use managed Qdrant collections that you may have in Qdrant Cloud.

By combining LlamaCloud and Qdrant as the sink, you also make use of LlamaCloud's advanced parsing capabilities. You can see the documentation on how to do it programmatically and throught the [UI](https://cloud.llamaindex.ai?utm_source=demo&utm_medium=li_social&utm_campaign=cloud) at on [LlamaCloud Documentation](https://docs.cloud.llamaindex.ai/llamacloud/integrations/data_sinks/qdrant?utm_source=demo&utm_medium=li_social&utm_campaign=cloud).

Below, let's see an example of an index, hooked up to Qdrant as the vector database which we manage separately, and a Google Drive folder as the data source 👇

In [None]:
import os
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex


index = LlamaCloudIndex("coming-sole-2025-07-09", project_name="Default")
query_engine = index.as_query_engine()
answer = await query_engine.aquery("What are LlamaIndex Workflows?")

In [None]:
print(answer)

LlamaIndex Workflows are event-driven abstractions used to chain together several events. They are composed of steps, where each step is responsible for handling specific event types and emitting new events. Workflows in LlamaIndex are created by decorating functions with a @step decorator, which helps infer the input and output types of each workflow for validation. These workflows ensure that each step only runs when an accepted event is ready. Additionally, Workflows in LlamaIndex are versatile and can be used to build agents, RAG flows, extraction flows, or any other desired functionality. They are also automatically instrumented for observability, allowing users to gain insights into each step using tools like Arize Phoenix.
