In [None]:
import json
import os
from typing import Optional
from openai import OpenAI
from dotenv import load_dotenv
from abc import ABC, abstractmethod
from colorama import Fore
from exec_code import execute_code, extract_code_from_string
from prompt import CODE_DEVELOPER_SYSTEMPROMPT, COORDINATOR_SYSTEMPROMPT, CRITIC_ROLE_PROMPT, EXECUTE_UIDESIGN_SYSTEMPROMPT, OPTIMIZER_SYSTEMPROMPT, TESTER_SYSTEMPROMPT
load_dotenv()

client = OpenAI()

class Agent(ABC):
    
    def __init__(self, 
        model: Optional[str] = None, 
        system_prompt: Optional[str] = None
    ) -> None:
        
        model = model if model else os.getenv("OPENAI_DEFAULT_MODEL")
        
        self.memory = []
        self.model = model
        
        if system_prompt:
            self.memory.append({"role": "system", "content": system_prompt})
    
    def add_to_memory(self, role, message):
        self.memory.append({"role": role, "content": message})

    def get_schema(self):
        return self.function
        
    @abstractmethod
    def run(self,prompt):
        """User must define this method. Run the agent"""
    
class ConversationAgent(Agent):
    
    def __init__(self, 
        model: Optional[str] = None, 
        system_prompt: Optional[str] = None,
        name_agent: Optional[str] = None,
        description: Optional[str] = None
    ) -> None:
        super().__init__(model,system_prompt)

        self.name_agent = name_agent
        self.description = description

        self.function={
            "name": self.name_agent,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": {
                    "user_prompt": {
                        "type": "string",
                        "description": "User prompt to the agent"
                    }
                },
                "required": ["user_query"]
            }
        }
    
    def run(self,prompt):

        self.add_to_memory("user", prompt)
        completion=''
        stream = client.chat.completions.create(
            messages=self.memory,
            model=self.model,
            stream=True
        )

        for chunk in stream:
            text_chunk=chunk.choices[0].delta.content
            if text_chunk:
                completion+=text_chunk
                print(text_chunk, end='', flush=True)
        
        self.add_to_memory("assistant", completion)
        
        return completion

class CodeExecuterAgent(Agent):

    def __init__(self, 
        model: Optional[str] = None, 
        system_prompt: Optional[str] = None,
        name_agent: Optional[str] = None,
        description: Optional[str] = None
    ) -> None:
        super().__init__(model,system_prompt)

        self.name_agent = name_agent
        self.description = description

        self.function={
            "name": self.name_agent,
            "description": self.description,
            "parameters": {
                "type": "object",
                "properties": {
                    "user_prompt": {
                        "type": "string",
                        "description": "User prompt to the agent"
                    }
                },
                "required": ["user_query"]
            }
        }
    
    def run(self,prompt):

        completion=''
        code_exec_result=''
        code_to_execute = extract_code_from_string(prompt)
        if code_to_execute:
            try:
                print("Executing code...")
                exec_response = execute_code(code_to_execute,use_docker=False)
                if exec_response[0] == 0:
                    print(f"Code executed successfully")
                    code_exec_result=f'{prompt}\n\nCode executed successfully'
                else:
                    print(f"Error executing code: {exec_response[1]}")
                    code_exec_result=f'{prompt}\n\nError executing code: {exec_response[1]}'
            except Exception as e:
                print(f"Error executing code: {e}")
                code_exec_result=f'{prompt}\n\nError executing code: {e}'
        else:
            print(f"No code found to execute.")
            prompt+=f'{prompt}\n\nError executing code: bad format'
        
        prompt+=code_exec_result
        
        self.add_to_memory("user", prompt)
        
        stream = client.chat.completions.create(
            messages=self.memory,
            model=self.model,
            stream=True
        )

        for chunk in stream:
            text_chunk=chunk.choices[0].delta.content
            if text_chunk:
                completion+=text_chunk
                print(text_chunk, end='', flush=True)
        
        self.add_to_memory("assistant", completion)
        
        return completion

In [None]:

class AgentManager(Agent):
    
    def __init__(self, 
        model: Optional[str] = None, 
        system_prompt: Optional[str] = None,
        list_agents: Optional[list] = None,
        max_iteration: Optional[int] = 10
    ) -> None:
        super().__init__(model,system_prompt)
        
        self.list_agents=list_agents
        self.max_iteration=max_iteration
    
    def get_agent_list_schemas(self):
        return [agent.get_schema() for agent in self.list_agents]   
    
    def run(self,prompt):
        
        print(Fore.MAGENTA,f"----------------- RUN MANAGER -----------------")
        print(f"Manager: I'm thinking the best agent to solve the user query -> {prompt}")
        
        # Choose what agent i have to execute to resolve the problem
        messages = self.memory + [{"role": "user", "content": f"[PROMPT]\n{prompt}\n\n[AGENTS]\n{self.get_agent_list_schemas()}"}]
        chat_completion = client.chat.completions.create(
            messages=messages,
            model=self.model,
            functions=self.get_agent_list_schemas()
        )
        
        # Get the agent to execute
        solver_agent_name=chat_completion.choices[0].message.function_call.name
        solver_agent_prompt=json.loads(chat_completion.choices[0].message.function_call.arguments)['user_prompt']
        
        print(f"Manager: the best agent to solve this is -> {solver_agent_name}")
        self.add_to_memory("user", solver_agent_prompt)
        
        #get the class agent to execute
        solver_agent_class = next((agent for agent in self.list_agents if agent.name_agent == solver_agent_name), None)
        
        # Se crea una nueva lista para alamacenar los agentes que actuarán como revisores
        reviewers_agents = self.list_agents.copy()
        
        # Se elimina el agente solver de la lista de revisores
        reviewers_agents.remove(solver_agent_class)      
        
        main_proposal = solver_agent_prompt
        
        feedback=''

        base_colors = [Fore.CYAN, Fore.YELLOW, Fore.BLUE]
        
        # Steps to execute
        for i in range(self.max_iteration):
            
            print(Fore.MAGENTA,f"\n\n-----------------> ITERATION: {i+1}")
            
            # press enter to continue
            #input("Press Enter to continue...")
            
            agree_agents = 0
            for index, reviewer_agent in enumerate(reviewers_agents):
                
                # If we didn't get agreement from all agents, solver gets a chance to update the proposal
                if "Code is fine" in feedback:
                    agree_agents += 1
                else:
                    print(Fore.GREEN, f"\n\nAgent Solver -> {solver_agent_class.name_agent} is proposing a solution")
                    # El solver_agent comienza con una propuesta inicial
                    main_proposal = solver_agent_class.run(f"{main_proposal}")
                    agree_agents = 0
                    
                # Colores base de Colorama
                color = base_colors[index % len(base_colors)]
                
                print(color,f"\n\nFeedback from: {reviewer_agent.name_agent}")

                # Cada agente revisor proporciona su retroalimentación
                feedback = reviewer_agent.run(f'{main_proposal}. {CRITIC_ROLE_PROMPT}')

                # Añade la retroalimentación a la memoria del solver_agent
                solver_agent_class.add_to_memory("user", f"{reviewer_agent.name_agent}: {feedback}")
                
                print(f'len(reviewers_agents): {len(reviewers_agents)}')
                print(f'agree_agents: {agree_agents}')
            
            if agree_agents >= len(reviewers_agents):
                break
                    
        print(Fore.MAGENTA, f"Manager: Consensus reached with all agents!!!")
        print(f"Manager: the final solution is -> \n\n{main_proposal}")
        
        return main_proposal

In [None]:
CodeDeveloper=ConversationAgent(
    model="gpt-4-1106-preview",
    name_agent="CodeDeveloper",
    description="Agent to create code in python",
    system_prompt=CODE_DEVELOPER_SYSTEMPROMPT
)

UIDesignDeveloper=ConversationAgent(
    model="gpt-4-1106-preview",
    name_agent="UIDesignDeveloper",
    description="Agent to resolve UI design problems in python",
    system_prompt=EXECUTE_UIDESIGN_SYSTEMPROMPT
)

TesterDeveloper=CodeExecuterAgent(
    model="gpt-4-1106-preview",
    name_agent="TesterDeveloper",
    description="Agent to execute and test code",
    system_prompt=TESTER_SYSTEMPROMPT
)

OptimizationAgent=ConversationAgent(
    model="gpt-4-1106-preview",
    name_agent="OptimizationAgent",
    description="Agent to optimize code",
    system_prompt=OPTIMIZER_SYSTEMPROMPT
)

# Manager Agents
manager=AgentManager(
    model="gpt-4-1106-preview",
    list_agents=[
        CodeDeveloper,         
        UIDesignDeveloper,     
        OptimizationAgent,     
        TesterDeveloper        
    ],
    system_prompt=COORDINATOR_SYSTEMPROMPT
)

In [None]:
manager.run("Create a snake game")