In [None]:
%%capture
!pip install llama-index llama-index-embeddings-cohere qdrant-client llama-index-vector-stores-qdrant llama-index-llms-cohere

In [None]:
%%capture
!pip install llama-index-llms-ollama llama-index-embeddings-cohere

In [None]:
!pip show llama-index

In [None]:
import os
from dotenv import load_dotenv
from getpass import getpass

import nest_asyncio

nest_asyncio.apply()
load_dotenv()

In [None]:
CO_API_KEY = os.environ['CO_API_KEY'] or getpass("Enter your Cohere API key: ")

In [None]:
# Remove OpenAI API key since we're not using OpenAI
# OPENAI_API_KEY = os.environ['OPENAI_API_KEY'] or getpass("Enter your OpenAI API key: ")

In [None]:
QDRANT_URL = os.environ['QDRANT_URL'] or getpass("Enter your Qdrant URL:")

In [None]:
QDRANT_API_KEY = os.environ['QDRANT_API_KEY'] or  getpass("Enter your Qdrant API Key:")

# Query Workflows

<img src="https://docs.llamaindex.ai/en/stable/_static/query/pipeline_rag_example.png">

Source: [LlamaIndex Docs](https://docs.llamaindex.ai/en/stable/module_guides/querying/pipeline/)

LlamaIndex offers a workflow API for chaining modules to manage data workflows easily. It revolves around the Workflow, where you link various modules like LLMs, prompts, and retrievers in a sequence or DAG for end-to-end execution using events.

You can streamline workflows efficiently using Workflow, reducing code complexity and enhancing readability. Additionally, a declarative interface ensures easy serialization of workflow components for portability and deployment across systems in the future.

In [None]:
from llama_index.core.settings import Settings
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.cohere import CohereEmbedding

Settings.llm = Ollama(
    model="qwen2.5:7b",
    request_timeout=120.0,
    context_window=8000,
)

Settings.embed_model = CohereEmbedding(
    api_key=CO_API_KEY,
    model_name="embed-english-v3.0"
)

# Debug: Verify embedding model is set
print(f"Embedding model type: {type(Settings.embed_model)}")
print(f"Embedding model: {Settings.embed_model}")

In [None]:
from qdrant_client import AsyncQdrantClient
from llama_index.core import VectorStoreIndex, StorageContext
from llama_index.vector_stores.qdrant import QdrantVectorStore

# Create a Qdrant client
client = AsyncQdrantClient(
    url=QDRANT_URL, 
    api_key=QDRANT_API_KEY,
)

# Create a Qdrant vector store
vector_store = QdrantVectorStore(
    aclient=client, 
    collection_name="it_can_be_done"
    )

# Create a vector store index with explicit embed_model
index = VectorStoreIndex.from_vector_store(
    vector_store=vector_store,
    embed_model=Settings.embed_model,
)

# A RAG Workflow with PromptTemplate

I'm going to kick it off with a slightly complex workflow where the input is passes through two prompts before initiating retrieval.

1. Retrieve question about given topic.

2. Rephrase the context

Each prompt only takes in one input, so `Workflow` will automatically chain LLM outputs into the prompt and then into the LLM using events.

You'll see how to define event flows more explicitly in the next section.

In [None]:
from llama_index.core import PromptTemplate
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

# Define events for the workflow
class TopicEvent(Event):
    topic: str

class ContextEvent(Event):
    context: str

class ResponseEvent(Event):
    response: str

# generate question regarding topic
prompt_str1 = "Retrieve context about the following topic: {topic}"
prompt_tmpl1 = PromptTemplate(prompt_str1)

prompt_str2 = """Synthesize the context provided into an answer using modern slang, while still quoting the sources.

Context:

{context}

Synthesized response:
"""

prompt_tmpl2 = PromptTemplate(prompt_str2)

retriever = index.as_retriever(similarity_top_k=5)

class RAGWorkflow(Workflow):
    @step
    async def generate_query(self, ev: StartEvent) -> TopicEvent:
        topic = ev.topic
        query = prompt_tmpl1.format(topic=topic)
        return TopicEvent(topic=query)
    
    @step
    async def retrieve_context(self, ev: TopicEvent) -> ContextEvent:
        nodes = await retriever.aretrieve(ev.topic)
        context = "\n\n".join([node.text for node in nodes])
        return ContextEvent(context=context)
    
    @step
    async def synthesize_response(self, ev: ContextEvent) -> StopEvent:
        prompt = prompt_tmpl2.format(context=ev.context)
        response = await Settings.llm.acomplete(prompt)
        return StopEvent(result=str(response))

w = RAGWorkflow(timeout=60, verbose=True)

In [None]:
import asyncio

async def run_workflow():
    response = await w.run(topic="Working hard to achieve your goals even when you doubt yourself and your chances of success")
    return response

response = asyncio.run(run_workflow())

In [None]:
print(response)

You can debug the workflow by viewing the event history and intermediate results

In [None]:
async def run_workflow_with_debug():
    handler = w.run(topic="Working hard to achieve your goals even when you doubt yourself and your chances of success")
    async for event in handler.stream_events():
        print(f"Event: {event}")
    result = await handler
    return result

result = asyncio.run(run_workflow_with_debug())

### Another RAG Workflow

Here we setup a RAG workflow without the query rewriting step.

Here we need a way to link the input query to both the retriever and summarizer. 

We can do this by defining events that can be consumed by multiple steps, allowing us to link the inputs to multiple downstream modules.

In [None]:
from llama_index.core.response_synthesizers import TreeSummarize

retriever = index.as_retriever(similarity_top_k=5)
tree_summarizer = TreeSummarize(llm=Settings.llm)

class QueryEvent(Event):
    query: str

class NodesEvent(Event):
    nodes: list

class SimpleRAGWorkflow(Workflow):
    @step
    async def retrieve_nodes(self, ev: StartEvent) -> NodesEvent:
        nodes = await retriever.aretrieve(ev.query)
        return NodesEvent(nodes=nodes)
    
    @step
    async def synthesize_response(self, ev: NodesEvent) -> StopEvent:
        # Get the original query from the context
        query = self._get_original_query()
        response = await tree_summarizer.asynthesize(query, ev.nodes)
        return StopEvent(result=response)
    
    def _get_original_query(self):
        # Access the original query from workflow context if needed
        return getattr(self, '_original_query', '')

w2 = SimpleRAGWorkflow(timeout=60, verbose=True)

In [None]:
# The workflow is already configured with the steps and event flows
# No need for manual module addition and linking like in QueryPipeline
print("Workflow configured with automatic event routing")

In [None]:
async def run_simple_workflow():
    # Store original query in workflow for access in steps
    w2._original_query = "Working hard to achieve your goals even when you doubt yourself and your chances of success"
    response = await w2.run(query="Working hard to achieve your goals even when you doubt yourself and your chances of success")
    return response

response = asyncio.run(run_simple_workflow())

In [None]:
print(str(response))

In [None]:
# Access response attributes
if hasattr(response, '__dict__'):
    print(response.__dict__)
else:
    print(f"Response type: {type(response)}")
    print(f"Response: {response}")