# BabyAGI User Guide

This notebook demonstrates how to implement [BabyAGI](https://github.com/yoheinakajima/babyagi/tree/main) by [Yohei Nakajima](https://twitter.com/yoheinakajima). BabyAGI is an AI agent that can generate and pretend to execute tasks based on a given objective.

* With API
* Using Bedrock instead of gpt-3.5

This guide will help you understand the components to create your own recursive agents.

Although BabyAGI uses specific vectorstores/model providers (Pinecone, OpenAI), one of the benefits of implementing it with LangChain is that you can easily swap those out for different options. In this implementation we use a FAISS vectorstore (because it runs locally and is free).

## Install and Import Required Modules

In [17]:
!pip install google-search-results --upgrade

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Collecting google-search-results
  Downloading google_search_results-2.4.2.tar.gz (18 kB)
  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: google-search-results
  Building wheel for google-search-results (setup.py) ... [?25ldone
[?25h  Created wheel for google-search-results: filename=google_search_results-2.4.2-py3-none-any.whl size=32003 sha256=4d2f01ab80bbbc6256e92d4e23db11fe4169cd9dd0c7db1543e17b70269154a3
  Stored in directory: /tmp/pip-ephem-wheel-cache-umh2llv4/wheels/68/8e/73/744b7d9d7ac618849d93081a20e1c0deccd2aef90901c9f5a9
Successfully built google-search-results
Installing collected packages: google-search-results
Successfully installed google-search-results-2.4.2


In [10]:
import os
import openai
from collections import deque
from typing import Dict, List, Optional, Any

from langchain import LLMChain, OpenAI, PromptTemplate
from langchain.embeddings import OpenAIEmbeddings
from langchain.llms import BaseLLM
from langchain.vectorstores.base import VectorStore
from pydantic import BaseModel, Field
from langchain.chains.base import Chain
import faiss
import boto3
import json

## Connect to the Vector Store

Depending on what vectorstore you use, this step may look different.

In [38]:
from langchain.vectorstores import FAISS
from langchain.docstore import InMemoryDocstore

### Bedrock credentials

In [1]:
def parse_credentials(file_path):
    credentials = {}
    with open(file_path, 'r') as file:
        current_user = None
        for line in file:
            line = line.strip()
            if line.startswith('[') and line.endswith(']'):
                current_user = line[1:-1]
                credentials[current_user] = {}
            elif '=' in line and current_user is not None:
                key, value = line.split('=', 1)
                credentials[current_user][key] = value
    return credentials

def get_key_from_credential_file(user, key_name, credential_file_path):
    credentials = parse_credentials(credential_file_path)

    if user in credentials:
        user_credentials = credentials[user]
        if key_name in user_credentials:
            return user_credentials[key_name]
        else:
            raise KeyError(f"'{key_name}' not found for user '{user}'.")
    else:
        raise KeyError(f"User '{user}' not found in the credential file.")
        
aws_access_key_id = get_key_from_credential_file('bedrock-sdk-1', 'aws_access_key_id', '/home/alfred/.aws/credentials')
aws_secret_access_key = get_key_from_credential_file('bedrock-sdk-1', 'aws_secret_access_key', '/home/alfred/.aws/credentials')

In [7]:
bedrock_url = 'https://bedrock.us-east-1.amazonaws.com'
boto3_bedrock = boto3.client(
    service_name='bedrock',
    region_name='us-east-1',
    aws_access_key_id=aws_access_key_id, 
    aws_secret_access_key=aws_secret_access_key,
    endpoint_url=bedrock_url
)

In [39]:
# Define your embedding model using OpenAI
openai.api_key =  os.environ.get('openai_api_token')
embeddings_model = OpenAIEmbeddings(openai_api_key=openai.api_key, model='text-embedding-ada-002')
# Initialize the vectorstore as empty

embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)
vectorstore = FAISS(embeddings_model.embed_query, index, InMemoryDocstore({}), {})

In [11]:
# (optional) Using Bedrock for embedding
embeddings = []
embedding_size = 1536
index = faiss.IndexFlatL2(embedding_size)

sentences =["Tell me a joke.", "Please describe to me what is unique about SF Bay Area."] 
def get_embedding(body, modelId, accept, contentType):
    response = boto3_bedrock.invoke_model(body=body, modelId=modelId, accept=accept, contentType=contentType)
    response_body = json.loads(response.get('body').read())
    embedding = response_body.get('embedding')
    return embedding

for sentence in sentences:
    body = json.dumps({"inputText": sentence})
    modelId = 'amazon.titan-e1t-medium'
    accept = 'application/json'
    contentType = 'application/json'

    embedding = get_embedding(body, modelId, accept, contentType)
    #print(embedding)
    '''
    query_response = bedrock.get_text_embedding(inputText=sentence, modelIdentifier=MODEL_IDENTIFIER)
    embedding = query_response["embedding"]
    '''
    embeddings.append(embedding)

In [12]:
embeddings

[[0.7890625,
  0.34960938,
  0.1328125,
  -0.047607422,
  0.64453125,
  0.6015625,
  0.69140625,
  -0.46875,
  -0.55078125,
  -0.61328125,
  -0.07080078,
  0.16503906,
  0.375,
  0.58984375,
  -0.033935547,
  0.42578125,
  0.31054688,
  -0.7265625,
  -0.13378906,
  0.37109375,
  -0.68359375,
  0.37109375,
  0.76171875,
  -0.6015625,
  -0.34960938,
  -0.59765625,
  0.3984375,
  0.24121094,
  -0.53125,
  -0.33203125,
  -0.16113281,
  0.24804688,
  -0.8984375,
  -0.3828125,
  -1.0078125,
  0.19433594,
  -1.109375,
  0.09326172,
  0.484375,
  0.45703125,
  -0.25390625,
  0.30078125,
  -0.17382812,
  -0.140625,
  -0.7578125,
  -0.009765625,
  0.38476562,
  -0.22363281,
  0.359375,
  0.22949219,
  0.045654297,
  -0.30859375,
  0.390625,
  -0.056396484,
  -0.22460938,
  0.38867188,
  0.24023438,
  -0.12695312,
  0.029052734,
  -0.54296875,
  0.002746582,
  0.00039672852,
  0.37109375,
  -0.484375,
  0.140625,
  -0.29492188,
  0.2890625,
  0.59375,
  0.10986328,
  0.58203125,
  -0.609375,
  0.

## Define the Chains

BabyAGI relies on three LLM chains:
- Task creation chain to select new tasks to add to the list
- Task prioritization chain to re-prioritize tasks
- Execution Chain to execute the tasks

In [27]:
class TaskCreationChain(LLMChain):
    """Chain to generates tasks."""

    @classmethod
    def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
        """Get the response parser."""
        task_creation_template = (
            "You are a task creation AI that uses the result of an execution agent"
            " to create new tasks with the following objective: {objective},"
            " The last completed task has the result: {result}."
            " This result was based on this task description: {task_description}."
            " These are incomplete tasks: {incomplete_tasks}."
            " Based on the result, create new tasks to be completed"
            " by the AI system that do not overlap with incomplete tasks."
            " Return the tasks as an array."
        )
        prompt = PromptTemplate(
            template=task_creation_template,
            input_variables=[
                "result",
                "task_description",
                "incomplete_tasks",
                "objective",
            ],
        )
        return cls(prompt=prompt, llm=llm, verbose=verbose)

In [29]:
class TaskPrioritizationChain(LLMChain):
    """Chain to prioritize tasks."""

    @classmethod
    def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
        """Get the response parser."""
        task_prioritization_template = (
            "You are a task prioritization AI tasked with cleaning the formatting of and reprioritizing"
            " the following tasks: {task_names}."
            " Consider the ultimate objective of your team: {objective}."
            " Do not remove any tasks. Return the result as a numbered list, like:"
            " #. First task"
            " #. Second task"
            " Start the task list with number {next_task_id}."
        )
        prompt = PromptTemplate(
            template=task_prioritization_template,
            input_variables=["task_names", "next_task_id", "objective"],
        )
        return cls(prompt=prompt, llm=llm, verbose=verbose)

In [30]:
class ExecutionChain(LLMChain):
    """Chain to execute tasks."""

    @classmethod
    def from_llm(cls, llm: BaseLLM, verbose: bool = True) -> LLMChain:
        """Get the response parser."""
        execution_template = (
            "You are an AI who performs one task based on the following objective: {objective}."
            " Take into account these previously completed tasks: {context}."
            " Your task: {task}."
            " Response:"
        )
        prompt = PromptTemplate(
            template=execution_template,
            input_variables=["objective", "context", "task"],
        )
        return cls(prompt=prompt, llm=llm, verbose=verbose)

## Zero shot and to do list

In [19]:
from langchain.agents import ZeroShotAgent, Tool, AgentExecutor
from langchain import OpenAI, SerpAPIWrapper, LLMChain
from langchain.llms.bedrock import Bedrock

todo_prompt = PromptTemplate.from_template(
    "You are a planner who is an expert at coming up with a todo list for a given objective. Come up with a todo list for this objective: {objective}"
)
#todo_chain = LLMChain(llm=OpenAI(temperature=0, openai_api_key=openai.api_key), prompt=todo_prompt)
todo_chain = LLMChain(llm=Bedrock(model_id="anthropic.claude-v1", client=boto3_bedrock), prompt=todo_prompt)

search = SerpAPIWrapper(serpapi_api_key=os.environ.get('serp_api_token'))
tools = [
    Tool(
        name="Search",
        func=search.run,
        description="useful for when you need to answer questions about current events",
    ),
    Tool(
        name="TODO",
        func=todo_chain.run,
        description="useful for when you need to come up with todo lists. Input: an objective to create a todo list for. Output: a todo list for that objective. Please be very clear what the objective is!",
    ),
]


prefix = """You are an AI who performs one task based on the following objective: {objective}. Take into account these previously completed tasks: {context}."""
suffix = """Question: {task}
{agent_scratchpad}"""
prompt = ZeroShotAgent.create_prompt(
    tools,
    prefix=prefix,
    suffix=suffix,
    input_variables=["objective", "task", "context", "agent_scratchpad"],
)

### Define the BabyAGI Controller

BabyAGI composes the chains defined above in a (potentially-)infinite loop.

In [20]:
def get_next_task(
    task_creation_chain: LLMChain,
    result: Dict,
    task_description: str,
    task_list: List[str],
    objective: str,
) -> List[Dict]:
    """Get the next task."""
    incomplete_tasks = ", ".join(task_list)
    response = task_creation_chain.run(
        result=result,
        task_description=task_description,
        incomplete_tasks=incomplete_tasks,
        objective=objective,
    )
    new_tasks = response.split("\n")
    return [{"task_name": task_name} for task_name in new_tasks if task_name.strip()]

In [21]:
def prioritize_tasks(
    task_prioritization_chain: LLMChain,
    this_task_id: int,
    task_list: List[Dict],
    objective: str,
) -> List[Dict]:
    """Prioritize tasks."""
    task_names = [t["task_name"] for t in task_list]
    next_task_id = int(this_task_id) + 1
    response = task_prioritization_chain.run(
        task_names=task_names, next_task_id=next_task_id, objective=objective
    )
    new_tasks = response.split("\n")
    prioritized_task_list = []
    for task_string in new_tasks:
        if not task_string.strip():
            continue
        task_parts = task_string.strip().split(".", 1)
        if len(task_parts) == 2:
            task_id = task_parts[0].strip()
            task_name = task_parts[1].strip()
            prioritized_task_list.append({"task_id": task_id, "task_name": task_name})
    return prioritized_task_list

In [22]:
def _get_top_tasks(vectorstore, query: str, k: int) -> List[str]:
    """Get the top k tasks based on the query."""
    results = vectorstore.similarity_search_with_score(query, k=k)
    if not results:
        return []
    sorted_results, _ = zip(*sorted(results, key=lambda x: x[1], reverse=True))
    return [str(item.metadata["task"]) for item in sorted_results]


def execute_task(
    vectorstore, execution_chain: LLMChain, objective: str, task: str, k: int = 5
) -> str:
    """Execute a task."""
    context = _get_top_tasks(vectorstore, query=objective, k=k)
    return execution_chain.run(objective=objective, context=context, task=task)

In [23]:
class BabyAGI(Chain, BaseModel):
    """Controller model for the BabyAGI agent."""

    task_list: deque = Field(default_factory=deque)
    task_creation_chain: TaskCreationChain = Field(...)
    task_prioritization_chain: TaskPrioritizationChain = Field(...)
    execution_chain: ExecutionChain = Field(...)
    task_id_counter: int = Field(1)
    vectorstore: VectorStore = Field(init=False)
    max_iterations: Optional[int] = None

    class Config:
        """Configuration for this pydantic object."""

        arbitrary_types_allowed = True

    def add_task(self, task: Dict):
        self.task_list.append(task)

    def print_task_list(self):
        print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
        for t in self.task_list:
            print(str(t["task_id"]) + ": " + t["task_name"])

    def print_next_task(self, task: Dict):
        print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
        print(str(task["task_id"]) + ": " + task["task_name"])

    def print_task_result(self, result: str):
        print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
        print(result)

    @property
    def input_keys(self) -> List[str]:
        return ["objective"]

    @property
    def output_keys(self) -> List[str]:
        return []

    def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Run the agent."""
        objective = inputs["objective"]
        first_task = inputs.get("first_task", "Make a todo list")
        self.add_task({"task_id": 1, "task_name": first_task})
        num_iters = 0
        while True:
            if self.task_list:
                self.print_task_list()

                # Step 1: Pull the first task
                task = self.task_list.popleft()
                self.print_next_task(task)

                # Step 2: Execute the task
                result = execute_task(
                    self.vectorstore, self.execution_chain, objective, task["task_name"]
                )
                this_task_id = int(task["task_id"])
                self.print_task_result(result)

                # Step 3: Store the result in Pinecone
                result_id = f"result_{task['task_id']}"
                self.vectorstore.add_texts(
                    texts=[result],
                    metadatas=[{"task": task["task_name"]}],
                    ids=[result_id],
                )

                # Step 4: Create new tasks and reprioritize task list
                new_tasks = get_next_task(
                    self.task_creation_chain,
                    result,
                    task["task_name"],
                    [t["task_name"] for t in self.task_list],
                    objective,
                )
                for new_task in new_tasks:
                    self.task_id_counter += 1
                    new_task.update({"task_id": self.task_id_counter})
                    self.add_task(new_task)
                self.task_list = deque(
                    prioritize_tasks(
                        self.task_prioritization_chain,
                        this_task_id,
                        list(self.task_list),
                        objective,
                    )
                )
            num_iters += 1
            if self.max_iterations is not None and num_iters == self.max_iterations:
                print(
                    "\033[91m\033[1m" + "\n*****TASK ENDING*****\n" + "\033[0m\033[0m"
                )
                break
        return {}

    @classmethod
    def from_llm(
        cls, llm: BaseLLM, vectorstore: VectorStore, verbose: bool = False, **kwargs
    ) -> "BabyAGI":
        """Initialize the BabyAGI Controller."""
        task_creation_chain = TaskCreationChain.from_llm(llm, verbose=verbose)
        task_prioritization_chain = TaskPrioritizationChain.from_llm(
            llm, verbose=verbose
        )
        execution_chain = ExecutionChain.from_llm(llm, verbose=verbose)
        return cls(
            task_creation_chain=task_creation_chain,
            task_prioritization_chain=task_prioritization_chain,
            execution_chain=execution_chain,
            vectorstore=vectorstore,
            **kwargs,
        )

NameError: name 'TaskCreationChain' is not defined

### WIth SerpAPI to call externally

In [68]:
from langchain.memory import ConversationBufferMemory
from langchain.agents import Tool, initialize_agent, load_tools, initialize_agent

memory = ConversationBufferMemory(memory_key="chat_history")

class BabyAGI_API(Chain, BaseModel):
    """Controller model for the BabyAGI agent."""

    task_list: deque = Field(default_factory=deque)
    task_creation_chain: TaskCreationChain = Field(...)
    task_prioritization_chain: TaskPrioritizationChain = Field(...)
    execution_chain: AgentExecutor = Field(...)
    task_id_counter: int = Field(1)
    vectorstore: VectorStore = Field(init=False)
    max_iterations: Optional[int] = None

    class Config:
        """Configuration for this pydantic object."""

        arbitrary_types_allowed = True

    def add_task(self, task: Dict):
        self.task_list.append(task)

    def print_task_list(self):
        print("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")
        for t in self.task_list:
            print(str(t["task_id"]) + ": " + t["task_name"])

    def print_next_task(self, task: Dict):
        print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")
        print(str(task["task_id"]) + ": " + task["task_name"])

    def print_task_result(self, result: str):
        print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")
        print(result)

    @property
    def input_keys(self) -> List[str]:
        return ["objective"]

    @property
    def output_keys(self) -> List[str]:
        return []

    def _call(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Run the agent."""
        objective = inputs["objective"]
        first_task = inputs.get("first_task", "Make a todo list")
        self.add_task({"task_id": 1, "task_name": first_task})
        num_iters = 0
        my_dict = {}
        while True:
            if self.task_list:
                self.print_task_list()

                # Step 1: Pull the first task
                task = self.task_list.popleft()
                self.print_next_task(task)

                # Step 2: Execute the task
                result = execute_task(
                    self.vectorstore, self.execution_chain, objective, task["task_name"]
                )
                this_task_id = int(task["task_id"])
                self.print_task_result(result)
                my_dict.update({"output": result})
   

                # Step 3: Store the result in Pinecone
                result_id = f"result_{task['task_id']}"
                self.vectorstore.add_texts(
                    texts=[result],
                    metadatas=[{"task": task["task_name"]}],
                    ids=[result_id],
                )

                # Step 4: Create new tasks and reprioritize task list
                new_tasks = get_next_task(
                    self.task_creation_chain,
                    result,
                    task["task_name"],
                    [t["task_name"] for t in self.task_list],
                    objective,
                )
                for new_task in new_tasks:
                    self.task_id_counter += 1
                    new_task.update({"task_id": self.task_id_counter})
                    self.add_task(new_task)
                self.task_list = deque(
                    prioritize_tasks(
                        self.task_prioritization_chain,
                        this_task_id,
                        list(self.task_list),
                        objective,
                    )
                )
            num_iters += 1
            if self.max_iterations is not None and num_iters == self.max_iterations:
                print(
                    "\033[91m\033[1m" + "\n*****TASK ENDING*****\n" + "\033[0m\033[0m"
                )
                break

        #return my_dict ## Need fix to return result as a dict but this does not seem to work for some reason????
        return{} 
        

    @classmethod
    def from_llm(
        cls, llm: BaseLLM, vectorstore: VectorStore, verbose: bool = False, **kwargs
    ) -> "BabyAGI":
        """Initialize the BabyAGI Controller."""
        task_creation_chain = TaskCreationChain.from_llm(llm, verbose=verbose)
        task_prioritization_chain = TaskPrioritizationChain.from_llm(
            llm, verbose=verbose
        )
        llm_chain = LLMChain(llm=llm, prompt=prompt)
        tool_names = [tool.name for tool in tools]
        agent = ZeroShotAgent(llm_chain=llm_chain, allowed_tools=tool_names)
        '''
        agent = initialize_agent(
            agent="zero-shot-react-description", 
            tools=tool_names, 
            llm=llm,
            verbose=True,
            memory=memory,
            max_iterations=3,
        )
        '''
        agent_executor = AgentExecutor.from_agent_and_tools(
            agent=agent, tools=tools, verbose=True
        )
        return cls(
            task_creation_chain=task_creation_chain,
            task_prioritization_chain=task_prioritization_chain,
            execution_chain=agent_executor,
            vectorstore=vectorstore,
            **kwargs,
        )

### Run the BabyAGI

Now it's time to create the BabyAGI controller and watch it try to accomplish your objective.

In [69]:
OBJECTIVE_1 = "Will the next 7-day average peak temperature in Seattle be higher than 60 degree?"
#OBJECTIVE_1 = "What happened to the First Republic Bank? Will the FED take the same action as it did on SVB's failure?"
#OBJECTIVE_1 = "Forecast Amazon earning numbers for the Q2, 2023 to be great than or less than the Street's estimate which is $4.68 per share"

In [70]:
llm_openai = OpenAI(temperature=0.01, openai_api_key=openai.api_key)
llm_bedrock = Bedrock(model_id="anthropic.claude-v1", client=boto3_bedrock)

In [71]:
# Logging of LLMChains
verbose = False
# If None, will keep on going forever
max_iterations: Optional[int] = 3
baby_agi = BabyAGI_API.from_llm(
    llm=llm_bedrock, vectorstore=vectorstore, verbose=verbose, max_iterations=max_iterations
)

In [72]:
baby_agi({"objective": OBJECTIVE_1})

Error in on_chain_start callback: 'name'


[95m[1m
*****TASK LIST*****
[0m[0m
1: Make a todo list
[92m[1m
*****NEXT TASK*****
[0m[0m
1: Make a todo list
[32;1m[1;3mThought: To make a todo list, I need to think of the steps
Action: TODO
Action Input: Will the next 7-day average peak temperature in Seattle be higher than 60 degree?[0m
Observation: [33;1m[1;3m

Here is a suggested todo list for that objective:

1. Gather historical 7-day average peak temperature data for Seattle for the past several years. This will provide context on typical temperatures for this time of year. 

2.[0m
Thought:[32;1m[1;3m To answer this question I will first gather current temperature data in Seattle and analyze the 7-day average peak temperature
Action: Search
Action Input: Current temperature in Seattle[0m
Observation: [36;1m[1;3mCloudy with showers. Low 48F. Winds SE at 5 to 10 mph. Chance of rain 40%. PRECIPITATION.[0m
Thought:

ValueError: Error raised by bedrock service: An error occurred (ThrottlingException) when calling the InvokeModel operation (reached max retries: 4): Too many requests, please wait before trying again. You have sent too many requests.  Wait before trying again.