## File Upload Testing in Azure OpenAI (AI Foundry)

### Environment Setup

In [1]:
# Import required packages
import os
import shutil
import time
import json
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
from openai import AzureOpenAI
from azure.identity import DefaultAzureCredential, get_bearer_token_provider

In [2]:
# Set file batch configuration
TOTAL_FILES = 200
FILE_SIZE_KB = 5

In [3]:
# Set up environment variables for Azure OpenAI
AOAI_API_BASE = os.getenv("AZURE_OPENAI_API_BASE")
AOAI_API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION")
AOAI_DEPLOYMENT = os.getenv("AZURE_OPENAI_API_DEPLOY")

### Azure OpenAI Client Setup

In [4]:
# Initialise token provider
token_provider = get_bearer_token_provider(
    DefaultAzureCredential(),
    "https://cognitiveservices.azure.com/.default"
)

In [5]:
# Initialise Azure OpenAI client
client = AzureOpenAI(  
    azure_endpoint = AOAI_API_BASE,
    azure_ad_token_provider = token_provider,
    api_version = AOAI_API_VERSION,
)

### File Batch and Vector Store Setup

In [6]:
# Create test files
def create_test_files():
    test_dir = os.path.join(os.getcwd(), "test_files")
    
    if os.path.exists(test_dir):
        shutil.rmtree(test_dir)
    os.makedirs(test_dir)
    
    files = []
    content_base = "This is test content for vector store batch upload testing. " * (FILE_SIZE_KB * 15)
    
    print(f"Creating {TOTAL_FILES} test files ...")
    for i in range(TOTAL_FILES):
        file_path = os.path.join(test_dir, f"test_doc_{i:03d}.txt")
        content = f"Document {i+1}\n{content_base}\nEnd of document {i+1}\nTimestamp: {time.time()}"
        
        with open(file_path, 'w') as f:
            f.write(content)
        files.append(file_path)
    
    print(f"Created {TOTAL_FILES} files")
    return files, test_dir

test_files, temp_dir = create_test_files()

Creating 200 test files ...
Created 200 files


In [7]:
# Create vector store
def create_vector_store():
    vector_store = client.vector_stores.create(
        name=f"Rate Test Vector Store - {int(time.time())}"
    )
    print(f"Created vector store: {vector_store.id}")
    return vector_store.id

vector_store_id = create_vector_store()

Created vector store: vs_vSoLHyPUBKcUNS20sLjXOfEK


### Option 1: Individual Sequential Test (10 files)

In [8]:
# Helper function to run individual sequential test
def run_individual_sequential_test(file_paths, vector_store_id):
    print("\n" + "="*70)
    print(f"Running Individual Sequential Test: {len(file_paths)} files")
    print("="*70)
    
    start_time = time.time()
    successful_uploads = 0
    
    for i, file_path in enumerate(file_paths):
        try:
            # Step 1: Upload from local disk
            with open(file_path, 'rb') as f:
                uploaded_file = client.files.create(file=f, purpose="assistants")
            
            # Step 2: Add to vector store
            client.vector_stores.files.create(
                vector_store_id=vector_store_id,
                file_id=uploaded_file.id
            )
            successful_uploads += 1
            print(f"  Processed file {i+1}/{len(file_paths)}")
        except Exception as e:
            print(f"  Failed to process file {i+1}: {e}")
            
    total_duration = time.time() - start_time
    completion_rate = successful_uploads / total_duration if total_duration > 0 else 0
    
    print("\n--- Individual Test Results ---")
    return {
        "Test Scenario": "1. Individual Sequential",
        "Files": len(file_paths),
        "Workers": 1,
        "Successful": successful_uploads,
        "Submission Rate (RPS)": f"{completion_rate:.2f}",
        "Completion Rate (files/sec)": f"{completion_rate:.2f}",
        "Total Time (s)": f"{total_duration:.2f}",
        "Rate Limit Errors": 0
    }

# Run the individual test on the first 10 files
individual_test_results = run_individual_sequential_test(test_files[:10], vector_store_id)
print(json.dumps(individual_test_results, indent=2))


Running Individual Sequential Test: 10 files
  Processed file 1/10
  Processed file 2/10
  Processed file 3/10
  Processed file 4/10
  Processed file 5/10
  Processed file 6/10
  Processed file 7/10
  Processed file 8/10
  Processed file 9/10
  Processed file 10/10

--- Individual Test Results ---
{
  "Test Scenario": "1. Individual Sequential",
  "Files": 10,
  "Workers": 1,
  "Successful": 10,
  "Submission Rate (RPS)": "0.56",
  "Completion Rate (files/sec)": "0.56",
  "Total Time (s)": "17.70",
  "Rate Limit Errors": 0
}


### Option 2: Individual Upload of Files to Vector Store

In [9]:
# Helper function to run unified concurrent test
def run_unified_concurrent_test(file_paths, vector_store_id, num_workers):
    print("\n" + "="*70)
    print(f"Running Unified Concurrent Test: {len(file_paths)} files with {num_workers} workers")
    print(f"Target Vector Store: {vector_store_id}")
    print("="*70)
    
    results = []

    # The task for each thread. This represents one complete, realistic operation.
    def upload_and_add_worker(file_path):
        request_start_time = time.time()
        try:
            # Step 1: Upload the file from the local machine.
            with open(file_path, 'rb') as f:
                uploaded_file = client.files.create(file=f, purpose="assistants")
            
            # Step 2: Add the now-uploaded file to the vector store.
            client.vector_stores.files.create(
                vector_store_id=vector_store_id,
                file_id=uploaded_file.id
            )
            return {'success': True, 'request_time': request_start_time, 'completion_time': time.time(), 'error': None}
        except Exception as e:
            return {'success': False, 'request_time': request_start_time, 'completion_time': time.time(), 'error': str(e)}

    # Use ThreadPoolExecutor to run the complete workflow concurrently
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(upload_and_add_worker, fp) for fp in file_paths]
        for i, future in enumerate(futures):
            results.append(future.result())
            if (i + 1) % 20 == 0:
                print(f"  ... {i+1}/{len(file_paths)} tasks completed.")

    # --- Analyze the results ---
    request_times = [r['request_time'] for r in results]
    completion_times = [r['completion_time'] for r in results]
    successful_results = [r for r in results if r['success']]
    
    submission_window = max(request_times) - min(request_times)
    submission_rate = len(file_paths) / submission_window if submission_window > 0 else float('inf')
    
    total_duration = max(completion_times) - min(request_times)
    completion_rate = len(successful_results) / total_duration if total_duration > 0 else 0
    
    rate_limit_errors = sum(1 for r in results if not r['success'] and '429' in r['error'])

    return {
        "Test Scenario": f"2. Concurrent ({num_workers} Workers)",
        "Files": len(file_paths),
        "Workers": num_workers,
        "Successful": len(successful_results),
        "Submission Rate (RPS)": f"{submission_rate:.2f}",
        "Completion Rate (files/sec)": f"{completion_rate:.2f}",
        "Total Time (s)": f"{total_duration:.2f}",
        "Rate Limit Errors": rate_limit_errors
    }

In [10]:
# Test different batch sizes with 200 files each
scenarios_to_run = [20, 50, 100]
all_concurrent_results = []
for worker_count in scenarios_to_run:
    # **# BOLD: Call the test function, passing the main_vector_store_id.**
    test_result = run_unified_concurrent_test(
        file_paths=test_files,
        vector_store_id=vector_store_id,
        num_workers=worker_count
    )
    all_concurrent_results.append(test_result)


Running Unified Concurrent Test: 200 files with 20 workers
Target Vector Store: vs_vSoLHyPUBKcUNS20sLjXOfEK
  ... 20/200 tasks completed.
  ... 40/200 tasks completed.
  ... 60/200 tasks completed.
  ... 80/200 tasks completed.
  ... 100/200 tasks completed.
  ... 120/200 tasks completed.
  ... 140/200 tasks completed.
  ... 160/200 tasks completed.
  ... 180/200 tasks completed.
  ... 200/200 tasks completed.

Running Unified Concurrent Test: 200 files with 50 workers
Target Vector Store: vs_vSoLHyPUBKcUNS20sLjXOfEK
  ... 20/200 tasks completed.
  ... 40/200 tasks completed.
  ... 60/200 tasks completed.
  ... 80/200 tasks completed.
  ... 100/200 tasks completed.
  ... 120/200 tasks completed.
  ... 140/200 tasks completed.
  ... 160/200 tasks completed.
  ... 180/200 tasks completed.
  ... 200/200 tasks completed.

Running Unified Concurrent Test: 200 files with 100 workers
Target Vector Store: vs_vSoLHyPUBKcUNS20sLjXOfEK
  ... 20/200 tasks completed.
  ... 40/200 tasks completed.


### Statistical Analysis

In [11]:
# Performance Summary
final_results = [individual_test_results] + all_concurrent_results
summary_df = pd.DataFrame(final_results)

print("\n\n" + "="*110)
print("PERFORMANCE ANALYSIS OF UNIFIED UPLOAD WORKFLOW")
print("="*110)
# Reorder columns for clarity
summary_df = summary_df[[
    "Test Scenario", "Files", "Workers", "Successful", "Submission Rate (RPS)", 
    "Completion Rate (files/sec)", "Total Time (s)", "Rate Limit Errors"
]]
print(summary_df.to_string(index=False))
print("="*110)

print("\nThis summary reflects the performance of the complete, realistic workflow: uploading a file from your machine directly into a vector store.")

print("\n1. REQUEST SUBMISSION RATE (The '30 RPS' Limit):")
print("   - This measures how fast we can INITIATE the complete upload process.")
print("   - As you increase the number of 'Workers' (concurrent threads), this rate increases significantly.")

print("\n2. COMPLETION RATE (End-to-End Throughput):")
print("   - This measures how fast the entire process FINISHES for each file.")
print("   - This rate is naturally lower, because of the time required to ingest and index the files.")
print("   - The server processes the queue asynchronously, so completion will always lag behind submission.")




PERFORMANCE ANALYSIS OF UNIFIED UPLOAD WORKFLOW
              Test Scenario  Files  Workers  Successful Submission Rate (RPS) Completion Rate (files/sec) Total Time (s)  Rate Limit Errors
   1. Individual Sequential     10        1          10                  0.56                        0.56          17.70                  0
 2. Concurrent (20 Workers)    200       20         200                  8.78                        7.88          25.37                  0
 2. Concurrent (50 Workers)    200       50         200                 15.42                       10.49          19.06                  0
2. Concurrent (100 Workers)    200      100         199                 19.47                        9.44          21.08                  1

This summary reflects the performance of the complete, realistic workflow: uploading a file from your machine directly into a vector store.

1. REQUEST SUBMISSION RATE (The '30 RPS' Limit):
   - This measures how fast we can INITIATE the complete up

### Housekeeping

In [12]:
# Remove test files and vector store
def cleanup():
    print("\n" + "="*50)
    print("CLEANUP")
    print("="*50)
    
    # Clean up local files
    if 'temp_dir' in globals() and temp_dir and os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)
        print(f"Deleted test files in the local directory")
    
    # Clean up the vector store and any remaining files.
    print("Deleting main vector store and all uploaded files from Azure OpenAI...")
    
    all_uploaded_files = client.files.list()
    file_ids_to_delete = [f.id for f in all_uploaded_files]
    
    deleted_count = 0
    print(f"Found {len(file_ids_to_delete)} total files in the account to delete...")
    for file_id in file_ids_to_delete:
        try:
            client.files.delete(file_id)
            deleted_count += 1
        except Exception:
            # Ignore errors for files that might already be deleted
            pass
    print(f"Successfully deleted {deleted_count}/{len(file_ids_to_delete)} files.")
    
    # Delete the main vector store
    try:
        client.vector_stores.delete(vector_store_id=vector_store_id)
        print(f"Deleted vector store: {vector_store_id}")
    except Exception as e:
        print(f"Failed to delete vector store {vector_store_id}: {e}")

cleanup()


CLEANUP
Deleted test files in the local directory
Deleting main vector store and all uploaded files from Azure OpenAI...
Found 609 total files in the account to delete...
Successfully deleted 609/609 files.
Deleted vector store: vs_vSoLHyPUBKcUNS20sLjXOfEK
