# Parallelization

There are often significant opportunities to parallelize calls to Azure OpenAI for different tasks such as classification or document processing.

Whilst a straightforward technique, not unique to GenAI/LLM, as seen in the document processing case study, it is important to consider how a workload could be broken up from a sequential process to a number of parallelizable tasks, which provides the opportunity to then use these approaches.

In [2]:
import datetime
import json
import time
import os
import datetime
import json
import time
from openai import AzureOpenAI
from dotenv import load_dotenv
import json
import copy
import textwrap

# Load environment variables
load_dotenv()

model = os.getenv("MODELGPT432k")

user_messages = [
    "GPT-3 (Generative Pre-trained Transformer 3) - OpenAI's powerful language model capable of writing like a human",  
    "DALL-E - OpenAI's AI system that can create images from textual descriptions",  
    "Scientific research has led to significant advancements in medicine and healthcare.",  
    "CLIP (Contrastive Language-Image Pretraining) - OpenAI's model that understands images in the context of natural language",  
    "Science has contributed to our understanding of the natural world and the universe.",  
    "Codex - OpenAI's AI system that can understand and generate code, powering GitHub Copilot",  
    "GPT-4 - OpenAI's rumored next iteration of their language model with anticipated improvements",  
    "Azure AI - Microsoft's suite of AI services, including machine learning, cognitive services, and conversational AI",  
     "The collaboration and exchange of scientific knowledge across international borders have facilitated global progress in various fields." ,
     "Scientific innovations have improved communication and connectivity through technology.", 
    "Microsoft Turing Models - A series of large-scale language models developed by Microsoft",  
    "Microsoft Project Brainwave - Real-time AI platform for cloud and edge computing",  
    "Microsoft AI for Earth - A program applying AI to environmental challenges",  
    "Microsoft AI for Health - An initiative leveraging AI for health-related research",  
    "Scientific innovations have improved communication and connectivity through technology.",  
    "OpenAI's API - Providing access to GPT-3 and other models for various applications",   
]

system_message = '''
You are an helpful AI assistant that categorizes the text in one of the two categories : SCIENCE, AI. 
###Important :
Do not add any additional information.
Make sure to complete all elements of the array'''


## Scenario: Document Classification

A number of article titles are being classified as either belonging to the _Science_ or _AI_ categories.

### A: Base case - Sequential Execution

**Time taken: 180 seconds**

Each article is evaluated by making sequential calls, one by one, to Azure OpenAI.

In [4]:
client = AzureOpenAI(
        api_version=os.getenv("API_VERSION"),
        azure_endpoint=os.getenv("AZURE_ENDPOINT"),
        api_key=os.getenv("API_KEY")
    )

responses = []

start_time = time.time()

for i, user_message in enumerate(user_messages):

    completion = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message},
        ],
    )

    result = completion.choices[0].message.content
    responses.append(result)

end_time = time.time()
e2e_time = end_time - start_time

print(responses)
print("Time taken for execution : ", e2e_time)


['["AI"]', '"AI"', '{"category": "SCIENCE"}', '["AI"]', '["Science has contributed to our understanding of the natural world and the universe.", "SCIENCE"]', '["Codex - OpenAI\'s AI system that can understand and generate code, powering GitHub Copilot", "AI"]', '["AI"]', '["AI"]', '["SCIENCE"]', '{"category": "SCIENCE"}', '["AI"]', '["AI"]', '"AI"', '["AI"]', '"SCIENCE"', '["AI"]']
Time taken for execution :  180.36897778511047


### B: Batching requests

**Time taken: 4.7 seconds**

Rather than calling the API one by one, all of the articles are loaded as an object. This saves time establishing the network connection, loading the prompt into memory, and also allows the LLM to more efficiently generate an array of responses.

In [26]:
def aoai_call(system_message,prompt, model):
    client = AzureOpenAI(
        api_version=os.getenv("API_VERSION"),
        azure_endpoint=os.getenv("AZURE_ENDPOINT"),
        api_key=os.getenv("API_KEY")
    )

    start_time = time.time()

    completion = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": prompt},
        ],
    )

    end_time = time.time()
    e2e_time = end_time - start_time

    result=json.loads(completion.model_dump_json(indent=2))
    completion_text=result["choices"][0]["message"]["content"]

    return completion_text, e2e_time

aoai_call(system_message, 
          json.dumps([user_messages]), model)


('["AI", "AI", "SCIENCE", "AI", "SCIENCE", "AI", "AI", "AI", "SCIENCE", "SCIENCE", "AI", "AI", "AI", "AI", "SCIENCE", "AI"]',
 4.722882986068726)

### C: Parallelize requests using asynchronous calls

**Time taken: 2.5 seconds**

The API is called asynchronously, allowing an incredible speed up of the application. The repsonse time is no longer proportionate to the number of articles to classify, but now fixed to the length of a single response. There is a batch parameter, which is used to control the number of articles classified simulataneously, as the limit of the application's speed is now the amount of compute quota available.

In [6]:
import asyncio
import aiohttp
import os
from aiohttp import ClientSession

async def fetch(session, system_message, user_message):
    url = f'{os.getenv("AZURE_ENDPOINT")}/openai/deployments/gpt-4/chat/completions?api-version={os.getenv("API_VERSION")}'
    headers = {
        "Content-Type": "application/json",
        "api-key": os.getenv("API_KEY")
    }  
    data = {
        "model": "gpt-4",  # Adjust the model as needed
        "messages": [
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ]
    }
    async with session.post(url, json=data, headers=headers) as response:
        return await response.json()

async def main(system_message, user_messages):
    api_call_batch_size = 16

    async with ClientSession() as session:
        tasks = []
        responses = []
        for i, user_message in enumerate(user_messages):
            # time.sleep(1) # Add a small delay to avoid hitting the rate limit
            task = asyncio.create_task(fetch(session, system_message, user_message))
            tasks.append(task)
            if len(tasks) >= api_call_batch_size: 
                responses.extend(await asyncio.gather(*tasks))
                tasks = []
        responses.extend(await asyncio.gather(*tasks))  # Process the last batch
        return responses

system_message = '''
You are an helpful AI assistant that categorizes the text in one of the two categories : SCIENCE, AI. 
###Important :
Do not add any additional information'''

start = time.time()
responses_async = await main(system_message, user_messages)
end = time.time()

run_time = end - start
print(f"Total time taken: {run_time} seconds")
# responses
response_content = []
for i in range(len(responses_async)):
    try:
        response_content.append(responses_async[i]["choices"][0]["message"]["content"])
    except:
        pass
print(response_content)
print(len(response_content))

Total time taken: 2.5420188903808594 seconds
['AI', 'AI', 'SCIENCE', 'AI', 'This text belongs to the category: SCIENCE.', 'AI', 'AI', 'AI', 'SCIENCE', 'SCIENCE', 'AI', 'AI', 'AI', 'AI', 'SCIENCE', 'AI']
16
