# Load Test deployed web application

In this notebook, we test the latency of the deployed web application by sending a number of duplicate questions as asychronous requests.

In [1]:
import asyncio
import json
import random
import urllib.request
from timeit import default_timer

import aiohttp
from tqdm import tqdm
import requests
import pandas as pd
from utilities import text_to_json

In [2]:
print(aiohttp.__version__) 

3.3.2


We will test our deployed service with 100 calls. We will only have 4 requests concurrently at any time. We have only deployed one pod on one node and increasing the number of concurrent calls does not really increase throughput. Feel free to try different values and see how the service responds.

In [3]:
NUMBER_OF_REQUESTS = 100  # Total number of requests
CONCURRENT_REQUESTS = 4   # Number of requests at a time

Get the IP address of our service.

In [4]:
service_json = !kubectl get service azure-ml -o json
service_dict = json.loads(''.join(service_json))
app_url = service_dict['status']['loadBalancer']['ingress'][0]['ip']

In [5]:
scoring_url = 'http://{}/score'.format(app_url)
version_url = 'http://{}/version'.format(app_url)
health_url = 'http://{}/'.format(app_url)

In [6]:
!curl $health_url

Healthy

In [7]:
!curl $version_url # Reports the lightgbm version

2.1.2

In [8]:
dupes_test_path = 'dupes_test.tsv'
dupes_test = pd.read_csv(dupes_test_path, sep='\t', encoding='latin1')
dupes_to_score = dupes_test.iloc[:NUMBER_OF_REQUESTS,4]

In [10]:
url_list = [[scoring_url, jsontext] for jsontext in dupes_to_score.apply(text_to_json)]

In [11]:
def decode(result):
    return json.loads(result.decode("utf-8"))

In [12]:
async def fetch(url, session, data, headers):
    start_time = default_timer()
    async with session.request('post', url, data=data, headers=headers) as response:
        resp = await response.read()
        elapsed = default_timer() - start_time
        return resp, elapsed

In [13]:
async def bound_fetch(sem, url, session, data, headers):
    # Getter function with semaphore.
    async with sem:
        return await fetch(url, session, data, headers)

In [14]:
async def await_with_progress(coros):
    results=[]
    for f in tqdm(asyncio.as_completed(coros), total=len(coros)):
        result = await f
        results.append((decode(result[0]),result[1]))
    return results

In [15]:
async def run(url_list, num_concurrent=CONCURRENT_REQUESTS):
    headers = {'content-type': 'application/json'}
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(num_concurrent)

    # Create client session that will ensure we dont open new connection
    # per each request.
    async with aiohttp.ClientSession() as session:
        for url, data in url_list:
            # pass Semaphore and session to every POST request
            task = asyncio.ensure_future(bound_fetch(sem, url, session, data, headers))
            tasks.append(task)
        return await await_with_progress(tasks)

Below we run the 100 requests against our deployed service.

In [16]:
loop = asyncio.get_event_loop()
start_time = default_timer()
complete_responses = loop.run_until_complete(asyncio.ensure_future(run(url_list, num_concurrent=CONCURRENT_REQUESTS)))
elapsed = default_timer() - start_time
print('Total Elapsed {}'.format(elapsed))
print('Avg time taken {0:4.2f} ms'.format(1000*elapsed/len(url_list)))

100%|██████████| 100/100 [00:07<00:00, 13.70it/s]

Total Elapsed 7.304544539190829
Avg time taken 73.05 ms





In [17]:
# Example response
complete_responses[0]

({'result': [[[27928, 27943, 0.8253809764787108],
    [14220321, 14220323, 0.35839553716891404],
    [23667086, 23667087, 0.2802815688638847],
    [13840429, 13840431, 0.13780540985428547],
    [149055, 149150, 0.09027890385269859],
    [14028959, 8716680, 0.07342530890949217],
    [7506844, 7506937, 0.06974038517581685],
    [951021, 951057, 0.06040331310628502],
    [111102, 111111, 0.05082323110520758],
    [750486, 750506, 0.03981735707256131],
    [1885557, 1885660, 0.0393630763988023],
    [1726630, 1726662, 0.03125891860989419],
    [805107, 805113, 0.03104900973709915],
    [3127429, 3127440, 0.026947309156889858],
    [3384504, 3384534, 0.02496890912836022],
    [20279484, 20279485, 0.02168826727522807],
    [8228281, 8228308, 0.020223091279871732],
    [1527803, 1527820, 0.02017140612455014],
    [901115, 901144, 0.01958429824005327],
    [1584370, 1584377, 0.014974984750806518],
    [194397, 194399, 0.013461759741770203],
    [203198, 1207393, 0.012889765461493624],
    [684

Let's use the number of original questions to count the succesful responses.

In [25]:
no_questions = len(complete_responses[0][0]['result'][0])

In [27]:
num_succesful=[len(i[0]['result'][0]) for i in complete_responses].count(no_questions)
print('Succesful {} out of {}'.format(num_succesful, len(url_list)))

Succesful 100 out of 100


To tear down the cluster and all related resources go to the last section of [deploy on AKS notebook](05_Deploy_On_AKS.ipynb).