In [16]:
#  this notebook showcases multi-threading to query the objective functions!
import torch as tr
# import scripts
from src.api_helper import env
import concurrent.futures
import multiprocessing
dtype=tr.float32

##### data

In [17]:
env.rosenbrock(tr.tensor([1,2],dtype=dtype))  # query the objective functions

tensor([100.])

In [35]:
n = 10000

In [36]:
def thread_function_io(name, query):
    print("Thread %s: starting", name)
    env.rosenbrock(query)
    print("Thread %s: finishing", name)

def thread_function_cpu(query):
    # print("Thread %s: starting", name)
    env.rosenbrock(query)
    # print("Thread %s: finishing", name)

names = [f"thread_{i}" for i in range(n)]
q = [tr.tensor([1,2],dtype=dtype)] * n

In [37]:
%%time
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:  # context 
    executor.map(thread_function_io, zip(names, q))

CPU times: user 397 ms, sys: 106 ms, total: 504 ms
Wall time: 485 ms


In [41]:
%%time
with concurrent.futures.ProcessPoolExecutor(3) as executor:  # context 
    executor.map(thread_function_cpu, q)

CPU times: user 5 s, sys: 4.26 s, total: 9.26 s
Wall time: 11 s


In [50]:
%%time
with multiprocessing.pool.ThreadPool(3) as pool:  #
    pool.map(thread_function_cpu,  q)

CPU times: user 2.68 s, sys: 1.65 s, total: 4.33 s
Wall time: 3.03 s


In [42]:
%%time
with multiprocessing.Pool(processes=3) as pool:  # because this is cpu bound 
    pool.map(thread_function_cpu,  q)

CPU times: user 14.4 ms, sys: 36.2 ms, total: 50.6 ms
Wall time: 357 ms


In [40]:
%%time 
for i in range(n):
    env.rosenbrock(q[i])

CPU times: user 862 ms, sys: 3.41 ms, total: 865 ms
Wall time: 868 ms


In [58]:
q = [tr.rand(2,) for _ in range(1)]
q

[tensor([0.3694, 0.0769])]

In [59]:
a = tr.zeros(2,)
for i in q:
    a = tr.cat([a,i])
a = a[2:].view(-1,2)
a


tensor([[0.3694, 0.0769]])

In [60]:
with multiprocessing.Pool(processes=5) as pool:  # because this is cpu bound 
    for i, r in enumerate(pool.map(env.rosenbrock,  a)):
        print(r)

tensor([0.7515])


In [56]:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:  # context 
    for i,r in enumerate(executor.map(env.rosenbrock,  q)):
        print(r)

tensor([46.2211])


In [57]:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:  # context 
    for i,r in enumerate(executor.map(env.rosenbrock,  a)):
        print(r)

tensor([46.2211])


In [66]:
from src.api_helper import api_utils

In [67]:
@api_utils.multi_process_transform
def transform_f(x):
    return env.rosenbrock(x)

In [None]:
def f(x):
    return env.rosenbrock(x)

In [68]:
with multiprocessing.Pool(processes=5) as pool:  # because this is cpu bound 
    for i, r in enumerate(pool.map(f,  a)):
        print(r)

AttributeError: Can't pickle local object 'api_utils.multi_process_transform.<locals>.wrapper'