In [1]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import concurrent.futures
import multiprocessing as mp
import sys
sys.path.append('/Users/jack/Documents/Concurrency')
from asyncio_practice.setup_logger import logger

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open("/dev/urandom", "rb") as f:
        return f.read(100)


def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))


async def main():
    loop = asyncio.get_running_loop()

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(None, blocking_io)
    logger.debug(f'default thread pool {result}')

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
        logger.debug(f'custom thread pool {result}')

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context('fork')) as pool:
        result = await loop.run_in_executor(pool, cpu_bound)
        logger.debug(f'custom process pool {result}')


await main()
# asyncio.run(main())

2021-06-20 12:18:33 | MainThread |[36m DEBUG    [0m| root | default thread pool b"\x88\xcd\x11\xf9\xaf\x0b8x\x9e\xa4njoO\xa8eR\xc5,\xd80\xfe\x857\x931\xe9=\xd9\xfegi=u\x05\xd1p\xbd^I\xb9\xb0k\xc3y\xe6\xad\xdc\xbbi\xf5v\xf4\tM\x9e\x9a\xf1%\xed\xcd\x86_9\xcbN&\x9c\xa2\xf20\xb1\xd4\x90\xc5%Qk\xf2\xa3\xa1\xb15\xad\xe6\xf4H\xc3\x92,\xdc\xbc\x85'fX\xf4\xe0\xdd\x88"
2021-06-20 12:18:33 | MainThread |[36m DEBUG    [0m| root | custom thread pool b'0\xcbg\xa1l\xf3\xdbs\xf3\x89\xd0p\x85\xd4\x15_\xd4\x94/\x06\x96\x88\xb2\x94\xd4,\x97\xb9-%|)\xb37kR\x84\x9eV\xb1\x17\x1a\x1bO\x19\x1e\xaf4\xa0!\xa0r@\x87#\'\xc3\n\xc5+\xa6&\xbc\x87!n>\xc3\xb3\x99\xb7r62\xb5\x88=q\x8a"\x08\x9e\xe8\x17=:\x1dnv|\x1b\x9c5\xb7Hl\x1eV[;'
2021-06-20 12:18:34 | MainThread |[36m DEBUG    [0m| root | custom process pool 333333283333335000000


In [1]:
# event loop contains cpu-bound code block
import nest_asyncio
nest_asyncio.apply()
import asyncio
import concurrent.futures
import multiprocessing as mp
from multiprocessing import Pool
import sys
sys.path.append('/Users/jack/Documents/Concurrency')
from asyncio_practice.setup_logger import logger
import psutil
import tensorflow as tf

num_cpus = psutil.cpu_count(logical=False)
logger.info(f'num_cpus: {num_cpus}')
filename = '/tmp/model'
iteration = 0


2021-06-20 12:42:15 | MainThread |[36m DEBUG    [0m| tensorflow | Falling back to TensorFlow client; we recommended you install the Cloud TPU client directly with pip install cloud-tpu-client.
2021-06-20 12:42:15 | MainThread |[36m DEBUG    [0m| h5py._conv | Creating converter from 7 to 5
2021-06-20 12:42:15 | MainThread |[36m DEBUG    [0m| h5py._conv | Creating converter from 5 to 7
2021-06-20 12:42:15 | MainThread |[36m DEBUG    [0m| h5py._conv | Creating converter from 7 to 5
2021-06-20 12:42:15 | MainThread |[36m DEBUG    [0m| h5py._conv | Creating converter from 5 to 7
2021-06-20 12:42:16 | MainThread |[32m INFO     [0m| root | 8


In [4]:
import nest_asyncio
nest_asyncio.apply()
import asyncio
import concurrent.futures
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import sys
sys.path.append('/Users/jack/Documents/Concurrency')
from asyncio_practice.setup_logger import logger
import psutil
import tensorflow as tf

num_cpus = psutil.cpu_count(logical=False)
filename = '/tmp/model'
# mp.set_start_method('fork')
iteration = 0

def evaluate_next_batch():
    # Pin the process to a specific core if we are on Linux to prevent
    # contention between the different processes since TensorFlow uses
    # multiple threads.
    # if sys.platform == 'linux':
    #     psutil.Process().cpu_affinity([i])
    model = tf.keras.models.load_model(filename)
    logger.debug(f'Model is loaded')
    mnist = tf.keras.datasets.mnist.load_data()
    x_test = mnist[1][0] / 255.0
    pred = model.predict(x_test)
    logger.info('Finished prediction')
    return pred

async def exec_model(loop, pool):
    return await loop.run_in_executor(pool, evaluate_next_batch)

async def run_model(loop, pool):
    global iteration
    while iteration < 50:
        result = await loop.run_in_executor(pool, evaluate_next_batch)
        iteration += 1

async def reporter():
    global iteration
    while iteration < 50:
        logger.info('Please Wait')
        await asyncio.sleep(0.5)


async def main():
    loop = asyncio.get_running_loop()
    pool = ProcessPoolExecutor(max_workers=num_cpus, mp_context=mp.get_context('fork'))
    task1 = loop.create_task(run_model(loop, pool))
    task2 = loop.create_task(reporter())
    logger.info(task1)
    L = asyncio.gather(task1, task2)
    await L
    logger.info(L)

await main()


2021-06-20 12:25:54 | MainThread |[32m INFO     [0m| root | <Task pending name='Task-9' coro=<run_model() running at <ipython-input-4-3b345f63338e>:35>>
2021-06-20 12:25:54 | MainThread |[32m INFO     [0m| root | Please Wait
2021-06-20 12:25:55 | MainThread |[36m DEBUG    [0m| root | Model is loaded
2021-06-20 12:25:55 | MainThread |[32m INFO     [0m| root | Please Wait
2021-06-20 12:25:55 | MainThread |[32m INFO     [0m| root | Finished prediction
2021-06-20 12:25:55 | MainThread |[36m DEBUG    [0m| root | Model is loaded
2021-06-20 12:25:55 | MainThread |[32m INFO     [0m| root | Please Wait
2021-06-20 12:25:56 | MainThread |[32m INFO     [0m| root | Finished prediction
2021-06-20 12:25:56 | MainThread |[32m INFO     [0m| root | Please Wait
2021-06-20 12:25:56 | MainThread |[36m DEBUG    [0m| root | Model is loaded
2021-06-20 12:25:56 | MainThread |[32m INFO     [0m| root | Please Wait
2021-06-20 12:25:57 | MainThread |[32m INFO     [0m| root | Finished predic

In [2]:
import multiprocessing as mp
import concurrent.futures
from multiprocessing import Pool
import psutil
import sys
import tensorflow as tf
import asyncio
import sys
import time
sys.path.append('/Users/jack/Documents/Concurrency')
from multiprocessing_practice.setup_logger import logger

num_cpus = psutil.cpu_count(logical=False)

filename = '/tmp/model'

def evaluate_next_batch(i):
    # Pin the process to a specific core if we are on Linux to prevent
    # contention between the different processes since TensorFlow uses
    # multiple threads.
    if sys.platform == 'linux':
        psutil.Process().cpu_affinity([i])
    model = tf.keras.models.load_model(filename)
    mnist = tf.keras.datasets.mnist.load_data()
    x_test = mnist[1][0] / 255.0
    return model.predict(x_test)

logger.info('Started')
begin = time.time()
for _ in range(50):
    with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context('fork'), max_workers=num_cpus) as pool:
        pool.map(evaluate_next_batch, range(num_cpus))
end = time.time()
logger.info('Finished')
logger.info(f'Time Elapsed : {end-begin}')

2021-06-20 12:18:41 | MainThread |[36m DEBUG    [0m| tensorflow | Falling back to TensorFlow client; we recommended you install the Cloud TPU client directly with pip install cloud-tpu-client.
2021-06-20 12:18:42 | MainThread |[36m DEBUG    [0m| h5py._conv | Creating converter from 7 to 5
2021-06-20 12:18:42 | MainThread |[36m DEBUG    [0m| h5py._conv | Creating converter from 5 to 7
2021-06-20 12:18:42 | MainThread |[36m DEBUG    [0m| h5py._conv | Creating converter from 7 to 5
2021-06-20 12:18:42 | MainThread |[36m DEBUG    [0m| h5py._conv | Creating converter from 5 to 7
2021-06-20 12:18:42 | MainThread |[32m INFO     [0m| root | Started
2021-06-20 12:19:52 | MainThread |[32m INFO     [0m| root | Finished
2021-06-20 12:19:52 | MainThread |[32m INFO     [0m| root | Time Elapsed : 69.5207359790802


In [5]:
import sys
sys.path.append('/Users/jack/Documents/Concurrency')
from asyncio_practice.setup_logger import logger
import asyncio
import time
import numpy
import torch
import onnxruntime as rt

max_seq_length = 128
total_samples = 50

dataset = torch.load('onnx_models/tensor_dataset.pt')

filename = 'onnx_models/optimized_model_cpu.onnx'
i = 0
async def recv_data():
    global i
    while i < total_samples:
        data = dataset[i]
        rt_inputs = {
            'input_ids':  data[0].cpu().reshape(1, max_seq_length).numpy(),
            'input_mask': data[1].cpu().reshape(1, max_seq_length).numpy(),
            'segment_ids': data[2].cpu().reshape(1, max_seq_length).numpy()
        }
        i += 1
        await asyncio.sleep(0.5)
        # logger.debug('send data')
        return rt_inputs
    return None

async def run(id):
    while True:
        logger.debug(f'id: {id} waiting data')
        rt_inputs = await recv_data()
        if rt_inputs is not None:
            logger.debug(f'id: {id} received data')
            logger.debug(f'id: {id} waiting prediction')
            sess_options = rt.SessionOptions()
            sess_options.intra_op_num_threads = 1
            sess = rt.InferenceSession(filename, sess_options=sess_options)
            pred = sess.run(None, rt_inputs)
            logger.debug(f'id: {id} finished prediction')
        else: break

async def main():
    loop = asyncio.get_running_loop()
    task1 = loop.create_task(run(id=1))
    task2 = loop.create_task(run(id=2))
    L = asyncio.gather(task1, task2)
    await L

# inference
logger.info('Started')
begin = time.time()
await main()
end = time.time()
logger.info('Finished')
logger.info(f'Time Elapsed : {end-begin}')

2021-06-20 16:56:44 | MainThread |[32m INFO     [0m| root | Started
2021-06-20 16:56:44 | MainThread |[36m DEBUG    [0m| root | id: 1 waiting data
2021-06-20 16:56:44 | MainThread |[36m DEBUG    [0m| root | id: 2 waiting data
2021-06-20 16:56:44 | MainThread |[36m DEBUG    [0m| root | id: 1 received data
2021-06-20 16:56:44 | MainThread |[36m DEBUG    [0m| root | id: 1 waiting prediction
2021-06-20 16:56:45 | MainThread |[36m DEBUG    [0m| root | id: 1 finished prediction
2021-06-20 16:56:45 | MainThread |[36m DEBUG    [0m| root | id: 1 waiting data
2021-06-20 16:56:45 | MainThread |[36m DEBUG    [0m| root | id: 2 received data
2021-06-20 16:56:45 | MainThread |[36m DEBUG    [0m| root | id: 2 waiting prediction
2021-06-20 16:56:46 | MainThread |[36m DEBUG    [0m| root | id: 2 finished prediction
2021-06-20 16:56:46 | MainThread |[36m DEBUG    [0m| root | id: 2 waiting data
2021-06-20 16:56:46 | MainThread |[36m DEBUG    [0m| root | id: 1 received data
2021-06-20

In [4]:
import sys
sys.path.append('/Users/jack/Documents/Concurrency')
from asyncio_practice.setup_logger import logger
import asyncio
import time
import numpy
import torch
import onnxruntime as rt

import psutil
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

max_seq_length = 128
total_samples = 50

dataset = torch.load('onnx_models/tensor_dataset.pt')

filename = 'onnx_models/optimized_model_cpu.onnx'
i = 0
async def recv_data():
    global i
    while i < total_samples:
        data = dataset[i]
        rt_inputs = {
            'input_ids':  data[0].cpu().reshape(1, max_seq_length).numpy(),
            'input_mask': data[1].cpu().reshape(1, max_seq_length).numpy(),
            'segment_ids': data[2].cpu().reshape(1, max_seq_length).numpy()
        }
        i += 1
        await asyncio.sleep(0.5)
        # logger.debug('send data')
        return rt_inputs
    return None

def exec_model(rt_inputs):
    sess_options = rt.SessionOptions()
    sess_options.intra_op_num_threads = 1
    sess = rt.InferenceSession(filename, sess_options=sess_options)
    pred = sess.run(None, rt_inputs)
    return pred

async def run(loop, pool, id):
    while True:
        logger.debug(f'id: {id} waiting data')
        rt_inputs = await recv_data()
        if rt_inputs is not None:
            logger.debug(f'id: {id} received data')
            logger.debug(f'id: {id} waiting prediction')
            pred = await loop.run_in_executor(pool,exec_model,rt_inputs)
            logger.debug(f'id: {id} finished prediction')
        else: break

async def main():
    num_cpus = psutil.cpu_count(logical=False)
    loop = asyncio.get_running_loop()
    pool = ProcessPoolExecutor(max_workers=num_cpus, mp_context=mp.get_context('fork'))
    task1 = loop.create_task(run(loop, pool, id=1))
    task2 = loop.create_task(run(loop, pool, id=2))
    L = asyncio.gather(task1, task2)
    await L

# inference
logger.info('Started')
begin = time.time()
await main()
end = time.time()
logger.info('Finished')
logger.info(f'Time Elapsed : {end-begin}')

2021-06-20 16:55:54 | MainThread |[32m INFO     [0m| root | Started
2021-06-20 16:55:54 | MainThread |[36m DEBUG    [0m| root | id: 1 waiting data
2021-06-20 16:55:54 | MainThread |[36m DEBUG    [0m| root | id: 2 waiting data
2021-06-20 16:55:55 | MainThread |[36m DEBUG    [0m| root | id: 1 received data
2021-06-20 16:55:55 | MainThread |[36m DEBUG    [0m| root | id: 1 waiting prediction
2021-06-20 16:55:55 | MainThread |[36m DEBUG    [0m| root | id: 2 received data
2021-06-20 16:55:55 | MainThread |[36m DEBUG    [0m| root | id: 2 waiting prediction
2021-06-20 16:55:56 | MainThread |[36m DEBUG    [0m| root | id: 2 finished prediction
2021-06-20 16:55:56 | MainThread |[36m DEBUG    [0m| root | id: 2 waiting data
2021-06-20 16:55:56 | MainThread |[36m DEBUG    [0m| root | id: 1 finished prediction
2021-06-20 16:55:56 | MainThread |[36m DEBUG    [0m| root | id: 1 waiting data
2021-06-20 16:55:56 | MainThread |[36m DEBUG    [0m| root | id: 2 received data
2021-06-20

In [7]:
import sys
sys.path.append('/Users/jack/Documents/Concurrency')
from asyncio_practice.setup_logger import logger
import asyncio
import time
import numpy
import torch
import onnxruntime as rt

import psutil
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

max_seq_length = 128
total_samples = 50

dataset = torch.load('onnx_models/tensor_dataset.pt')

filename = 'onnx_models/optimized_model_cpu.onnx'
i = 0
async def recv_data():
    global i
    while i < total_samples:
        data = dataset[i]
        rt_inputs = {
            'input_ids':  data[0].cpu().reshape(1, max_seq_length).numpy(),
            'input_mask': data[1].cpu().reshape(1, max_seq_length).numpy(),
            'segment_ids': data[2].cpu().reshape(1, max_seq_length).numpy()
        }
        i += 1
        await asyncio.sleep(0.5)
        # logger.debug('send data')
        return rt_inputs
    return None

sess = None
def init():
    global sess
    sess_options = rt.SessionOptions()
    sess_options.intra_op_num_threads = 1
    sess = rt.InferenceSession(filename, sess_options=sess_options)
    logger.debug('Initialized model')

def exec_model(rt_inputs):
    global sess
    pred = sess.run(None, rt_inputs)
    return pred

async def run(loop, pool, id):
    while True:
        logger.debug(f'id: {id} waiting data')
        rt_inputs = await recv_data()
        if rt_inputs is not None:
            logger.debug(f'id: {id} received data')
            logger.debug(f'id: {id} waiting prediction')
            pred = await loop.run_in_executor(pool,exec_model,rt_inputs)
            logger.debug(f'id: {id} finished prediction')
        else: break

async def main():
    num_cpus = psutil.cpu_count(logical=False)
    loop = asyncio.get_running_loop()
    pool = ProcessPoolExecutor(max_workers=num_cpus, mp_context=mp.get_context('fork'), initializer=init)
    task1 = loop.create_task(run(loop, pool, id=1))
    task2 = loop.create_task(run(loop, pool, id=2))
    L = asyncio.gather(task1, task2)
    await L

# inference
logger.info('Started')
begin = time.time()
await main()
end = time.time()
logger.info('Finished')
logger.info(f'Time Elapsed : {end-begin}')

2021-06-20 16:59:49 | MainThread |[32m INFO     [0m| root | Started
2021-06-20 16:59:49 | MainThread |[36m DEBUG    [0m| root | id: 1 waiting data
2021-06-20 16:59:49 | MainThread |[36m DEBUG    [0m| root | id: 2 waiting data
2021-06-20 16:59:49 | MainThread |[36m DEBUG    [0m| root | id: 1 received data
2021-06-20 16:59:49 | MainThread |[36m DEBUG    [0m| root | id: 1 waiting prediction
2021-06-20 16:59:49 | MainThread |[36m DEBUG    [0m| root | id: 2 received data
2021-06-20 16:59:49 | MainThread |[36m DEBUG    [0m| root | id: 2 waiting prediction
2021-06-20 16:59:55 | MainThread |[36m DEBUG    [0m| root | Initialized model
2021-06-20 16:59:55 | MainThread |[36m DEBUG    [0m| root | Initialized model
2021-06-20 16:59:55 | MainThread |[36m DEBUG    [0m| root | Initialized model
2021-06-20 16:59:55 | MainThread |[36m DEBUG    [0m| root | Initialized model
2021-06-20 16:59:55 | MainThread |[36m DEBUG    [0m| root | Initialized model
2021-06-20 16:59:55 | MainThre