<a href="https://colab.research.google.com/github/ernanhughes/local-rag/blob/main/LlamaIndex_Workflows_AI_Makerspace.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LlamaIndex Workflows - AI Makerspace Event

In the follow notebook we'll be looking at LlamaIndex's Workflows!

We'll cover:

1. Events
2. Steps
3. Tying them together in a Workflow!

The example application we'll be using is a Corrective RAG Workflow, based on the research paper [Corrective Retrieval Augmented Generation](https://arxiv.org/pdf/2401.15884).

## Preparation

As always, we have some work to do before we can jump straight into the workflows.

Let's set-up some boilerplate, add some dependencies, and get ready to rock!

### Async Boilerplate:

Since "workflows make async a first-class citizen", and we're running these examples in a Jupyter Notebook (which is in an active async loop!) we'll need to use the `nest_asyncio` library to ensure we're able to take advantage of the async capabilities of the workflows we're making!

In [None]:
import nest_asyncio

nest_asyncio.apply()

### Installing Dependencies:

Next, we're going to install our dependencies!

We'll want to get the [Taviliy Research Tool](https://llamahub.ai/l/tools/llama-index-tools-tavily-research?from=) which will allow us to do open research as part of our Corrective RAG Implementation (more details on that later).

We'll also want to grab our `llama-index-utils-workflow` package which will let us draw all possible paths through the resultant workflow.

In [None]:
%pip install -qU llama-index llama-index-tools-tavily-research llama-index-utils-workflow

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/756.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m153.6/756.0 kB[0m [31m4.8 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m747.5/756.0 kB[0m [31m12.2 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m756.0/756.0 kB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
[?25h

### Setting Up API Keys

Since we'll be using OpenAI's models to power our workflows today, we'll need to provide our OpenAI API Key.

> NOTE: Look [here](https://help.openai.com/en/articles/4936850-where-do-i-find-my-openai-api-key) to find your API key.

In [None]:
import os
import getpass

os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")

OpenAI API Key:··········


We'll also be using a Taviliy API Key - which you can obtain [here!](https://app.tavily.com/home)

In [None]:
os.environ["TAVILY_API_KEY"] = getpass.getpass("Tavily API Key:")

Tavily API Key:··········


### Preparing Data

We're going to be using the HTML file version of the new [EU AI Act](https://eur-lex.europa.eu/legal-content/EN/TXT/?uri=CELEX%3A32024R1689) so we can ask some questions about the new act that we have!

> NOTE: We've provided this in our `DataRepository`, but you could just as easily replace this data with whatever data you desire!

We'll get started by cloning the repository.

In [None]:
!git clone https://github.com/AI-Maker-Space/DataRepository

Cloning into 'DataRepository'...
remote: Enumerating objects: 90, done.[K
remote: Counting objects: 100% (82/82), done.[K
remote: Compressing objects: 100% (69/69), done.[K
remote: Total 90 (delta 24), reused 29 (delta 8), pack-reused 8 (from 1)[K
Receiving objects: 100% (90/90), 70.26 MiB | 33.70 MiB/s, done.
Resolving deltas: 100% (24/24), done.


Then we'll move the desired file into our `data` folder for later!

In [None]:
!mkdir data
!cp -r DataRepository/eu_ai_act.html data/eu_ai_act.html

## Steps & Events: LlamaIndex Workflow Event Introduction.

`Steps` and `Events` comprise the core building-blocks of LlamaIndex Workflows.

In the simplest terms:

`Steps`:
- `Steps` are units of work, or tasks, in a Workflow. They are typically Python functions decorated by `@step`, marking them as part of the Workflow.
- Each `Step` is associated with `Events` as input, and `Events` as outputs.
  - A `Step` must take, as input, one or more `Events`
  - A `Step` must emit, as output, an `Event`.
-`Steps` can be extended to have multiple workers in Workflows where that would be an advantage.
- `Steps` can modify shared global context (can be thought of as state) as required.

`Events`:
- `Events` are data structures that pass information between `Steps`.
- `Events` are based on Pydantic Models granting all the typical benefits of type validation, etc.
- There are two special `Events` worth listing immediately:
  - `StartEvent` - the entry point into the Workflow.
  - `StopEvent` - this event stops the execution of the current Workflow.

That's a lot of information - so let's see how we could set these events up ourselves.

We'll need a sample application to do so - so let's cover the core ideas behind Corrective RAG!

#### Corrective RAG Overview:

Let's take a look at the image provided from the [Corrective Retrieval Augmented Generation](https://i.imgur.com/14rfVyT.png) paper.


![image](https://i.imgur.com/14rfVyT.png)

To lay out the idea in basic terms, we have a few steps:

1. Obtain a user query, and retrieve `k` relevant documents.
2. Use a Retrieval Evaluation process to determine if the Retrieved documents are correct/relevant.
  - If the retrieved document(s) are correct, this process also refactors the documents to ensure only the most relevant information is retained.
  - If the retrieved documents are ambiguous, they go through the above process, but there are also external documents (obtained through search) added to the final context.
  - If the retrieved documents are incorrect, only the external search context is used.
3. The appropriate contexts are provided as context for the generation, as is typical in RAG.

Let's think about this process in the Event-Driven framework that is LlamaIndex workflows:

1. Step 1: Ingest and Process Data
  - FUNCTION:
    - Ingests, processes, and creates index to be used in following steps.
  - INPUT EVENT(S): `StartEvent`
  - OUTPUT EVENT(S): `StopEvent`
    - signifies the completion of this step
  - RETURNS: Created Index
2. Step 2: Prepare the Pipeline for Use
  - FUNCTION:
    - Modifies Workflow context to include all necessary pipelines for the remainder of the Workflow
  - INPUT EVENT(S): `StartEvent`
  - OUTPUT EVENT(S): `PrepEvent`
    - signifying that the pipeline is prepared and ready for Retrieval
    - STATE: None required.
3. Step 3: Retrieve Context
  - INPUT EVENT(S): `PrepEvent`
  - OUTPUT EVENT(S): `RetrieveEvent`
    - signifies that context has been retrieved
    - STATE: List of Contexts Retrieved
4. Step 4: Evaluate Context
  - FUNCTION:
    - Evaluates the current context as compared to the query to determine if the context is relevant.
  - INPUT EVENT(S): `RetrieveEvent`
  - OUTPUT EVENT(S): `RelevanceEvalEvent`
    - signifies that the context has been evaluated and is ready to have the related contexts extracted
    - STATE: List of contexts, and results of evaluation for each context.
5. Step 5: Extract Relevant Context
  - FUNCTION:
    - Extracts relevant context based on the evaluation step.
  - INPUT EVENT(S): `RelevanceEvalEvent`
  - OUTPUT EVENT(S): `TextExtractEvent`
    - signifies that the relevant context has been extracted based on the evaluation
    - STATE: `str` containing the relevant contexts separated by a `\n`.
6. Step 6: Transform the Query
  - FUNCTION:
    - Transforms the initial user query to be compatible with the external search tool, and searches if any of the previous contexts were evaluated to be not relevant
  - INPUT EVENT(S): `TextExtractEvent`
  - OUTPUT EVENT(S): `QueryEvent`
    - signifies that the query has been transformed, and the external search tool has been used, and the new context pool is ready
    - STATE: `str`: relevant texts, `str`: external searched documents
7. Step 7: Final Query
  - FUNCTION:
    - Uses the retrieved context (that was relevant) and the externally obtained context (if any of the retrieved context was irrelevant) to answer the user's query
  - INPUT EVENT(S): `QueryEvent`
  - OUTPUT EVENT(S): `StopEvent`
    - we're all done!
    - STATE: `str` result!

### Initialize Events:

Now that we have a plan - let's set this up in code - we'll start by defining our `Events` that will trigger and be emitted by our `Steps`.

First up is the `PrepEvent`!

We noted above that it didn't need any state, so let's just pass out if we see this `Event`!
> NOTE: Notice that each event subclasses `Event`!

In [None]:
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore

class PrepEvent(Event):
    """Prep event (prepares for retrieval)."""
    pass

Next will be our `RetrieveEvent`, we noted that it should have a list of contexts - and so we initialize it as such.

In [None]:
class RetrieveEvent(Event):
    """Retrieve event (gets retrieved nodes)."""

    retrieved_nodes: list[NodeWithScore]

We'll quickly initialize the rest of the `Events`, as their construction is (and should be) relatively simple!

In [None]:
class RelevanceEvalEvent(Event):
    """Relevance evaluation event (gets results of relevance evaluation)."""
    relevant_results: list[str]

class TextExtractEvent(Event):
    """Text extract event. Extracts relevant text and concatenates."""
    relevant_text: str

class QueryEvent(Event):
    """Query event. Queries given relevant text and search text."""
    relevant_text: str
    search_text: str

### Prompt Templates:

As always, when using an LLM Appliation, we'll want to create some prompt templates that achieve our tasks.

We'll need two for this application:

- `DEFAULT_RELEVANCY_PROMPT_TEMPLATE` - this will be used to evaluate our context to ensure it's relevant to the user's query.
- `DEFAULT_TRANSFORM_QUERY_TEMPLATE` - this prompt will transform the user's query to be compatible with our external search tool

In [None]:
from llama_index.core import PromptTemplate

DEFAULT_RELEVANCY_PROMPT_TEMPLATE = PromptTemplate(
    template="""As a grader, your task is to evaluate the relevance of a document retrieved in response to a user's question.

    Retrieved Document:
    -------------------
    {context_str}

    User Question:
    --------------
    {query_str}

    Evaluation Criteria:
    - Consider whether the document contains keywords or topics related to the user's question.
    - The evaluation should not be overly stringent; the primary objective is to identify and filter out clearly irrelevant retrievals.

    Decision:
    - Assign a binary score to indicate the document's relevance.
    - Use 'yes' if the document is relevant to the question, or 'no' if it is not.

    Please provide your binary score ('yes' or 'no') below to indicate the document's relevance to the user question."""
)

In [None]:
DEFAULT_TRANSFORM_QUERY_TEMPLATE = PromptTemplate(
    template="""Your task is to refine a query to ensure it is highly effective for retrieving relevant search results. \n
    Analyze the given input to grasp the core semantic intent or meaning. \n
    Original Query:
    \n ------- \n
    {query_str}
    \n ------- \n
    Your goal is to rephrase or enhance this query to improve its search performance. Ensure the revised query is concise and directly aligned with the intended search objective. \n
    Respond with the optimized query only:"""
)

## Setting up Steps

Next, we'll define our `Steps`!

Remember: A `Step` must be triggered by one or more `Events`, and it must emit an `Event`.

To get started with our Workflow, we'll need to define a Workflow class.

Let's do that!

### Workflow Class

First things first, we need to create a new class that subclasses `Workflow`.

```python
class CorrectiveRAGWorkflow(Workflow):
```

Each step, now, is a method (decorated by the `@step` decorator) which will take an `Event` and `Context` as input.

#### An Aside on Context:

`Context`, in workflows, is analagous to `State` in frameworks like LangGraph.

It's a way to provide information to multiple `Steps`, without needing to constantly carry forward information in each `Event`.



### Step 1: Ingest Pipeline

The first `Step` we're going to create is the ingestion step:

```python
@step
async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
    """Ingest step (for ingesting docs and initializing index)."""
    documents: list[Document] | None = ev.get("documents")

    if documents is None:
        return None

    index = VectorStoreIndex.from_documents(documents)

    return StopEvent(result=index)
```

This `Step` simply creates and returns a `VectorStoreIndex` from our provided documents.

Notice how we're addresing our `Event` through `ev.get` - this is how we collect results from our `Events` (and `Context`).

### Step 2:

Next, we need to create a number of pipelines that can be leveraged by our downstream steps.

We're going to store a number of these in our `Context` so we don't need to track them through our `Events`.

```python
@step
async def prepare_for_retrieval(
    self, ctx: Context, ev: StartEvent
) -> PrepEvent | None:
    """Prepare for retrieval."""

    query_str: str | None = ev.get("query_str")
    retriever_kwargs: dict | None = ev.get("retriever_kwargs", {})

    if query_str is None:
        return None

    tavily_ai_apikey: str | None = ev.get("tavily_ai_apikey")
    index = ev.get("index")

    llm = OpenAI(model="gpt-4o-mini")
    await ctx.set("relevancy_pipeline", QueryPipeline(
        chain=[DEFAULT_RELEVANCY_PROMPT_TEMPLATE, llm]
    ))
    await ctx.set("transform_query_pipeline", QueryPipeline(
        chain=[DEFAULT_TRANSFORM_QUERY_TEMPLATE, llm]
    ))

    await ctx.set("llm", llm)
    await ctx.set("index", index)
    await ctx.set("tavily_tool", TavilyToolSpec(api_key=tavily_ai_apikey))

    await ctx.set("query_str", query_str)
    await ctx.set("retriever_kwargs", retriever_kwargs)

    return PrepEvent()
```

As you can see, we're basically using the `Step` to fill up our `Context` with all the relevant tools needed to complete the pipeline.

### Step 3: Retrieving Context

Next, we build a `Step` that retieves context from our `VectorStoreIndex`.

```python
@step
async def retrieve(
    self, ctx: Context, ev: PrepEvent
) -> RetrieveEvent | None:
    """Retrieve the relevant nodes for the query."""
    query_str = await ctx.get("query_str")
    retriever_kwargs = await ctx.get("retriever_kwargs")

    if query_str is None:
        return None

    index = await ctx.get("index", default=None)
    tavily_tool = await ctx.get("tavily_tool", default=None)
    if not (index or tavily_tool):
        raise ValueError(
            "Index and tavily tool must be constructed. Run with 'documents' and 'tavily_ai_apikey' params first."
        )

    retriever: BaseRetriever = index.as_retriever(
        **retriever_kwargs
    )
    result = retriever.retrieve(query_str)
    await ctx.set("retrieved_nodes", result)
    await ctx.set("query_str", query_str)
    return RetrieveEvent(retrieved_nodes=result)
```



### Step 4:

Next, we'll create a `Step` that we can use to evaluate our retrieved documents.

```python
@step
async def eval_relevance(
    self, ctx: Context, ev: RetrieveEvent
) -> RelevanceEvalEvent:
    """Evaluate relevancy of retrieved documents with the query."""
    retrieved_nodes = ev.retrieved_nodes
    query_str = await ctx.get("query_str")

    relevancy_results = []
    for node in retrieved_nodes:
        relevancy_pipeline = await ctx.get("relevancy_pipeline")
        relevancy = relevancy_pipeline.run(
            context_str=node.text, query_str=query_str
        )
        relevancy_results.append(relevancy.message.content.lower().strip())

    await ctx.set("relevancy_results", relevancy_results)
    return RelevanceEvalEvent(relevant_results=relevancy_results)
```

### Step 5: Extract Relevant Texts

Next, we'll extract the contexts that we scored as relevant!

```python
@step
async def extract_relevant_texts(
    self, ctx: Context, ev: RelevanceEvalEvent
) -> TextExtractEvent:
    """Extract relevant texts from retrieved documents."""
    retrieved_nodes = await ctx.get("retrieved_nodes")
    relevancy_results = ev.relevant_results

    relevant_texts = [
        retrieved_nodes[i].text
        for i, result in enumerate(relevancy_results)
        if result == "yes"
    ]

    result = "\n".join(relevant_texts)
    return TextExtractEvent(relevant_text=result)
```

### Step 6: Transform Query

Next, we'll (if there are any non-relevant contexts) transform our user's query and query our external search tool.

```python
    @step
    async def transform_query_pipeline(
        self, ctx: Context, ev: TextExtractEvent
    ) -> QueryEvent:
        """Search the transformed query with Tavily API."""
        relevant_text = ev.relevant_text
        relevancy_results = await ctx.get("relevancy_results")
        query_str = await ctx.get("query_str")

        # If any document is found irrelevant, transform the query string for better search results.
        if "no" in relevancy_results:
            qp = await ctx.get("transform_query_pipeline")
            transformed_query_str = (qp.run(query_str=query_str).message.content)
            # Conduct a search with the transformed query string and collect the results.
            search_tool = await ctx.get("tavily_tool")
            search_results = search_tool.search(
                transformed_query_str, max_results=5
            )
            search_text = "\n".join([result.text for result in search_results])
        else:
            search_text = ""

        return QueryEvent(relevant_text=relevant_text, search_text=search_text)
```

### Step 7:

All that's left to do is fire off the final query + context package to our LLM for a response!

```python
@step
async def query_result(self, ctx: Context, ev: QueryEvent) -> StopEvent:
    """Get result with relevant text."""
    relevant_text = ev.relevant_text
    search_text = ev.search_text
    query_str = await ctx.get("query_str")

    documents = [Document(text=relevant_text + "\n" + search_text)]
    index = SummaryIndex.from_documents(documents)
    query_engine = index.as_query_engine()
    result = query_engine.query(query_str)
    return StopEvent(result=result)
```

#### CODE FOR WORKFLOW:

Execute this cell to construct the overall `CorrectiveRAGWorkflow`!

In [None]:
from llama_index.core.workflow import (
    Workflow,
    step,
    Context,
    StartEvent,
    StopEvent,
)
from llama_index.core import (
    VectorStoreIndex,
    Document,
    SummaryIndex,
)
from llama_index.core.query_pipeline import QueryPipeline
from llama_index.llms.openai import OpenAI
from llama_index.tools.tavily_research.base import TavilyToolSpec
from llama_index.core.base.base_retriever import BaseRetriever

class CorrectiveRAGWorkflow(Workflow):
    @step
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Ingest step (for ingesting docs and initializing index)."""
        documents: list[Document] | None = ev.get("documents")

        if documents is None:
            return None

        index = VectorStoreIndex.from_documents(documents)

        return StopEvent(result=index)

    @step
    async def prepare_for_retrieval(
        self, ctx: Context, ev: StartEvent
    ) -> PrepEvent | None:
        """Prepare for retrieval."""

        query_str: str | None = ev.get("query_str")
        retriever_kwargs: dict | None = ev.get("retriever_kwargs", {})

        if query_str is None:
            return None

        tavily_ai_apikey: str | None = ev.get("tavily_ai_apikey")
        index = ev.get("index")

        llm = OpenAI(model="gpt-4o-mini")
        await ctx.set("relevancy_pipeline", QueryPipeline(
            chain=[DEFAULT_RELEVANCY_PROMPT_TEMPLATE, llm]
        ))
        await ctx.set("transform_query_pipeline", QueryPipeline(
            chain=[DEFAULT_TRANSFORM_QUERY_TEMPLATE, llm]
        ))

        await ctx.set("llm", llm)
        await ctx.set("index", index)
        await ctx.set("tavily_tool", TavilyToolSpec(api_key=tavily_ai_apikey))

        await ctx.set("query_str", query_str)
        await ctx.set("retriever_kwargs", retriever_kwargs)

        return PrepEvent()

    @step
    async def retrieve(
        self, ctx: Context, ev: PrepEvent
    ) -> RetrieveEvent | None:
        """Retrieve the relevant nodes for the query."""
        query_str = await ctx.get("query_str")
        retriever_kwargs = await ctx.get("retriever_kwargs")

        if query_str is None:
            return None

        index = await ctx.get("index", default=None)
        tavily_tool = await ctx.get("tavily_tool", default=None)
        if not (index or tavily_tool):
            raise ValueError(
                "Index and tavily tool must be constructed. Run with 'documents' and 'tavily_ai_apikey' params first."
            )

        retriever: BaseRetriever = index.as_retriever(
            **retriever_kwargs
        )
        result = retriever.retrieve(query_str)
        await ctx.set("retrieved_nodes", result)
        await ctx.set("query_str", query_str)
        return RetrieveEvent(retrieved_nodes=result)

    @step
    async def eval_relevance(
        self, ctx: Context, ev: RetrieveEvent
    ) -> RelevanceEvalEvent:
        """Evaluate relevancy of retrieved documents with the query."""
        retrieved_nodes = ev.retrieved_nodes
        query_str = await ctx.get("query_str")

        relevancy_results = []
        for node in retrieved_nodes:
            relevancy_pipeline = await ctx.get("relevancy_pipeline")
            relevancy = relevancy_pipeline.run(
                context_str=node.text, query_str=query_str
            )
            relevancy_results.append(relevancy.message.content.lower().strip())

        await ctx.set("relevancy_results", relevancy_results)
        return RelevanceEvalEvent(relevant_results=relevancy_results)

    @step
    async def extract_relevant_texts(
        self, ctx: Context, ev: RelevanceEvalEvent
    ) -> TextExtractEvent:
        """Extract relevant texts from retrieved documents."""
        retrieved_nodes = await ctx.get("retrieved_nodes")
        relevancy_results = ev.relevant_results

        relevant_texts = [
            retrieved_nodes[i].text
            for i, result in enumerate(relevancy_results)
            if result == "yes"
        ]

        result = "\n".join(relevant_texts)
        return TextExtractEvent(relevant_text=result)

    @step
    async def transform_query_pipeline(
        self, ctx: Context, ev: TextExtractEvent
    ) -> QueryEvent:
        """Search the transformed query with Tavily API."""
        relevant_text = ev.relevant_text
        relevancy_results = await ctx.get("relevancy_results")
        query_str = await ctx.get("query_str")

        # If any document is found irrelevant, transform the query string for better search results.
        if "no" in relevancy_results:
            qp = await ctx.get("transform_query_pipeline")
            transformed_query_str = (qp.run(query_str=query_str).message.content)
            # Conduct a search with the transformed query string and collect the results.
            search_tool = await ctx.get("tavily_tool")
            search_results = search_tool.search(
                transformed_query_str, max_results=5
            )
            search_text = "\n".join([result.text for result in search_results])
        else:
            search_text = ""

        return QueryEvent(relevant_text=relevant_text, search_text=search_text)

    @step
    async def query_result(self, ctx: Context, ev: QueryEvent) -> StopEvent:
        """Get result with relevant text."""
        relevant_text = ev.relevant_text
        search_text = ev.search_text
        query_str = await ctx.get("query_str")

        documents = [Document(text=relevant_text + "\n" + search_text)]
        index = SummaryIndex.from_documents(documents)
        query_engine = index.as_query_engine()
        result = query_engine.query(query_str)
        return StopEvent(result=result)

## "Graphing" our Workflow

Since we have `Steps` that take `Events` and return `Events` - we can trace through all possible paths and wind up with a graph!

In [None]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(
    CorrectiveRAGWorkflow, filename="crag_workflow.html"
)

crag_workflow.html


## Using our Workflow

First, we need to set-up our documents, then initialize our Index!

In [None]:
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader("./data").load_data()
workflow = CorrectiveRAGWorkflow()
index = await workflow.run(documents=documents)

Now we're ready to query our Workflow!

In [None]:
from IPython.display import Markdown, display

response = await workflow.run(
    query_str="How many parameters make a 'Large Language Model'?",
    index=index,
    tavily_ai_apikey=os.environ["TAVILY_API_KEY"],
)
display(Markdown(str(response)))

A Large Language Model typically has at least a billion parameters.

In [None]:
response = await workflow.run(
    query_str="Why does the EU want to regulate AI?",
    index=index,
    tavily_ai_apikey=os.environ["TAVILY_API_KEY"],
)
display(Markdown(str(response)))

The EU wants to regulate AI to improve the functioning of the internal market, promote the uptake of human-centric and trustworthy artificial intelligence, ensure a high level of protection of health, safety, fundamental rights, democracy, the rule of law, and environmental protection, protect against the harmful effects of AI systems, support innovation, prevent fragmentation of the internal market, ensure legal certainty for operators, and facilitate the free circulation, innovation, deployment, and uptake of AI systems within the internal market.

In [None]:
response = await workflow.run(
    query_str="Who owns the IP when it comes to data used to train models?",
    index=index,
    tavily_ai_apikey=os.environ["TAVILY_API_KEY"],
)
display(Markdown(str(response)))

Content creators are recognized as the owners of the intellectual property when it comes to the data used to train models.

In [None]:
response = await workflow.run(
    query_str="What are the points of contention between the EU and the US?",
    index=index,
    tavily_ai_apikey=os.environ["TAVILY_API_KEY"],
)
display(Markdown(str(response)))

The points of contention between the EU and the US include differing views on European integration, with the US under the current president being seen as undermining rather than encouraging it. Additionally, there is disagreement on the perception of the European Union as a threat versus an ally, as well as injecting conditionality and uncertainty into NATO. Domestic debates in both the US and the EU regarding their respective roles in the world also highlight the fragility of unity between the transatlantic partners.

In [None]:
response = await workflow.run(
    query_str="What AI positions do the EU and US agree on?",
    index=index,
    tavily_ai_apikey=os.environ["TAVILY_API_KEY"],
)
display(Markdown(str(response)))

The EU and US agree on a risk-based approach to artificial intelligence (AI), prioritizing human safety, ensuring transparency, and supporting safe and trustworthy AI. They also share common fair competition principles on foundational AI models.

> NOTE: This example is adapted from the LlamaIndex [Corrective RAG Workflow Example](https://github.com/run-llama/llama_index/blob/main/docs/docs/examples/workflow/corrective_rag_pack.ipynb).