# Asynchronous Batch Processing of Automated Big-bench
copyright 2024, Denis Rothman

**The goal of this notebook is to show to run a corpus of 144 OpenAI API requests in less than 0.25 seconds per task with asynchronous batch processing!**

*The potential of next generation of AI is to be able to evaluate and benchmark itself for a large range of tasks asynchronously at full speed and lowering costs.*

**In July 2024** OpenAI released [GPT-4o-mini in July 2024](https://openai.com/index/gpt-4o-mini-advancing-cost-efficient-intelligence/)

 `GPT-4o-mini` is faster. But how fast? And how faster in advanced production environements?

Let's find out.

This educational notebook explores the speed of GPT-4, GPT-4o, and GPT-4o-mini in asynchronous batch processing environment.

The notebook is divided into 7 sections:

1.Is GPT-4o-mini significantly faster than GPT-4o?   
2.The corpus: Automating Big bench tasks with OpenAI Generative AI    
3.Defining Asynchronous batch processing   
4.Installing the environment        
5.Retrieving the list of Big-bench prompts designed for this notebook    
6.Running the tasks   
7.Response times summary   









# 1.Is GPT-4o-mini significantly faster than GPT-4o?

The questions for the OpenAI community was to verify the cost, speed, and quality of GPT-4o-mini. These issues are addressed in a new educational notebook in this repository: [`Auto_Big_bench_GPT-4o-mini.ipynb`](https://github.com/Denis2054/Transformers-for-NLP-and-Computer-Vision-3rd-Edition/blob/main/Chapter15/Auto_Big_bench_GPT-4o-mini.ipynb)

`Auto_Big_bench_GPT-4o-mini.ipynb` shows that for a lower cost and similar quality `GPT-4o-mini` runs about twice as fast as `GPT-4o` for the 144 task corpus:

*`GPT-4o-mini` response Time of 802.35 seconds    
versus        
*`GPT-4o` response time of `GPT-4o`of 1659.37 seconds.

However, this notebook takes the exploration further in advanced asynchronous batch processing production environments.

The results mitigate the initial excitement over the various OpenAI LLM models with the following response times obtained in this notebook(see section *7.Response times summary*):

`GPT-4`: 30.95 seconds (Average time per task: 0.2149 seconds)
`GPT-4o`: 26.75 seconds (Average time per task: 0.1858 seconds)
`GPT-4o-mini`: 21.54 seconds (Average time per task: 0.1496 seconds)

`GPT-4o-mini` is faster than  `GPT-4o` which in turn is faster than `GPT-4`.

However, these performances are mitigated by the implementation of asynchronous batch processing. `GPT-4o-mini` is about twice as fast in a synchronous environement but only about 20% faster in an asynchronous batch processing environement.

*The choice of an LLM, as always, depends on each project you implement. For each project, you must factor in all the parameters (cost, speed, quality) and find the right LLM for your needs.*








# 2.The corpus: Automating Big bench tasks with OpenAI Generative AI

[Big-bench](https://github.com/google/BIG-bench/blob/main/bigbench/benchmark_tasks/README.md) contains more than 200+ NLP tasks. The goal is to evaluate a model.

In this notebook, we take Generative AI a step further. We will not ask the LLM to solve a Big-bench NLP problem and apply metrics. We will ask LLM to create the tasks itself and solve them!

The program will feed  a sample of 140+ Big-bench tasks with a two-part prompt:   

**The first part contains the instruction:**    

**The second part is the description of a Big-bench:**  

**The output will then be displayed for human evaluation**
Human evaluation plays an important role in LLM training and evaluations. Reinforcement Learning with Human Feedback(RLHF) will help mitigate the potential limits of automated models and evaluation metrics.

**Limit of the program:** The program does not run thousands of samples for each task. The goal is to show the potential of Large Language Models(LLMs)

**Potential:** We can see that GPT-4o, PaLM 2, and other Foundations Models are just the beginning of what will become *Massive Multitask Language Understanding(MMLU)* models in one form or another in the years to come.

# 3.Defining Asynchronous batch processing


### **Concepts and Philosophy of the Code**

#### **Asynchronous Batching:**
Asynchronous batching is a programming technique that allows multiple tasks to be processed concurrently in batches. This approach leverages asynchronous programming to improve efficiency and performance, especially when dealing with I/O-bound operations like network requests. By sending multiple requests concurrently rather than sequentially, the overall execution time is significantly reduced.

#### **Philosophy of the Code:**
The provided code is designed to efficiently handle and process a large number of API requests to OpenAI's GPT-4 model. The key goals are:
1. **Efficiency:** Minimize the time spent waiting for API responses by processing requests concurrently.
2. **Scalability:** Handle large volumes of tasks by batching requests.
3. **Error Handling:** Manage unexpected responses gracefully to ensure robustness.


### **Detailed Line-by-Line Explanation**

```python
# Apply the nest_asyncio patch
nest_asyncio.apply()
```
- **Purpose:** `nest_asyncio` is a library that allows nested use of `asyncio.run()` within an existing event loop. This is particularly useful in environments like Jupyter notebooks where an event loop might already be running.
- **Explanation:** This line patches the existing event loop to allow nested asynchronous operations, ensuring compatibility and preventing runtime errors related to event loops.

```python
# Define the asynchronous function for API calls
async def fetch(session, url, payload, headers):
    async with session.post(url, json=payload, headers=headers) as response:
        # Check for a valid JSON response
        if response.headers.get('Content-Type') == 'application/json':
            return await response.json()
        else:
            # Handle unexpected content type
            text = await response.text()
            raise ValueError(f"Unexpected response content type: {response.headers.get('Content-Type')}, Content: {text}")
```
- **Purpose:** This function performs an asynchronous HTTP POST request to the specified URL with the given payload and headers.
- **Explanation:**
  - `async def fetch(session, url, payload, headers):`: Defines an asynchronous function named `fetch`.
  - `async with session.post(url, json=payload, headers=headers) as response:`: Sends a POST request asynchronously and waits for the response.
  - `if response.headers.get('Content-Type') == 'application/json':`: Checks if the response content type is JSON.
  - `return await response.json()`: If the response is JSON, it parses and returns the JSON content.
  - `else:`: If the response is not JSON, it handles the unexpected content type.
  - `text = await response.text()`: Reads the response text.
  - `raise ValueError(f"Unexpected response content type: {response.headers.get('Content-Type')}, Content: {text}")`: Raises an error with details of the unexpected response.

```python
# Create a function to process tasks in batches
async def process_tasks_in_batches(tasks, batch_size, headers):
    async with aiohttp.ClientSession() as session:
        for i in range(0, len(tasks), batch_size):
            batch = tasks[i:i + batch_size]
            tasks_to_fetch = [
                fetch(session, "https://api.openai.com/v1/chat/completions", {
                    "model": "gpt-4",  # Model is defined here
                    "messages": [
                        {"role": "system", "content": "You are an expert Natural Language Processing exercise expert."},
                        {"role": "assistant", "content": "1. You can explain any NLP task. 2. Create an example. 3. Solve the example."},
                        {"role": "user", "content": task}
                    ],
                    "temperature": 0.1  # Add the temperature parameter here and other parameters you need
                }, headers) for task in batch
            ]
            responses = await asyncio.gather(*tasks_to_fetch)
            for task_num, (input_text, response) in enumerate(zip(batch, responses), start=i+1):
                if 'choices' in response and response['choices']:
                    formatted_task = response['choices'][0]['message']['content'].replace('\n', '<br>')
                    parts = input_text.split('Solve it:')
                    bb_task = parts[1].strip()
                    display_response(task_num, input_text, formatted_task, bb_task)
                else:
                    print(f"Error in response for task {task_num}: {input_text}, Response: {response}")
            print(f"Processed {i + batch_size} tasks.")
```
- **Purpose:** This function processes a list of tasks in batches, making concurrent API calls for each batch and handling the responses.
- **Explanation:**
  - `async def process_tasks_in_batches(tasks, batch_size, headers):`: Defines an asynchronous function named `process_tasks_in_batches`.
  - `async with aiohttp.ClientSession() as session:`: Creates an asynchronous HTTP session.
  - `for i in range(0, len(tasks), batch_size):`: Iterates through the tasks in steps of `batch_size`.
  - `batch = tasks[i:i + batch_size]`: Extracts a batch of tasks.
  - `tasks_to_fetch = [...]:`: Creates a list of asynchronous fetch tasks for the current batch.
    - `fetch(session, "https://api.openai.com/v1/chat/completions", { ... }, headers) for task in batch`: For each task in the batch, it defines the payload and makes an asynchronous call using `fetch`.
  - `responses = await asyncio.gather(*tasks_to_fetch)`: Runs all fetch tasks concurrently and waits for all to complete.
  - `for task_num, (input_text, response) in enumerate(zip(batch, responses), start=i+1):`: Iterates over the tasks and their corresponding responses.
  - `if 'choices' in response and response['choices']:`: Checks if the response contains the expected 'choices' key.
    - `formatted_task = response['choices'][0]['message']['content'].replace('\n', '<br>')`: Formats the response content.
    - `parts = input_text.split('Solve it:')`: Splits the input text to extract the task description.
    - `bb_task = parts[1].strip()`: Strips any leading/trailing whitespace from the task description.
    - `display_response(task_num, input_text, formatted_task, bb_task)`: Displays the response using a separate function.
  - `else: print(f"Error in response for task {task_num}: {input_text}, Response: {response}")`: Prints an error message if the response is invalid.
  - `print(f"Processed {i + batch_size} tasks.")`: Logs the number of processed tasks.

### Summary
The code leverages asynchronous batch processing to efficiently handle multiple API requests to OpenAI's GPT-4 model. By processing tasks in batches and using asynchronous HTTP requests, the overall execution time is minimized. The code also includes error handling to manage unexpected responses, ensuring robustness and reliability.

# 4.Installing the environment

**June 24 update** OpenAI has updated its installation. There is no need to install cohere and tiktoken separately. OpenAI fixed the issue!
This installation has been now shortened to strictly OpenAI.

Note:
OpenAI in January 2024 requires dependencies that in turn require other dependencies that are installed after or before. On Google Colab(or your machine), if you encounter OpenAI installation issues, try the process below:  
a) uncomment and install cohere and run it after the `!pip install openai` cell<br>
b) uncomment and install tiktoken and run it after `!pip install cohere `  
c) then run `!pip install cohere ` again   
d) then run `!pip install openai` again   

Hopefully, OpenAI will fix this in 2024 and we will update the installation accordingly. This is normal on a fast-moving market. We simply need to be on the watch.

In [None]:
#Importing openai
!pip install openai

In [None]:
#API Key
#Store you key in a file and read it(you can type it directly in the notebook but it will be visible for somebody next to you)
from google.colab import drive
drive.mount('/content/drive')
f = open("drive/MyDrive/files/api_key.txt", "r")
API_KEY=f.readline()
f.close()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#The OpenAI Key
import openai
from openai import OpenAI
import os
os.environ['OPENAI_API_KEY'] =API_KEY
openai.api_key = os.getenv("OPENAI_API_KEY")

In [None]:
!pip install nest_asyncio

# 5.Retrieving the list of Big-bench prompts designed for this notebook

The list was created from the list of tasks of [Big-bench](https://github.com/google/BIG-bench/blob/main/bigbench/benchmark_tasks/README.md)

In [None]:
!curl -L https://raw.githubusercontent.com/Denis2054/Transformers_3rd_Edition/master/Chapter15/tasks.txt --output "tasks.txt"

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100 17838  100 17838    0     0  73114      0 --:--:-- --:--:-- --:--:-- 73407


## Reading the file into a Pandas Dataframe

In [None]:
import pandas as pd

# read the file
df = pd.read_csv('tasks.txt', header=None, on_bad_lines='skip')

# If you want to add a column name after loading
df.columns = ['Tasks']

# print the dataframe
df

Unnamed: 0,Tasks
0,1.Explain the following task 2.Provide an exam...
1,1.Explain the following task 2.Provide an exam...
2,1.Explain the following task 2.Provide an exam...
3,1.Explain the following task 2.Provide an exam...
4,1.Explain the following task 2.Provide an exam...
...,...
139,1.Explain the following task 2.Provide an exam...
140,1.Explain the following task 2.Provide an exam...
141,1.Explain the following task 2.Provide an exam...
142,1.Explain the following task 2.Provide an exam...


In [None]:
nbt=len(df)
print("Number of tasks: ", nbt)

Number of tasks:  144


# 6.Running the tasks

Check OpenAI's policy for rate limits before running the tasks:
https://platform.openai.com/docs/guides/rate-limits/overview


## Running the corpus with `GPT-4`


Check OpenAI's policy for rate limits before running the tasks:
https://platform.openai.com/docs/guides/rate-limits/overview

In [None]:
import asyncio
import aiohttp
import nest_asyncio
import os
import time
from IPython.core.display import display, HTML

# Apply the nest_asyncio patch
nest_asyncio.apply()

# Define the asynchronous function for API calls
async def fetch(session, url, payload, headers):
    async with session.post(url, json=payload, headers=headers) as response:
        # Check for a valid JSON response
        if response.headers.get('Content-Type') == 'application/json':
            return await response.json()
        else:
            # Handle unexpected content type
            text = await response.text()
            raise ValueError(f"Unexpected response content type: {response.headers.get('Content-Type')}, Content: {text}")

# Create a function to process tasks in batches
async def process_tasks_in_batches(tasks, batch_size, headers):
    async with aiohttp.ClientSession() as session:
        for i in range(0, len(tasks), batch_size):
            batch = tasks[i:i + batch_size]
            tasks_to_fetch = [
                fetch(session, "https://api.openai.com/v1/chat/completions", {
                    "model": "gpt-4",  # Model is defined here
                    "messages": [
                        {"role": "system", "content": "You are an expert Natural Language Processing exercise expert."},
                        {"role": "assistant", "content": "1. You can explain any NLP task. 2. Create an example. 3. Solve the example."},
                        {"role": "user", "content": task}
                    ],
                    "temperature": 0.1  # Add the temperature parameter here and other parameters you need
                }, headers) for task in batch
            ]
            responses = await asyncio.gather(*tasks_to_fetch)
            for task_num, (input_text, response) in enumerate(zip(batch, responses), start=i+1):
                if 'choices' in response and response['choices']:
                    formatted_task = response['choices'][0]['message']['content'].replace('\n', '<br>')
                    parts = input_text.split('Solve it:')
                    bb_task = parts[1].strip()
                    display_response(task_num, input_text, formatted_task, bb_task)
                else:
                    print(f"Error in response for task {task_num}: {input_text}, Response: {response}")
            print(f"Processed {i + batch_size} tasks.")

# Function to display the response
def display_response(task_num, input_text, formatted_task, bb_task):
    html_content = f"""
    <html>
      <body>
          <h1>Task {task_num}: {bb_task}</h1>
          <p>{formatted_task}</p>
      </body>
    </html>
    """
    display(HTML(html_content))

# Prepare the list of tasks
tasks = df['Tasks'].tolist()  # Assuming 'Tasks' is a column in the DataFrame
batch_size = 150  # Adjust the batch size as needed. In this case, all the tasks are sent at once.

# Set headers for API requests
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# Measure response time and run the batch processing
start_time = time.time()
asyncio.run(process_tasks_in_batches(tasks, batch_size, headers))
response_time_gpt4 = time.time() - start_time
print(f"Response Time: {response_time_gpt4:.2f} seconds")

Processed 150 tasks.
Response Time: 30.95 seconds


In [None]:
print(f"Response Time: {response_time_gpt4:.2f} seconds")

Response Time: 30.95 seconds


## Running the corpus with `GPT-4o`


Check OpenAI's policy for rate limits before running the tasks:
https://platform.openai.com/docs/guides/rate-limits/overview

In [None]:
import asyncio
import aiohttp
import nest_asyncio
import os
import time
from IPython.core.display import display, HTML

# Apply the nest_asyncio patch
nest_asyncio.apply()

# Define the asynchronous function for API calls
async def fetch(session, url, payload, headers):
    async with session.post(url, json=payload, headers=headers) as response:
        # Check for a valid JSON response
        if response.headers.get('Content-Type') == 'application/json':
            return await response.json()
        else:
            # Handle unexpected content type
            text = await response.text()
            raise ValueError(f"Unexpected response content type: {response.headers.get('Content-Type')}, Content: {text}")

# Create a function to process tasks in batches
async def process_tasks_in_batches(tasks, batch_size, headers):
    async with aiohttp.ClientSession() as session:
        for i in range(0, len(tasks), batch_size):
            batch = tasks[i:i + batch_size]
            tasks_to_fetch = [
                fetch(session, "https://api.openai.com/v1/chat/completions", {
                    "model": "gpt-4o",  # Model is defined here
                    "messages": [
                        {"role": "system", "content": "You are an expert Natural Language Processing exercise expert."},
                        {"role": "assistant", "content": "1. You can explain any NLP task. 2. Create an example. 3. Solve the example."},
                        {"role": "user", "content": task}
                    ],
                    "temperature": 0.1  # Add the temperature parameter here and other parameters you need
                }, headers) for task in batch
            ]
            responses = await asyncio.gather(*tasks_to_fetch)
            for task_num, (input_text, response) in enumerate(zip(batch, responses), start=i+1):
                if 'choices' in response and response['choices']:
                    formatted_task = response['choices'][0]['message']['content'].replace('\n', '<br>')
                    parts = input_text.split('Solve it:')
                    bb_task = parts[1].strip()
                    display_response(task_num, input_text, formatted_task, bb_task)
                else:
                    print(f"Error in response for task {task_num}: {input_text}, Response: {response}")
            print(f"Processed {i + batch_size} tasks.")

# Function to display the response
def display_response(task_num, input_text, formatted_task, bb_task):
    html_content = f"""
    <html>
      <body>
          <h1>Task {task_num}: {bb_task}</h1>
          <p>{formatted_task}</p>
      </body>
    </html>
    """
    display(HTML(html_content))

# Prepare the list of tasks
tasks = df['Tasks'].tolist()  # Assuming 'Tasks' is a column in the DataFrame
batch_size = 150  # Adjust the batch size as needed. In this case, all the tasks are sent at once.

# Set headers for API requests
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# Measure response time and run the batch processing
start_time = time.time()
asyncio.run(process_tasks_in_batches(tasks, batch_size, headers))
response_time_gpt4o = time.time() - start_time
print(f"Response Time: {response_time_gpt4o:.2f} seconds")

Processed 150 tasks.
Response Time: 26.75 seconds


In [None]:
print(f"Response Time: {response_time_gpt4o:.2f} seconds")

Response Time: 26.75 seconds


## Running the corpus with `GPT-4o-mini`


Check OpenAI's policy for rate limits before running the tasks:
https://platform.openai.com/docs/guides/rate-limits/overview

In [None]:
import asyncio
import aiohttp
import nest_asyncio
import os
import time
from IPython.core.display import display, HTML

# Apply the nest_asyncio patch
nest_asyncio.apply()

# Define the asynchronous function for API calls
async def fetch(session, url, payload, headers):
    async with session.post(url, json=payload, headers=headers) as response:
        # Check for a valid JSON response
        if response.headers.get('Content-Type') == 'application/json':
            return await response.json()
        else:
            # Handle unexpected content type
            text = await response.text()
            raise ValueError(f"Unexpected response content type: {response.headers.get('Content-Type')}, Content: {text}")

# Create a function to process tasks in batches
async def process_tasks_in_batches(tasks, batch_size, headers):
    async with aiohttp.ClientSession() as session:
        for i in range(0, len(tasks), batch_size):
            batch = tasks[i:i + batch_size]
            tasks_to_fetch = [
                fetch(session, "https://api.openai.com/v1/chat/completions", {
                    "model": "gpt-4o-mini",  # Model is defined here
                    "messages": [
                        {"role": "system", "content": "You are an expert Natural Language Processing exercise expert."},
                        {"role": "assistant", "content": "1. You can explain any NLP task. 2. Create an example. 3. Solve the example."},
                        {"role": "user", "content": task}
                    ],
                    "temperature": 0.1  # Add the temperature parameter here and other parameters you need
                }, headers) for task in batch
            ]
            responses = await asyncio.gather(*tasks_to_fetch)
            for task_num, (input_text, response) in enumerate(zip(batch, responses), start=i+1):
                if 'choices' in response and response['choices']:
                    formatted_task = response['choices'][0]['message']['content'].replace('\n', '<br>')
                    parts = input_text.split('Solve it:')
                    bb_task = parts[1].strip()
                    display_response(task_num, input_text, formatted_task, bb_task)
                else:
                    print(f"Error in response for task {task_num}: {input_text}, Response: {response}")
            print(f"Processed {i + batch_size} tasks.")

# Function to display the response
def display_response(task_num, input_text, formatted_task, bb_task):
    html_content = f"""
    <html>
      <body>
          <h1>Task {task_num}: {bb_task}</h1>
          <p>{formatted_task}</p>
      </body>
    </html>
    """
    display(HTML(html_content))

# Prepare the list of tasks
tasks = df['Tasks'].tolist()  # Assuming 'Tasks' is a column in the DataFrame
batch_size = 150  # Adjust the batch size as needed. In this case, all the tasks are sent at once.

# Set headers for API requests
headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# Measure response time and run the batch processing
start_time = time.time()
asyncio.run(process_tasks_in_batches(tasks, batch_size, headers))
response_time_gpt4omini = time.time() - start_time
print(f"Response Time: {response_time_gpt4omini:.2f} seconds")

Processed 150 tasks.
Response Time: 21.54 seconds


In [None]:
print(f"Response Time: {response_time_gpt4omini:.2f} seconds")

Response Time: 21.54 seconds


# 7.Response times summary

In [None]:
print("Response Times:")
print(f"GPT-4: {response_time_gpt4:.2f} seconds (Average time per task: {response_time_gpt4 / nbt:.4f} seconds)")
print(f"GPT-4o: {response_time_gpt4o:.2f} seconds (Average time per task: {response_time_gpt4o / nbt:.4f} seconds)")
print(f"GPT-4o-mini: {response_time_gpt4omini:.2f} seconds (Average time per task: {response_time_gpt4omini / nbt:.4f} seconds)")


Response Times:
GPT-4: 30.95 seconds (Average time per task: 0.2149 seconds)
GPT-4o: 26.75 seconds (Average time per task: 0.1858 seconds)
GPT-4o-mini: 21.54 seconds (Average time per task: 0.1496 seconds)
