Cost Optimization Techniques for LLM API Usage
This notebook demonstrates strategies to optimize costs when working with LLM APIs:
- Implementing batching
- Implementing prompt caching
- Token usage optimization techniques

!!! Prepare .env file based on .env.template !!!

In [None]:
# Install required external libraries
%pip install python-dotenv openai anthropic boto3 tiktoken pydantic IPython


# Import

In [None]:
import json
import os
import time
from enum import Enum
from pathlib import Path

import anthropic
import boto3
from dotenv import load_dotenv
from IPython.display import display, JSON
from openai import AzureOpenAI, OpenAI
from pydantic import BaseModel, Field
from openai.lib._parsing._completions import type_to_response_format_param

from openai_callback import (
    get_llm_debug_anthropic,
    get_llm_debug_openai, 
    get_llm_debug_bedrock
)
from timer import Timer


In [None]:
load_dotenv()

# Prerequisites

In [None]:
# Model names and versions used for different LLM providers

OPENAI_MODEL = "gpt-4o-mini"
AZURE_OPENAI_MODEL = "gpt-4o-mini"

OPENAI_MODEL_BATCH = "gpt-4o-mini"
AZURE_OPENAI_MODEL_BATCH = "gpt-4o-mini-batch"

ANTHROPIC_MODEL = "claude-3-5-haiku-20241022"

AWS_BEDROCK_ANTHROPIC_MODEL = "anthropic.claude-3-5-haiku-20241022-v1:0"

## Transcripts

In [None]:
transcripts_dir = "./transcripts/"
example_transcript_path = os.path.join(transcripts_dir, "16.txt")
example_transcript = open(example_transcript_path, "r").read()

## Models

In [None]:
class TopicEnum(str, Enum):
    ORDER_STATUS = "Order Status"
    PRODUCT_INQUIRY = "Product Inquiry"
    RETURN_EXCHANGE = "Return/Exchange"
    TECHNICAL_SUPPORT = "Technical Support"
    BILLING_PAYMENT = "Billing/Payment"
    DELIVERY_ISSUE = "Delivery Issue"
    PROMOTIONS_DISCOUNTS = "Promotions/Discounts"
    ACCOUNT_ASSISTANCE = "Account Assistance"
    FEEDBACK_COMPLAINT = "Feedback/Complaint"
    STORE_LOCATOR = "Store Locator"
    WARRANTY_GUARANTEE = "Warranty/Guarantee"
    FRAUD_SECURITY = "Fraud/Security"
    OTHER = "Other"

class Offer(BaseModel):
    offering_name: str = Field(...)
    price: str | None = Field(None)
    amount_with_unit: str | None = Field(None)
    background: str = Field(...)
    additional_notes: str | None = Field(None)

class CompetitorOffer(Offer):
    competitor_name: str = Field(...)

class Insight(BaseModel):
    main_call_topic: TopicEnum = Field(
        ..., description="Main topic category (must match predefined categories)"
    )
    secondary_call_topics: list[TopicEnum] = Field(
        ..., description="Secondary topic categories (must match predefined categories)"
    )
    summary: str = Field(...)
    offers: list[Offer] = Field(...)

## Instructions

In [None]:
system_prompt = open("./system_prompt.txt", "r").read()

# Cost Optimisation

## Level 1 - Prompt Caching
Prompt caching is a performance optimization technique used in language models that stores tokenized prefixes of prompts (including system and user messages). By caching these commonly used components, it significantly reduces both costs and processing time while maintaining the ability to generate unique outputs for each request.

The KV (key-value) cache stores intermediate attention computations generated during sequence processing. These cached representations allow the model to skip recomputing values for identical prefix tokens in subsequent requests. While this provides substantial performance benefits, the cache is sensitive to changes - any modification to the prefix tokens invalidates the cache and requires recomputation from that point forward.

Key Use Cases:
- Conversational AI: Efficiently handling conversation history and context
- Document Processing: Optimizing analysis of large documents and texts
- Few-Shot Learning: Maintaining consistent instruction sets and examples
- Batch Processing: Handling repetitive tasks with similar prompts
- Tool-Augmented LLMs: Supporting multiple rounds of tool interactions efficiently

##Anthropic's Prompt Caching

Anthropic's prompt caching feature offers significant performance and cost benefits:

Key Benefits:
- Reduces costs up to 90% and latency up to 85% for long prompts
- Maximum 5-minute cache lifetime (TTL), refreshed on each use
- Can be combined with Batch Inference for additional optimization

Implementation:
- Enabled by adding "cache_control" block to messages
- Requires minimum prompt length:
  - 1024 tokens: Claude 3.7/3.5 Sonnet, Claude 3 Opus
  - 2048 tokens: Claude 3.5 Haiku, Claude 3 Haiku

Pricing Structure:
- Cache write: 25% additionally over base input tokens
- Cache read: 90% discount compared to base input tokens

Cacheable Components:
- System and user messages
- Images
- Tools and tool definitions/usage

Pricing Table (per million tokens):
| **Model**                 | **Standard Input** | **Cache Write** | **Cache Read** | **Output** |
|---------------------------|-------------------|-----------------|----------------|------------|
| **Claude 3.5/3.7 Sonnet** | $3.00             | $3.75           | $0.30         | $15.00     |
| **Claude 3 Opus**         | $15.00            | $18.75          | $1.50         | $75.00     |
| **Claude 3.5 Haiku**      | $0.80             | $1.00           | $0.08         | $4.00      |

References:
- [Anthropic Documentation](https://docs.anthropic.com/en/docs/build-with-claude/tool-use/token-efficient-tool-use)

In [None]:
# output schema prefix due to lack of support for structured output in anthropic
model_json_schema_anthropic = f"""
As a genius expert, your task is to understand the content and provide
the parsed objects in json that match the following json_schema:
{Insight.model_json_schema()}
Make sure to return an instance of the JSON, not the schema itself
"""

In [None]:
# Test Anthropic's prompt caching feature by making multiple API calls with the same transcript
anthropic_client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))

model_json_schema_anthropic = f"""
As a genius expert, your task is to understand the content and provide
the parsed objects in json that match the following json_schema:
{Insight.model_json_schema()}
Make sure to return an instance of the JSON, not the schema itself
"""

example_transcripts = [
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
]

for _example_transcript in example_transcripts:
    with Timer() as timer:
        message = anthropic_client.messages.create(
            model=ANTHROPIC_MODEL,
            max_tokens=1000,
            temperature=0,
            system=[
                {
                    "type": "text",
                    "text": "\n\n".join(["aaa"+system_prompt, model_json_schema_anthropic]),
                    "cache_control": {"type": "ephemeral"},
                }
            ],
            messages=[
                {
                    "role": "user",
                    "content": [
                    {
                        "type": "text",
                        "text": f"[Transcript]\n {_example_transcript}",
                        "cache_control": {"type": "ephemeral"}
                    }
                ]
            }
            ]
        )
        time.sleep(1)

    #display(JSON(model_object.model_dump(), expanded=True))
    print(f"Elapsed time: {timer.elapsed_time}")
    print(get_llm_debug_anthropic(message.usage.dict(), message.model))
    print()

### Anthropic


In [None]:
# Test Anthropic's system message caching with multiple transcripts

anthropic_client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))

model_json_schema_anthropic = f"""
As a genius expert, your task is to understand the content and provide
the parsed objects in json that match the following json_schema:
{Insight.model_json_schema()}
Make sure to return an instance of the JSON, not the schema itself
"""

example_transcripts = [
    open(os.path.join(transcripts_dir, "15.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "17.txt"), "r").read(),
]

for _example_transcript in example_transcripts:
    with Timer() as timer:
        message = anthropic_client.messages.create(
            model=ANTHROPIC_MODEL,
            max_tokens=1000,
            temperature=0,
            system=[
                {
                    "type": "text",
                    "text": "\n\n".join(["a"+system_prompt, model_json_schema_anthropic]),
                    "cache_control": {"type": "ephemeral"},
                }
            ],
            messages=[
                {
                    "role": "user",
                    "content": [
                    {
                        "type": "text",
                        "text": f"[Transcript]\n {_example_transcript}",
                        "cache_control": {"type": "ephemeral"}
                    }
                ]
            }
            ]
        )

    #display(JSON(model_object.model_dump(), expanded=True))
    print(f"Elapsed time: {timer.elapsed_time}")
    print(get_llm_debug_anthropic(message.usage.dict(), message.model))
    print()

### AWS Bedrock

AWS Bedrock provides prompt caching similar to Anthropic's implementation, with a few key differences:
- Batch inference requests still accumulate costs even when using prompt caching
- The service is currently in preview/beta and not yet generally available (GA)
- Cache hits can help reduce latency and costs for repeated prompts
- Best practice is to structure prompts with static content at the beginning

### OpenAI

OpenAI Prompt Caching

Performance Benefits:
- Reduces latency by up to 80% 
- Reduces input token costs by 50%
- Automatic - no configuration needed

Technical Details:
- Available for prompts with 1024+ tokens
- Cache hits occur in 128 token increments
- Cache duration: 5-10 minutes (up to 1 hour during off-peak)

Cacheable Content:
- System messages
- User messages  
- Images
- Tools and tool calls
- Structured output specifications

Supported Models:
- GPT-4.5 Preview
- GPT-4o (except gpt-4o-2024-05-13 and chatgpt-4o-latest)
- GPT-4o Mini
- GPT-4o Realtime Preview
- O1 Preview
- O1 Mini

Pricing (per million tokens):

| Model                      | Input (Regular) | Input (Cached) | Output    |
|----------------------------|----------------|----------------|-----------|
| GPT-4o (2024-08-06)       | $2.50          | $1.25         | $10.00    |
| GPT-4o Fine-tuned         | $3.75          | $1.875        | $15.00    |
| GPT-4o Mini (2024-07-18)  | $0.15          | $0.075        | $0.60     |
| GPT-4o Mini Fine-tuned    | $0.30          | $0.15         | $1.20     |
| O1 Preview                | $15.00         | $7.50         | $60.00    |
| O1 Mini                   | $3.00          | $1.50         | $12.00    |

Best Practices:
1. Place static/repeated content at the start of prompts
2. Put dynamic content at the end
3. Use longer prompts when possible (1024+ tokens)
4. Make API calls during off-peak hours for better cache retention

In [None]:
# Test OpemAI's prompt caching feature by making multiple API calls with the same transcript

example_transcripts = [
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
]
openai_client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),
)
for _example_transcript in example_transcripts:
    with Timer() as timer:
        openai_result = openai_client.beta.chat.completions.parse(
                model=OPENAI_MODEL,
                temperature=0,
                max_completion_tokens=1024,
                messages=[
                    {
                        "role": "system",
                        "content": "asdad" + system_prompt
                    },
                    {
                        "role": "user",
                        "content": f"[Transcript]\n {_example_transcript}"
                    }
                ],
                response_format=Insight,
        )


    openai_response = openai_result.choices[0].message.content
    openai_insight: Insight = openai_result.choices[0].message.parsed
    openai_completion_tokens = openai_result.usage.completion_tokens
    openai_prompt_tokens = openai_result.usage.prompt_tokens
    openai_cached_tokens = openai_result.usage.prompt_tokens_details.cached_tokens

    display(JSON(openai_insight.model_dump(), expanded=True))

    print(f"Completion Tokens: {openai_completion_tokens}")
    print(f"Prompt Tokens: {openai_prompt_tokens}")
    print(f"Cached Tokens: {openai_cached_tokens}")
    print(f"Elapsed time: {timer.elapsed_time}")
    print()

In [None]:
# Test OpenAI's system message caching with multiple transcripts
 
example_transcripts = [
    open(os.path.join(transcripts_dir, "15.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "17.txt"), "r").read(),
]
openai_client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),
)
for _example_transcript in example_transcripts:
    with Timer() as timer:
        openai_result = openai_client.beta.chat.completions.parse(
                model=OPENAI_MODEL,
                temperature=0,
                max_completion_tokens=1024,
                messages=[
                    {
                        "role": "system",
                        "content": "d" + system_prompt
                    },
                    {
                        "role": "user",
                        "content": f"[Transcript]\n {_example_transcript}"
                    }
                ],
                response_format=Insight,
        )


    openai_response = openai_result.choices[0].message.content
    openai_insight: Insight = openai_result.choices[0].message.parsed
    openai_completion_tokens = openai_result.usage.completion_tokens
    openai_prompt_tokens = openai_result.usage.prompt_tokens
    openai_cached_tokens = openai_result.usage.prompt_tokens_details.cached_tokens

    display(JSON(openai_insight.model_dump(), expanded=True))

    print(f"Completion Tokens: {openai_completion_tokens}")
    print(f"Prompt Tokens: {openai_prompt_tokens}")
    print(f"Cached Tokens: {openai_cached_tokens}")
    print(f"Elapsed time: {timer.elapsed_time}")
    print()

### Azure OpenAI

Azure OpenAI provides system prompt caching functionality that enables efficient reuse of system prompts across multiple API calls, reducing both token usage and latency.

Key features of Azure OpenAI prompt caching:
- Officially supported in API version 2024-10-01-preview and later
- Available exclusively for the o1 model family (e.g. gpt-4-o1, gpt-3.5-turbo-o1)
- Cached tokens are tracked and reported in the usage.prompt_tokens_details.cached_tokens field
- Achieves up to 90% reduction in token usage for repeated system prompts
- Reduces latency by skipping re-tokenization of cached system prompts
- Implements the same caching logic and behavior as OpenAI's base API

The example below demonstrates Azure OpenAI's prompt caching capabilities by processing multiple transcripts with a shared system prompt. Monitor the cached_tokens metric to observe the caching in action.


In [None]:
# Process multiple transcripts using Azure OpenAI with prompt caching
example_transcripts = [
    open(os.path.join(transcripts_dir, "15.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "17.txt"), "r").read(),
]
azure_openai_client = AzureOpenAI(
    api_version="2024-10-01-preview",
    api_key=os.environ.get("AZURE_OPENAI_API_KEY"),
    azure_endpoint=os.environ.get("AZURE_OPENAI_API_ENDPOINT"),
)

for _example_transcript in example_transcripts:
    with Timer() as timer:
        azure_openai_result = azure_openai_client.beta.chat.completions.parse(
                model=OPENAI_MODEL,
                temperature=0,
                max_completion_tokens=1024,
                messages=[
                    {
                        "role": "system",
                        "content": "agdfa" + system_prompt
                    },
                    {
                        "role": "user",
                        "content": f"[Transcript]\n {_example_transcript}"
                    }
                ],
                response_format=Insight,
        )


    azure_openai_response = azure_openai_result.choices[0].message.content
    azure_openai_insight: Insight = azure_openai_result.choices[0].message.parsed
    azure_openai_completion_tokens = azure_openai_result.usage.completion_tokens
    azure_openai_prompt_tokens = azure_openai_result.usage.prompt_tokens
    azure_openai_cached_tokens = azure_openai_result.usage.prompt_tokens_details.cached_tokens

    display(JSON(openai_insight.model_dump(), expanded=True))

    print(f"Completion Tokens: {azure_openai_completion_tokens}")
    print(f"Prompt Tokens: {azure_openai_prompt_tokens}")
    print(f"Cached Tokens: {azure_openai_cached_tokens}")
    print(f"Elapsed time: {timer.elapsed_time}")
    print()

## Level 2 - Batch Inference

When to use batch inference:
- Processing large volumes of data (thousands of records)
- Latency is not critical (responses can be delayed)
- Cost optimization is a priority ( 50% cheaper than on-demand)
- Running extensive evaluations, analyses, or model comparisons
- Data processing can be done asynchronously
- Resource utilization needs to be optimized

### Anthropic

Key Features
- Efficient prompt caching support for optimized processing
- Cost-effective with 50% discount compared to standard API pricing
- Asynchronous batch processing with results typically within 1 hour

Pricing (per Million Tokens)

 | Model             | On-demand Input | Batch Input   | On-demand Output | Batch Output  |
 |-------------------|-----------------|---------------|------------------|---------------|
 | Claude 3.7 Sonnet | \$3.00 / MTok   | \$1.50 / MTok | \$15.00 / MTok   | \$7.50 / MTok |
 | Claude 3.5 Sonnet | \$3.00 / MTok   | \$1.50 / MTok | \$15.00 / MTok   | \$7.50 / MTok |
 | Claude 3.5 Haiku  | \$0.80 / MTok   | \$0.40 / MTok | \$4.00 / MTok    | \$2.00 / MTok |

Quota Limits & Processing Details
Tier 1
- Queue capacity: 100,000 batch requests
- Rate limit: 50 requests per minute (RPM)

Tier 2  
- Queue capacity: 200,000 batch requests
- Rate limit: 1,000 requests per minute (RPM)

General Limits
- Maximum requests per batch: 100,000
- Maximum batch size: Either 100,000 messages or 256 MB
- Processing time: Usually within 1 hour, maximum 24 hours
- Result retention: 29 days from creation
- Expiration: Batches expire if not processed within 24 hours

For more information, visit: https://console.anthropic.com/

In [None]:
import anthropic
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request

client = anthropic.Anthropic()

In [None]:
# Prepare batch requests for multiple transcripts to be processed by Anthropic's API
requests = []
timestamp = int(time.time())

example_transcripts = [
    open(os.path.join(transcripts_dir, "14.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "15.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "17.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "18.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "19.txt"), "r").read(),
]

for i, _example_transcript in enumerate(example_transcripts):
    _request = Request(
        custom_id=f"{i}--anthropic_batch_example--{timestamp}",
        params=MessageCreateParamsNonStreaming(
            model=ANTHROPIC_MODEL,
            max_tokens=1024,
            system=[
                {
                    "type": "text",
                    "text": "\n\n".join([system_prompt, model_json_schema_anthropic]),
                    "cache_control": {"type": "ephemeral"},
                }
            ],
            messages=[
                {
                    "role": "user",
                    "content": f"[Transcript]\n {_example_transcript}"
                }
            ],
        )
    )
    requests.append(_request)


In [None]:
message_batch = client.messages.batches.create(
    requests=requests
)

message_batch_id = message_batch.id

print(message_batch)

In [None]:
message_batch_id = "msgbatch_01BYGbkrw2KrVUxx4MrFF48u"
message_batch_id = "msgbatch_016LieMPc3tdMMi3wY5nQK3D" 

In [None]:
message_batch = client.messages.batches.retrieve(
    message_batch_id
)
message_batch

In [None]:
for result in client.messages.batches.results(
    message_batch_id,
):
    match result.result.type:
        case "succeeded":
            print(f"Success! {result.custom_id}")
            print(result)
            display(JSON(result.model_dump_json(), expanded=True))
            result_dict = result.model_dump()
            print(get_llm_debug_anthropic(result_dict['result']['message']['usage'], result_dict['result']['message']['model']))
            print()
        case "errored":
            if result.result.error.type == "invalid_request":
                print(f"Validation error {result.custom_id}")
            else:
                print(f"Server error {result.custom_id}")
        case "expired":
            print(f"Request expired {result.custom_id}")

### AWS Bedrock
Key Benefits:
- Cost-effective: Up to 50% lower costs compared to standard API calls
- Flexible processing: 24-hour window for batch completion
- High throughput: Process large volumes of requests efficiently

Technical Specifications:
Message Batch Limits:
- Maximum 50,000 message requests per batch
- Maximum 200 MB batch size
- Token limits vary by agreement type (e.g., 200M tokens for default and GPT-4)

Processing Details:
- Results typically available within 1 hour
- Maximum processing window of 24 hours
- Results expire after 24 hours
- Batch processing uses separate quota from standard API rate limits


In [None]:
model_json_schema_anthropic = f"""
As a genius expert, your task is to understand the content and provide
the parsed objects in json that match the following json_schema:
{Insight.model_json_schema()}
Make sure to return an instance of the JSON, not the schema itself
"""

In [None]:
role_arn = "arn:aws:iam::<arn>"
model_id = "anthropic.claude-3-5-haiku-20241022-v1:0"
bucket_name = os.environ["S3_BATCH_BUCKET_NAME"]
output_s3_bucket_path = f"s3://{bucket_name}/batch_output/"
bedrock_anthropic_batch_input_path = Path("./bedrock_anthropic_batch_input.jsonl")
object_name = os.path.basename(bedrock_anthropic_batch_input_path)

In [None]:
bedrock_anthropic_message_requests = []
timestamp = int(time.time())

example_transcripts = [open(os.path.join(transcripts_dir, f"{i}.txt"), "r").read() for i in range(1, 101)]

for i, _example_transcript in enumerate(example_transcripts):
    _request =  {
        "recordId": f"{i}--aws_bedrock_batch_example--{timestamp}",
        "modelInput": {
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "system": [
                {
                        "type": "text",
                        "text": "\n\n".join([system_prompt,model_json_schema_anthropic]),
                        "cache_control": {"type": "ephemeral"},
                },
            ],
            "messages": [
                {
                        "role": "user",
                        "content": f"[Transcript]\n {_example_transcript}"
                }
            ]
        },
    }
    bedrock_anthropic_message_requests.append(_request)

bedrock_anthropic_batch_input_path.write_text("\n".join(json.dumps(bedrock_anthropic_message_request) for bedrock_anthropic_message_request in bedrock_anthropic_message_requests))

In [None]:
s3_client = boto3.client('s3')
bedrock_client = boto3.client(service_name="bedrock", region_name="us-west-2")

In [None]:
s3_client.upload_file(bedrock_anthropic_batch_input_path, bucket_name, object_name)
input_s3_object_path = f"s3://{bucket_name}/{object_name}"

In [None]:
timestamp = int(time.time())
job_name = f"example-batch-job-{timestamp}"

inputDataConfig = ({
    "s3InputDataConfig": {
        "s3Uri": input_s3_object_path
    }
})

outputDataConfig = ({
    "s3OutputDataConfig": {
        "s3Uri": output_s3_bucket_path
    }
})
response = bedrock_client.create_model_invocation_job(
    roleArn=role_arn,
    modelId=model_id,
    jobName=job_name,
    inputDataConfig=inputDataConfig,
    outputDataConfig=outputDataConfig
)
job_arn = response.get('jobArn')

In [None]:
job_arn = "arn:aws:bedrock:us-west-2:711156763240:model-invocation-job/4n5cf3uuyym9"
job_arn = "arn:aws:bedrock:us-west-2:711156763240:model-invocation-job/zxcg13d36ewe"

In [None]:
response = bedrock_client.get_model_invocation_job(jobIdentifier=job_arn)
display(JSON(response, expanded=True))

In [None]:
s3 = boto3.client('s3')

results_response = s3.get_object(Bucket=response['outputDataConfig']['s3OutputDataConfig']['s3Uri'].split('/')[2], Key=f"{response['outputDataConfig']['s3OutputDataConfig']['s3Uri'].split('/')[3]}/{response['jobArn'].split('/')[-1]}/bedrock_anthropic_batch_input.jsonl.out")
data = results_response['Body'].read().decode('utf-8')

for line in data.splitlines()[:2]:
    _data = json.loads(line)
    display(JSON(_data))


### Azure OpenAI

Issues with schema validation with openai schema compliance:
- Schema is missing "additionalProperties": false which would prevent extra fields
- Need to set "strict": True for strict validation enforcement

In [None]:
json.dumps(Insight.model_json_schema())
json.dumps(type_to_response_format_param(Insight)["json_schema"]["schema"])

Azure OpenAI Batch Processing Quota Limits & Constraints

Queue Limits
- Tier 1: Max 100,000 requests in processing queue
- Tier 2: Max 200,000 requests in processing queue

Rate Limits
- Tier 1: 50 requests per minute (RPM)
- Tier 2: 1,000 requests per minute (RPM)

Batch Size Limits
- Maximum requests per batch: 100,000 (both tiers)
- Message batch size limit: 100,000 requests OR 256 MB
 
Time Constraints
- Expected completion: Most batches complete within 1 hour
- Maximum processing time: 24 hours before expiration
- Results retention period: 29 days from creation

Technical Requirements
- Strict schema validation enforced
- Deployment requires dedicated model instances

In [None]:
message_requests = []
timestamp = int(time.time())

json_schema = type_to_response_format_param(Insight) #secure schema

json_schema = {
        "type": "json_schema",
        "json_schema": {
            "schema": Insight.model_json_schema(),
            "name": "Insight",
            "strict": True,
        },
    }

example_transcripts = [
    open(os.path.join(transcripts_dir, "14.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "15.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "17.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "18.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "19.txt"), "r").read(),
]

for i, _example_transcript in enumerate(example_transcripts):
    _result = {
        "custom_id": f"{i}--azure_openai--{timestamp}",
        "method": "POST",
        "url": "/chat/completions",
        "body": {
            "model": AZURE_OPENAI_MODEL_BATCH,
            "messages": [
                {
                    "role": "system",
                    "content": "" + system_prompt
                },
                {
                    "role": "user",
                    "content": f"[Transcript]\n {_example_transcript}"
                }
            ],
            "response_format": json_schema,

        }
    }
    message_requests.append(_result)

In [None]:
azure_openai_batch_input_path = Path("./azure_openai_batch_input.jsonl")
azure_openai_batch_input_path.write_text("\n".join(json.dumps(message_request) for message_request in message_requests))

In [None]:
azure_openai_client = AzureOpenAI(
    api_version="2024-10-01-preview",
    api_key=os.environ.get("AZURE_OPENAI_API_KEY"),
    azure_endpoint=os.environ.get("AZURE_OPENAI_API_ENDPOINT"),
)

In [None]:
file = azure_openai_client.files.create(
  file=open(azure_openai_batch_input_path, "rb"),
  purpose="batch"
)
print(file.model_dump_json(indent=2))
file_id = file.id

In [None]:
batch_response = azure_openai_client.batches.create(
    input_file_id=file_id,
    endpoint="/chat/completions",
    completion_window="24h",
)
batch_id = batch_response.id
print(batch_response.model_dump_json(indent=2))

In [None]:
#invalid schema
batch_id = "batch_2183d1c7-297b-4a75-8775-8ba489da90d0"

batch_response = azure_openai_client.batches.retrieve(batch_id)
status = batch_response.status
print(f"{time.time_ns()} Batch Id: {batch_id},  Status: {status}")

output_file_id = batch_response.output_file_id
error_file_id = batch_response.error_file_id

if output_file_id:
    file_response = azure_openai_client.files.content(output_file_id)
    raw_responses = file_response.text.strip().split('\n')
    for raw_response in raw_responses:
        if not raw_response:
            continue
        json_response = json.loads(raw_response)
        formatted_json = json.dumps(json_response, indent=2)
        print(formatted_json)

if error_file_id:
    file_response = azure_openai_client.files.content(error_file_id)
    raw_responses = file_response.text.strip().split('\n')
    for raw_response in raw_responses:
        if not raw_response:
            continue
        json_response = json.loads(raw_response)
        formatted_json = json.dumps(json_response, indent=2)
        print(formatted_json)


In [None]:
# valid schema
batch_id = "batch_ce73fcc8-5b70-4154-8aa0-be9961a0d147"

batch_response = azure_openai_client.batches.retrieve(batch_id)
status = batch_response.status
print(f"{time.time_ns()} Batch Id: {batch_id},  Status: {status}")

output_file_id = batch_response.output_file_id
error_file_id = batch_response.error_file_id

if output_file_id:
    file_response = azure_openai_client.files.content(output_file_id)
    raw_responses = file_response.text.strip().split('\n')

    for raw_response in raw_responses:
        json_response = json.loads(raw_response)
        formatted_json = json.dumps(json_response, indent=2)
        print(formatted_json)


if error_file_id:
    file_response = azure_openai_client.files.content(error_file_id)
    raw_responses = file_response.text.strip().split('\n')
    for raw_response in raw_responses:
        if not raw_response:
            continue
        json_response = json.loads(raw_response)
        formatted_json = json.dumps(json_response, indent=2)
        print(formatted_json)

### OpenAI

Key Benefits:
- 50% cost reduction compared to standard API
- Guaranteed 24-hour turnaround time (most batches complete within 1 hour)
- Does not consume tokens from standard per-model rate limits

Quota & Limitations:
- Batch size: Maximum 50,000 messages or 200 MB per batch
- Output tokens: Unlimited
- Request submissions: Unlimited
- Results availability: Up to 24 hours
- Data retention: Results expire after 24 hours

Best Practices:
- Monitor batch status regularly
- Download results promptly before expiration
- Consider batch size for optimal processing


In [None]:
message_requests = []
timestamp = int(time.time())

json_schema = type_to_response_format_param(Insight) # secure schema

json_schema = {
        "type": "json_schema",
        "json_schema": {
            "schema": Insight.model_json_schema(),
            "name": "Insight",
            "strict": True,
        },
    }

example_transcripts = [
    open(os.path.join(transcripts_dir, "14.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "15.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "16.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "17.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "18.txt"), "r").read(),
    open(os.path.join(transcripts_dir, "19.txt"), "r").read(),
]

for i, _example_transcript in enumerate(example_transcripts):
    _result = {
        "custom_id": f"{i}--azure_openai--{timestamp}",
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": OPENAI_MODEL_BATCH,
            "messages": [
                {
                    "role": "system",
                    "content": "" + system_prompt
                },
                {
                    "role": "user",
                    "content": f"[Transcript]\n {_example_transcript}"
                }
            ],
            "response_format": json_schema,

        }
    }
    message_requests.append(_result)

In [None]:
openai_batch_input_path = Path("./openai_batch_input.jsonl")
openai_batch_input_path.write_text("\n".join(json.dumps(message_request) for message_request in message_requests))


In [None]:
openai_client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),
)

In [None]:
file = openai_client.files.create(
    file=open(openai_batch_input_path, "rb"),
    purpose="batch"
)
print(file.model_dump_json(indent=2))
openai_file_id = file.id

In [None]:
openai_batch_response = openai_client.batches.create(
    input_file_id=openai_file_id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
)
openai_batch_id = openai_batch_response.id
print(openai_batch_response.model_dump_json(indent=2))

In [None]:
# valid schema
openai_batch_id = "batch_67def026bdfc81908b85f6b125273a67"

openai_batch_response = openai_client.batches.retrieve(openai_batch_id)
status = openai_batch_response.status
print(f"{time.time_ns()} Batch Id: {openai_batch_id},  Status: {status}")

output_file_id = openai_batch_response.output_file_id
error_file_id = openai_batch_response.error_file_id

if output_file_id:
    file_response = openai_client.files.content(output_file_id)
    raw_responses = file_response.text.strip().split('\n')

    for raw_response in raw_responses:
        json_response = json.loads(raw_response)
        formatted_json = json.dumps(json_response, indent=2)
        display(JSON(formatted_json, expanded=True))
        print(get_llm_debug_openai(json_response['response']['body']['usage'], json_response['response']['body']['model']))
        print()

if error_file_id:
    file_response = openai_client.files.content(error_file_id)
    raw_responses = file_response.text.strip().split('\n')
    for raw_response in raw_responses:
        if not raw_response:
            continue
        json_response = json.loads(raw_response)
        formatted_json = json.dumps(json_response, indent=2)
        print(formatted_json)


# Level 0

## Schema optimisation


In [None]:
import tiktoken

encoding = tiktoken.encoding_for_model('gpt-4o')

In [None]:
#  JSON schema representation of nested Pydantic models
from pydantic import BaseModel
import json

class InnerClass(BaseModel):
    inner_attribute: str

class OuterClass(BaseModel):
    outer_attribute: str
    inner_class: InnerClass

schema = json.dumps(OuterClass.model_json_schema())
print(schema)
print(f"schema tokens: {len(encoding.encode(str(schema)))}")


In [None]:
# Compare token size between JSON and YAML formats
import yaml

schema_yaml = yaml.dump(json.loads(schema), default_flow_style=False)

print(schema_yaml)
print(f"schema_yaml tokens: {len(encoding.encode(str(schema_yaml)))}")

### Flattened schema


In [None]:
from pydantic import BaseModel
import json

# Flattened version of the nested schema that combines attributes into a single class
class OuterClassFlat(BaseModel):
    outer_attribute: str
    inner_attribute: str

schema = json.dumps(OuterClassFlat.model_json_schema())
print(schema)
print(f"schema tokens: {len(encoding.encode(str(schema)))}")

In [None]:
# Compare token size between JSON and YAML formats
import yaml

schema_yaml = yaml.dump(json.loads(schema), default_flow_style=False)

print(schema_yaml)
print(f"schema_yaml tokens: {len(encoding.encode(str(schema_yaml)))}")


### Lightweight schema
 
Techniques to reduce schema size:
- Remove non-essential field metadata (titles, descriptions, examples)
- Use shorter aliases for verbose attribute names
- Minimize enum value repetition
- Strip optional fields where possible
- Use compact JSON format

In [None]:
%pip install -U git+https://github.com/nicholishen/tooldantic.git


In [None]:
from tooldantic import GenericSchemaGenerator
from pydantic import BaseModel
import json

class Color(Enum):
    BLUE = "blue"
    RED = "red"


class InnerClass(BaseModel):
    inner_attribute: str
    color: Color


class OuterClass(BaseModel):
    outer_attribute: str
    inner_class: InnerClass
    inner_class2: InnerClass


schema_td = json.dumps(OuterClass.model_json_schema(schema_generator=GenericSchemaGenerator))
ref_schema = json.dumps(OuterClass.model_json_schema())

print(ref_schema)
print(schema_td)

encoding = tiktoken.encoding_for_model('gpt-4o')
print(len(encoding.encode(str(ref_schema))))
print(len(encoding.encode(str(schema_td))))


In [None]:
import yaml

schema_yaml = yaml.dump(json.loads(schema_td), default_flow_style=False)

print(schema_yaml)
print(f"schema_yaml tokens: {len(encoding.encode(str(schema_yaml)))}")
