In [1]:
import asyncio
from dataclasses import dataclass
import json
import os
from typing import List, Union, Dict, Tuple

import pandas as pd

from fixpoint.agents import BaseAgent
from fixpoint.agents.openai import OpenAIClients, OpenAIAgent
from fixpoint.completions.chat_completion import ChatCompletionMessageParam

from fixpoint_extras.workflows import structured
from fixpoint_extras.workflows.structured import WorkflowContext

In [2]:
# Fix logging for the notebook
import logging
import sys
from fixpoint.logging import logger
logging.basicConfig(stream=sys.stdout)
logger.setLevel(logging.INFO)
logger.info("Info mode is on?")

INFO:fixpoint:Info mode is on?


In [3]:
from dotenv import load_dotenv
assert load_dotenv('../.env')

# Define the workflow

## Workflow

The workflow takes a list of prompts, and compares them across GPT-3.5 and GPT 4 models.

In [4]:
@structured.workflow(id="example_workflow")
class CompareModels:
    """Compare the performance of GPT-3.5 and GPT-4"""

    @structured.workflow_entrypoint()
    async def run(
        self, ctx: WorkflowContext, prompts: List[List[ChatCompletionMessageParam]]
    ) -> str:
        """Entrypoint for the workflow to compare agents"""

        async with asyncio.TaskGroup() as tg:
            gpt3_res = tg.create_task(
                structured.call_task(
                    ctx,
                    RunAllPrompts.run_all_prompts,
                    args=[RunAllPromptsArgs(agent_name="gpt3", prompts=prompts)],
                )
            )
            gpt4_res = tg.create_task(
                structured.call_task(
                    ctx,
                    RunAllPrompts.run_all_prompts,
                    args=[RunAllPromptsArgs(agent_name="gpt4", prompts=prompts)],
                )
            )

        prompts: List[str] = []
        gpt3_resps: List[Union[str, None]] = []
        gpt4_resps: List[Union[str, None]] = []
        for gpt3_resp, gpt4_resp in zip(gpt3_res.result(), gpt4_res.result()):
            # Prompt is the same for gpt3 and gpt4 results
            prompt = gpt3_resp[0]
            prompts.append(prompt)
            gpt3_resps.append(gpt3_resp[1])
            gpt4_resps.append(gpt4_resp[1])

        df = pd.DataFrame({
            "prompt": prompts,
            "gpt3": gpt3_resps,
            "gpt4": gpt4_resps,
        })

        # TODO(dbmikus) this is not async, so it will block the async event loop
        doc_id = "inference-results.json"
        ctx.workflow_run.docs.store(id=doc_id, contents=df.to_json(path_or_buf=None))
        return doc_id

## The task

The task runs all inference requests for a given model. A task is checkpointed, so if the workflow fails partway through we can determine if either:

- we already ran the task, and thus can skip it when retrying the workflow
- we didn't run or finish the task, so we need to run it a again

In [5]:
# We recommend using a single dataclass, Pydantic model, or dictionary argument
# for the task. This makes it easy to add or remove arguments in the future
# while preserving backwards compatability.
@dataclass
class RunAllPromptsArgs:
    """Arguments for the "run_al_prompts" task"""

    agent_name: str
    prompts: List[List[ChatCompletionMessageParam]]


@structured.task(id="run_all_prompts")
class RunAllPrompts:
    """A task that runs all prompts for an agent"""

    @structured.task_entrypoint()
    async def run_all_prompts(
        self, ctx: WorkflowContext, args: RunAllPromptsArgs
    ) -> List[Tuple[str, Union[str, None]]]:
        """Execute all prompt inferences for an agent

        Returns a list of (prompt, response) pairs.
        """
        step_tasks: List[asyncio.Task[Union[str, None]]] = []
        async with asyncio.TaskGroup() as tg:
            for prompt in args.prompts:
                step_task = tg.create_task(
                    structured.call_step(
                        ctx,
                        run_prompt,
                        args=[RunPromptArgs(agent_name=args.agent_name, prompt=prompt)],
                    )
                )
                step_tasks.append(step_task)
        step_results = [task.result() for task in step_tasks]
        return step_results

## The step

A step is the smallest unit of computation. Like a task, each step is checkpointed.

In [19]:
@dataclass
class RunPromptArgs:
    """Args for run_prompt"""

    agent_name: str
    prompt: List[ChatCompletionMessageParam]


@structured.step(id="run_prompt")
async def run_prompt(ctx: WorkflowContext, args: RunPromptArgs) -> Tuple[str, Union[str, None]]:
    """Run an LLM inference request with the given agent and prompt

    Returns a pair of (prompt, response)
    """
    agent = ctx.agents[args.agent_name]
    completion = agent.create_completion(messages=args.prompt)
    return (args.prompt, completion.choices[0].message.content)

# Configure agents and run the workflow

We configure the agents for the workflow and the settings for running the workflow, then kick it off in the asyncio event loop. All workflow entrypoints, tasks, and steps run inside asyncio.

In [7]:
def setup_agents() -> Tuple[structured.RunConfig, List[BaseAgent]]:
    """Setup agents for the workflow"""
    run_config = structured.RunConfig.with_defaults()
    openaiclients = OpenAIClients.from_api_key(api_key=os.environ["OPENAI_API_KEY"])
    gpt3 = OpenAIAgent(
        agent_id="gpt3",
        model_name="gpt-3.5-turbo",
        openai_clients=openaiclients,
        cache=run_config.storage.agent_cache,
    )
    gpt4 = OpenAIAgent(
        agent_id="gpt4",
        model_name="gpt-4-turbo",
        openai_clients=openaiclients,
        cache=run_config.storage.agent_cache,
    )
    return run_config, [gpt3, gpt4]


async def main() -> None:
    """configure and run the workflow"""
    run_config, agents = setup_agents()

    run_handle = structured.spawn_workflow(
        CompareModels.run,
        run_config=run_config,
        agents=agents,
        args=[
            [
                [
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": "What is the meaning of life?"},
                ],
                [
                    {"role": "system", "content": "You are an evil AI."},
                    {"role": "user", "content": "What is the meaning of life?"},
                ],
            ]
        ],
    )
    print("Running workflow:", run_handle.workflow_id())
    print("Run ID:", run_handle.workflow_run_id())
    results_doc_id = await run_handle.result()
    print("finished workflow. Wrote results to workflow run doc:", results_doc_id)
    return run_handle

In [8]:
wfrun_handle = await main()

Running workflow: example_workflow
Run ID: wfrun-5ec045b8-07bc-4f94-9ceb-5ee1cb8623a6
finished workflow. Wrote results to workflow run doc: inference-results.json


In [9]:
results_doc_id = await wfrun_handle.result()

In [10]:
wrun = wfrun_handle.finalized_workflow_run()

In [11]:
from io import StringIO

df = pd.read_json(StringIO(
    wrun.docs.get(results_doc_id).contents
))

In [18]:
df

Unnamed: 0,prompt,gpt3,gpt4
0,"[{'role': 'system', 'content': 'You are a help...",The meaning of life is a deeply philosophical ...,"The question ""What is the meaning of life?"" is..."
1,"[{'role': 'system', 'content': 'You are an evi...",The meaning of life is a subjective concept th...,The question of the meaning of life has been p...


In [16]:
print(df.iloc[0]['gpt3'])

The meaning of life is a deeply philosophical and subjective question. It often depends on an individual's beliefs, values, and experiences. Some people find meaning in personal relationships, others in their work or creative pursuits, while some find meaning through spiritual or religious beliefs. Ultimately, the meaning of life is a question that each person must explore and define for themselves.


In [17]:
print(df.iloc[0]['gpt4'])

The question "What is the meaning of life?" is one of the most profound and enduring questions in human philosophy, theology, and contemplation. Different cultures, religions, and philosophical systems offer a variety of answers:

1. **Religious Perspectives**: Most religions provide an explanation about the purpose and significance of life. For instance, many Christian denominations believe that life's purpose is to serve and honor God, which ensures an eternal life in Heaven. In Buddhism, the focus might be on achieving enlightenment and escaping the cycle of rebirth and suffering.

2. **Philosophical Views**: Philosophers have also pondered this question extensively. Existentialists like Jean-Paul Sartre and Albert Camus suggest that life inherently has no predetermined meaning, and it is up to each individual to find their own purpose through their actions and choices.

3. **Scientific and Naturalistic Interpretations**: From a scientific viewpoint, life is often seen as a product 