# 6. Chapter: AI Agent Workflows

When we talk about LLM-powered AI agent workflows, we're referring to the integration of LLMs with other AI models, tools, and systems to create a seamless, end-to-end process for tackling complex tasks. These workflows enable AI agents to leverage the strengths of different technologies, ensuring they can handle a diverse range of tasks involving both unstructured and structured data. LLMs are exceptional at working with unstructured data like text, audio transcripts, and social media posts. Effective AI workflows often involve processing structured data, such as tables, spreadsheets, and databases. Integrations with tools like:

In [1]:
from __future__ import annotations

import sys
from typing import Any, Callable, Literal
from functools import reduce

from pydantic import BaseModel, Field
from loguru import logger

from language_models.agent import Agent, OutputType, PromptingStrategy
from language_models.models.llm import OpenAILanguageModel
from language_models.proxy_client import ProxyClient
from language_models.settings import settings

In [2]:
logger.remove()
logger.add(sys.stderr, format="{message}", level="INFO")

proxy_client = ProxyClient(
    client_id=settings.CLIENT_ID,
    client_secret=settings.CLIENT_SECRET,
    auth_url=settings.AUTH_URL,
    api_base=settings.API_BASE,
)

In [3]:
llm = OpenAILanguageModel(
    proxy_client=proxy_client,
    model="gpt-4",
    max_tokens=250,
    temperature=0.2,
)

In [4]:
class WorkflowStepOutput(BaseModel):
    """Class that represents the output of a step."""

    inputs: dict[str, Any]
    output: (
        str | int | float | dict | BaseModel | list[str] | list[int] | list[float] | list[dict] | list[BaseModel] | None
    )

class WorkflowFunctionStep(BaseModel):
    """Class that implements a function step.

    Attributes:
        name: The name of the step.
        inputs: The Pydantic model that represents the input arguments.
        function: The function that will be invoked when calling this step.
    """

    name: str
    inputs: type[BaseModel]
    function: Callable[[Any], Any]

    def invoke(self, inputs: dict[str, Any], verbose: bool) -> WorkflowStepOutput:
        inputs = {key: value for key, value in inputs.items() if key in self.inputs.model_fields}
        inputs = self.inputs.model_validate(inputs).model_dump()
        if verbose:
            logger.opt(colors=True).info(f"<b><fg #EC9A3C>Function Input</fg #EC9A3C></b>: {inputs}")

        output = self.function(**inputs)
        if verbose:
            logger.opt(colors=True).info(f"<b><fg #EC9A3C>Function Output</fg #EC9A3C></b>: {output}")

        return WorkflowStepOutput(inputs=inputs, output=output)

class WorkflowAgentStep(BaseModel):
    """Class that implements an agent step.

    Attributes:
        name: The name of the step.
        agent: The agent that will be invoked when calling this step.
    """

    name: str
    agent: Agent

    def invoke(self, inputs: dict[str, Any], verbose: bool) -> WorkflowStepOutput:
        inputs = {variable: inputs.get(variable) for variable in self.agent.prompt_variables}
        if verbose:
            logger.opt(colors=True).info(f"<b><fg #EC9A3C>Agent Input</fg #EC9A3C></b>: {inputs}")

        output = self.agent.invoke(inputs)
        if verbose:
            logger.opt(colors=True).info(f"<b><fg #EC9A3C>Agent Output</fg #EC9A3C></b>: {output.final_answer}")

        return WorkflowStepOutput(inputs=inputs, output=output.final_answer)

class WorkflowTransformationStep(BaseModel):
    """Class that implements a transformation step.

    Attributes:
        name: The name of the step.
        input_field: The name of the field values to transform.
        transformation: The transformation to apply (can be map, filter, reduce).
        function: The function used for the transformation.
    """

    name: str
    input_field: str
    transformation: Literal["map", "filter", "reduce"]
    function: Callable[[Any], Any]

    def invoke(self, inputs: dict[str, Any], verbose: bool) -> WorkflowStepOutput:
        values = inputs[self.input_field]
        if verbose:
            logger.opt(colors=True).info(f"<b><fg #EC9A3C>Transformation Input</fg #EC9A3C></b>: {values}")

        if self.transformation == "map":
            transformed_values = map(self.function, values)
            output = list(transformed_values) if isinstance(values, list) else dict(transformed_values)
        elif self.transformation == "filter":
            transformed_values = filter(self.function, values)
            output = list(transformed_values) if isinstance(values, list) else dict(transformed_values)
        else:
            output = reduce(self.function, values)

        if verbose:
            logger.opt(colors=True).info(f"<b><fg #EC9A3C>Transformation Output</fg #EC9A3C></b>: {output}")

        return WorkflowStepOutput(inputs={self.input_field: values}, output=output)

In [5]:
class WorkflowStateManager(BaseModel):
    """Class that implements a state manager."""

    state: dict[str, Any]

    def update(self, name: str, step: WorkflowStepOutput) -> None:
        """Updates the state values."""
        self.state[name] = step.output

class WorkflowOutput(BaseModel):
    """Class that represents the workflow output."""

    inputs: dict[str, Any]
    output: (
        str
        | int
        | float
        | dict[str, Any]
        | BaseModel
        | list[str]
        | list[int]
        | list[float]
        | list[dict[str, Any]]
        | list[BaseModel]
        | None
    )

class Workflow(BaseModel):
    """Class that implements a workflow.

    Attributes:
        name: The name of the workflow.
        description: The description of what the workflow does.
        steps: The steps of the workflow.
        inputs: The workflow inputs.
        output: The name of the step value to output.
    """

    name: str
    description: str
    steps: list[WorkflowAgentStep | WorkflowFunctionStep | WorkflowTransformationStep]
    inputs: type[BaseModel]
    output: str
    verbose: bool

    def invoke(self, inputs: dict[str, Any]) -> WorkflowOutput:
        """Runs the workflow."""
        inputs = self.inputs.model_validate(inputs).model_dump()
        if self.verbose:
            logger.opt(colors=True).info(f"<b><fg #EC9A3C>Workflow Input</fg #EC9A3C></b>: {inputs}")

        state_manager = WorkflowStateManager(state=inputs)
        for step in self.steps:
            if self.verbose:
                logger.opt(colors=True).info(f"<b><fg #2D72D2>Running Step</fg #2D72D2></b>: {step.name}")

            output = step.invoke(state_manager.state, self.verbose)
            state_manager.update(step.name, output)

        output = state_manager.state.get(self.output)
        if self.verbose:
            logger.opt(colors=True).success(f"<b><fg #32A467>Workflow Output</fg #32A467></b>: {output}")

        return WorkflowOutput(inputs=inputs, output=output)

In [6]:
system_prompt = """You are an AI assistant designed to help users with a variety of tasks.

Extract all numbers from the user's input text."""

agent = Agent.create(
    llm=llm,
    system_prompt=system_prompt,
    prompt="{prompt}",
    prompt_variables=["prompt"],
    output_type=OutputType.ARRAY_INTEGER,
    prompting_strategy=PromptingStrategy.SINGLE_COMPLETION,
    verbose=True,
)

agent_step = WorkflowAgentStep(name="numbers", agent=agent)

class Function(BaseModel):
    numbers: list[int]

function_step = WorkflowFunctionStep(name="sort", inputs=Function, function=lambda numbers: sorted(numbers))

filter_step = WorkflowTransformationStep(name="numbers_greater_10", input_field="sort", transformation="filter", function=lambda number: number > 10)

In [7]:
class Prompt(BaseModel):
    prompt: str = Field(description="The user prompt")

workflow = Workflow(
    name="Data Extraction",
    description="Extracts numbers from a given text",
    steps=[agent_step, function_step, filter_step],
    inputs=Prompt,
    output="numbers_greater_10",
    verbose=True,
)

In [8]:
prompt = """Last weekend, six of us went on a 15-kilometer hike, starting at 7 AM.

By noon, we had covered 10 kilometers and reached Mount Elbert's 4,401-meter summit by 2 PM, with a temperature of 12°C.

We camped 5 kilometers away by 6 PM with 12 others and returned home by 5 PM the next day."""

output = workflow.invoke({"prompt": prompt})

[1m[38;2;236;154;60mWorkflow Input[0m[1m[0m: {'prompt': "Last weekend, six of us went on a 15-kilometer hike, starting at 7 AM.\n\nBy noon, we had covered 10 kilometers and reached Mount Elbert's 4,401-meter summit by 2 PM, with a temperature of 12°C.\n\nWe camped 5 kilometers away by 6 PM with 12 others and returned home by 5 PM the next day."}
[1m[38;2;45;114;210mRunning Step[0m[1m[0m: numbers
[1m[38;2;236;154;60mAgent Input[0m[1m[0m: {'prompt': "Last weekend, six of us went on a 15-kilometer hike, starting at 7 AM.\n\nBy noon, we had covered 10 kilometers and reached Mount Elbert's 4,401-meter summit by 2 PM, with a temperature of 12°C.\n\nWe camped 5 kilometers away by 6 PM with 12 others and returned home by 5 PM the next day."}
Prompt: Last weekend, six of us went on a 15-kilometer hike, starting at 7 AM.

By noon, we had covered 10 kilometers and reached Mount Elbert's 4,401-meter summit by 2 PM, with a temperature of 12°C.

We camped 5 kilometers away by 6 PM wit

In [9]:
print(output.output)

[12, 12, 15, 4401]
