In [1]:
import nest_asyncio

nest_asyncio.apply()

In [4]:
%pip install -qU llama-index llama-index-utils-workflow
%pip install -qU pinecone llama-index-vector-stores-pinecone
%pip install -qU llama-index-embeddings-mistralai llama-index-llms-text-generation-inference
%pip install -qU llama-index-core llama-parse llama-index-readers-file python-dotenv
%pip install -qU llama-index-readers-file
%pip install -qU "openinference-instrumentation-llama-index>=3.0.0" "opentelemetry-proto>=1.12.0" opentelemetry-exporter-otlp opentelemetry-sdk


Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [5]:
import os
from dotenv import load_dotenv

load_dotenv()

PINECONE_API = os.environ["PINECONE_API"]
PINECONE_INDEX_NAME = os.environ["PINECONE_INDEX_NAME"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

PHOENIX_API_KEY = os.environ["PHOENIX_API_KEY"]
OTEL_EXPORTER_OTLP_HEADERS = os.environ["OTEL_EXPORTER_OTLP_HEADERS"]
PHOENIX_CLIENT_HEADERS = os.environ["PHOENIX_CLIENT_HEADERS"]
PHOENIX_COLLECTOR_ENDPOINT = os.environ["PHOENIX_COLLECTOR_ENDPOINT"]

In [7]:
PHOENIX_COLLECTOR_ENDPOINT

'https://app.phoenix.arize.com'

In [8]:
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=os.environ["PINECONE_API"])
index_name = os.environ["PINECONE_INDEX_NAME"]
pinecone_index = pc.Index(index_name)

  from tqdm.autonotebook import tqdm


In [None]:
# from llama_index.embeddings.openai import OpenAIEmbedding
# from llama_index.core import Settings

# embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")
# Settings.embed_model = embed_model

In [9]:
from llama_index.core import PromptTemplate

DEFAULT_RAG_PROMPT = PromptTemplate(
    template="""Use the provided context to answer the question. If you don't know the answer, say you don't know.

    Context:
    {context}

    Question:
    {question}
    """
)

In [10]:
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore

class PrepEvent(Event):
    """Prep event (prepares for retrieval)."""
    pass

class RetrieveEvent(Event):
    """Retrieve event (gets retrieved nodes)."""

    retrieved_nodes: list[NodeWithScore]

class AugmentGenerateEvent(Event):
    """Query event. Queries given relevant text and search text."""
    relevant_text: str
    search_text: str

In [36]:
from llama_index.core.workflow import (
    Workflow,
    step,
    Context,
    StartEvent,
    StopEvent,
)
from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.core.query_pipeline import QueryPipeline
from llama_index.llms.openai import OpenAI
from llama_index.core.base.base_retriever import BaseRetriever
from llama_index.vector_stores.pinecone import PineconeVectorStore

# First things first, we need to create a new class that subclasses Workflow.
# Each step, now, is a method (decorated by the @step decorator) which will take an Event and Context as input.
class WorkflowRAG(Workflow):
    @step
    async def initialize_index(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Initializing index."""

        pc = Pinecone(api_key=os.environ["PINECONE_API"])
        index_name = os.environ["PINECONE_INDEX_NAME"]
        pinecone_index = pc.Index(index_name)

        vector_store = PineconeVectorStore(
            pinecone_index=pinecone_index,
            add_sparse_vector=True,
        )
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        index = VectorStoreIndex.from_documents(
            [], storage_context=storage_context
        )
        return StopEvent(result=index)

    @step
    async def prepare_for_retrieval(
        self, ctx: Context, ev: StartEvent
    ) -> PrepEvent | None:
        """Prepare for retrieval."""

        query_str: str | None = ev.get("query_str")
        retriever_kwargs: dict | None = ev.get("retriever_kwargs", {})

        if query_str is None:
            return None

        index = ev.get("index")

        llm=OpenAI(model="gpt-4o-mini")
        await ctx.set("rag_pipeline", QueryPipeline(
            chain=[DEFAULT_RAG_PROMPT, llm]
        ))

        await ctx.set("llm", llm)
        await ctx.set("index", index)

        await ctx.set("query_str", query_str)
        await ctx.set("retriever_kwargs", retriever_kwargs)

        return PrepEvent()

    @step
    async def retrieve(
        self, ctx: Context, ev: PrepEvent
    ) -> RetrieveEvent | None:
        """Retrieve the relevant nodes for the query."""
        query_str = await ctx.get("query_str")
        retriever_kwargs = await ctx.get("retriever_kwargs")

        if query_str is None:
            return None

        index = await ctx.get("index", default=None)
        if not (index):
            raise ValueError(
                "Index and tavily tool must be constructed. Run with 'documents' and 'tavily_ai_apikey' params first."
            )

        retriever: BaseRetriever = index.as_retriever(
            **retriever_kwargs
        )
        result = retriever.retrieve(query_str)
        await ctx.set("query_str", query_str)
        return RetrieveEvent(retrieved_nodes=result)

    @step
    async def augment_and_generate(self, ctx: Context, ev: RetrieveEvent) -> StopEvent:
        """Get result with relevant text."""
        relevant_nodes = ev.retrieved_nodes
        relevant_text = "\n".join([node.get_content() for node in relevant_nodes])
        query_str = await ctx.get("query_str")

        relevancy_pipeline = await ctx.get("rag_pipeline")

        relevancy = relevancy_pipeline.run(
                context=relevant_text, question=query_str
        )

        return StopEvent(result=relevancy.message.content)

In [37]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(
    WorkflowRAG, filename="wf_rag_workflow.html"
)

wf_rag_workflow.html


huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


In [38]:
rag_workflow = WorkflowRAG()
index = await rag_workflow.run()

In [39]:
from IPython.display import Markdown, display

response = await rag_workflow.run(
    query_str="How do I install oh my zsh?",
    index=index,
)
display(Markdown(str(response)))

To install Oh My Zsh, you first need to have Zsh installed on your machine. Here are the steps to install Oh My Zsh:

1. **Install Zsh**: If you don't have Zsh installed, you can do so using the following commands:
   - On Mac: `brew install zsh`
   - On Ubuntu: `sudo apt-get install zsh`

2. **Install Oh My Zsh**: Once Zsh is installed, you can install Oh My Zsh using one of the following commands:
   - Using `curl`: 
     ```bash
     sh -c "$(curl -fsSL https://raw.githubusercontent.com/ohmyzsh/ohmyzsh/master/tools/install.sh)"
     ```
   - Using `wget`: 
     ```bash
     sh -c "$(wget https://raw.githubusercontent.com/ohmyzsh/ohmyzsh/master/tools/install.sh -O -)"
     ```

3. **Follow the prompts**: After running the installation command, you may need to enter your password and follow any additional prompts.

Make sure to check the official Oh My Zsh website for the most up-to-date installation instructions and any additional options.