## This is the tutorial from the llama Index official docs

#### The defualt Global Setting

In [4]:
from llama_index.core import Settings
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

Settings.llm = Ollama(model="llama3.2", request_timeout=60.0)
Settings.embed_model = HuggingFaceEmbedding(
    model_name="BAAI/bge-small-en-v1.5"
) # loads BAAI/bge-small-en-v1.5 as the default embedding model instead of Open-AI

  from .autonotebook import tqdm as notebook_tqdm


#### 1. Ingestion pipeline typically consists of three main stages:

1. Load the data
2. Transform the data
3. Index and store the data

### 1.1 Loading using SimpleDirectoryReader 

In [48]:
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader("./data").load_data()

### 1.2 Transform the data 

Transformations include chunking, extracting metadata, and embedding each chunk.

This is necessary to make sure that the data can be retrieved, and used optimally by the LLM.

#### High-Level Transformation API

In [49]:
from llama_index.core import VectorStoreIndex

vector_index = VectorStoreIndex.from_documents(documents)
vector_index.as_query_engine()  # uses BAAI/bge-small-en-v1.5 or whatever is set on the settings default

<llama_index.core.query_engine.retriever_query_engine.RetrieverQueryEngine at 0x7fcb309f37f0>

#### Lower-Level Transformation API

##### Splitting Your Documents into Nodes

In [50]:
from llama_index.core import SimpleDirectoryReader
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import TokenTextSplitter

documents = SimpleDirectoryReader("./data").load_data()

# https://docs.llamaindex.ai/en/stable/module_guides/loading/node_parsers/modules/#tokentextsplitter
pipeline = IngestionPipeline(transformations=[TokenTextSplitter()])

nodes = pipeline.run(documents=documents)

nodes  # If you see the text section of each node it is splitting it per page 

[TextNode(id_='02444a83-52e0-4492-a5cb-287003706867', embedding=None, metadata={'page_label': '1', 'file_name': '2023_canadian_budget.pdf', 'file_path': '/home/brooks/rag_llama_index/rag_llama_tutorial/data/2023_canadian_budget.pdf', 'file_type': 'application/pdf', 'file_size': 376126, 'creation_date': '2024-11-05', 'last_modified_date': '2024-11-05'}, excluded_embed_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], excluded_llm_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], relationships={<NodeRelationship.SOURCE: '1'>: RelatedNodeInfo(node_id='77e7197a-01a2-4d96-b6f0-553196f9a04b', node_type=<ObjectType.DOCUMENT: '4'>, metadata={'page_label': '1', 'file_name': '2023_canadian_budget.pdf', 'file_path': '/home/brooks/rag_llama_index/rag_llama_tutorial/data/2023_canadian_budget.pdf', 'file_type': 'application/pdf', 'file_size': 376126, 'creation_date': '2

##### Adding Metadata

In [51]:
nodes[0].metadata

### OUTPUTS 
metadata={
    'page_label': '1',
     'file_name': '2023_canadian_budget.pdf',
     'file_path': '/home/brooks/rag_llama_index/rag_llama_tutorial/data/2023_canadian_budget.pdf',
     'file_type': 'application/pdf',
     'file_size': 376126,
     'creation_date': '2024-11-05',
     'last_modified_date': '2024-11-05'
     }

In [None]:
import asyncio # This is an asyncio case

### These are the MetaData Extractors 
from llama_index.core.extractors import (
    TitleExtractor,
    QuestionsAnsweredExtractor,
)
from llama_index.core.node_parser import TokenTextSplitter

text_splitter = TokenTextSplitter(
    separator=" ", chunk_size=512, chunk_overlap=128
)
title_extractor = TitleExtractor(nodes=4)
qa_extractor = QuestionsAnsweredExtractor(questions=3)

from llama_index.core.ingestion import IngestionPipeline

pipeline = IngestionPipeline(
    transformations=[text_splitter, title_extractor, qa_extractor]
)

# Use 'await' to run the asynchronous 'arun' method
nodes = await pipeline.arun(
    documents=documents,
    in_place=True,
    show_progress=True,
)

nodes

Parsing nodes: 100%|██████████| 4/4 [00:00<00:00, 189.83it/s]
100%|██████████| 2/2 [00:07<00:00,  3.95s/it]
100%|██████████| 1/1 [00:02<00:00,  2.19s/it]


In [None]:
nodes[1].metadata  # Includes the document_title and question_this_excerpt_can_answer

{'page_label': '1',
 'file_name': '2023_canadian_budget.pdf',
 'file_path': '/home/brooks/rag_llama_index/rag_llama_tutorial/data/2023_canadian_budget.pdf',
 'file_type': 'application/pdf',
 'file_size': 376126,
 'creation_date': '2024-11-05',
 'last_modified_date': '2024-11-05',
 'document_title': 'Based on the provided context, I would suggest the following comprehensive title:\n\n"Key Components and Measures of the 2023 Canadian Federal Budget"\n\nThis title accurately captures the essence of the context, highlighting the unique entities (e.g., budget, government, Prime Minister), themes (e.g., policy objectives, investments in green growth, dental care program), and other relevant information. It is clear, concise, and informative, making it suitable for a document that aims to provide an overview of the 2023 Canadian Federal Budget.',
 'questions_this_excerpt_can_answer': 'Based on the provided context, here are three potential questions with specific answers that are unlikely to 

##### Adding Embeddings

In [None]:
embedding_test = Settings.embed_model.get_text_embedding(
    "It is raining cats and dogs here!"
)
# https://huggingface.co/BAAI/bge-small-en-v1.5   it is 384 dimensions here 
len(embedding_test)

384

##### 1.3 Embedding Stored in VectorStoreIndex using pipeline

##### Qdrant Vector Store Implementation

In [None]:
# Make sure all the dependencies have been installed

In [None]:
import asyncio # This is an asyncio case

### These are the MetaData Extractors 
from llama_index.core.extractors import (
    TitleExtractor,
    QuestionsAnsweredExtractor,
)
# Text splitter for the document 
from llama_index.core.node_parser import TokenTextSplitter
# The Ingestion pipeline 
from llama_index.core.ingestion import IngestionPipeline

# Vector store to store our low level created embeddings 
from llama_index.vector_stores.qdrant import QdrantVectorStore

import qdrant_client 

from llama_index.core import VectorStoreIndex

# you can use :memory: mode for fast and light-weight experiments, No deployment req.
client = qdrant_client.QdrantClient(location=":memory:")
vector_qdrant_store = QdrantVectorStore(client=client, collection_name="test_store")

text_splitter = TokenTextSplitter(
    separator=" ", chunk_size=512, chunk_overlap=128
)
title_extractor = TitleExtractor(nodes=4)
qa_extractor = QuestionsAnsweredExtractor(questions=3)



pipeline = IngestionPipeline(
    name="ETL and Qdrant Store pipeline",
    transformations=[text_splitter, title_extractor, qa_extractor],
    vector_store=vector_qdrant_store
)

# Use 'await' to run the asynchronous 'arun' method
low_level_nodes = await pipeline.arun(
    documents=documents,
    in_place=True,
    show_progress=True,
)

qdrant_stored_index = VectorStoreIndex.from_vector_store(vector_qdrant_store)

Parsing nodes: 100%|██████████| 4/4 [00:00<00:00, 189.20it/s]
100%|██████████| 2/2 [00:02<00:00,  1.15s/it]
100%|██████████| 1/1 [00:01<00:00,  1.46s/it]
100%|██████████| 3/3 [00:03<00:00,  1.07s/it]
100%|██████████| 2/2 [00:02<00:00,  1.49s/it]
100%|██████████| 8/8 [00:19<00:00,  2.42s/it]


##### View the Indices stored in Low-level API custom stored

In [None]:
# Print the Nodes Directly
for node in low_level_nodes:
    print(node.id_)

777fa0de-3357-4759-8a79-b2699ab789eb
334f722a-fa7f-44e9-b8ab-0cecf2f62166
b27e25df-bc1b-4de1-906b-16bc744cbf60
ed4dec14-e665-4f9b-af28-0d12904c006f
fa9ea975-db7a-49db-b011-d7d972eb46a1
aa18f65d-a700-4a33-a926-b320173000a1
0f4d7d4a-1450-4426-ac01-2401590d3504
6197071e-0d55-467c-921f-87e74e32f193


In [None]:
print(qdrant_stored_index._get_node_with_embedding(low_level_nodes)[0])  # Displaying only the first index (you can see the node id above as well)

Node ID: 777fa0de-3357-4759-8a79-b2699ab789eb
Text: ‹ 2022 2024›2023 budget of the Canadian federal government
Submitted 28 March 2023 Presented 28 March 2023 Parliament 44th Party
Liberal Finance ministerChrystia Freeland Total revenue$456.8 billion
(projected) Total expenditures$496.9 billion (projected) Deﬁcit $40.1
billion (projected)[ 1 ] GDP TBA Website 2023 Budget (http
s://www.budget.can ...


##### See what has been by the high level Transformation API   

In [None]:
# Print the Nodes Directly
for node in nodes:
    print(node.id_)

6e9143f6-288e-449b-99fc-bfed45757dae
6d57812b-c448-44ae-a6b7-a29d47483251
9bd1528a-a1bd-4a16-87f4-6b21e9f96abc
072dd4bb-f8fb-4c66-9c7b-ba4305e7cc0d
82210f6a-132d-47eb-b32d-bb3d3bd48c9c
b1d85874-f0e1-45be-93ef-cd389445cba6
13ad8f3d-d0ee-4dc9-946f-25749c9d470b
1eb9f8e6-e44a-4092-baee-53cdf46a4ceb


In [None]:
print(vector_index._get_node_with_embedding(nodes)[0])

Node ID: 6e9143f6-288e-449b-99fc-bfed45757dae
Text: ‹ 2022 2024›2023 budget of the Canadian federal government
Submitted 28 March 2023 Presented 28 March 2023 Parliament 44th Party
Liberal Finance ministerChrystia Freeland Total revenue$456.8 billion
(projected) Total expenditures$496.9 billion (projected) Deﬁcit $40.1
billion (projected)[ 1 ] GDP TBA Website 2023 Budget (http
s://www.budget.can ...


#### 1.3.1 Querying 

##### Create a Simple QueryEngine using the index

In [None]:
high_query_engine = vector_index.as_query_engine()
response = high_query_engine.query("What is the estimated total amount of net new spending in the 2023 Canadian Federal Budget over a six-year period?") 
print(response)

$43 billion.


#### 1.3.2 Store with Storage Context  

In [None]:
import chromadb
from llama_index.core import VectorStoreIndex , SimpleDirectoryReader
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext

# load some documents
documents = SimpleDirectoryReader("./data").load_data()

# initialize client, setting path to save data
db = chromadb.PersistentClient(path="./db/chroma_db_new")

# create collection
chroma_collection = db.get_or_create_collection("test_store")

# assign chroma as the vector_store to the context
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

# create your index
index = VectorStoreIndex.from_documents(
    documents, storage_context=storage_context
)

# create a query engine and query
chroma_query_engine = index.as_query_engine()
response = chroma_query_engine.query("What is the 2023 Canadian Federal Budget?")
print(response)

#### 2. Building an agent 

##### Create a basic tool

In [None]:
from llama_index.core.tools import FunctionTool

def multiply(a: float, b: float) -> float:
    """Multiply two numbers and returns the product"""
    return a * b


multiply_tool = FunctionTool.from_defaults(fn=multiply)


def add(a: float, b: float) -> float:
    """Add two numbers and returns the sum"""
    return a + b


add_tool = FunctionTool.from_defaults(fn=add)

In [None]:
# Test 
multiply(4,3)

12

##### Initialize the LLM

In [None]:
llm = Settings.llm  # We are using the default which is llama3.2

##### Initialize an existing small yet powerful ReAct Agent 

In [None]:
from llama_index.core.agent import ReActAgent
agent = ReActAgent.from_tools([multiply_tool, add_tool], llm=llm, verbose=True)

##### Ask a Question for the Agent to respond 

In [None]:
response = agent.chat("What is 20+(2*4)? Use a tool to calculate every step.")

> Running step d6a9093f-0737-4cbc-a2ab-2a35501cf516. Step input: What is 20+(2*4)? Use a tool to calculate every step.
[1;3;38;5;200mThought: The current language of the user is: English. I need to use a tool to help me answer the question.
Action: multiply
Action Input: {'a': 2, 'b': 4}
[0m[1;3;34mObservation: 8
[0m> Running step ce248c4d-17a4-48b0-8e81-0c5273a20fd5. Step input: None
[1;3;38;5;200mThought: The current expression after multiplication is now: 20 + 8. I need to use another tool to add these numbers together.
Action: add
Action Input: {'input': 20, 'a': 8}
[0m[1;3;34mObservation: Error: add() got an unexpected keyword argument 'input'
[0m> Running step c37d670c-69f5-414f-be50-9c80957cdee9. Step input: None
[1;3;38;5;200mThought: The current language of the user is still English. It seems like the tool I used to add numbers together requires only two arguments, but I provided a third one named "input". I need to adjust my input to use the correct keywords.
Action:

#### Adding RAG to the Agent

##### Using the High level API to create the query Engine Tool

In [None]:
from llama_index.core.tools import QueryEngineTool
high_budget_tool = QueryEngineTool.from_defaults(
    high_query_engine,
    name="high_canadian_budget_2023",
    description="A RAG engine with some basic facts about the 2023 Canadian federal budget using High Level API",
)

##### Pass all the methods to the ReAct Agent

In [None]:
high_react_agent = ReActAgent.from_tools(
    [multiply_tool, add_tool, high_budget_tool], verbose=True
)

In [None]:
response = high_react_agent.chat(
    "What is the total amount of the 2023 Canadian federal budget multiplied by 3? Go step by step, using a tool to do any math."
)

print(response)

> Running step b6d0afea-6cd2-4231-a50f-88ef0f149349. Step input: What is the total amount of the 2023 Canadian federal budget multiplied by 3? Go step by step, using a tool to do any math.
[1;3;38;5;200mThought: The user wants me to perform some calculations on the 2023 Canadian federal budget, so I need to use a tool to get the details of the budget first.
Action: high_canadian_budget_2023
Action Input: {'input': '2023 Canadian federal budget', 'num_beams': 5}
[0m[1;3;34mObservation: The 2023 Canadian federal budget was presented by Finance Minister Chrystia Freeland on March 28, 2023, with a projected deficit of $40.1 billion. The budget aimed to make life more affordable for Canadians while reducing government expenditures, following Prime Minister Justin Trudeau's policy objective.
[0m> Running step be342bdb-5740-4ddc-ab36-096351704cd3. Step input: None
[1;3;38;5;200mThought: Now that I have the details of the Canadian federal budget, I need to find out what the total amount i

#### Using the low level API 


We need to create the query engine from the index first (note: all indexes can create query engine)
We will create from the in-memory qdrant for the low level nodes. 

In [None]:
low_qdrant_query_engine = qdrant_stored_index.as_query_engine()

In [None]:
# Create the Query engine Tool as part of the toolkit 
low_api_budget_tool = QueryEngineTool.from_defaults(
    low_qdrant_query_engine,
    name="low_api_canadian_budget_2023",
    description="A RAG engine with some basic facts about the 2023 Canadian federal budget using low Level API / custom MetaData Extractors",
)

In [None]:
low_react_agent = ReActAgent.from_tools(
    [multiply_tool, add_tool, low_api_budget_tool], verbose=True
)

In [None]:
response = low_react_agent.chat(
    "What is the total amount of the 2023 Canadian federal budget multiplied by 3? Go step by step, using a tool to do any math."
)

print(response)

> Running step 6e1c0423-0a25-4bc5-89f1-44d1a52ebd10. Step input: What is the total amount of the 2023 Canadian federal budget multiplied by 3? Go step by step, using a tool to do any math.
[1;3;38;5;200mThought: The user wants to perform an operation on the output of the low_api_canadian_budget_2023 tool and multiply it by 3.
Action: add
Action Input: {'a': 1.0, 'b': 1.0}
[0m[1;3;34mObservation: 2.0
[0m> Running step d0e7ab0f-5c03-44d7-8063-832b2ba10129. Step input: None
[1;3;38;5;200mThought: The user provided an observation but no further information was given. We need to get the output of low_api_canadian_budget_2023 first.
Action: low_api_canadian_budget_2023
Action Input: {'input': 'The total amount of the Canadian federal budget is $2.0 billion.'}
[0m[1;3;34mObservation: Error: Collection test_store not found
[0m> Running step 96c34414-1e4e-4b1d-ba63-63e75ef06580. Step input: None
[1;3;38;5;200mThought: The low_api_canadian_budget_2023 tool failed to execute correctly an

**The above fails to find the qdrant in-memory call to the test_store collection.**

##### Let us attempt using the ChromaDB

In [None]:
# use the previously created chroma_query_engine from the pipeline 
# Create the Query engine Tool as part of the toolkit 
chromadb_budget_tool = QueryEngineTool.from_defaults(
    chroma_query_engine,
    name="chromadb_canadian_budget_2023",
    description="A RAG engine with some basic facts about the 2023 Canadian federal budget using chromaDB",
)

In [None]:
chromadb_react_agent = ReActAgent.from_tools(
    [multiply_tool, add_tool, chromadb_budget_tool], verbose=True
)

##### Needed a couple more runs but managed to find out the correct results

In [None]:
response = chromadb_react_agent.chat(
    "What is the total amount of the 2023 Canadian federal budget multiplied by 3? Go step by step, using a tool to do any math."
)

print(response)

> Running step 67d31c0a-c56a-4ecb-9bba-deb0035d1b3b. Step input: What is the total amount of the 2023 Canadian federal budget multiplied by 3? Go step by step, using a tool to do any math.
[1;3;38;5;200mThought: The current language of the user is: English. I need to use a tool to help me answer the question.
Action: multiply
Action Input: {}
[0m[1;3;34mObservation: Error: multiply() missing 2 required positional arguments: 'a' and 'b'
[0m> Running step 4dd88a98-de82-4d06-b887-c022d66dd12f. Step input: None
[1;3;38;5;200mThought: The error indicates that the multiply tool requires two numbers as input. Since we don't have the actual budget amount, I'll need to use another tool to get that information.
Action: chromadb_canadian_budget_2023
Action Input: {'input': 'total cost of 2023 Canadian federal budget'}
[0m[1;3;34mObservation: The total expenditures in the 2023 Canadian federal budget were $496.9 billion (projected).
[0m> Running step 234ad51c-471e-4be5-9ac3-fe19b219c09d. S

##### Enhancing with LlamaParse

##### Parse and query the file using LlamaParse 

In [None]:
import os
import nest_asyncio
from dotenv import load_dotenv
from llama_parse import LlamaParse
from llama_index.core import VectorStoreIndex

# allow nested event loops 
nest_asyncio.apply()

load_dotenv()
llama_parse_api_key = os.getenv("LLAMA_CLOUD_API_KEY")

parsed_documents = LlamaParse(result_type="markdown", api_key=llama_parse_api_key , show_progress=True).load_data(
    "./data/2023_canadian_budget.pdf"
)
parsed_index = VectorStoreIndex.from_documents(parsed_documents)
parsed_query_engine = parsed_index.as_query_engine()

response = parsed_query_engine.query(
    "How much exactly was allocated to a tax credit to promote investment in green technologies in the 2023 Canadian federal budget?"
)
print(response)

Started parsing the file under job_id 4ed81201-9394-4f96-80d7-56285818c7d0
$20 billion was allocated over six years.


##### Comparing the difference with the ChromaDB it is not a significant improvement ? 

In [None]:
import chromadb
from llama_index.core import VectorStoreIndex
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import StorageContext 

# Initialize client, setting path to save data
db = chromadb.PersistentClient(path="./db/chroma_db")
chroma_collection = db.get_collection("quickstart")

# Assign Chroma as the vector store to the context
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
chroma_query_engine = index.as_query_engine()
response = chroma_query_engine.query("How much exactly was allocated to a tax credit to promote investment in green technologies in the 2023 Canadian federal budget?")
print(response)

$20 billion over six years.


##### Using an existing tool from LlamaHub to fetch stock data

In [None]:
! pip install llama-index-tools-yahoo-finance

In [None]:
from llama_index.core.agent import ReActAgent  
from llama_index.tools.yahoo_finance import YahooFinanceToolSpec

# I can also build my own web scraper tool here
# returns a list of tool names from the imported package
finance_tools = YahooFinanceToolSpec().to_tool_list() # [stock_basic_info, ...]
finance_tools.extend([multiply_tool, add_tool])

agent = ReActAgent.from_tools(finance_tools, verbose=True)

response = agent.chat("What is the current price of NVDA?")

print(response)

> Running step 47951218-7828-43c8-a4d8-64452ab67580. Step input: What is the current price of NVDA?
[1;3;38;5;200mThought: The current language of the user is: English. I need to use a tool to help me answer the question.
Action: stock_basic_info
Action Input: {'ticker': 'NVDA'}
[0m[1;3;34mObservation: Info: 
{'address1': '2788 San Tomas Expressway', 'city': 'Santa Clara', 'state': 'CA', 'zip': '95051', 'country': 'United States', 'phone': '408 486 2000', 'website': 'https://www.nvidia.com', 'industry': 'Semiconductors', 'industryKey': 'semiconductors', 'industryDisp': 'Semiconductors', 'sector': 'Technology', 'sectorKey': 'technology', 'sectorDisp': 'Technology', 'longBusinessSummary': "NVIDIA Corporation provides graphics and compute and networking solutions in the United States, Taiwan, China, Hong Kong, and internationally. The Graphics segment offers GeForce GPUs for gaming and PCs, the GeForce NOW game streaming service and related infrastructure, and solutions for gaming plat

#### Building basic Workflow

##### Single-step workflow

In [3]:
from llama_index.core.workflow import (
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Event,
    Context,
)
class MyWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        return StopEvent(result="Hello, world!")


w = MyWorkflow(timeout=10, verbose=False)
result = await w.run()
print(result)

Hello, world!


##### Visualizing Workflow

In [None]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(MyWorkflow, filename="basic_workflow.html")

<class 'NoneType'>
<class 'llama_index.core.workflow.events.StopEvent'>
basic_workflow.html


##### Multistep workflow 

In [None]:
# define two custom events 
class FirstEvent(Event):
    first_output: str
class SecondEvent(Event):
    second_output: str

# There is going to be a chain being passed for CoT type of approach
class MyMultiStepWorkFlow(Workflow):
    @step
    async def step_one(self, ev: StartEvent) -> FirstEvent:
        print(ev.first_input) 
        return FirstEvent(first_output="First step complete.")
    @step
    async def step_two(self, ev: FirstEvent) -> SecondEvent:
        print(ev.first_output)
        return SecondEvent(second_output="Second output completed!")
    @step 
    async def stop(self, ev: SecondEvent) -> StopEvent:
        print(ev.second_output)
        return StopEvent(result="Multistep Workflow completed.")

multi_workflow = MyMultiStepWorkFlow(timeout=10, verbose=False)
result = await multi_workflow.run(first_input= "Start the Workflow")
result

Start the Workflow
First step complete.
Second output completed!


'Multistep Workflow completed.'

In [None]:
draw_all_possible_flows(MyMultiStepWorkFlow, filename="multi_step_workflow.html")

<class 'NoneType'>
<class '__main__.FirstEvent'>
<class '__main__.SecondEvent'>
<class 'llama_index.core.workflow.events.StopEvent'>
multi_step_workflow.html


#### Loops in workflows

In [None]:
import random

class LoopEvent(Event): 
    loop_output: str

class MyLoopWorkFlow(Workflow):
    @step
    async def step_one(self, ev: StartEvent | LoopEvent ) -> FirstEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print(f"stuck in loop")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Exited the loop")
            return FirstEvent(first_output="First step complete.")
    @step
    async def final_step(self, ev: FirstEvent) -> StopEvent:
        print(ev.first_output)
        return StopEvent(result="workflow completed.")

loop_workflow = MyLoopWorkFlow(timeout=10, verbose=False)
result = await loop_workflow.run(first_input= "Start the Workflow")
result

stuck in loop
Exited the loop
First step complete.


'workflow completed.'

In [None]:
##### Visualizing the above loop
draw_all_possible_flows(MyLoopWorkFlow, filename="loop_workflow.html")

<class 'NoneType'>
<class 'llama_index.core.workflow.events.StopEvent'>
<class '__main__.FirstEvent'>
<class '__main__.LoopEvent'>
loop_workflow.html


#### Maintaining state using  Context

In [None]:
class SetupEvent(Event):
    query: str


class StepTwoEvent(Event):
    query: str


class StatefulFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> SetupEvent | StepTwoEvent:
        db = await ctx.get("some_database", default=None)
        if db is None:
            print("Need to load data")
            return SetupEvent(query=ev.query)

        # do something with the query
        return StepTwoEvent(query=ev.query)

    @step
    async def setup(self, ctx: Context, ev: SetupEvent) -> StartEvent:
        # load data
        await ctx.set("some_database", [1, 2, 3]) # key-value pair storage set
        return StartEvent(query=ev.query)
    @step
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        # do something with the data
        print("Data is ", await ctx.get("some_database"))
        data = await ctx.get("some_database") # retrieve using the key
        return StopEvent(result=data[1])


stateful_workflow = StatefulFlow(timeout=10, verbose=False)
result = await stateful_workflow.run(query="Some query")
print(result)

Need to load data
Data is  [1, 2, 3]
2


#### Streaming events

In [None]:
import asyncio

class FirstEvent(Event):
    first_output: str


class SecondEvent(Event):
    second_output: str
    response: str


class ProgressEvent(Event):
    msg: str

class StreamWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Saving user query in the context!"))
        await ctx.set("user_query_ctx", ev.user_query) 
        return FirstEvent(first_output="First step complete.")

    @step
    async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
        llm = Settings.llm # using the llama 3.2 from global

        user_request:str = await ctx.get("user_query_ctx") # get the user query here 
        generator = await llm.astream_complete(user_request) # since prompt should be of type str

        ## Extracted from the lib implementation for better understanding 
        ##  generator Yields:
            # CompletionResponse:
                # An async generator of CompletionResponse objects, each containing a new token of the response.
                # Which will be written to the output response
        async for response in generator:
            # Allow the workflow to stream this piece of response
            ctx.write_event_to_stream(ProgressEvent(msg=response.delta))
        return SecondEvent(
            second_output="Second step complete, full response attached",
            response=str(response),
        )

    @step
    async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
        return StopEvent(result="Workflow complete.")

async def main():
    w = StreamWorkflow(timeout=30, verbose=True)
    handler = w.run(user_query="Please give me the first 3 paragraphs of Moby Dick, a book in the public domain.")

    async for ev in handler.stream_events():
        if isinstance(ev, ProgressEvent):
            print(ev.msg)

    final_result = await handler
    print("Final result", final_result)

    draw_all_possible_flows(StreamWorkflow, filename="streaming_workflow.html")


if __name__ == "__main__":
    asyncio.run(main())

Running step step_one
Step step_one produced event FirstEvent
Running step step_two
Saving user query in the context!
Here
's
 an
 excerpt
 from
 the
 beginning
 of
 M
oby
 Dick
 by
 Herman
 Mel
ville
:


Call
 me
 Ish
ma
el
.


Some
 years
 ago
—
never
 mind
 how
 long
 precisely
—
having
 little
 or
 no
 money
 in
 my
 purse
,
 and
 nothing
 particular
 to
 interest
 me
 on
 shore
,
 I
 thought
 I
 would
 sail
 about
 a
 little
 and
 see
 the
 wat
ery
 part
 of
 the
 world
.
 It
 was
 a
 year
 after
 I
 had
 returned
 from
 the
 sea
,
 when
 I
 began
 to
 feel
 restless
;
 there
 was
 nothing
 new
 to
 do
,
 nothing
 to
 hear
,
 nothing
 to
 read
,
 nothing
 to
 think
 of
.
 Suff
iced
,
 however
,
 to
 go
 to
 bed
 in
 the
 afternoon
 when
 I
 would
.


I
 did
 not
 choose
 that
 place
.
 There
 was
 nothing
 particular
ly
 appealing
 about
 it
.
 I
 had
 simply
 picked
 it
 at
 random
.
 But
 it
 had
 the
 advantage
 of
 being
 cheap
;
 and
 in
 this
 world
 there
 are
 many
 things

#### Concurrent execution of workflows

##### The `send_event` and `collect_events` are the important ones since we'll want to wait for all your slow operations to complete before moving on to another step in concurrency. 

In [None]:
class StepTwoEvent(Event):
    query: str

class ParallelFlow(Workflow):
    @step
    async def start(self, ctx: Context, ev: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))

    @step(num_workers=4)
    async def step_two(self, ctx: Context, ev: StepTwoEvent) -> StopEvent:
        print("Running slow query ", ev.query)
        await asyncio.sleep(random.randint(1, 5))

        return StopEvent(result= f"{ev.query} completed first and stopped the workflow.")

parallel_workflow = ParallelFlow()
response = await parallel_workflow.run()
print(response) 

Running slow query  Query 1
Running slow query  Query 2
Running slow query  Query 3
Query 3 completed first and stopped the workflow.


In [12]:
class StepAEvent(Event):
    query: str

class StepBEvent(Event):
    query: str

class StepCEvent(Event):
    query: str

class StepACompleteEvent(Event):
    result: str

class StepBCompleteEvent(Event):
    result: str

class StepCCompleteEvent(Event):
    result: str

class ConcurrentFlow(Workflow):
    @step
    async def start(
        self, ctx: Context, ev: StartEvent
    ) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query 1"))
        ctx.send_event(StepBEvent(query="Query 2"))
        ctx.send_event(StepCEvent(query="Query 3"))

    @step
    async def step_a(self, ctx: Context, ev: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        await asyncio.sleep(random.randint(1, 5))
        return StepACompleteEvent(result=ev.query)

    @step
    async def step_b(self, ctx: Context, ev: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        await asyncio.sleep(random.randint(1, 5))
        return StepBCompleteEvent(result=ev.query)

    @step
    async def step_c(self, ctx: Context, ev: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        await asyncio.sleep(random.randint(1, 5))
        return StepCCompleteEvent(result=ev.query)

    @step
    async def step_three(
        self,
        ctx: Context,
        ev: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent,
    ) -> StopEvent:
        print("Received event ", ev.result)

        # wait until we receive 3 events
        if (
            ctx.collect_events(
                ev,
                [StepACompleteEvent, StepBCompleteEvent, StepCCompleteEvent]
            )
            is None
        ):
            return None

        # do something with all 3 results together
        return StopEvent(result="Done")
    
concurrent_workflow = ConcurrentFlow(timeout=15, verbose=False)
result = await concurrent_workflow.run()
print(result)

Doing something A-ish
Doing something B-ish
Doing something C-ish
Received event  Query 2
Received event  Query 3
Received event  Query 1
Done
