In [2]:
import asyncio
from typing import List, Dict, Optional
from aiohttp_retry import ExponentialRetry, RetryClient
from tqdm.notebook import tqdm

In [3]:
# 自己的API_KEY
API_KEY = ''
# Set up the request parameters
url = "https://api.openai.com/v1/chat/completions"
# Set up the request headers
headers = {
    "Content-Type": "application/json",
    # Replace API_KEY with your OpenAI API key
    "Authorization": f"Bearer {API_KEY}"
}
# Set up the proxy environment variable
# os.environ['HTTP_PROXY'] = 'http://172.19.160.1:7890'
# os.environ['HTTPS_PROXY'] = 'http://172.19.160.1:7890'

In [7]:
async def async_invoke_chatgpt(
    texts: List[str],
    pool_size=3,
    retry=3,
    finished_results: Optional[Dict[int, str]] = None,
):
    pbar = tqdm(total=len(texts))
    # 定义一个异步锁
    lock = asyncio.Lock()
    # 定义一个信号量
    semaphore = asyncio.Semaphore(pool_size)
    # 定义一个字典来保存任务的结果
    results_dict = {} if finished_results is None else finished_results

    async def process_row(text, index):
        # 检查任务是否已经成功完成
        if index in results_dict:
            return
        async with semaphore:
            # ... 其他代码，如设置请求数据、重试配置等
            # Set up retry configuration
            retry_options = ExponentialRetry(
                attempts=retry,
                statuses={500, 502, 503, 504},
                exceptions={ConnectionResetError},
            )
            # Set up the request data
            data = {
                "model": "gpt-3.5-turbo",
                "messages": [{"role": "user", "content": f"{text}"}],
            }
            async with RetryClient(
                headers=headers, retry_options=retry_options, trust_env=True
            ) as session:
                async with session.post(url, json=data) as response:
                    json_response = await response.json()
                    print(json_response)
                    content = json_response["choices"][0]["message"]["content"]
                    # 保存成功的结果
                    async with lock:
                        results_dict[index] = content
                        pbar.update(1)

    async def execute_tasks_until_success(texts):
        count = 0
        while len(results_dict) < len(texts) and count < retry:
            tasks = [
                process_row(text, index)
                for index, text in enumerate(texts)
                if index not in results_dict
            ]
            await asyncio.gather(*tasks)
            count += 1  # 增加重试计数

        # 关闭进度条
        pbar.close()
        return results_dict  # 返回结果字典

    return await execute_tasks_until_success(texts)

In [8]:
texts: List[str] = ["Example 1", "Example 2", "Example 3"] 

In [None]:
predicts = {}

In [None]:
predicts: Dict[int, str] = await async_invoke_chatgpt(texts, 3, finished_results=predicts)