# BeeAI Workflows

In the previous notebook you learned the basics of the BeeAI framework such as PromptTemplates, Messages, Memory, Model Backends and various forms of output generation (freeform & structured). In this notebook we will focus on Workflows.

Workflows allow you to combine what you have already learned to develop an AI agent. The behavior of the agent is defined via workflow steps, and transitions between those steps. You can think of the Workflow as a graph that describes the behavior of an agent. 

## Basics of Workflows

The main components of a BeeAI workflow is state, defined as a pydantic model, and steps, which are defined using python functions.

You can think of State as structured memory that the workflow can read and write during execution.

Steps are the the functional components of the Workflow that connect together to perform the actions of the agent.

The following simple workflow example exhibits the following key features: 

- The state definition contains a required message field.
- The step (my_first_step) is defined as a function parameterized with the state instance.
- The state can be modified in a step and state changes are persisted between steps and workflow executions.
- The step function returns a string `Workflow.END` which indicates the name of the next step (this is how step transitions are implemented).
- `Workflow.END` indicates the end of the workflow.

In [None]:
import traceback
import warnings

from pydantic import BaseModel, ValidationError

from beeai_framework.workflows.workflow import Workflow, WorkflowError

warnings.simplefilter("ignore", UserWarning)


# Define global state that is accessible to each step in the workflow graph
# The message field is required when instantiating the state object
class MessageState(BaseModel):
    message: str


# Each step in the workflow is defined as a python function
async def my_first_step(state: MessageState) -> None:
    state.message += " World"  # Modify the state
    print("Running first step!")
    return Workflow.END


try:
    # Define the structure of the workflow graph
    basic_workflow = Workflow(schema=MessageState, name="MyWorkflow")

    # Add a step, each step has a name and a function that implements the step
    basic_workflow.add_step("my_first_step", my_first_step)

    # Execute the workflow
    basic_response = await basic_workflow.run(MessageState(message="Hello"))

    print("State after workflow:", basic_response.state.message)

except WorkflowError:
    traceback.print_exc()
except ValidationError:
    traceback.print_exc()

## A Multi step workflow with tools

You now know the basic components of a Workflow. To explore the power of BeeAI Workflows we will now walk through the implementation of a simple web search agent built as a Workflow.

This agent devises a search query based on an input question, runs the query to get search results, and then generates an answer to the question based on the retrieved search results.

Lets start with some imports.

In [None]:
from langchain_community.utilities import SearxSearchWrapper
from pydantic import Field

from beeai_framework.backend.chat import ChatModel, ChatModelOutput, ChatModelStructureOutput
from beeai_framework.backend.message import UserMessage
from beeai_framework.utils.templates import PromptTemplate

Next we can define our workflow State.

In this case we have a `question` which is a required field when instantiating the State. The other fields `search_results` and `answer` are optional during construction (defaulting to None) but will be populated by the workflow steps during execution.

In [None]:
# Workflow State
class SearchAgentState(BaseModel):
    question: str
    search_results: str | None = None
    answer: str | None = None

Next we define the ChatModel instance that we use for interaction with our LLM. We will use IBM Granite 3.1 8B via ollama.

In [None]:
# Create a ChatModel to interface with granite3.1-dense:8b on a local ollama
model = ChatModel.from_name("ollama:granite3.1-dense:8b")

This is a web search agent, so we need a way to run web searches. We will use the `SearxSearchWrapper` from the langchain community tools project.

To use the `SearxSearchWrapper` you will need to setup a local SearXNG service. 

Follow the instructions at [searXNG.md](searXNG.md) to configure a local searXNG instance.

In [None]:
# Web search tool
search_tool = SearxSearchWrapper(searx_host="http://127.0.0.1:8888")

In the workflow we make extensive use of prompt templates and structured outputs.

Here we define the various templates, input schemas and structured output schemas that we will need to implement the agent. 

In [None]:
# PromptTemplate Input Schemas
class QuestionInput(BaseModel):
    question: str


class SearchRAGInput(BaseModel):
    question: str
    search_results: str


# Prompt Templates
search_query_template = PromptTemplate(
    schema=QuestionInput,
    template="""Convert the following question into a concise, effective web search query using keywords and operators for accuracy.
Question: {{question}}""",
)

search_rag_template = PromptTemplate(
    schema=SearchRAGInput,
    template="""Search results:
{{search_results}}

Question: {{question}}
Provide a concise answer based on the search results provided. If the results are irrelevant or insufficient, say 'I don't know.' Avoid phrases such as 'According to the results...'.""",
)


# Structured output Schemas
class WebSearchQuery(BaseModel):
    query: str = Field(description="The web search query.")

Now we can define the first step of the workflow named `web_search`. 

This step prompts the llm to generate an effective search query using the search_query_template.

The search query is then used to run a web search using the search tool. The search results are stored in the `search_results` field in the workflow state.

The step then returns `generate_answer` which passes control to the step names `generate_answer`.

In [None]:
async def web_search(state: SearchAgentState) -> str:
    print("Step: ", "web_search")
    # Generate a search query
    prompt = search_query_template.render(QuestionInput(question=state.question))
    response: ChatModelStructureOutput = await model.create_structure(
        {
            "schema": WebSearchQuery,
            "messages": [UserMessage(prompt)],
        }
    )
    # Run search and store results in state
    state.search_results = search_tool.run(response.object["query"])
    return "generate_answer"

The next step in the Workflow is `generate_answer`, this steps takes the `question` and the `search_results` from the workflow state and uses the search_rag_template to generate an answer.

The answer is stored in the state and the workflow is ended by returning `Workflow.END`

In [None]:
async def generate_answer(state: SearchAgentState) -> str:
    print("Step: ", "generate_answer")
    # Generate answer based on question and search results from previous step.
    prompt = search_rag_template.render(
        SearchRAGInput(question=state.question, search_results=state.search_results or "No results available.")
    )
    output: ChatModelOutput = await model.create({"messages": [UserMessage(prompt)]})

    # Store answer in state
    state.answer = output.get_text_content()
    return Workflow.END

FInally we define the overall workflow and add the steps we developed earlier. 

In [None]:
try:
    # Define the structure of the workflow graph
    search_agent_workflow = Workflow(schema=SearchAgentState, name="WebSearchAgent")
    search_agent_workflow.add_step("web_search", web_search)
    search_agent_workflow.add_step("generate_answer", generate_answer)

    # Execute the workflow
    search_response = await search_agent_workflow.run(
        SearchAgentState(question="What is the term for a baby hedgehog?")
    )

    print("*****")
    print("Question: ", search_response.state.question)
    print("Answer: ", search_response.state.answer)

except WorkflowError:
    traceback.print_exc()
except ValidationError:
    traceback.print_exc()

# Adding Memory to a Workflow Agent

The web search agent from the previous example can answer questions but is unable to converse because it does not maintain message history. 

In the next example we show you how to add memory to your agent, so you can chat interactively.

In [None]:
# Workflow State
from pydantic import InstanceOf

from beeai_framework.backend.message import AssistantMessage, SystemMessage
from beeai_framework.memory.unconstrained_memory import UnconstrainedMemory


class ChatState(BaseModel):
    memory: InstanceOf[UnconstrainedMemory]
    output: str | None = None


async def chat(state: ChatState) -> str:
    output: ChatModelOutput = await model.create({"messages": state.memory.messages})
    state.output = output.get_text_content()
    return Workflow.END


memory = UnconstrainedMemory()
await memory.add(SystemMessage(content="You are a helpful and friendly AI assistant."))

try:
    # Define the structure of the workflow graph
    chat_workflow = Workflow(ChatState)
    chat_workflow.add_step("chat", chat)
    chat_workflow.add_step("generate_answer", generate_answer)

    # Add user message to memory
    await memory.add(UserMessage(content=input("User: ")))
    # Run workflow with memory
    response = await chat_workflow.run(ChatState(memory=memory))
    # Add assistant response to memory
    await memory.add(AssistantMessage(content=response.state.output))

    print("\n".join(f"{m.role}: {m.text}" for m in memory.messages))

except WorkflowError:
    traceback.print_exc()
except ValidationError:
    traceback.print_exc()

## ReAct Agents

You are now familiar with Workflow based agents, next you can explore pre-canned ReAct agents. Move on to [agents.ipynb](agents.ipynb).