From 8bdce98643fb5f7c0531b2ba349d7cc4728133d6 Mon Sep 17 00:00:00 2001 From: Vivian Fang Date: Fri, 20 Oct 2023 19:59:33 -0700 Subject: [PATCH] Add Autogen MemGPT agent Co-authored-By: Charles Packer --- memgpt/autogen/__init__.py | 0 memgpt/autogen/interface.py | 158 +++++++++++++++++++++++++++++++++ memgpt/autogen/memgpt_agent.py | 93 +++++++++++++++++++ 3 files changed, 251 insertions(+) create mode 100644 memgpt/autogen/__init__.py create mode 100644 memgpt/autogen/interface.py create mode 100644 memgpt/autogen/memgpt_agent.py diff --git a/memgpt/autogen/__init__.py b/memgpt/autogen/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/memgpt/autogen/interface.py b/memgpt/autogen/interface.py new file mode 100644 index 0000000000..15d9cc270e --- /dev/null +++ b/memgpt/autogen/interface.py @@ -0,0 +1,158 @@ +import json +import re + +from colorama import Fore, Style, init +init(autoreset=True) + + +# DEBUG = True # puts full message outputs in the terminal +DEBUG = False # only dumps important messages in the terminal + + +class DummyInterface(object): + + def set_message_list(self, message_list): + pass + + async def internal_monologue(self, msg): + pass + + async def assistant_message(self, msg): + pass + + async def memory_message(self, msg): + pass + + async def system_message(self, msg): + pass + + async def user_message(self, msg, raw=False): + pass + + async def function_message(self, msg): + pass + + +class AutoGenInterface(object): + """AutoGen expects a single action return in its step loop, but MemGPT may take many actions. + + To support AutoGen, we keep a buffer of all the steps that were taken using the interface abstraction, + then we concatenate it all and package back as a single 'assistant' ChatCompletion response. + + The buffer needs to be wiped before each call to memgpt.agent.step() + """ + + def __init__(self, message_list=None, show_user_message=False, fancy=True): + self.message_list = message_list + self.show_user_message = show_user_message + self.fancy = fancy # set to false to disable colored outputs + emoji prefixes + + def reset_message_list(self): + """Clears the buffer. Call before every agent.step() when using MemGPT+AutoGen""" + self.message_list = [] + + async def internal_monologue(self, msg): + # ANSI escape code for italic is '\x1B[3m' + message = f'\x1B[3m{Fore.LIGHTBLACK_EX}💭 {msg}{Style.RESET_ALL}' if self.fancy else f'[inner thoughts] {msg}' + self.message_list.append(message) + + async def assistant_message(self, msg): + message = f'{Fore.YELLOW}{Style.BRIGHT}🤖 {Fore.YELLOW}{msg}{Style.RESET_ALL}' if self.fancy else msg + self.message_list.append(message) + + async def memory_message(self, msg): + message = f'{Fore.LIGHTMAGENTA_EX}{Style.BRIGHT}🧠 {Fore.LIGHTMAGENTA_EX}{msg}{Style.RESET_ALL}' if self.fancy else f'[memory] {msg}' + self.message_list.append(message) + + async def system_message(self, msg): + message = f'{Fore.MAGENTA}{Style.BRIGHT}🖥️ [system] {Fore.MAGENTA}{msg}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}' + self.message_list.append(message) + + async def user_message(self, msg, raw=False): + if not self.show_user_message: + return + + if isinstance(msg, str): + if raw: + message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' + self.message_list.append(message) + return + else: + try: + msg_json = json.loads(msg) + except: + print(f"Warning: failed to parse user message into json") + message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' + self.message_list.append(message) + return + + if msg_json['type'] == 'user_message': + msg_json.pop('type') + message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' + elif msg_json['type'] == 'heartbeat': + if True or DEBUG: + msg_json.pop('type') + message = f'{Fore.GREEN}{Style.BRIGHT}💓 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system heartbeat] {msg}' + elif msg_json['type'] == 'system_message': + msg_json.pop('type') + message = f'{Fore.GREEN}{Style.BRIGHT}🖥️ {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}' + else: + message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' + + self.message_list.append(message) + + async def function_message(self, msg): + + if isinstance(msg, dict): + message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' + self.message_list.append(message) + return + + if msg.startswith('Success: '): + message = f'{Fore.RED}{Style.BRIGHT}⚡🟢 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - OK] {msg}' + elif msg.startswith('Error: '): + message = f'{Fore.RED}{Style.BRIGHT}⚡🔴 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - error] {msg}' + elif msg.startswith('Running '): + if DEBUG: + message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function] {msg}' + else: + if 'memory' in msg: + match = re.search(r'Running (\w+)\((.*)\)', msg) + if match: + function_name = match.group(1) + function_args = match.group(2) + message = f'{Fore.RED}{Style.BRIGHT}⚡🧠 [function] {Fore.RED}updating memory with {function_name}{Style.RESET_ALL}:' \ + if self.fancy else f'[function] updating memory with {function_name}' + try: + msg_dict = eval(function_args) + if function_name == 'archival_memory_search': + message = f'{Fore.RED}\tquery: {msg_dict["query"]}, page: {msg_dict["page"]}' \ + if self.fancy else f'[function] query: {msg_dict["query"]}, page: {msg_dict["page"]}' + else: + message = f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t{Fore.GREEN}→ {msg_dict["new_content"]}' \ + if self.fancy else f'[old -> new] {msg_dict["old_content"]} -> {msg_dict["new_content"]}' + except Exception as e: + print(e) + message = msg_dict + else: + print(f"Warning: did not recognize function message") + message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \ + if self.fancy else f'[function] {msg}' + elif 'send_message' in msg: + # ignore in debug mode + message = None + else: + message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \ + if self.fancy else f'[function] {msg}' + else: + try: + msg_dict = json.loads(msg) + if "status" in msg_dict and msg_dict["status"] == "OK": + message = f'{Fore.GREEN}{Style.BRIGHT}⚡ [function] {Fore.GREEN}{msg}{Style.RESET_ALL}' \ + if self.fancy else f'[function] {msg}' + except Exception: + print(f"Warning: did not recognize function message {type(msg)} {msg}") + message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \ + if self.fancy else f'[function] {msg}' + + if message: self.message_list.append(message) diff --git a/memgpt/autogen/memgpt_agent.py b/memgpt/autogen/memgpt_agent.py new file mode 100644 index 0000000000..a1dce92f89 --- /dev/null +++ b/memgpt/autogen/memgpt_agent.py @@ -0,0 +1,93 @@ +from autogen.agentchat import ConversableAgent, Agent +from ..agent import AgentAsync + +from .. import system +from .. import constants + +import asyncio +from typing import Callable, Optional, List, Dict, Union, Any, Tuple + + +def create_memgpt_autogen_agent_from_config( + name: str, + system_message: Optional[str] = "You are a helpful AI Assistant.", + is_termination_msg: Optional[Callable[[Dict], bool]] = None, + max_consecutive_auto_reply: Optional[int] = None, + human_input_mode: Optional[str] = "TERMINATE", + function_map: Optional[Dict[str, Callable]] = None, + code_execution_config: Optional[Union[Dict, bool]] = None, + llm_config: Optional[Union[Dict, bool]] = None, + default_auto_reply: Optional[Union[str, Dict, None]] = "", +): + """ + TODO support AutoGen config workflow in a clean way with constructors + """ + raise NotImplementedError + + +class MemGPTAgent(ConversableAgent): + + def __init__( + self, + name: str, + agent: AgentAsync, + skip_verify=False + ): + super().__init__(name) + self.agent = agent + self.skip_verify = skip_verify + self.register_reply([Agent, None], MemGPTAgent._a_generate_reply_for_user_message) + self.register_reply([Agent, None], MemGPTAgent._generate_reply_for_user_message) + + def _generate_reply_for_user_message( + self, + messages: Optional[List[Dict]] = None, + sender: Optional[Agent] = None, + config: Optional[Any] = None, + ) -> Tuple[bool, Union[str, Dict, None]]: + return asyncio.run(self._a_generate_reply_for_user_message(messages=messages, sender=sender, config=config)) + + async def _a_generate_reply_for_user_message( + self, + messages: Optional[List[Dict]] = None, + sender: Optional[Agent] = None, + config: Optional[Any] = None, + ) -> Tuple[bool, Union[str, Dict, None]]: + ret = [] + # for the interface + self.agent.interface.reset_message_list() + + for msg in messages: + user_message = system.package_user_message(msg['content']) + while True: + new_messages, heartbeat_request, function_failed, token_warning = await self.agent.step(user_message, first_message=False, skip_verify=self.skip_verify) + ret.extend(new_messages) + # Skip user inputs if there's a memory warning, function execution failed, or the agent asked for control + if token_warning: + user_message = system.get_token_limit_warning() + elif function_failed: + user_message = system.get_heartbeat(constants.FUNC_FAILED_HEARTBEAT_MESSAGE) + elif heartbeat_request: + user_message = system.get_heartbeat(constants.REQ_HEARTBEAT_MESSAGE) + else: + break + + # Pass back to AutoGen the pretty-printed calls MemGPT made to the interface + pretty_ret = MemGPTAgent.pretty_concat(self.agent.interface.message_list) + return True, pretty_ret + + @staticmethod + def pretty_concat(messages): + """AutoGen expects a single response, but MemGPT may take many steps. + + To accomadate AutoGen, concatenate all of MemGPT's steps into one and return as a single message. + """ + ret = { + 'role': 'assistant', + 'content': '' + } + lines = [] + for m in messages: + lines.append(f"{m}") + ret['content'] = '\n'.join(lines) + return ret