In [11]:
import os
import re
import importlib
from langchain.chat_models import ChatOpenAI
from langchain.schema import (
    AIMessage,
    HumanMessage,
    SystemMessage
)
from Prompts.ReACT.prompts import zeroshot_react_agent_prompt
from typing import List, Dict, Any
from pandas import DataFrame


OPENAI_API_KEY = os.getenv('OPEN_AI_API')

In [12]:
class ReactAgent:
    def __init__(self,
                 react_llm_name = 'gpt-4o-mini',
                 planner_llm_name = 'gpt-4o-mini',
                 mode: str = 'zero_shot',
                 tools: List[str] = None,
                 ) -> None: 
        self.react_name = react_llm_name
        self.answer = ''
        self.json_log = []
        self.mode = mode
        self.planner_name = planner_llm_name
        self.notebook = []
        
        self.current_observation = ''
        self.current_data = None

        self.tools = self.load_tools(tools, planner_model_name=planner_llm_name)
        

        if self.mode == 'zero_shot':
            self.agent_prompt = zeroshot_react_agent_prompt

        if 'gpt-4o-mini' in react_llm_name:
            stop_list = ['\n']
            self.max_token_length = 15000
            self.llm = ChatOpenAI(temperature=1,
                     max_tokens=256,
                     model_name=react_llm_name,
                     openai_api_key=OPENAI_API_KEY,
                     model_kwargs={"stop": stop_list})

    def run(self,query, reset=True) -> None:
        self.query = query

        if reset:
            self.__reset_agent()

        while not self.is_finished():
            self.step()
        
        return self.answer, self.scratchpad, self.json_log

    
    def step(self) -> None:
        #record the log
        self.json_log.append({"step": self.step_n, "thought":"",
                              "action": "", "observation": "", "state":""})
        #thought
        self.scratchpad += f'\nThought {self.step_n}:'
        self.scratchpad += ' ' + self.prompt_agent() #
        print(self.scratchpad.split('\n')[-1])
        self.json_log[-1]['thought'] = self.scratchpad.split('\n')[-1].replace(f'\nThought {self.step_n}:',"")
        
        
        #act
        self.scratchpad += f'\nAction {self.step_n}:'
        action = self.prompt_agent()
        self.scratchpad += ' ' + action
        print(self.scratchpad.split('\n')[-1])
        self.json_log[-1]['action'] = self.scratchpad.split('\n')[-1].replace(f'\nAction {self.step_n}:',"")
        
        #observation
        self.scratchpad += f'\nObservation {self.step_n}: '
        action_type, action_arg = parse_action(action)
        #print(action_arg)
        if action_type == 'AccommodationSearch':
            self.scratchpad = self.scratchpad.replace(to_string(self.current_data).strip(),'Masked due to limited length. Make sure the data has been written in Notebook.')
            self.current_data = self.tools['accommodations'].run(action_arg.split(',')[0],[p.strip() for p in action_arg.split('[')[1].strip('[]').split(',')])
            self.current_observation = str(to_string(self.current_data))
            self.scratchpad += self.current_observation
            self.notebook.append({'Description': 'Accommodation Choice', 'Content': self.current_data})
            self.json_log[-1]['state'] = f'Successful'

        elif action_type == 'AttractionSearch':
            self.scratchpad = self.scratchpad.replace(to_string(self.current_data).strip(),'Masked due to limited length. Make sure the data has been written in Notebook.')
            self.current_data = self.tools['attractions'].run(action_arg.split(',')[0],[action_arg.split(',')[1].strip()[1:][:-1]])
            self.current_observation = str(to_string(self.current_data))
            self.scratchpad += self.current_observation
            self.notebook.append({'Description': 'Attraction Choice', 'Content': self.current_data})
            self.json_log[-1]['state'] = f'Successful'

        elif action_type == 'RestaurantSearch': #action_arg = 'Cheap Budget, Indian, [Good Flavor, Good Value]'
            self.scratchpad = self.scratchpad.replace(to_string(self.current_data).strip(),'Masked due to limited length. Make sure the data has been written in Notebook.')
            self.current_data = self.tools['restaurants'].run(action_arg.split('[')[0].split(',')[0].strip(),action_arg.split('[')[0].split(',')[1].strip(),[a.strip() for a in action_arg.split('[')[1].strip()[:-1].split(',')])
            self.current_observation = str(to_string(self.current_data))
            self.scratchpad += self.current_observation
            self.notebook.append({'Description': 'Restaurant Choice', 'Content': self.current_data})
            self.json_log[-1]['state'] = f'Successful'

        #elif action_type == 'NotebookWrite':
        #    self.scratchpad = self.scratchpad.replace(to_string(self.current_data).strip(),'Masked due to limited length. Make sure the data has been written in Notebook.')
        #    self.current_observation = str(self.tools['notebook'].write(self.current_data, action_arg))
        #    self.scratchpad  +=  self.current_observation
        #    self.json_log[-1]['state'] = f'Successful'

        elif action_type == 'Planner':
            self.current_observation = str(self.tools['planner'].run(str(self.notebook),action_arg))
            self.scratchpad  +=  self.current_observation
            self.answer = self.current_observation
            self.json_log[-1]['state'] = f'Successful'
            print(self.current_observation)
        else:
            print('invalid action type')
        
        self.step_n += 1


        if action_type  and action_type == 'Planner':
            self.finished = True

            #print(self.scratchpad)
            #print(self.json_log)
            #print(self.notebook)

            
        return
    
    
    
    
    def is_finished(self) -> bool:
        return self.finished
    
    #def is_halted(self) -> bool:
    #    return ((self.step_n > self.max_steps) or (
    #                len(self.enc.encode(self._build_agent_prompt())) > self.max_token_length)) and not self.finished
    
    def __reset_agent(self) -> None:
        self.step_n = 1
        self.finished = False
        self.answer = ''
        self.scratchpad: str = ''
        self.json_log = []

    def prompt_agent(self) -> str:
        while True:
            request = format_step(self.llm([HumanMessage(content=self._build_agent_prompt())]).content)
            return request  
        
        
    def _build_agent_prompt(self) -> str:
        if self.mode == "zero_shot":
            return self.agent_prompt.format(
                query=self.query,
                scratchpad=self.scratchpad)
        
    def load_tools(self, tools: List[str], planner_model_name=None) -> Dict[str, Any]:
        tools_map = {}
        for tool_name in tools:
            module = importlib.import_module("tools.{}.apis".format(tool_name)) #
            
            # Avoid instantiating the planner tool twice, need to finish planner module before uncomment this
            if tool_name == 'planner' and planner_model_name is not None:
                tools_map[tool_name] = getattr(module, tool_name[0].upper()+tool_name[1:])(model_name=planner_model_name)
            else:
                tools_map[tool_name] = getattr(module, tool_name[0].upper()+tool_name[1:])()
        #print(tools_map)
        return tools_map
        

def format_step(step: str) -> str:
    return step.strip('\n').strip().replace('\n', '')

def parse_action(string):
    pattern = r'^(\w+)\[(.+)\]$'
    match = re.match(pattern, string)
    action_type = match.group(1)
    action_arg = match.group(2)
    return action_type,action_arg

#def parse action arg

def to_string(data) -> str:
    if data is not None:
        if type(data) == DataFrame:
            return data.to_string(index=False)
        else:
            return str(data)
    else:
        return str(None)

In [13]:
tools_list = ["notebook","attractions","accommodations","restaurants","planner"]
agent = ReactAgent(tools=tools_list)
with open('../Task1/Prompts/prompts/Prompt_2.txt','r') as f:
    query = f.read()

planner_results, scratchpad, action_log  = agent.run(query)
# find out why notebook write is used that less, compare the code carefully hehe

Thought 1: I need to gather information relevant to a 4-day itinerary focusing on family-oriented attractions, Greek cuisine restaurants with a good environment, and quality accommodations in a prime location while keeping a moderate budget in mind.
Action 1: AccommodationSearch[Moderate Budget,[Good Location, Good Service]]
Thought 2: I have gathered a range of accommodations that meet the criteria for good location and quality service. Now I need to find restaurants that serve Greek cuisine and have a good environment while also being within a moderate budget.
Action 2: RestaurantSearch[Moderate Budget, Greek, [Good Environment]]


TypeError: can only concatenate str (not "list") to str

In [7]:
#create a check the completeness of the notebook, or check the log file later

In [8]:
arg = 'Cheap Budget, Indian, [Good Flavor, Good Value]'
arg.split('[')[0].split(',')[0].strip()

'Cheap Budget'

In [9]:
arg.split('[')[0].split(',')[1].strip()

'Indian'

In [10]:
[a.strip() for a in arg.split('[')[1].strip()[:-1].split(',')]

['Good Flavor', 'Good Value']