## Preparation

As always, we have some work to do before we can jump straight into the workflows.

Let's set-up some boilerplate, add some dependencies, and get ready to rock!

### Async Boilerplate:

Since "workflows make async a first-class citizen", and we're running these examples in a Jupyter Notebook (which is in an active async loop!) we'll need to use the `nest_asyncio` library to ensure we're able to take advantage of the async capabilities of the workflows we're making!

In [None]:
import nest_asyncio

nest_asyncio.apply()

### Installing Dependencies:

Next, we're going to install our dependencies!

We'll want to grab our `llama-index-utils-workflow` package which will let us draw all possible paths through the resultant workflow.

We'll also grab the rest of our dependencies here as well!

In [None]:
%pip install -qU llama-index llama-index-utils-workflow

In [None]:
%pip install -qU pinecone llama-index-vector-stores-pinecone

In [None]:
%pip install -qU llama-index-embeddings-mistralai llama-index-llms-text-generation-inference

In [None]:
%pip install -qU llama-index-core llama-parse llama-index-readers-file python-dotenv

In [None]:
%pip install -qU llama-index-readers-file

Next, we'll grab all of our API keys, which are a lot!

In [None]:
import os
import getpass

os.environ["PINECONE_API_KEY"] = getpass.getpass("Pinecone API Key:")

Pinecone API Key:··········


In [None]:
os.environ["LLAMA_CLOUD_API_KEY"] = getpass.getpass("Llama Cloud API Key")

Llama Cloud API Key··········


In [None]:
os.environ["HF_TOKEN"] = getpass.getpass("Huggingface Token:")

Huggingface Token:··········


In [None]:
os.environ["MISTRAL_API_KEY"] = getpass.getpass("Mistral API Key:")

Mistral API Key:··········


In [None]:
!git clone https://github.com/AI-Maker-Space/DataRepository.git

Cloning into 'DataRepository'...
remote: Enumerating objects: 110, done.[K
remote: Counting objects: 100% (102/102), done.[K
remote: Compressing objects: 100% (88/88), done.[K
remote: Total 110 (delta 34), reused 35 (delta 9), pack-reused 8 (from 1)[K
Receiving objects: 100% (110/110), 71.41 MiB | 22.89 MiB/s, done.
Resolving deltas: 100% (34/34), done.


We'll use [LlamaParse](https://docs.llamaindex.ai/en/stable/llama_cloud/llama_parse/) to ingest the legal complaint that Elon Musk brought against OpenAI - which we can find [here](https://github.com/AI-Maker-Space/DataRepository/blob/main/RAGATHON/musk_v_openai.pdf).

In [None]:
from llama_parse import LlamaParse
from llama_index.core import SimpleDirectoryReader

parser = LlamaParse(
    result_type="markdown"  # "markdown" and "text" are available
)

file_extractor = {".pdf": parser}
pdf_documents = SimpleDirectoryReader(input_files=['./DataRepository/RAGATHON/musk_v_openai.pdf'], file_extractor=file_extractor).load_data()
print(len(pdf_documents))

Started parsing the file under job_id a683348b-4f40-45f0-91fb-82eefeca7dc0
86


In [None]:
all_documents = pdf_documents

Next, we'll use our [mistral-embed](https://docs.mistral.ai/capabilities/embeddings/) model as our default Embedding model!

In [None]:
from llama_index.embeddings.mistralai import MistralAIEmbedding

model_name="mistral-embed"

embed_model = MistralAIEmbedding(model_name=model_name)

Let's test this out - and grab our embedding dimension by checking the length of the returned response.

In [None]:
embeddings = embed_model.get_text_embedding("Welcome to the RAGATHON!")
print(len(embeddings))
embedding_dimension = len(embeddings)

1024


Let's save this model in our settings!

In [None]:
from llama_index.core import Settings

Settings.embed_model = embed_model

Now, we can set-up [Pinecone's integration with LlamaIndex](https://www.pinecone.io/pricing/)!

We'll need to ensure we have our Pinecone API key!

![image](https://i.imgur.com/xiJXVpC.png)

> NOTE: Your organization, and project names will be different.

In [None]:
from pinecone import Pinecone, ServerlessSpec

pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])

Next, we'll create our Pinecone Index through the Pinecone client!

In [None]:
index_name = "llamaindex-ragathon-demo-index-v1"

pc.create_index(
    name=index_name,
    dimension=embedding_dimension,
    metric="cosine",
    spec=ServerlessSpec(
        cloud="aws",
        region="us-east-1"
    )
)

In [None]:
pinecone_index = pc.Index(index_name)

## LLM

We can point at our inference endpoint (that we set-up already) by including our URL below!

In [None]:
import os
from typing import List, Optional

from llama_index.llms.text_generation_inference import (
    TextGenerationInference,
)

URL = "<< YOUR URL HERE>>"
hf_llm = TextGenerationInference(
    model_url=URL, token=os.environ["HF_TOKEN"]
)

completion_response = hf_llm.complete("To infinity, and")
print(completion_response)



...beyond!


## RAG Prompt

We'll set up and provide a classic RAG Prompt, of course!

In [None]:
from llama_index.core import PromptTemplate

DEFAULT_RAG_PROMPT = PromptTemplate(
    template="""Use the provided context to answer the question. If you don't know the answer, say you don't know.

    Context:
    {context}

    Question:
    {question}
    """
)

## Steps & Events: LlamaIndex Workflow Workshop Introduction.

`Steps` and `Events` comprise the core building-blocks of LlamaIndex Workflows.

In the simplest terms:

`Steps`:
- `Steps` are units of work, or tasks, in a Workflow. They are typically Python functions decorated by `@step`, marking them as part of the Workflow.
- Each `Step` is associated with `Events` as input, and `Events` as outputs.
  - A `Step` must take, as input, one or more `Events`
  - A `Step` must emit, as output, an `Event`.
-`Steps` can be extended to have multiple workers in Workflows where that would be an advantage.
- `Steps` can modify shared global context (can be thought of as state) as required.

`Events`:
- `Events` are data structures that pass information between `Steps`.
- `Events` are based on Pydantic Models granting all the typical benefits of type validation, etc.
- There are two special `Events` worth listing immediately:
  - `StartEvent` - the entry point into the Workflow.
  - `StopEvent` - this event stops the execution of the current Workflow.

That's a lot of information - so let's see how we could set these events up ourselves.

We'll need a sample application to do so - so let's cover the core ideas behind Corrective RAG!

In [None]:
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore

class PrepEvent(Event):
    """Prep event (prepares for retrieval)."""
    pass

class RetrieveEvent(Event):
    """Retrieve event (gets retrieved nodes)."""

    retrieved_nodes: list[NodeWithScore]

class AugmentGenerateEvent(Event):
    """Query event. Queries given relevant text and search text."""
    relevant_text: str
    search_text: str

## Setting up Steps

Next, we'll define our `Steps`!

Remember: A `Step` must be triggered by one or more `Events`, and it must emit an `Event`.

To get started with our Workflow, we'll need to define a Workflow class.

Let's do that!

#### An Aside on Context:

`Context`, in workflows, is analagous to `State` in frameworks like LangGraph.

It's a way to provide information to multiple `Steps`, without needing to constantly carry forward information in each `Event`.

In [None]:
from llama_index.core.workflow import (
    Workflow,
    step,
    Context,
    StartEvent,
    StopEvent,
)
from llama_index.core import (
    VectorStoreIndex,
    Document,
    SummaryIndex,
)
from llama_index.core.query_pipeline import QueryPipeline
from llama_index.llms.openai import OpenAI
from llama_index.core import StorageContext
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.vector_stores.pinecone import PineconeVectorStore
from IPython.display import Markdown, display
from llama_index.core.base.base_retriever import BaseRetriever

# First things first, we need to create a new class that subclasses Workflow.
# Each step, now, is a method (decorated by the @step decorator) which will take an Event and Context as input.
class OpenSourceRAG(Workflow):
    @step
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Ingest step (for ingesting docs and initializing index)."""
        documents: list[Document] | None = ev.get("documents")

        if documents is None:
            return None

        vector_store = PineconeVectorStore(pinecone_index=pinecone_index)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)
        index = VectorStoreIndex.from_documents(
            documents, storage_context=storage_context
        )

        return StopEvent(result=index)

    @step
    async def prepare_for_retrieval(
        self, ctx: Context, ev: StartEvent
    ) -> PrepEvent | None:
        """Prepare for retrieval."""

        model_url = "https://cx7s40y9qdd7zxhr.us-east-1.aws.endpoints.huggingface.cloud"

        query_str: str | None = ev.get("query_str")
        retriever_kwargs: dict | None = ev.get("retriever_kwargs", {})

        if query_str is None:
            return None

        index = ev.get("index")

        llm = TextGenerationInference(
            model_url=model_url,
            token=os.environ["HF_TOKEN"],
            model_name="hugging-quants/Meta-Llama-3.1-8B-Instruct-AWQ-INT4 "
        )
        await ctx.set("rag_pipeline", QueryPipeline(
            chain=[DEFAULT_RAG_PROMPT, llm]
        ))

        await ctx.set("llm", llm)
        await ctx.set("index", index)

        await ctx.set("query_str", query_str)
        await ctx.set("retriever_kwargs", retriever_kwargs)

        return PrepEvent()

    @step
    async def retrieve(
        self, ctx: Context, ev: PrepEvent
    ) -> RetrieveEvent | None:
        """Retrieve the relevant nodes for the query."""
        query_str = await ctx.get("query_str")
        retriever_kwargs = await ctx.get("retriever_kwargs")

        if query_str is None:
            return None

        index = await ctx.get("index", default=None)
        if not (index):
            raise ValueError(
                "Index and tavily tool must be constructed. Run with 'documents' and 'tavily_ai_apikey' params first."
            )

        retriever: BaseRetriever = index.as_retriever(
            **retriever_kwargs
        )
        result = retriever.retrieve(query_str)
        await ctx.set("query_str", query_str)
        return RetrieveEvent(retrieved_nodes=result)

    @step
    async def augment_and_generate(self, ctx: Context, ev: RetrieveEvent) -> StopEvent:
        """Get result with relevant text."""
        relevant_nodes = ev.retrieved_nodes
        relevant_text = "\n".join([node.get_content() for node in relevant_nodes])
        query_str = await ctx.get("query_str")

        relevancy_pipeline = await ctx.get("rag_pipeline")

        relevancy = relevancy_pipeline.run(
                context=relevant_text, question=query_str
        )

        return StopEvent(result=relevancy.message.content)

## "Graphing" our Workflow

Since we have `Steps` that take `Events` and return `Events` - we can trace through all possible paths and wind up with a graph!

In [None]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(
    OpenSourceRAG, filename="os_rag_workflow.html"
)

os_rag_workflow.html


## Using our Workflow

First, we need to set-up our documents, then initialize our Index!

In [None]:
from llama_index.core import SimpleDirectoryReader

rag_workflow = OpenSourceRAG()
index = await rag_workflow.run(documents=all_documents)

Upserted vectors:   0%|          | 0/87 [00:00<?, ?it/s]

Now we're ready to query our Workflow!

In [None]:
from IPython.display import Markdown, display

response = await rag_workflow.run(
    query_str="Why did Elon Musk sue OpenAI?",
    index=index,
)
display(Markdown(str(response)))



I don't know.

In [None]:
from IPython.display import Markdown, display

response = await rag_workflow.run(
    query_str="In what state was this complaint levied?",
    index=index,
)
display(Markdown(str(response)))



I don't have enough information to answer your question. You haven't provided any context about the complaint, so I'm unable to determine the state in which it was levied.

---