In [6]:
import requests
import time
import concurrent.futures
import base64

In [7]:
def base64_encode(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode("utf-8")

def test_single_request(encoded_image, url):
    start_time = time.time()
    response = requests.post(url, json={"image": encoded_image})
    elapsed_time = time.time() - start_time

    if response.status_code == 200:
        return elapsed_time
    else:
        return None

In [8]:
def load_test(encoded_image, url, num_requests):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(test_single_request, encoded_image, url) for _ in range(num_requests)]
        results = [f.result() for f in concurrent.futures.as_completed(futures)]

    results = [result for result in results if result is not None]
    return results

In [14]:
def run_load_tests(url, sample_image, num_requests):

    encoded_image = base64_encode(sample_image)
    
    print(f"Starting load test with {num_requests} concurrent requests...")
    latencies = load_test(encoded_image, url, num_requests)

    average_latency = sum(latencies) / len(latencies)
    print(f"Average latency: {average_latency:.4f} seconds")
    print(f"Min latency: {min(latencies):.4f} seconds")
    print(f"Max latency: {max(latencies):.4f} seconds")

run_load_tests("http://localhost:8888/predict/", "public/sample_images/sample_cat.jpeg", 1)

Starting load test with 1 concurrent requests...
Average latency: 0.5274 seconds
Min latency: 0.5274 seconds
Max latency: 0.5274 seconds


### Multi requests ###

In [15]:
def test_batch_request(encoded_images, url):
    start_time = time.time()
    response = requests.post(url, json={"images": encoded_images})
    elapsed_time = time.time() - start_time

    if response.status_code == 200:
        return elapsed_time, response.json()
    else:
        print(f"Request failed with status code: {response.status_code}")
        return None

In [32]:
def concurrent_batch_test(encoded_images, url, num_requests):
   
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(test_batch_request, encoded_images, url) for _ in range(num_requests)]
        results = [f.result() for f in concurrent.futures.as_completed(futures)]
    
    results = [result for result in results if result is not None]
    return results

In [37]:
def run_load_test_batch(url, sample_image, batch_size, num_requests):

    encoded_image = base64_encode(sample_image)
    encoded_images = [encoded_image] * batch_size

    print(f"Starting batch test with {num_requests} concurrent requests...")
    test_results = concurrent_batch_test(encoded_images, url, num_requests)

    latencies = [result[0] for result in test_results]
    average_latency = sum(latencies) / len(latencies)
    print(f"Average latency: {average_latency:.4f} seconds")
  
run_load_test_batch("http://localhost:9000/predict/", "public/sample_images/sample_cat.jpeg", 10, 50)

Starting batch test with 50 concurrent requests...
Average latency: 5.9855 seconds
