In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [None]:
pip install llama-index-utils-workflow

In [2]:
from llama_index.core.workflow import Workflow, step, StartEvent, StopEvent

class MyWorkflow(Workflow):
    
    @step
    async def my_step(self, event: StartEvent) -> StopEvent:
        return StopEvent(result="Hello World!")
    

workflow = MyWorkflow(timeout=10, verbose=True)

result = await workflow.run()
result

'Hello World!'

In [3]:
from llama_index.utils.workflow import draw_all_possible_flows

draw_all_possible_flows(MyWorkflow, filename="basic_workflow.html")

basic_workflow.html


In [6]:
from llama_index.core.workflow import Event

class FirstEvent(Event):
    first_output: str
    
class SecondEvent(Event):
    second_output: str

In [7]:
class MyWorkflow(Workflow):
    
    @step
    async def step_one(self, event: StartEvent) -> FirstEvent:
        print(event.first_input)
        return FirstEvent(first_output="First step complete")
    
    @step
    async def step_two(self, event: FirstEvent) -> SecondEvent:
        print(event.first_output)
        return SecondEvent(second_output="Second step complete")
    
    @step
    async def step_three(self, event: SecondEvent) -> StopEvent:
        print(event.second_output)
        return StopEvent(result="Workflow complete")
    

w = MyWorkflow(timeout=10, verbose=True)
result = await w.run(first_input="start the workflow")
print(result)

draw_all_possible_flows(MyWorkflow, filename="basic_workflow.html")

start the workflow
First step complete
Second step complete
Workflow complete
basic_workflow.html


Branch Workflow (IF ELSE)

In [7]:
class BranchA1Event(Event):
    payload: str
    
class BranchA2Event(Event):
    payload: str
    
class BranchB1Event(Event):
    payload: str
    
class BranchB2Event(Event):
    payload: str

In [10]:
import random

class BranchWorkflow(Workflow):
    
    @step
    async def start(self, event: StartEvent) -> BranchA1Event | BranchB1Event:
        if random.randint(0, 1) == 0:
            print("Go to Branch A")
            return BranchA1Event(payload="Branch A")
        else:
            print("Go to Branch B")
            return BranchB1Event(payload="Branch B")
        
    @step
    async def step_a1(self, event: BranchA1Event) -> BranchA2Event:
        print(event.payload)
        return BranchA2Event(payload=event.payload)
    
    @step
    async def step_a2(self, event: BranchA2Event) -> StopEvent:
        print(event.payload)
        return StopEvent(payload="Branch A Complete")
    
    @step
    async def step_b1(self, event: BranchB1Event) -> BranchB2Event:
        print(event.payload)
        return BranchB2Event(payload=event.payload)
    
    @step
    async def step_b2(self, event: BranchB2Event) -> StopEvent:
        print(event.payload)
        return StopEvent(payload="Branch B Complete")
    
draw_all_possible_flows(BranchWorkflow, filename="branch_workflow.html")

branch_workflow.html


Loop Workflow

In [2]:
from llama_index.core.workflow import Event

class TextEvent(Event):
    text: str
    iteration: int = 0
    
class EvaluationEvent(Event):
    text: str
    feedback: str
    score: float
    iteration: int
    
class FinalTextEvent(Event):
    text: str
    iterations_required: int

In [4]:
from llama_index.llms.openai import OpenAI
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step
from llama_index.utils.workflow import draw_all_possible_flows

class TextRefinementWorkflow(Workflow):
    
    def __init__(self, timeout=60, verbose=False, max_iterations=5):
        super().__init__(timeout=timeout, verbose=verbose)
        self.llm = OpenAI(model="gpt-4o-mini")
        self.max_iterations = max_iterations
        
    
    @step
    async def initialize_text(self, event: StartEvent) -> TextEvent:
        initial_text = event.initial_text
        print(f"Initial Text Received (Iteration 0): {initial_text}")
        return TextEvent(text=initial_text, iteration=0)
    
    @step
    async def grade_text(self, event: TextEvent) -> EvaluationEvent | FinalTextEvent:
        if event.iteration >= self.max_iterations:
            print(f"Reached Maximum Iterations (Iteration {self.max_iterations})")
            return FinalTextEvent(text=event.text, iterations_required=event.iteration)
        
        prompt = f"""
        Evaluate the following text for clarity, coherence and conciseness
        
        Text: 
        {event.text}
        
        Provide:
        1. A score from 0-10(where 0 is least score and 10 is highest score)
        2. Specific feedback on how to improve
        
        Format your response exactly as:
        SCORE: [number]
        FEEDBACK: [your feedback]
        """
        
        response = await self.llm.acomplete(prompt)
        result = response.text.strip()
        
        score_line = [line for line in result.split("\n") if line.startswith("SCORE:")][0]
        score = float(score_line.split("SCORE:")[1].strip())
        
        feedback_line = [line for line in result.split("\n") if line.startswith("FEEDBACK:")][0]
        feedback = feedback_line.split("FEEDBACK:")[1].strip()
        
        print(f"Evaluation Iteration (Iteration {event.iteration})")
        print(f"Score: {score}")
        print(f"Feedback: {feedback}")
        
        if score > 8:
            print(f"Quality threshold met after {event.iteration} iterations!")
            return FinalTextEvent(text=feedback, iterations_required=event.iteration)
        
        return EvaluationEvent(text=event.text, feedback=feedback,
                               score=score, iteration=event.iteration)
        
    
    @step
    async def refine_text(self, event: EvaluationEvent) -> TextEvent:
        prompt = f"""
        Please improve the following text based on this feedback.
        
        Original Text:
        {event.text}
        
        Feedback:
        {event.feedback}
        
        Current Score: {event.score}
        
        Provide an improved version that addresses all the feeback points.
        Only return the improved text with no additional commentary
        """
        
        response = await self.llm.acomplete(prompt)
        result = response.text.strip()
        iteration = event.iteration + 1
    
        print(f"Refined Text (Iteration {iteration}: {result})")    
        return TextEvent(text=result, iteration=iteration)
    
    
    @step
    async def finalize_text(self, event: FinalTextEvent) -> StopEvent:
        result = {
            "text": event.text,
            "iterations": event.iterations_required
        }
        
        return StopEvent(result=result)
    
    

draw_all_possible_flows(TextRefinementWorkflow, filename="Text_refinement_workflow.html")        
    

Text_refinement_workflow.html


In [5]:
workflow = TextRefinementWorkflow()

initial_text = """
    Machine Learning is a process where computer systems can learn from data without being explicitly programmed.
    It uses algorithms to identify patterns in data and make predictions. Its used in many applications today.
"""

result = await workflow.run(initial_text=initial_text)

Initial Text Received (Iteration 0): 
    Machine Learning is a process where computer systems can learn from data without being explicitly programmed.
    It uses algorithms to identify patterns in data and make predictions. Its used in many applications today.

Evaluation Iteration (Iteration 0)
Score: 7.0
Feedback: The text is generally clear and conveys the main idea of machine learning effectively. However, it could be improved in terms of coherence and conciseness. Here are some suggestions:
Refined Text (Iteration 1: Machine learning is a process that enables computer systems to learn from data without explicit programming. By utilizing algorithms, it identifies patterns and makes predictions. Today, it is applied across various fields and industries.)
Evaluation Iteration (Iteration 1)
Score: 8.0
Feedback: The text is clear and coherent, effectively conveying the concept of machine learning and its applications. However, it could be more concise by eliminating some redundancy. 

In [6]:
result

{'text': 'Machine learning is a process that enables computer systems to learn from data without explicit programming. By utilizing algorithms, it identifies patterns and makes predictions. For example, in healthcare, machine learning can analyze medical images to detect diseases like cancer at an early stage. It is now applied in various fields, including finance and marketing.',
 'iterations': 5}

Maintaining Context

In [13]:
from llama_index.core.workflow import Context

class SetupEvent(Event):
    query: str
    
class QueryEvent(Event):
    query: str
    

class StatefulWorkFlow(Workflow):
    
    @step
    async def init(self, ctx: Context, event: StartEvent) -> SetupEvent | QueryEvent:
        db_config = await ctx.store.get("database_config", default=None)
        if db_config is None:
            print("Load DB Config")
            return SetupEvent(query=event.query)
        
        return QueryEvent(query=event.query)
    
    
    @step
    async def setup(self, ctx: Context, event:SetupEvent) ->StartEvent:
        await ctx.store.set("database_config", {"url": "jdbc://mock-url"})
        return StartEvent(query=event.query)
    
    @step
    async def query(self, ctx: Context, event: QueryEvent) -> StopEvent:
        print("Querying DB")
        return StopEvent(result = await ctx.store.get("database_config"))


workflow = StatefulWorkFlow()
result = await workflow.run(query="Load Customer Data")
result


Load DB Config
Querying DB


{'url': 'jdbc://mock-url'}

Using Cache

In [3]:
from llama_index.core.workflow import Event

class DocumentEvent(Event):
    document_text: str
    
class SummarizedEvent(Event):
    summary: str

In [8]:
from llama_index.llms.openai import OpenAI
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step, Context
import hashlib

class DocumentWorkflow(Workflow):
    
    def __init__(self, timeout=30, verbose=False):
        super().__init__(timeout=timeout, verbose=verbose)
        
        self.llm = OpenAI(model="gpt-4o-mini")
        self.summaries_cache = {}
        self.processed_count = 0
        
    def get_document_hash(self, text):
        return hashlib.md5(text.encode()).hexdigest()
    
    @step
    async def start(self, ctx: Context, event: StartEvent) -> DocumentEvent:
        document_text = event.text
        document_hash = self.get_document_hash(document_text)
        
        using_cache = document_hash in self.summaries_cache
        
        await ctx.store.set("document_hash", document_hash)
        await ctx.store.set("using_cache", using_cache)
        
        return DocumentEvent(document_text=document_text)
    
    @step
    async def summarize_document(self, ctx: Context, event: DocumentEvent) -> SummarizedEvent:
        using_cache = await ctx.store.get("using_cache")
        document_hash = await ctx.store.get("document_hash")
        
        # print(f"Cache is {using_cache}")
        
        if using_cache:
            print("Retrieved cache summary")
            summary = self.summaries_cache[document_hash]
            return SummarizedEvent(summary=summary)
        
        print("Generating New Summary")
        prompt = f"Please summarize this text in one paragraph:\n\n {event.document_text}"
        response = await self.llm.acomplete(prompt)
        summary = response.text.strip()
        
        self.summaries_cache[document_hash] = summary
        return SummarizedEvent(summary=summary)
    
    @step
    async def final_step(self, ctx: Context, event: SummarizedEvent) -> StopEvent:
        self.processed_count += 1
        using_cache = await ctx.store.get("using_cache")
        
        result = {
            "summary": event.summary,
            "from_cache": using_cache,
            "total_processed": self.processed_count
        }
        return StopEvent(result = result)
        
        
    

In [9]:
workflow = DocumentWorkflow()

text = "Machine Learning is a process where computer systems can learn from data without being explicitly programmed.It uses algorithms to identify patterns in data and make predictions. Its used in many applications today."
result = await workflow.run(text=text)
result


Generating New Summary


{'summary': 'Machine Learning is a method that enables computer systems to learn from data autonomously, utilizing algorithms to detect patterns and make predictions. This technology is widely applied across various fields today.',
 'from_cache': False,
 'total_processed': 1}

In [10]:
result2 = await workflow.run(text=text)
result2

Retrieved cache summary


{'summary': 'Machine Learning is a method that enables computer systems to learn from data autonomously, utilizing algorithms to detect patterns and make predictions. This technology is widely applied across various fields today.',
 'from_cache': True,
 'total_processed': 2}

### Streaming Events

In [11]:
class FirstEvent(Event):
    first_output: str
    
class SecondEvent(Event):
    second_output: str
    response: str
    
class ProgressEvent(Event):
    msg: str

In [21]:
class MyWorkflow(Workflow):
    
    @step
    async def step_one(self, ctx: Context, event: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Event :: Step one is happening"))
        return FirstEvent(first_output="First step complete")
    
    @step
    async def step_two(self, ctx: Context, event: FirstEvent) -> SecondEvent:
        llm = OpenAI(model="gpt-4o-mini")
        generator = await llm.astream_complete("Tell me about Elon Musk in 5 sentences")
        
        async for token in generator:
            ctx.write_event_to_stream(ProgressEvent(msg=token.delta))
            
        return SecondEvent(second_output="Second step complete", response=str(token))
    
    @step
    async def step_three(self, ctx: Context, event: SecondEvent) -> StopEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Event :: Step three is happening"))
        return StopEvent(result="Workflow Complete")

In [22]:
workflow = MyWorkflow()
handler = workflow.run()

async for event in handler.stream_events():
    if isinstance(event, ProgressEvent):
        print(event.msg, sep="", end="")
        
print("Streaming completed")
final_result = await handler
print(final_result)

Event :: Step one is happeningElon Musk is a billionaire entrepreneur and CEO known for his role in revolutionizing multiple industries, including electric vehicles, space travel, and renewable energy. He co-founded Zip2 and X.com, which later became PayPal, before founding SpaceX in 2002 with the goal of reducing space transportation costs and enabling the colonization of Mars. Musk is also the CEO and product architect of Tesla, Inc., which has played a significant role in popularizing electric cars and sustainable energy solutions. In addition to these ventures, he has initiated projects like Neuralink, aimed at developing brain-computer interfaces, and The Boring Company, focused on tunnel construction and infrastructure. Musk is known for his ambitious vision for the future, often pushing the boundaries of technology and innovation.Event :: Step three is happeningStreaming completed
Workflow Complete


### Concurrency

In [23]:
class StepTwoEvent(Event):
    query: str
    
class StepThreeEvent(Event):
    result: str

send_event() will trigger other steps in workflow  - internal to workflow
write_event_to_stream() will be for streaming logs, llm tokens, progress - external to workflow

In [35]:
import asyncio
import random

class ConcurrentFlow(Workflow):
    
    @step
    async def start(self, ctx: Context, event: StartEvent) -> StepTwoEvent:
        ctx.send_event(StepTwoEvent(query="Query 1"))
        ctx.send_event(StepTwoEvent(query="Query 2"))
        ctx.send_event(StepTwoEvent(query="Query 3"))
        
    # Here num_workers is the concurrency limit, only 4 will run at same time 
    # default is unlimited
    @step(num_workers=4)
    async def step_two(self, ctx: Context, event: StepTwoEvent) -> StepThreeEvent:
        print("Running Query ", event.query)
        await asyncio.sleep(random.randint(1, 5))
        return StepThreeEvent(result=event.query)
    
    @step
    async def step_three(self, ctx: Context, event: StepThreeEvent) -> StopEvent:
        result = ctx.collect_events(event, [StepThreeEvent] * 3)

        # If collect_events didn't all 3 StepThreeEvent then it will return None
        # so if the result is None we will return None to wait to for all 3 events to receive
        
        if result is None:
            return None
        
        print(result)
        return StopEvent(result=result)

In [36]:
workflow = ConcurrentFlow()
result = await workflow.run()
result

Running Query  Query 1
Running Query  Query 2
Running Query  Query 3
[StepThreeEvent(result='Query 2'), StepThreeEvent(result='Query 1'), StepThreeEvent(result='Query 3')]


[StepThreeEvent(result='Query 2'),
 StepThreeEvent(result='Query 1'),
 StepThreeEvent(result='Query 3')]

In [2]:
from llama_index.core.workflow import Event, StartEvent, StopEvent

class StepAEvent(Event):
    query: str
    
class StepBEvent(Event):
    query: str
    
class StepCEvent(Event):
    query: str
    
class StepACompleteEvent(Event):
    result: str
    
class StepBCompleteEvent(Event):
    result: str
    
class StepCCompleteEvent(Event):
    result: str

In [3]:
from llama_index.core.workflow import Workflow, Context, step

class ConcurrentWorkflow(Workflow):
    
    @step
    async def start(self, ctx: Context, event: StartEvent) -> StepAEvent | StepBEvent | StepCEvent:
        ctx.send_event(StepAEvent(query="Query1"))
        ctx.send_event(StepBEvent(query="Query2"))
        ctx.send_event(StepCEvent(query="Query3"))
        
    @step
    async def step_a(self, ctx: Context, event: StepAEvent) -> StepACompleteEvent:
        print("Doing something A-ish")
        return StepACompleteEvent(result=event.query)
    
    @step
    async def step_b(self, ctx: Context, event: StepBEvent) -> StepBCompleteEvent:
        print("Doing something B-ish")
        return StepBCompleteEvent(result=event.query)
    
    @step
    async def step_c(self, ctx: Context, event: StepCEvent) -> StepCCompleteEvent:
        print("Doing something C-ish")
        return StepCCompleteEvent(result=event.query)
    
    @step
    async def step_three(self, ctx: Context,
            event: StepACompleteEvent | StepBCompleteEvent | StepCCompleteEvent) -> StopEvent:
        
        print("Received Event", event.result)
        result = ctx.collect_events(event, [StepACompleteEvent, StepBCompleteEvent, StepCCompleteEvent])
        
        if result is None:
            return None
        
        print("Final Result", result)
        return StopEvent(result="Done")
        

In [5]:
w = ConcurrentWorkflow()
await w.run()

Doing something A-ish
Doing something B-ish
Doing something C-ish
Received Event Query1
Received Event Query2
Received Event Query3
Received Event Query2
Received Event Query3
Received Event Query3
Final Result [StepACompleteEvent(result='Query1'), StepBCompleteEvent(result='Query2'), StepCCompleteEvent(result='Query3')]


'Done'

In [5]:
from llama_index.core.workflow import Event

class Step2Event(Event):
    query: str
    
class Step3Event(Event):
    query: str

In [6]:
class MainWorkflow(Workflow):
    @step
    async def start(self, event: StartEvent) -> Step2Event:
        print("starting up")
        return Step2Event(query=event.query)
    
    @step
    async def step_two(self, event: Step2Event) -> Step3Event:
        print("Sending an email")
        return Step3Event(query=event.query)
    
    @step
    async def step_three(self, event: Step3Event) -> StopEvent:
        print("Finishing up")
        return StopEvent(result=event.query)
    

w = MainWorkflow()
result = await w.run(query="Initial Query")
result

starting up
Sending an email
Finishing up


'Initial Query'

In [7]:
class Step2BEvent(Event):
    query: str
    
class CustomWorkflow(MainWorkflow):
    @step
    async def step_two(self, event: Step2Event) -> Step2BEvent:
        print("Sending an email")
        return Step2BEvent(query=event.query)
    
    @step
    async def step_two_b(self, event: Step2BEvent) -> Step3Event:
        print("Sending text message")
        return Step3Event(query=event.query)
    
w= CustomWorkflow()
result = await w.run(query="Initial Query")

starting up
Sending an email
Sending text message
Finishing up
