-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #64 from cpacker/autogen
Add Autogen MemGPT agent Co-authored-By: Charles Packer <contact@charlespacker.com>
- Loading branch information
Showing
3 changed files
with
251 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
import json | ||
import re | ||
|
||
from colorama import Fore, Style, init | ||
init(autoreset=True) | ||
|
||
|
||
# DEBUG = True # puts full message outputs in the terminal | ||
DEBUG = False # only dumps important messages in the terminal | ||
|
||
|
||
class DummyInterface(object): | ||
|
||
def set_message_list(self, message_list): | ||
pass | ||
|
||
async def internal_monologue(self, msg): | ||
pass | ||
|
||
async def assistant_message(self, msg): | ||
pass | ||
|
||
async def memory_message(self, msg): | ||
pass | ||
|
||
async def system_message(self, msg): | ||
pass | ||
|
||
async def user_message(self, msg, raw=False): | ||
pass | ||
|
||
async def function_message(self, msg): | ||
pass | ||
|
||
|
||
class AutoGenInterface(object): | ||
"""AutoGen expects a single action return in its step loop, but MemGPT may take many actions. | ||
To support AutoGen, we keep a buffer of all the steps that were taken using the interface abstraction, | ||
then we concatenate it all and package back as a single 'assistant' ChatCompletion response. | ||
The buffer needs to be wiped before each call to memgpt.agent.step() | ||
""" | ||
|
||
def __init__(self, message_list=None, show_user_message=False, fancy=True): | ||
self.message_list = message_list | ||
self.show_user_message = show_user_message | ||
self.fancy = fancy # set to false to disable colored outputs + emoji prefixes | ||
|
||
def reset_message_list(self): | ||
"""Clears the buffer. Call before every agent.step() when using MemGPT+AutoGen""" | ||
self.message_list = [] | ||
|
||
async def internal_monologue(self, msg): | ||
# ANSI escape code for italic is '\x1B[3m' | ||
message = f'\x1B[3m{Fore.LIGHTBLACK_EX}💭 {msg}{Style.RESET_ALL}' if self.fancy else f'[inner thoughts] {msg}' | ||
self.message_list.append(message) | ||
|
||
async def assistant_message(self, msg): | ||
message = f'{Fore.YELLOW}{Style.BRIGHT}🤖 {Fore.YELLOW}{msg}{Style.RESET_ALL}' if self.fancy else msg | ||
self.message_list.append(message) | ||
|
||
async def memory_message(self, msg): | ||
message = f'{Fore.LIGHTMAGENTA_EX}{Style.BRIGHT}🧠 {Fore.LIGHTMAGENTA_EX}{msg}{Style.RESET_ALL}' if self.fancy else f'[memory] {msg}' | ||
self.message_list.append(message) | ||
|
||
async def system_message(self, msg): | ||
message = f'{Fore.MAGENTA}{Style.BRIGHT}🖥️ [system] {Fore.MAGENTA}{msg}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}' | ||
self.message_list.append(message) | ||
|
||
async def user_message(self, msg, raw=False): | ||
if not self.show_user_message: | ||
return | ||
|
||
if isinstance(msg, str): | ||
if raw: | ||
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' | ||
self.message_list.append(message) | ||
return | ||
else: | ||
try: | ||
msg_json = json.loads(msg) | ||
except: | ||
print(f"Warning: failed to parse user message into json") | ||
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' | ||
self.message_list.append(message) | ||
return | ||
|
||
if msg_json['type'] == 'user_message': | ||
msg_json.pop('type') | ||
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' | ||
elif msg_json['type'] == 'heartbeat': | ||
if True or DEBUG: | ||
msg_json.pop('type') | ||
message = f'{Fore.GREEN}{Style.BRIGHT}💓 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system heartbeat] {msg}' | ||
elif msg_json['type'] == 'system_message': | ||
msg_json.pop('type') | ||
message = f'{Fore.GREEN}{Style.BRIGHT}🖥️ {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}' | ||
else: | ||
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' | ||
|
||
self.message_list.append(message) | ||
|
||
async def function_message(self, msg): | ||
|
||
if isinstance(msg, dict): | ||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' | ||
self.message_list.append(message) | ||
return | ||
|
||
if msg.startswith('Success: '): | ||
message = f'{Fore.RED}{Style.BRIGHT}⚡🟢 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - OK] {msg}' | ||
elif msg.startswith('Error: '): | ||
message = f'{Fore.RED}{Style.BRIGHT}⚡🔴 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - error] {msg}' | ||
elif msg.startswith('Running '): | ||
if DEBUG: | ||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function] {msg}' | ||
else: | ||
if 'memory' in msg: | ||
match = re.search(r'Running (\w+)\((.*)\)', msg) | ||
if match: | ||
function_name = match.group(1) | ||
function_args = match.group(2) | ||
message = f'{Fore.RED}{Style.BRIGHT}⚡🧠 [function] {Fore.RED}updating memory with {function_name}{Style.RESET_ALL}:' \ | ||
if self.fancy else f'[function] updating memory with {function_name}' | ||
try: | ||
msg_dict = eval(function_args) | ||
if function_name == 'archival_memory_search': | ||
message = f'{Fore.RED}\tquery: {msg_dict["query"]}, page: {msg_dict["page"]}' \ | ||
if self.fancy else f'[function] query: {msg_dict["query"]}, page: {msg_dict["page"]}' | ||
else: | ||
message = f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t{Fore.GREEN}→ {msg_dict["new_content"]}' \ | ||
if self.fancy else f'[old -> new] {msg_dict["old_content"]} -> {msg_dict["new_content"]}' | ||
except Exception as e: | ||
print(e) | ||
message = msg_dict | ||
else: | ||
print(f"Warning: did not recognize function message") | ||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \ | ||
if self.fancy else f'[function] {msg}' | ||
elif 'send_message' in msg: | ||
# ignore in debug mode | ||
message = None | ||
else: | ||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \ | ||
if self.fancy else f'[function] {msg}' | ||
else: | ||
try: | ||
msg_dict = json.loads(msg) | ||
if "status" in msg_dict and msg_dict["status"] == "OK": | ||
message = f'{Fore.GREEN}{Style.BRIGHT}⚡ [function] {Fore.GREEN}{msg}{Style.RESET_ALL}' \ | ||
if self.fancy else f'[function] {msg}' | ||
except Exception: | ||
print(f"Warning: did not recognize function message {type(msg)} {msg}") | ||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \ | ||
if self.fancy else f'[function] {msg}' | ||
|
||
if message: self.message_list.append(message) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
from autogen.agentchat import ConversableAgent, Agent | ||
from ..agent import AgentAsync | ||
|
||
from .. import system | ||
from .. import constants | ||
|
||
import asyncio | ||
from typing import Callable, Optional, List, Dict, Union, Any, Tuple | ||
|
||
|
||
def create_memgpt_autogen_agent_from_config( | ||
name: str, | ||
system_message: Optional[str] = "You are a helpful AI Assistant.", | ||
is_termination_msg: Optional[Callable[[Dict], bool]] = None, | ||
max_consecutive_auto_reply: Optional[int] = None, | ||
human_input_mode: Optional[str] = "TERMINATE", | ||
function_map: Optional[Dict[str, Callable]] = None, | ||
code_execution_config: Optional[Union[Dict, bool]] = None, | ||
llm_config: Optional[Union[Dict, bool]] = None, | ||
default_auto_reply: Optional[Union[str, Dict, None]] = "", | ||
): | ||
""" | ||
TODO support AutoGen config workflow in a clean way with constructors | ||
""" | ||
raise NotImplementedError | ||
|
||
|
||
class MemGPTAgent(ConversableAgent): | ||
|
||
def __init__( | ||
self, | ||
name: str, | ||
agent: AgentAsync, | ||
skip_verify=False | ||
): | ||
super().__init__(name) | ||
self.agent = agent | ||
self.skip_verify = skip_verify | ||
self.register_reply([Agent, None], MemGPTAgent._a_generate_reply_for_user_message) | ||
self.register_reply([Agent, None], MemGPTAgent._generate_reply_for_user_message) | ||
|
||
def _generate_reply_for_user_message( | ||
self, | ||
messages: Optional[List[Dict]] = None, | ||
sender: Optional[Agent] = None, | ||
config: Optional[Any] = None, | ||
) -> Tuple[bool, Union[str, Dict, None]]: | ||
return asyncio.run(self._a_generate_reply_for_user_message(messages=messages, sender=sender, config=config)) | ||
|
||
async def _a_generate_reply_for_user_message( | ||
self, | ||
messages: Optional[List[Dict]] = None, | ||
sender: Optional[Agent] = None, | ||
config: Optional[Any] = None, | ||
) -> Tuple[bool, Union[str, Dict, None]]: | ||
ret = [] | ||
# for the interface | ||
self.agent.interface.reset_message_list() | ||
|
||
for msg in messages: | ||
user_message = system.package_user_message(msg['content']) | ||
while True: | ||
new_messages, heartbeat_request, function_failed, token_warning = await self.agent.step(user_message, first_message=False, skip_verify=self.skip_verify) | ||
ret.extend(new_messages) | ||
# Skip user inputs if there's a memory warning, function execution failed, or the agent asked for control | ||
if token_warning: | ||
user_message = system.get_token_limit_warning() | ||
elif function_failed: | ||
user_message = system.get_heartbeat(constants.FUNC_FAILED_HEARTBEAT_MESSAGE) | ||
elif heartbeat_request: | ||
user_message = system.get_heartbeat(constants.REQ_HEARTBEAT_MESSAGE) | ||
else: | ||
break | ||
|
||
# Pass back to AutoGen the pretty-printed calls MemGPT made to the interface | ||
pretty_ret = MemGPTAgent.pretty_concat(self.agent.interface.message_list) | ||
return True, pretty_ret | ||
|
||
@staticmethod | ||
def pretty_concat(messages): | ||
"""AutoGen expects a single response, but MemGPT may take many steps. | ||
To accomadate AutoGen, concatenate all of MemGPT's steps into one and return as a single message. | ||
""" | ||
ret = { | ||
'role': 'assistant', | ||
'content': '' | ||
} | ||
lines = [] | ||
for m in messages: | ||
lines.append(f"{m}") | ||
ret['content'] = '\n'.join(lines) | ||
return ret |