### Client

This notebook contains the client application code as well as the code to performance profile the server.

In [None]:
import time 
import requests
import json 
import threading
from typing import Dict, List, Optional
import numpy as np

# URI = "http://localhost:5000"
URI = "http://chu-healthcare-dns-name.eastus.azurecontainer.io:5000"
SYNC_ROUTE = "/text/analytics/v3.1/entities/health"
ASYNC_SUBMIT_ROUTE = "/text/analytics/v3.1/entities/health/jobs"
ASYNC_STATUS_ROUTE = "/text/analytics/v3.1/entities/health/jobs/{}"

class InferenceCall(threading.Thread): 
    def __init__(
        self, 
        call_server_sync: Optional[bool] = True, 
        batch_size: Optional[int] = 1, 
    ): 
        super().__init__()
        self.batched_payload = InferenceCall.make_batched_payload(batch_size)
        self.call_server_sync = call_server_sync
        self.batch_size = batch_size
        self.latency = None
        self.throughput = None 
        self.success = None
        self.status_code = None 
        
    @staticmethod
    def make_batched_payload(batch_size: int) -> Dict: 
        payload = {
          "documents": [
            {
              "text": "Patient doesn't suffer from high blood pressure. " * 102,
              "id": f"{idx}",
              "language": "en",
              "isLanguageDefaulted": True,
              "isLanguageFinalized": False,
              "isAutoLanguageDetectionEnabled": False
            } for idx in range(batch_size)
          ]
        }
        return payload 
    
    def run(self): 
        if self.call_server_sync: 
            st = time.time()
            response = requests.post(URI + SYNC_ROUTE, json=self.batched_payload)
            en = time.time() 
            status_code = response.status_code
        else: 
            st = time.time() 
            submit_response = requests.post(URI + ASYNC_SUBMIT_ROUTE, json=self.batched_payload)
            job_uid = submit_response.headers["Operation-Location"].split('/')[-1]
            status_response = requests.get(URI + ASYNC_STATUS_ROUTE.format(job_uid))
            status = status_response.json()["status"].lower() 
            while status != "succeeded":
                time.sleep(0.01)
                status_response = requests.get(URI + ASYNC_STATUS_ROUTE.format(job_uid))
                status = status_response.json()["status"].lower() 
                if status not in ['notstarted', 'running', 'succeeded']: 
                    print(status)
                    break
            en = time.time() 
            if status == "succeeded": 
                status_code = 200
            else: 
                status_code = 500
        self.latency = en - st
        self.throughput = self.batch_size / self.latency 
        self.status_code = status_code 
        self.success = status_code == 200
    
    
class Client: 
    def __init__(
        self, 
        num_concurrent_calls: Optional[int] = 1, 
        call_server_sync: Optional[bool] = True, 
        batch_size: Optional[int] = 1, 
    ): 
        super().__init__()
        self.num_concurrent_calls = num_concurrent_calls
        self.call_server_sync = call_server_sync
        self.batch_size = batch_size
        
        self.calls = [InferenceCall(call_server_sync=call_server_sync, batch_size=batch_size) for _ in range(num_concurrent_calls)]
        self.latency = None
        self.throughput = None 
        self.success_rate = None
        self.success = False
        self.statuses = None
        
    def execute(self): 
        st = time.time()
        for call in self.calls: 
            call.start() 
        for call in self.calls: 
            call.join()
        en = time.time() 
        self.latency = en - st
        self.success = all([call.success for call in self.calls])
        num_successes = sum([call.success for call in self.calls])
        self.success_rate =  num_successes / len(self.calls)
        self.throughput = num_successes * self.batch_size / self.latency
        self.statuses = {} 
        for call in self.calls: 
            if call.status_code not in self.statuses: 
                self.statuses[call.status_code] = 0 
            self.statuses[call.status_code] += 1
        
    

In [None]:
c = Client(num_concurrent_calls=2, call_server_sync=False, batch_size=1)
c.execute()

print(f"The test call was{'' if c.success else 'not'} successful.")
print(f"- Latency: {c.latency:.5f} seconds")
print(f"- Throughput: {c.throughput} TPS")
print(f"- Success Rate: {100 * c.success_rate:.2f}%")

In [None]:
def run_concurrent_experiment(
    call_server_sync: bool, 
    batch_size: int, 
    num_concurrent_calls: List[int], 
    num_trials: int,
    partial_data_name: str, 
): 
    data = {ncc: [] for ncc in num_concurrent_calls}
    for _ in range(num_trials):
        for ncc in num_concurrent_calls: 
            trial = Client(num_concurrent_calls=ncc, call_server_sync=call_server_sync, batch_size=batch_size)
            trial.execute()
            data[ncc].append(
                {
                    'latency': trial.latency, 
                    'throughput': trial.throughput, 
                    'success': trial.success,
                    'success_rate': trial.success_rate,
                    'statuses': trial.statuses,
                }
            )
            json.dump(data, open(f'{partial_data_name}.json', 'w'), indent=4)
    return data

In [None]:
num_concurrent_calls = [i for i in range(1, 25)] + [25 + 5*k for k in range(6)]
num_trials = 20

In [None]:
data1 = run_concurrent_experiment(
    call_server_sync=True, 
    batch_size=1, 
    num_concurrent_calls=num_concurrent_calls, 
    num_trials=num_trials, 
    partial_data_name="sync_endpoint_20_trials_bs_1")

In [None]:
data2 = run_concurrent_experiment(
    call_server_sync=False, 
    batch_size=1, 
    num_concurrent_calls=num_concurrent_calls, 
    num_trials=num_trials, 
    partial_data_name="async_endpoint_20_trials_bs_1")

In [None]:
data3 = run_concurrent_experiment(
    call_server_sync=True, 
    batch_size=5, 
    num_concurrent_calls=num_concurrent_calls, 
    num_trials=num_trials, 
    partial_data_name="sync_endpoint_20_trials_bs_5")

In [None]:
data4 = run_concurrent_experiment(
    call_server_sync=False, 
    batch_size=5, 
    num_concurrent_calls=num_concurrent_calls, 
    num_trials=num_trials, 
    partial_data_name="async_endpoint_20_trials_bs_5")