Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rnd): add FastAPI support to existing project outline #7165

Merged
merged 19 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions rnd/autogpt_server/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Next Gen AutoGPT

This is a research project into creating the next generation of autogpt, which is an autogpt agent server.

The agent server will enable the creation of composite multi-agent system that utilize AutoGPT Agent as its default agent.


## Project Outline

Currently the project mainly consist of these components:

*agent_api*
A component that will expose API endpoints for the creation & execution of agents.
This component will make connections to the database to persist and read the agents.
It will also trigger the agent execution by pushing its execution request to the ExecutionQueue.

*agent_executor*
A component that will execute the agents.
This component will be a pool of processes/threads that will consume the ExecutionQueue and execute the agent accordingly.
The result and progress of its execution will be persisted in the database.
1 change: 1 addition & 0 deletions rnd/autogpt_server/autogpt_server/agent_api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .server import start_server # noqa
majdyz marked this conversation as resolved.
Show resolved Hide resolved
39 changes: 39 additions & 0 deletions rnd/autogpt_server/autogpt_server/agent_api/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import uvicorn
from fastapi import FastAPI, APIRouter

from autogpt_server.data import ExecutionQueue


class AgentServer:

def __init__(self, queue: ExecutionQueue):
self.app = FastAPI(
title="AutoGPT Agent Server",
description=(
"This server is used to execute agents that are created by the "
"AutoGPT system."
),
summary="AutoGPT Agent Server",
version="0.1",
)
self.execution_queue = queue

# Define the API routes
self.router = APIRouter()
self.router.add_api_route(
path="/agents/{agent_id}/execute",
endpoint=self.execute_agent,
methods=["POST"],
)
self.app.include_router(self.router)

def execute_agent(self, agent_id: str):
execution_id = self.execution_queue.add(agent_id)
return {"execution_id": execution_id, "agent_id": agent_id}


def start_server(queue: ExecutionQueue, use_uvicorn: bool = True):
app = AgentServer(queue).app
if use_uvicorn:
uvicorn.run(app)
return app
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .executor import start_executors # noqa
42 changes: 42 additions & 0 deletions rnd/autogpt_server/autogpt_server/agent_executor/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process

from autogpt_server.data import ExecutionQueue

logger = logging.getLogger(__name__)


class AgentExecutor:
# TODO: Replace this by an actual Agent Execution.
def __execute(id: str, data: str) -> None:
logger.warning(f"Executor processing started, execution_id: {id}, data: {data}")
for i in range(5):
logger.warning(
f"Executor processing step {i}, execution_id: {id}, data: {data}"
)
time.sleep(1)
logger.warning(
f"Executor processing completed, execution_id: {id}, data: {data}"
)

def start_executor(pool_size: int, queue: ExecutionQueue) -> None:
with ThreadPoolExecutor(max_workers=pool_size) as executor:
while True:
execution = queue.get()
if not execution:
time.sleep(1)
continue
executor.submit(
AgentExecutor.__execute,
execution.execution_id,
execution.data,
)


def start_executors(pool_size: int, queue: ExecutionQueue) -> None:
executor_process = Process(
target=AgentExecutor.start_executor, args=(pool_size, queue)
)
executor_process.start()
13 changes: 13 additions & 0 deletions rnd/autogpt_server/autogpt_server/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from autogpt_server.agent_api import start_server
from autogpt_server.agent_executor import start_executors
from autogpt_server.data import ExecutionQueue


def main() -> None:
queue = ExecutionQueue()
start_executors(5, queue)
start_server(queue)
majdyz marked this conversation as resolved.
Show resolved Hide resolved


if __name__ == "__main__":
main()
36 changes: 36 additions & 0 deletions rnd/autogpt_server/autogpt_server/data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import uuid
from multiprocessing import Queue


class Execution:
"""Data model for an execution of an Agent"""

def __init__(self, execution_id: str, data: str):
self.execution_id = execution_id
self.data = data


# TODO: This shared class make api & executor coupled in one machine.
# Replace this with a persistent & remote-hosted queue.
# One very likely candidate would be persisted Redis (Redis Queue).
# It will also open the possibility of using it for other purposes like
# caching, execution engine broker (like Celery), user session management etc.
class ExecutionQueue:
"""
Queue for managing the execution of agents.
This will be shared between different processes
"""

def __init__(self):
self.queue = Queue()

def add(self, data: str) -> str:
execution_id = uuid.uuid4()
self.queue.put(Execution(str(execution_id), data))
return str(execution_id)

def get(self) -> Execution | None:
return self.queue.get()

def empty(self) -> bool:
return self.queue.empty()
Loading
Loading