In [1]:
import aiohttp
import asyncio
import numpy as np
import json
import dask
from dask import delayed
from dask.distributed import Client, LocalCluster

In [2]:
class ModelClient():
    
    MODEL_HOST = 'http://192.168.64.8:30123'
    
    def __init__(self, model_host = MODEL_HOST):
        self.model_host = model_host

    async def async_predict(self, indata):    
        async with aiohttp.ClientSession() as session:
            async with session.post(self.model_host + '/predict', json=self.numpy_to_json(indata)) as resp:
                response_json = await resp.json()
                return self.json_to_numpy(response_json)
    
    def predict(self, indata):
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(self.async_predict(indata))
    
    def json_to_numpy(self, json_str, key='data'):
        return np.array(json.loads(json_str)[key])

    def numpy_to_json(self, nd_array, key='data'):
        try:
            if nd_array.shape[1] == 1:
                nd_array.reshape(nd_array.shape[0])
            nd_array_list =  { key : nd_array.tolist() } 
            return json.dumps(nd_array_list)
        except Exception as e:
            nd_array_list =  { key : [] } 
            return json.dumps(nd_array_list)


In [3]:
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:36105,Cluster  Workers: 8  Cores: 8  Memory: 8.36 GB


In [8]:
indata = np.random.randint(100000, size=(1000, 20))
modelClient = ModelClient()

result_delayed = [delayed(modelClient.predict)(indata) for i in range(100)]

%time result = dask.compute(*result_delayed)

CPU times: user 1.86 s, sys: 842 ms, total: 2.71 s
Wall time: 19.8 s


In [6]:
100000 / 20

5000.0