From 04ea2915a2be9cf70215877c8b7809fd1d14cffd Mon Sep 17 00:00:00 2001 From: zhongyang Date: Thu, 9 Nov 2023 16:09:43 +0800 Subject: [PATCH 01/10] update get rpm from api.openai.com --- config/config.yaml | 4 ++++ metagpt/config.py | 1 + metagpt/provider/openai_api.py | 30 +++++++++++++++++++++++++++++- 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/config/config.yaml b/config/config.yaml index b2c50991d..c27146a18 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -9,6 +9,10 @@ OPENAI_API_BASE: "https://api.openai.com/v1" #OPENAI_PROXY: "http://127.0.0.1:8118" #OPENAI_API_KEY: "YOUR_API_KEY" OPENAI_API_MODEL: "gpt-4" +## If you want to automate the setup for RPM and MAX_TOKENS, please log in through your own browser at https://platform.openai.com/account/limits, enable developer mode, and find 'rate_limits' in the network section. +## If it's not found, refresh the recording using Ctrl+R. After locating 'rate_limits', find 'Authorization: sess-xxx' in the request header, where 'sess-xxx' is the session key. +## If you wish to manually set RPM, please set RPM and either do not set OPENAI_LIMIT_SESSION_KEY or set it to "". +# OPENAI_LIMIT_SESSION_KEY: "YOUR_API_LIMITS_KEY" MAX_TOKENS: 1500 RPM: 10 diff --git a/metagpt/config.py b/metagpt/config.py index 27455d38d..9cbadb8ae 100644 --- a/metagpt/config.py +++ b/metagpt/config.py @@ -44,6 +44,7 @@ def __init__(self, yaml_file=default_yaml_file): logger.info("Config loading done.") self.global_proxy = self._get("GLOBAL_PROXY") self.openai_api_key = self._get("OPENAI_API_KEY") + self.openai_limit_session_key = self._get("OPENAI_LIMIT_SESSION_KEY", "") self.anthropic_api_key = self._get("Anthropic_API_KEY") if (not self.openai_api_key or "YOUR_API_KEY" == self.openai_api_key) and ( not self.anthropic_api_key or "YOUR_API_KEY" == self.anthropic_api_key diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 6ebed2c16..5dd96c3be 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -6,9 +6,11 @@ """ import asyncio import time +import json from typing import NamedTuple, Union import openai +import requests from openai.error import APIConnectionError from tenacity import ( after_log, @@ -157,7 +159,7 @@ def __init_openai(self, config): if config.openai_api_type: openai.api_type = config.openai_api_type openai.api_version = config.openai_api_version - self.rpm = int(config.get("RPM", 10)) + self.rpm = self.__get_rpm(config) async def _achat_completion_stream(self, messages: list[dict]) -> str: response = await openai.ChatCompletion.acreate(**self._cons_kwargs(messages), stream=True) @@ -323,3 +325,29 @@ async def amoderation(self, content: Union[str, list[str]]): async def _amoderation(self, content: Union[str, list[str]]): rsp = await self.llm.Moderation.acreate(input=content) return rsp + + @staticmethod + def __get_rpm(config) -> int: + limit_session_key = config.get("OPENAI_LIMIT_SESSION_KEY", "") + default_rpm = int(config.get("RPM", 10)) + if len(limit_session_key) > 0: + try: + response = requests.get( + "https://api.openai.com/dashboard/rate_limits", + headers={'Authorization': f'Bearer {limit_session_key}'}, + timeout=10, + proxies={'https': openai.proxy} + ) + except Exception as e: + logger.error(f"Connection to api.openai.com failed:{e}.Setting rpm to default parameter.") + return default_rpm + + if response.status_code == 200: + limit_dict = json.loads(response.text)[config.openai_api_model] + return limit_dict['max_requests_per_1_minute'] + else: + error = json.loads(response.text)['error'] + raise ValueError(f"Get rpm from api.openai.com error. {error['message']}") + else: + return default_rpm + From e199d1e7cc224519b04e0cb39213c06e6f239d1b Mon Sep 17 00:00:00 2001 From: zhongyang Date: Thu, 9 Nov 2023 19:05:13 +0800 Subject: [PATCH 02/10] update get rpm from api.openai.com --- config/config.yaml | 4 ++-- metagpt/config.py | 2 +- metagpt/provider/openai_api.py | 13 ++++++------- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index c27146a18..6f647e01b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -11,8 +11,8 @@ OPENAI_API_BASE: "https://api.openai.com/v1" OPENAI_API_MODEL: "gpt-4" ## If you want to automate the setup for RPM and MAX_TOKENS, please log in through your own browser at https://platform.openai.com/account/limits, enable developer mode, and find 'rate_limits' in the network section. ## If it's not found, refresh the recording using Ctrl+R. After locating 'rate_limits', find 'Authorization: sess-xxx' in the request header, where 'sess-xxx' is the session key. -## If you wish to manually set RPM, please set RPM and either do not set OPENAI_LIMIT_SESSION_KEY or set it to "". -# OPENAI_LIMIT_SESSION_KEY: "YOUR_API_LIMITS_KEY" +## If you wish to manually set RPM, please set RPM and either do not set OPENAI_SESSION_KEY or set it to "". +# OPENAI_SESSION_KEY: "" MAX_TOKENS: 1500 RPM: 10 diff --git a/metagpt/config.py b/metagpt/config.py index 9cbadb8ae..47f6216bb 100644 --- a/metagpt/config.py +++ b/metagpt/config.py @@ -44,7 +44,7 @@ def __init__(self, yaml_file=default_yaml_file): logger.info("Config loading done.") self.global_proxy = self._get("GLOBAL_PROXY") self.openai_api_key = self._get("OPENAI_API_KEY") - self.openai_limit_session_key = self._get("OPENAI_LIMIT_SESSION_KEY", "") + self.openai_session_key = self._get("OPENAI_SESSION_KEY", "") self.anthropic_api_key = self._get("Anthropic_API_KEY") if (not self.openai_api_key or "YOUR_API_KEY" == self.openai_api_key) and ( not self.anthropic_api_key or "YOUR_API_KEY" == self.anthropic_api_key diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 5dd96c3be..4622db455 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -20,7 +20,7 @@ wait_fixed, ) -from metagpt.config import CONFIG +from metagpt.config import CONFIG, Config from metagpt.logs import logger from metagpt.provider.base_gpt_api import BaseGPTAPI from metagpt.utils.singleton import Singleton @@ -152,7 +152,7 @@ def __init__(self): self._cost_manager = CostManager() RateLimiter.__init__(self, rpm=self.rpm) - def __init_openai(self, config): + def __init_openai(self, config: Config): openai.api_key = config.openai_api_key if config.openai_api_base: openai.api_base = config.openai_api_base @@ -327,14 +327,14 @@ async def _amoderation(self, content: Union[str, list[str]]): return rsp @staticmethod - def __get_rpm(config) -> int: - limit_session_key = config.get("OPENAI_LIMIT_SESSION_KEY", "") + def __get_rpm(config: Config) -> int: + session_key = config.get("OPENAI_SESSION_KEY", "") default_rpm = int(config.get("RPM", 10)) - if len(limit_session_key) > 0: + if len(session_key) > 0: try: response = requests.get( "https://api.openai.com/dashboard/rate_limits", - headers={'Authorization': f'Bearer {limit_session_key}'}, + headers={'Authorization': f'Bearer {session_key}'}, timeout=10, proxies={'https': openai.proxy} ) @@ -350,4 +350,3 @@ def __get_rpm(config) -> int: raise ValueError(f"Get rpm from api.openai.com error. {error['message']}") else: return default_rpm - From 8ef7e0ee88db9e5f582f6171fb6c5c6ab63131da Mon Sep 17 00:00:00 2001 From: zhongyang Date: Thu, 9 Nov 2023 20:16:38 +0800 Subject: [PATCH 03/10] update ' to "" --- metagpt/provider/openai_api.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 4622db455..9d7a42838 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -334,9 +334,9 @@ def __get_rpm(config: Config) -> int: try: response = requests.get( "https://api.openai.com/dashboard/rate_limits", - headers={'Authorization': f'Bearer {session_key}'}, + headers={"Authorization": f"Bearer {session_key}"}, timeout=10, - proxies={'https': openai.proxy} + proxies={"https": openai.proxy} ) except Exception as e: logger.error(f"Connection to api.openai.com failed:{e}.Setting rpm to default parameter.") @@ -344,9 +344,9 @@ def __get_rpm(config: Config) -> int: if response.status_code == 200: limit_dict = json.loads(response.text)[config.openai_api_model] - return limit_dict['max_requests_per_1_minute'] + return limit_dict["max_requests_per_1_minute"] else: - error = json.loads(response.text)['error'] + error = json.loads(response.text)["error"] raise ValueError(f"Get rpm from api.openai.com error. {error['message']}") else: return default_rpm From 2f2dcc79fefd061ba81c98f129b079051a779f66 Mon Sep 17 00:00:00 2001 From: zhongyang Date: Mon, 4 Dec 2023 16:25:46 +0800 Subject: [PATCH 04/10] update some problem of async --- metagpt/provider/openai_api.py | 40 +++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 9d7a42838..a65f14ab9 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -5,6 +5,7 @@ @File : openai.py """ import asyncio +import aiohttp import time import json from typing import NamedTuple, Union @@ -159,7 +160,7 @@ def __init_openai(self, config: Config): if config.openai_api_type: openai.api_type = config.openai_api_type openai.api_version = config.openai_api_version - self.rpm = self.__get_rpm(config) + self.rpm = self.aget_rpm(config) async def _achat_completion_stream(self, messages: list[dict]) -> str: response = await openai.ChatCompletion.acreate(**self._cons_kwargs(messages), stream=True) @@ -325,28 +326,41 @@ async def amoderation(self, content: Union[str, list[str]]): async def _amoderation(self, content: Union[str, list[str]]): rsp = await self.llm.Moderation.acreate(input=content) return rsp + + def aget_rpm(self, config: Config): + loop = asyncio.get_event_loop() + rpm = loop.run_until_complete(self._aget_rpm(config)) + return rpm - @staticmethod - def __get_rpm(config: Config) -> int: + async def _aget_rpm(self, config: Config): session_key = config.get("OPENAI_SESSION_KEY", "") default_rpm = int(config.get("RPM", 10)) if len(session_key) > 0: try: - response = requests.get( + async with aiohttp.ClientSession() as session: + async with session.get( "https://api.openai.com/dashboard/rate_limits", headers={"Authorization": f"Bearer {session_key}"}, timeout=10, - proxies={"https": openai.proxy} - ) + proxies=config.openai_proxy + ) as response: + if response.status_code == 200: + response_content = json.loads(await response.text()) + if config.openai_api_model not in response_content: + raise ValueError("Get rpm from api.openai.com error. \ + You have entered a model name that is not supported by OpenAI, or the input is incorrect. \ + Please enter the correct name in the configuration file. \ + Setting rpm to default parameter.") + + limit_dict = response_content[config.openai_api_model] + return limit_dict["max_requests_per_1_minute"] + else: + error = json.loads(response.text)["error"] + logger.error(f"Connection to api.openai.com failed:{error}.Setting rpm to default parameter.") + return default_rpm + except Exception as e: logger.error(f"Connection to api.openai.com failed:{e}.Setting rpm to default parameter.") return default_rpm - - if response.status_code == 200: - limit_dict = json.loads(response.text)[config.openai_api_model] - return limit_dict["max_requests_per_1_minute"] - else: - error = json.loads(response.text)["error"] - raise ValueError(f"Get rpm from api.openai.com error. {error['message']}") else: return default_rpm From 156e7635a9e3cc0ebefde5509ea239d8b609ab75 Mon Sep 17 00:00:00 2001 From: zhongyang Date: Mon, 4 Dec 2023 17:41:38 +0800 Subject: [PATCH 05/10] move async get rpm to RateLimiter --- metagpt/provider/openai_api.py | 108 +++++++++++++++++---------------- 1 file changed, 56 insertions(+), 52 deletions(-) diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index a65f14ab9..04e1974d8 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -21,7 +21,7 @@ wait_fixed, ) -from metagpt.config import CONFIG, Config +from metagpt.config import CONFIG from metagpt.logs import logger from metagpt.provider.base_gpt_api import BaseGPTAPI from metagpt.utils.singleton import Singleton @@ -36,16 +36,57 @@ class RateLimiter: """Rate control class, each call goes through wait_if_needed, sleep if rate control is needed""" - def __init__(self, rpm): + def __init__(self): self.last_call_time = 0 + self.rpm = None + self.interval = None # Here 1.1 is used because even if the calls are made strictly according to time, # they will still be QOS'd; consider switching to simple error retry later - self.interval = 1.1 * 60 / rpm - self.rpm = rpm def split_batches(self, batch): + if self.rpm is None: + raise ValueError("Your must run update_rpm before calling split_batches.") return [batch[i : i + self.rpm] for i in range(0, len(batch), self.rpm)] + async def update_rpm(self): + if self.rpm is None: + self.rpm = await self._aget_rpm() + self.interval = 1.1 * 60 / self.rpm + logger.info(f'Setting rpm to {self.rpm}') + + async def _aget_rpm(self): + session_key = CONFIG.get("OPENAI_SESSION_KEY", "") + default_rpm = int(CONFIG.get("RPM", 10)) + if len(session_key) > 0: + try: + async with aiohttp.ClientSession() as session: + async with session.get( + "https://api.openai.com/dashboard/rate_limits", + headers={"Authorization": f"Bearer {session_key}"}, + timeout=10, + proxy=openai.proxy + ) as response: + if response.status == 200: + response_content = json.loads(await response.text()) + if CONFIG.openai_api_model not in response_content: + raise ValueError("Get rpm from api.openai.com error. \ + You have entered a model name that is not supported by OpenAI, or the input is incorrect. \ + Please enter the correct name in the configuration file. \ + Setting rpm to default parameter.") + + limit_dict = response_content[CONFIG.openai_api_model] + return limit_dict["max_requests_per_1_minute"] + else: + error = json.loads(await response.text())["error"] + logger.error(f"Connection to api.openai.com failed:{error}.Setting rpm to default parameter.") + return default_rpm + + except Exception as exp: + logger.error(f"Connection to api.openai.com failed, error type:{type(exp).__name__}, error message:{str(exp)}.Setting rpm to default parameter.") + return default_rpm + else: + return default_rpm + async def wait_if_needed(self, num_requests): current_time = time.time() elapsed_time = current_time - self.last_call_time @@ -146,21 +187,20 @@ class OpenAIGPTAPI(BaseGPTAPI, RateLimiter): """ def __init__(self): - self.__init_openai(CONFIG) + self.__init_openai() self.llm = openai self.model = CONFIG.openai_api_model self.auto_max_tokens = False self._cost_manager = CostManager() - RateLimiter.__init__(self, rpm=self.rpm) - def __init_openai(self, config: Config): - openai.api_key = config.openai_api_key - if config.openai_api_base: - openai.api_base = config.openai_api_base - if config.openai_api_type: - openai.api_type = config.openai_api_type - openai.api_version = config.openai_api_version - self.rpm = self.aget_rpm(config) + def __init_openai(self): + openai.api_key = CONFIG.openai_api_key + if CONFIG.openai_api_base: + openai.api_base = CONFIG.openai_api_base + if CONFIG.openai_api_type: + openai.api_type = CONFIG.openai_api_type + openai.api_version = CONFIG.openai_api_version + self.rpm = None async def _achat_completion_stream(self, messages: list[dict]) -> str: response = await openai.ChatCompletion.acreate(**self._cons_kwargs(messages), stream=True) @@ -258,6 +298,8 @@ def _calc_usage(self, messages: list[dict], rsp: str) -> dict: async def acompletion_batch(self, batch: list[list[dict]]) -> list[dict]: """Return full JSON""" + + await self.update_rpm() split_batches = self.split_batches(batch) all_results = [] @@ -326,41 +368,3 @@ async def amoderation(self, content: Union[str, list[str]]): async def _amoderation(self, content: Union[str, list[str]]): rsp = await self.llm.Moderation.acreate(input=content) return rsp - - def aget_rpm(self, config: Config): - loop = asyncio.get_event_loop() - rpm = loop.run_until_complete(self._aget_rpm(config)) - return rpm - - async def _aget_rpm(self, config: Config): - session_key = config.get("OPENAI_SESSION_KEY", "") - default_rpm = int(config.get("RPM", 10)) - if len(session_key) > 0: - try: - async with aiohttp.ClientSession() as session: - async with session.get( - "https://api.openai.com/dashboard/rate_limits", - headers={"Authorization": f"Bearer {session_key}"}, - timeout=10, - proxies=config.openai_proxy - ) as response: - if response.status_code == 200: - response_content = json.loads(await response.text()) - if config.openai_api_model not in response_content: - raise ValueError("Get rpm from api.openai.com error. \ - You have entered a model name that is not supported by OpenAI, or the input is incorrect. \ - Please enter the correct name in the configuration file. \ - Setting rpm to default parameter.") - - limit_dict = response_content[config.openai_api_model] - return limit_dict["max_requests_per_1_minute"] - else: - error = json.loads(response.text)["error"] - logger.error(f"Connection to api.openai.com failed:{error}.Setting rpm to default parameter.") - return default_rpm - - except Exception as e: - logger.error(f"Connection to api.openai.com failed:{e}.Setting rpm to default parameter.") - return default_rpm - else: - return default_rpm From 22c7ea8cf0a43d3bdd1af14410f6b75343612e15 Mon Sep 17 00:00:00 2001 From: zhongyang Date: Mon, 18 Dec 2023 19:12:33 +0800 Subject: [PATCH 06/10] update docstring, type hint, and test example of update rpm --- metagpt/provider/openai_api.py | 47 +++++++++++++++++++++-- tests/metagpt/provider/test_openai_api.py | 20 ++++++++++ 2 files changed, 63 insertions(+), 4 deletions(-) create mode 100644 tests/metagpt/provider/test_openai_api.py diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 04e1974d8..3c0dc85f7 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -8,7 +8,7 @@ import aiohttp import time import json -from typing import NamedTuple, Union +from typing import NamedTuple, Union, List, Dict import openai import requests @@ -43,18 +43,45 @@ def __init__(self): # Here 1.1 is used because even if the calls are made strictly according to time, # they will still be QOS'd; consider switching to simple error retry later - def split_batches(self, batch): + def split_batches(self, batch: List[Dict[str, str]]): + """ + Splits a batch of requests into smaller batches based on the current RPM. + + Args: + batch (list): A batch of requests to be split. + + Returns: + list: A list of smaller batches, each not exceeding the RPM limit. + + Raises: + ValueError: If RPM is not set before calling this method. + """ if self.rpm is None: raise ValueError("Your must run update_rpm before calling split_batches.") return [batch[i : i + self.rpm] for i in range(0, len(batch), self.rpm)] async def update_rpm(self): + """ + Asynchronously updates the RPM (requests per minute) limit. + + This method fetches the RPM limit from an external API and updates the rate limiting parameters. + It is designed to be run before making any batched API calls. + """ if self.rpm is None: self.rpm = await self._aget_rpm() self.interval = 1.1 * 60 / self.rpm logger.info(f'Setting rpm to {self.rpm}') - async def _aget_rpm(self): + async def _aget_rpm(self) -> int: + """ + Asynchronously fetches the RPM (requests per minute) limit from an external API. + + This is an internal method used by update_rpm to fetch the current RPM limit. It uses + the OPENAI_SESSION_KEY for authorization and falls back to a default RPM value in case of failure. + + Returns: + int: The fetched or default RPM value. + """ session_key = CONFIG.get("OPENAI_SESSION_KEY", "") default_rpm = int(CONFIG.get("RPM", 10)) if len(session_key) > 0: @@ -87,7 +114,19 @@ async def _aget_rpm(self): else: return default_rpm - async def wait_if_needed(self, num_requests): + async def wait_if_needed(self, num_requests: int): + """ + Asynchronously waits before making API requests if the rate limit is about to be exceeded. + + This method calculates the time elapsed since the last API call and determines if a delay + is required to stay within the RPM limit. If a delay is needed, it pauses the execution + for the required amount of time. + + Args: + num_requests (int): The number of upcoming API requests for which to check the rate limit. + + The method updates `self.last_call_time` to the current time after the waiting period, if any. + """ current_time = time.time() elapsed_time = current_time - self.last_call_time diff --git a/tests/metagpt/provider/test_openai_api.py b/tests/metagpt/provider/test_openai_api.py new file mode 100644 index 000000000..f86400e0c --- /dev/null +++ b/tests/metagpt/provider/test_openai_api.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +@Time : 2023/12/18 19:08 +@Author : zhongyang +@File : test_openai_api.py +""" + +import pytest + +from metagpt.llm import LLM +from metagpt.logs import logger + + +@pytest.mark.asyncio +async def test_update_rpm(): + llm = LLM() + + await llm.update_rpm() + assert isinstance(llm.rpm, int) From 0e712c4090b95561d86acc8bd755254b55545121 Mon Sep 17 00:00:00 2001 From: zhongyang Date: Thu, 4 Jan 2024 16:30:52 +0800 Subject: [PATCH 07/10] update aget --- metagpt/provider/openai_api.py | 48 +++++++++++++++++----------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 184331db3..9d4f34c27 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -122,31 +122,31 @@ async def _aget_rpm(self) -> int: logger.error(f"Connection to api.openai.com failed:{error}.Setting rpm to default parameter.") return default_rpm - except Exception as exp: - logger.error(f"Connection to api.openai.com failed, error type:{type(exp).__name__}, error message:{str(exp)}.Setting rpm to default parameter.") - return default_rpm - - async with aiohttp.ClientSession() as session: - async with session.get( - "https://api.openai.com/dashboard/rate_limits", - headers={"Authorization": f"Bearer {session_key}"}, - timeout=10, - proxy=openai.proxy - ) as response: - if response.status == 200: - response_content = json.loads(await response.text()) - if CONFIG.openai_api_model not in response_content: - raise ValueError("Get rpm from api.openai.com error. \ - You have entered a model name that is not supported by OpenAI, or the input is incorrect. \ - Please enter the correct name in the configuration file. \ - Setting rpm to default parameter.") + # except Exception as exp: + # logger.error(f"Connection to api.openai.com failed, error type:{type(exp).__name__}, error message:{str(exp)}.Setting rpm to default parameter.") + # return default_rpm + + # async with aiohttp.ClientSession() as session: + # async with session.get( + # "https://api.openai.com/dashboard/rate_limits", + # headers={"Authorization": f"Bearer {session_key}"}, + # timeout=10, + # proxy=openai.proxy + # ) as response: + # if response.status == 200: + # response_content = json.loads(await response.text()) + # if CONFIG.openai_api_model not in response_content: + # raise ValueError("Get rpm from api.openai.com error. \ + # You have entered a model name that is not supported by OpenAI, or the input is incorrect. \ + # Please enter the correct name in the configuration file. \ + # Setting rpm to default parameter.") - limit_dict = response_content[CONFIG.openai_api_model] - return limit_dict["max_requests_per_1_minute"] - else: - error = json.loads(await response.text())["error"] - logger.error(f"Connection to api.openai.com failed:{error}.Setting rpm to default parameter.") - return default_rpm + # limit_dict = response_content[CONFIG.openai_api_model] + # return limit_dict["max_requests_per_1_minute"] + # else: + # error = json.loads(await response.text())["error"] + # logger.error(f"Connection to api.openai.com failed:{error}.Setting rpm to default parameter.") + # return default_rpm except Exception as exp: logger.error(f"Connection to api.openai.com failed, error type:{type(exp).__name__}, error message:{str(exp)}.Setting rpm to default parameter.") From 89b6f2935a071defc2566fe460f39bb91ad0ed93 Mon Sep 17 00:00:00 2001 From: zhongyang Date: Thu, 4 Jan 2024 16:32:14 +0800 Subject: [PATCH 08/10] update aget --- metagpt/utils/ahttp_client.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/metagpt/utils/ahttp_client.py b/metagpt/utils/ahttp_client.py index b4a33e9d7..bbb8e3442 100644 --- a/metagpt/utils/ahttp_client.py +++ b/metagpt/utils/ahttp_client.py @@ -28,6 +28,24 @@ async def apost( return data +async def aget( + url: str, + params: Optional[Mapping[str, str]] = None, + headers: Optional[dict] = None, + as_json: bool = False, + encoding: str = "utf-8", + timeout: int = DEFAULT_TIMEOUT.total, +) -> Union[str, dict]: + async with aiohttp.ClientSession() as session: + async with session.get(url=url, params=params, headers=headers, timeout=timeout) as resp: + if as_json: + data = await resp.json() + else: + data = await resp.read() + data = data.decode(encoding) + return data + + async def apost_stream( url: str, params: Optional[Mapping[str, str]] = None, From 12c04ddabdd94aa82382fd92e0015cf9682dba03 Mon Sep 17 00:00:00 2001 From: zhongyang Date: Thu, 8 Feb 2024 10:47:37 +0800 Subject: [PATCH 09/10] git update rpm --- metagpt/provider/openai_api.py | 56 +++++++++++++++++++++++++++++++++- metagpt/utils/ahttp_client.py | 3 +- 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 8dcfb8a3a..42d8fa720 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -12,6 +12,8 @@ import json from typing import AsyncIterator, Union +import openai + from openai import APIConnectionError, AsyncOpenAI, AsyncStream from openai._base_client import AsyncHttpxClientWrapper from openai.types import CompletionUsage @@ -30,6 +32,7 @@ from metagpt.provider.constant import GENERAL_FUNCTION_SCHEMA, GENERAL_TOOL_CHOICE from metagpt.provider.llm_provider_registry import register_provider from metagpt.schema import Message +from metagpt.utils.ahttp_client import aget from metagpt.utils.cost_manager import Costs from metagpt.utils.exceptions import handle_exception from metagpt.utils.token_counter import ( @@ -232,4 +235,55 @@ def _get_max_tokens(self, messages: list[dict]): @handle_exception async def amoderation(self, content: Union[str, list[str]]): """Moderate content.""" - return await self.aclient.moderations.create(input=content) \ No newline at end of file + return await self.aclient.moderations.create(input=content) + + async def update_rpm(self): + """ + Asynchronously updates the RPM (requests per minute) limit. + + This method fetches the RPM limit from an external API and updates the rate limiting parameters. + It is designed to be run before making any batched API calls. + """ + self.rpm = await self._aget_rpm() + self.interval = 1.1 * 60 / self.rpm + logger.info(f'Setting rpm to {self.rpm}') + + async def _aget_rpm(self) -> int: + """ + Asynchronously fetches the RPM (requests per minute) limit from an external API. + + This is an internal method used by update_rpm to fetch the current RPM limit. It uses + the OPENAI_SESSION_KEY for authorization and falls back to a default RPM value in case of failure. + + Returns: + int: The fetched or default RPM value. + """ + session_key = CONFIG.openai_session_key + default_rpm = int(CONFIG.get("RPM", 10)) + if len(session_key) > 0: + try: + response = await aget( + url="https://api.openai.com/dashboard/rate_limits", + headers={"Authorization": f"Bearer {session_key}"}, + proxy=CONFIG.openai_proxy + ) + response_content = json.loads(response) + if not "error" in response_content: + if CONFIG.openai_api_model not in response_content: + raise ValueError("Get rpm from api.openai.com error. \ + You have entered a model name that is not supported by OpenAI, or the input is incorrect. \ + Please enter the correct name in the configuration file. \ + Setting rpm to default parameter.") + + limit_dict = response_content[CONFIG.openai_api_model] + return limit_dict["max_requests_per_1_minute"] + else: + error = response_content["error"] + logger.error(f"Connection to api.openai.com failed:{error}.Setting rpm to default parameter.") + return default_rpm + + except Exception as exp: + logger.error(f"Connection to api.openai.com failed, error type:{type(exp).__name__}, error message:{str(exp)}.Setting rpm to default parameter.") + return default_rpm + else: + return default_rpm diff --git a/metagpt/utils/ahttp_client.py b/metagpt/utils/ahttp_client.py index bbb8e3442..a45bf51a4 100644 --- a/metagpt/utils/ahttp_client.py +++ b/metagpt/utils/ahttp_client.py @@ -35,9 +35,10 @@ async def aget( as_json: bool = False, encoding: str = "utf-8", timeout: int = DEFAULT_TIMEOUT.total, + proxy: str = None, ) -> Union[str, dict]: async with aiohttp.ClientSession() as session: - async with session.get(url=url, params=params, headers=headers, timeout=timeout) as resp: + async with session.get(url=url, params=params, headers=headers, timeout=timeout, proxy=proxy) as resp: if as_json: data = await resp.json() else: From 4021f45767025f23f621a0fa5fd3a791d137648e Mon Sep 17 00:00:00 2001 From: zhongyang Date: Thu, 8 Feb 2024 14:54:21 +0800 Subject: [PATCH 10/10] In the updated version of config2, there will be an additional layer for llm, and it has been updated to automatically obtain rpm through the session_key. --- config/config2.yaml.example | 1 + metagpt/configs/llm_config.py | 2 ++ metagpt/provider/openai_api.py | 14 +++++++------- tests/metagpt/provider/test_openai_api.py | 9 ++++----- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/config/config2.yaml.example b/config/config2.yaml.example index 2217f1b2c..6d43ede51 100644 --- a/config/config2.yaml.example +++ b/config/config2.yaml.example @@ -3,6 +3,7 @@ llm: base_url: "YOUR_BASE_URL" api_key: "YOUR_API_KEY" model: "gpt-4-turbo-preview" # or gpt-3.5-turbo-1106 / gpt-4-1106-preview + openai_session_key: "" # If you want to automate the setup for RPM and MAX_TOKENS, please log in through your own browser at https://platform.openai.com/account/limits, enable developer mode, and find 'rate_limits' in the network section. If it's not found, refresh the recording using Ctrl+R. After locating 'rate_limits', find 'Authorization: sess-xxx' in the request header, where 'sess-xxx' is the session key.If you wish to manually set RPM, please set RPM and either do not set OPENAI_SESSION_KEY or set it to "". proxy: "YOUR_PROXY" diff --git a/metagpt/configs/llm_config.py b/metagpt/configs/llm_config.py index fb923d3e4..d35f660df 100644 --- a/metagpt/configs/llm_config.py +++ b/metagpt/configs/llm_config.py @@ -40,6 +40,7 @@ class LLMConfig(YamlModel): api_type: LLMType = LLMType.OPENAI base_url: str = "https://api.openai.com/v1" api_version: Optional[str] = None + openai_session_key: Optional[str] = None model: Optional[str] = None # also stands for DEPLOYMENT_NAME @@ -63,6 +64,7 @@ class LLMConfig(YamlModel): logprobs: Optional[bool] = None # https://cookbook.openai.com/examples/using_logprobs top_logprobs: Optional[int] = None timeout: int = 60 + rpm: int = 10 # For Network proxy: Optional[str] = None diff --git a/metagpt/provider/openai_api.py b/metagpt/provider/openai_api.py index 8ef4c85be..c8cac073c 100644 --- a/metagpt/provider/openai_api.py +++ b/metagpt/provider/openai_api.py @@ -66,7 +66,7 @@ def __init__(self, config: LLMConfig): self.cost_manager: Optional[CostManager] = None def _init_model(self): - self.model = self.config.model # Used in _calc_usage & _cons_kwargs + self.model = self.config.llm.model # Used in _calc_usage & _cons_kwargs def _init_client(self): """https://github.com/openai/openai-python#async-usage""" @@ -74,7 +74,7 @@ def _init_client(self): self.aclient = AsyncOpenAI(**kwargs) def _make_client_kwargs(self) -> dict: - kwargs = {"api_key": self.config.api_key, "base_url": self.config.base_url} + kwargs = {"api_key": self.config.llm.api_key, "base_url": self.config.llm.base_url} # to use proxy, openai v1 needs http_client if proxy_params := self._get_proxy_params(): @@ -317,24 +317,24 @@ async def _aget_rpm(self) -> int: Returns: int: The fetched or default RPM value. """ - session_key = self.config.openai_session_key - default_rpm = int(self.config.get("RPM", 10)) + session_key = self.config.llm.openai_session_key + default_rpm = self.config.llm.rpm if len(session_key) > 0: try: response = await aget( url="https://api.openai.com/dashboard/rate_limits", headers={"Authorization": f"Bearer {session_key}"}, - proxy=self.config.openai_proxy + proxy=self.config.llm.proxy ) response_content = json.loads(response) if not "error" in response_content: - if self.config.openai_api_model not in response_content: + if self.model not in response_content: raise ValueError("Get rpm from api.openai.com error. \ You have entered a model name that is not supported by OpenAI, or the input is incorrect. \ Please enter the correct name in the configuration file. \ Setting rpm to default parameter.") - limit_dict = response_content[self.config.openai_api_model] + limit_dict = response_content[self.model] return limit_dict["max_requests_per_1_minute"] else: error = response_content["error"] diff --git a/tests/metagpt/provider/test_openai_api.py b/tests/metagpt/provider/test_openai_api.py index f86400e0c..de26d01fe 100644 --- a/tests/metagpt/provider/test_openai_api.py +++ b/tests/metagpt/provider/test_openai_api.py @@ -8,13 +8,12 @@ import pytest -from metagpt.llm import LLM from metagpt.logs import logger - +from metagpt.provider.openai_api import OpenAILLM +from metagpt.config2 import config @pytest.mark.asyncio async def test_update_rpm(): - llm = LLM() - + llm = OpenAILLM(config) await llm.update_rpm() - assert isinstance(llm.rpm, int) + assert isinstance(llm.rpm, float) \ No newline at end of file