In [1]:

from llama_index.core import Settings
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding

# Configure Ollama LLM
ollama_llm = Ollama(
    model="llama3.2:latest",
    base_url="http://localhost:11434",
    temperature=0.1
)

# Configure embedding model
ollama_embedding = OllamaEmbedding(
    model_name="nomic-embed-text:latest",
    base_url="http://localhost:11434",
    ollama_additional_kwargs={"mirostat": 0}
)

Settings.llm = ollama_llm
Settings.embed_model = ollama_embedding

In [2]:
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context,
    InputRequiredEvent,
    HumanResponseEvent,
)

from llama_index.core.workflow.retry_policy import ConstantDelayRetryPolicy

# Define custom events
class JokeEvent(Event):
    joke: str

class ProgressEvent(Event):
    msg: str

class CritiqueEvent(Event):
    joke: str
    critique: str

class FinalResultEvent(Event):
    result: str

# Define the workflow
class JokeFlow(Workflow):
    llm = ollama_llm

    @step
    async def generate_joke(self, ctx: Context, ev: StartEvent) -> JokeEvent:
        topic = ev.topic
        prompt = f"Write your best joke about {topic}."
        response = await self.llm.acomplete(prompt)

        joke = str(response)
        await ctx.set("joke", joke)

        ctx.write_event_to_stream(ProgressEvent(msg="Joke generated."))

        return JokeEvent(joke=joke)

    @step(retry_policy=ConstantDelayRetryPolicy(delay=5, maximum_attempts=3))
    async def critique_joke(self, ctx: Context, ev: JokeEvent) -> InputRequiredEvent:
        joke = ev.joke
        prompt = f"Give a thorough analysis and critique of the following joke: {joke}"

        try:
            response = await self.llm.acomplete(prompt)
        except Exception as e:
            ctx.write_event_to_stream(ProgressEvent(msg=f"Critique failed: {e}"))
            raise e

        critique = str(response)
        await ctx.set("critique", critique)

        ctx.write_event_to_stream(ProgressEvent(msg="Critique generated."))

        return InputRequiredEvent(prefix=f"Review the critique and approve: {critique}")

    @step
    async def human_validation(self, ctx: Context, ev: HumanResponseEvent) -> CritiqueEvent:
        approval = ev.response

        if approval.lower() not in ["yes", "approved"]:
            ctx.write_event_to_stream(ProgressEvent(msg="Critique rejected."))
            raise ValueError("Critique was rejected by the user.")

        joke = await ctx.get("joke")
        critique = await ctx.get("critique")

        ctx.write_event_to_stream(ProgressEvent(msg="Critique approved by user."))

        return CritiqueEvent(joke=joke, critique=critique)

    @step
    async def finalize_result(self, ctx: Context, ev: CritiqueEvent) -> FinalResultEvent:
        joke = ev.joke
        critique = ev.critique

        result = f"Joke: {joke}\n\nCritique: {critique}"
        await ctx.set("final_result", result)

        ctx.write_event_to_stream(ProgressEvent(msg="Final result prepared."))

        return FinalResultEvent(result=result)

    @step
    async def complete_workflow(self, ctx: Context, ev: FinalResultEvent) -> StopEvent:
        result = ev.result
        return StopEvent(result=result)

    # **Placeholder Steps for Visualization**
    @step
    async def visualize_input_required(self, ev: InputRequiredEvent) -> HumanResponseEvent:
        """
        Placeholder step to link InputRequiredEvent to HumanResponseEvent.
        """
        return HumanResponseEvent(response="yes")

    @step
    async def visualize_human_response(self, ev: HumanResponseEvent) -> None:
        """
        Placeholder step to ensure HumanResponseEvent is recognized.
        """
        pass

In [3]:
# Run the workflow
w = JokeFlow(timeout=120, verbose=True)

async def main():
    # Start the workflow with a topic
    handler = w.run(topic="pirates")

    # Stream events
    print("Streaming events in real-time:")
    async for event in handler.stream_events():
        if isinstance(event, ProgressEvent):
            print(f"[Progress]: {event.msg}")
        elif isinstance(event, InputRequiredEvent):
            # Simulate user input (replace with `input()` for real interaction)
            # user_response = "yes"  # Simulating approval
            user_response = input(event.prefix)
            handler.ctx.send_event(HumanResponseEvent(response=user_response))

    # Wait for the final result
    final_result = await handler
    print("\nFinal Result:")
    print(str(final_result))

In [4]:
await main()

Streaming events in real-time:
Running step generate_joke
Step generate_joke produced event JokeEvent
[Progress]: Joke generated.
Running step critique_joke
Step critique_joke produced event InputRequiredEvent
[Progress]: Critique generated.
Running step human_validation
Step human_validation produced event CritiqueEvent
Running step visualize_human_response
Step visualize_human_response produced no event
[Progress]: Critique approved by user.
Running step finalize_result
Step finalize_result produced event FinalResultEvent
[Progress]: Final result prepared.
Running step complete_workflow
Step complete_workflow produced event StopEvent

Final Result:
Joke: Why did the pirate quit his job?

Because he was sick of all the arrrr-guments! (get it?)

Critique: The joke in question is a play on words, relying on the multiple meanings of "arrrr" to create a pun. Here's a thorough analysis and critique:

**Strengths:**

1. **Wordplay**: The use of "arrrr" as both a pirate-themed sound effect a

In [5]:
from llama_index.core.workflow import draw_all_possible_flows

draw_all_possible_flows(w, filename="joke_workflow_human_loop.html")

  draw_all_possible_flows(w, filename="joke_workflow_human_loop.html")


joke_workflow_human_loop.html
