Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Autogen MemGPT agent #64

Merged
merged 1 commit into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added memgpt/autogen/__init__.py
Empty file.
158 changes: 158 additions & 0 deletions memgpt/autogen/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import json
import re

from colorama import Fore, Style, init
init(autoreset=True)


# DEBUG = True # puts full message outputs in the terminal
DEBUG = False # only dumps important messages in the terminal


class DummyInterface(object):

def set_message_list(self, message_list):
pass

async def internal_monologue(self, msg):
pass

async def assistant_message(self, msg):
pass

async def memory_message(self, msg):
pass

async def system_message(self, msg):
pass

async def user_message(self, msg, raw=False):
pass

async def function_message(self, msg):
pass


class AutoGenInterface(object):
"""AutoGen expects a single action return in its step loop, but MemGPT may take many actions.

To support AutoGen, we keep a buffer of all the steps that were taken using the interface abstraction,
then we concatenate it all and package back as a single 'assistant' ChatCompletion response.

The buffer needs to be wiped before each call to memgpt.agent.step()
"""

def __init__(self, message_list=None, show_user_message=False, fancy=True):
self.message_list = message_list
self.show_user_message = show_user_message
self.fancy = fancy # set to false to disable colored outputs + emoji prefixes

def reset_message_list(self):
"""Clears the buffer. Call before every agent.step() when using MemGPT+AutoGen"""
self.message_list = []

async def internal_monologue(self, msg):
# ANSI escape code for italic is '\x1B[3m'
message = f'\x1B[3m{Fore.LIGHTBLACK_EX}💭 {msg}{Style.RESET_ALL}' if self.fancy else f'[inner thoughts] {msg}'
self.message_list.append(message)

async def assistant_message(self, msg):
message = f'{Fore.YELLOW}{Style.BRIGHT}🤖 {Fore.YELLOW}{msg}{Style.RESET_ALL}' if self.fancy else msg
self.message_list.append(message)

async def memory_message(self, msg):
message = f'{Fore.LIGHTMAGENTA_EX}{Style.BRIGHT}🧠 {Fore.LIGHTMAGENTA_EX}{msg}{Style.RESET_ALL}' if self.fancy else f'[memory] {msg}'
self.message_list.append(message)

async def system_message(self, msg):
message = f'{Fore.MAGENTA}{Style.BRIGHT}🖥️ [system] {Fore.MAGENTA}{msg}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}'
self.message_list.append(message)

async def user_message(self, msg, raw=False):
if not self.show_user_message:
return

if isinstance(msg, str):
if raw:
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
self.message_list.append(message)
return
else:
try:
msg_json = json.loads(msg)
except:
print(f"Warning: failed to parse user message into json")
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
self.message_list.append(message)
return

if msg_json['type'] == 'user_message':
msg_json.pop('type')
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
elif msg_json['type'] == 'heartbeat':
if True or DEBUG:
msg_json.pop('type')
message = f'{Fore.GREEN}{Style.BRIGHT}💓 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system heartbeat] {msg}'
elif msg_json['type'] == 'system_message':
msg_json.pop('type')
message = f'{Fore.GREEN}{Style.BRIGHT}🖥️ {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}'
else:
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'

self.message_list.append(message)

async def function_message(self, msg):

if isinstance(msg, dict):
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}'
self.message_list.append(message)
return

if msg.startswith('Success: '):
message = f'{Fore.RED}{Style.BRIGHT}⚡🟢 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - OK] {msg}'
elif msg.startswith('Error: '):
message = f'{Fore.RED}{Style.BRIGHT}⚡🔴 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - error] {msg}'
elif msg.startswith('Running '):
if DEBUG:
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function] {msg}'
else:
if 'memory' in msg:
match = re.search(r'Running (\w+)\((.*)\)', msg)
if match:
function_name = match.group(1)
function_args = match.group(2)
message = f'{Fore.RED}{Style.BRIGHT}⚡🧠 [function] {Fore.RED}updating memory with {function_name}{Style.RESET_ALL}:' \
if self.fancy else f'[function] updating memory with {function_name}'
try:
msg_dict = eval(function_args)
if function_name == 'archival_memory_search':
message = f'{Fore.RED}\tquery: {msg_dict["query"]}, page: {msg_dict["page"]}' \
if self.fancy else f'[function] query: {msg_dict["query"]}, page: {msg_dict["page"]}'
else:
message = f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t{Fore.GREEN}→ {msg_dict["new_content"]}' \
if self.fancy else f'[old -> new] {msg_dict["old_content"]} -> {msg_dict["new_content"]}'
except Exception as e:
print(e)
message = msg_dict
else:
print(f"Warning: did not recognize function message")
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \
if self.fancy else f'[function] {msg}'
elif 'send_message' in msg:
# ignore in debug mode
message = None
else:
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \
if self.fancy else f'[function] {msg}'
else:
try:
msg_dict = json.loads(msg)
if "status" in msg_dict and msg_dict["status"] == "OK":
message = f'{Fore.GREEN}{Style.BRIGHT}⚡ [function] {Fore.GREEN}{msg}{Style.RESET_ALL}' \
if self.fancy else f'[function] {msg}'
except Exception:
print(f"Warning: did not recognize function message {type(msg)} {msg}")
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \
if self.fancy else f'[function] {msg}'

if message: self.message_list.append(message)
93 changes: 93 additions & 0 deletions memgpt/autogen/memgpt_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from autogen.agentchat import ConversableAgent, Agent
from ..agent import AgentAsync

from .. import system
from .. import constants

import asyncio
from typing import Callable, Optional, List, Dict, Union, Any, Tuple


def create_memgpt_autogen_agent_from_config(
name: str,
system_message: Optional[str] = "You are a helpful AI Assistant.",
is_termination_msg: Optional[Callable[[Dict], bool]] = None,
max_consecutive_auto_reply: Optional[int] = None,
human_input_mode: Optional[str] = "TERMINATE",
function_map: Optional[Dict[str, Callable]] = None,
code_execution_config: Optional[Union[Dict, bool]] = None,
llm_config: Optional[Union[Dict, bool]] = None,
default_auto_reply: Optional[Union[str, Dict, None]] = "",
):
"""
TODO support AutoGen config workflow in a clean way with constructors
"""
raise NotImplementedError


class MemGPTAgent(ConversableAgent):

def __init__(
self,
name: str,
agent: AgentAsync,
skip_verify=False
):
super().__init__(name)
self.agent = agent
self.skip_verify = skip_verify
self.register_reply([Agent, None], MemGPTAgent._a_generate_reply_for_user_message)
self.register_reply([Agent, None], MemGPTAgent._generate_reply_for_user_message)

def _generate_reply_for_user_message(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
return asyncio.run(self._a_generate_reply_for_user_message(messages=messages, sender=sender, config=config))

async def _a_generate_reply_for_user_message(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
ret = []
# for the interface
self.agent.interface.reset_message_list()

for msg in messages:
user_message = system.package_user_message(msg['content'])
while True:
new_messages, heartbeat_request, function_failed, token_warning = await self.agent.step(user_message, first_message=False, skip_verify=self.skip_verify)
ret.extend(new_messages)
# Skip user inputs if there's a memory warning, function execution failed, or the agent asked for control
if token_warning:
user_message = system.get_token_limit_warning()
elif function_failed:
user_message = system.get_heartbeat(constants.FUNC_FAILED_HEARTBEAT_MESSAGE)
elif heartbeat_request:
user_message = system.get_heartbeat(constants.REQ_HEARTBEAT_MESSAGE)
else:
break

# Pass back to AutoGen the pretty-printed calls MemGPT made to the interface
pretty_ret = MemGPTAgent.pretty_concat(self.agent.interface.message_list)
return True, pretty_ret

@staticmethod
def pretty_concat(messages):
"""AutoGen expects a single response, but MemGPT may take many steps.

To accomadate AutoGen, concatenate all of MemGPT's steps into one and return as a single message.
"""
ret = {
'role': 'assistant',
'content': ''
}
lines = []
for m in messages:
lines.append(f"{m}")
ret['content'] = '\n'.join(lines)
return ret