# joblib trials

In [1]:
import joblib

In [2]:
joblib.__version__

'0.16.0'

In [3]:
from math import sqrt

In [4]:
%%time
temp = [sqrt(i ** 2) for i in range(100)]

CPU times: user 76 µs, sys: 50 µs, total: 126 µs
Wall time: 137 µs


In [5]:
from joblib import Parallel, delayed

In [6]:
%%time
temp_p = Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))

CPU times: user 40.6 ms, sys: 16.3 ms, total: 56.9 ms
Wall time: 378 ms


In [7]:
%%time
temp_p = Parallel(n_jobs=2, prefer="threads")(delayed(sqrt)(i ** 2) for i in range(10))

CPU times: user 4.32 ms, sys: 1.66 ms, total: 5.99 ms
Wall time: 104 ms


In [8]:
from joblib import parallel_backend

In [9]:
%%time

# useful when calling a library that uses joblib.Parallel 
# internally without exposing backend selection as part of 
# its public API.

with parallel_backend('threading', n_jobs=2):
    Parallel()(delayed(sqrt)(i ** 2) for i in range(10))

CPU times: user 12.5 ms, sys: 0 ns, total: 12.5 ms
Wall time: 103 ms


In [10]:
%%time

# more efficient to use the context manager API of the 
# joblib.Parallel class to re-use the same pool of workers 
# for several calls to the joblib.Parallel object.

with Parallel(n_jobs=2) as parallel:
    accumulator = 0.
    n_iter = 0
    while accumulator < 1000:
        results = parallel(delayed(sqrt)(accumulator + i ** 2) 
                           for i in range(5))
        accumulator += sum(results)  # synchronization barrier
        n_iter += 1

CPU times: user 48.1 ms, sys: 1.39 ms, total: 49.5 ms
Wall time: 45.7 ms


In [11]:
(accumulator, n_iter)

(1136.5969161564717, 14)

When these libraries are used with joblib.Parallel, each worker will spawn its own thread-pools, resulting in a massive over-subscription of resources that can slow down the computation compared to a sequential one. To cope with this problem, joblib tells supported third-party libraries to use a limited number of threads in workers managed by the 'loky' backend: by default each worker process will have environment variables set to allow a maximum of cpu_count() // n_jobs so that the total number of threads used by all the workers does not exceed the number of CPUs of the host.

In [12]:
# with parallel_backend("loky", inner_max_num_threads=2):
#     results = Parallel(n_jobs=4)(delayed(func)(x, y) for x, y in data)

In [13]:
from math import modf
from joblib import Parallel, delayed

In [14]:
r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
res, i = zip(*r)

In [15]:
res, i

((0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5),
 (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0))

In [16]:
from time import sleep

In [17]:
r = Parallel(n_jobs=2, verbose=10)(delayed(sleep)(.2) for _ in range(10))

[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    0.2s
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:    1.0s finished


In [18]:
from heapq import nlargest

In [19]:
%%time

from math import sqrt
from joblib import Parallel, delayed
def producer():
    for i in range(6):
        print('Produced %s' % i)
        yield i
out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
    delayed(sqrt)(i) for i in producer()
) #doctest: +SKIP

[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
Produced 0
Produced 1
Produced 2
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    0.3s
Produced 3
Produced 4
[Parallel(n_jobs=2)]: Done   2 tasks      | elapsed:    0.3s
[Parallel(n_jobs=2)]: Done   3 tasks      | elapsed:    0.3s
Produced 5
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    0.3s
[Parallel(n_jobs=2)]: Done   6 out of   6 | elapsed:    0.3s remaining:    0.0s
[Parallel(n_jobs=2)]: Done   6 out of   6 | elapsed:    0.3s finished
CPU times: user 16.7 ms, sys: 16.4 ms, total: 33.1 ms
Wall time: 408 ms


In [20]:
out

[0.0, 1.0, 1.4142135623730951, 1.7320508075688772, 2.0, 2.23606797749979]

# DE with joblib

In [21]:
import numpy as np
seed = 0
np.random.seed(0)


def eggholder(x):
    return (-(x[1] + 47) * np.sin(np.sqrt(abs(x[0]/2 + (x[1]  + 47)))) - \
            x[0] * np.sin(np.sqrt(abs(x[0] - (x[1]  + 47)))))

bounds = [(-512, 512), (-512, 512)]

In [22]:
import time

def eggholder_objective(x):
    '''Evaluates x on the eggholder function (minimizing)
    
    x: a list/array of length 2 with each value in [0, 1]
    '''
    start = time.time()
    bound_range = [bounds[0][1] - bounds[0][0]]
    bound_range.append(bounds[1][1] - bounds[1][0])
    bound_min = [bounds[0][0]]
    bound_min.append(bounds[1][0])
    x_new = x.copy()
    for i in range(len(x)):
        x_new[i] = bound_min[i] + x[i] * bound_range[i]
    
    # the fitness which determines the selection process in DE
    ## the lower the fitness value, the better is x adjudged to be
    fitness = eggholder(x_new)
    # runtime of objective, or None, or equivalent
    cost = time.time() - start 
    
    return fitness, cost

In [23]:
%%time

### introducing sleep

sleep_low = 0.2
sleep_high = 0.5

def eggholder_objective_sleep(x):
    '''Evaluates x on the eggholder function (minimizing)
    
    x: a list/array of length 2 with each value in [0, 1]
    '''
    start = time.time()
    bound_range = [bounds[0][1] - bounds[0][0]]
    bound_range.append(bounds[1][1] - bounds[1][0])
    bound_min = [bounds[0][0]]
    bound_min.append(bounds[1][0])
    x_new = x.copy()
    for i in range(len(x)):
        x_new[i] = bound_min[i] + x[i] * bound_range[i]
    
    # the fitness which determines the selection process in DE
    ## the lower the fitness value, the better is x adjudged to be
    fitness = eggholder(x_new)
    # 25% chance of a long sleep of 2 seconds to simulate bottlenecks
    if np.random.uniform() > 0.75:
        time.sleep(2)
    else:
        time.sleep(np.random.uniform(low=sleep_low, high=sleep_high))
    # runtime of objective, or None, or equivalent
    cost = time.time() - start 
    
    return fitness, cost

CPU times: user 0 ns, sys: 12 µs, total: 12 µs
Wall time: 21.5 µs


In [24]:
from denas import DE, PDE, AsyncPDE

In [25]:
# Sequential

de = DE(
    dimensions=len(bounds),
    configspace=False,            # if passing a custom search space and not ConfigSpace
    f=eggholder_objective_sleep,  # passing the objective function
    pop_size=20,                  # tunable hyperparameter (minimum limit determined by mutation strategy)
    mutation_factor=0.5,          # tunable hyperparameter (determines exploration)
    crossover_prob=0.5,           # tunable hyperparameter (determines exploitation)
    strategy='rand1_bin',         # tunable hyperparameter (trades-off exploration-exploitation)
)
start = time.time()
traj, runtime, hist = de.run(generations=10, verbose=True)
end = time.time() - start
    
print("Time taken for sequential processing: {} seconds".format(end))
print(de.inc_config, de.inc_score)

Initializing and evaluating new population...
Running evolutionary search...
Generation 1 /10 -- -547.8635
Generation 2 /10 -- -547.8635
Generation 3 /10 -- -794.7629
Generation 4 /10 -- -794.7629
Generation 5 /10 -- -794.7629
Generation 6 /10 -- -794.7629
Generation 7 /10 -- -794.7629
Generation 8 /10 -- -794.7629
Generation 9 /10 -- -794.7629
Generation 10/10 -- -794.7629

Run complete!
Time taken for sequential processing: 164.77414059638977 seconds
[0.06276866 0.88686323] -794.7628545499281


In [26]:
# Parallel (1 worker)

num_workers = 1

start = time.time()
de = PDE(
    dimensions=len(bounds),
    configspace=False,            # if passing a custom search space and not ConfigSpace
    f=eggholder_objective_sleep,  # passing the objective function
    pop_size=20,                  # tunable hyperparameter (minimum limit determined by mutation strategy)
    mutation_factor=0.5,          # tunable hyperparameter (determines exploration)
    crossover_prob=0.5,           # tunable hyperparameter (determines exploitation)
    strategy='rand1_bin',         # tunable hyperparameter (trades-off exploration-exploitation)
    num_workers=num_workers
)
load_time = time.time() - start
start = time.time()
traj, runtime, hist = de.run(generations=10, verbose=True)
end = time.time() - start
    
print("Time to initialise client: {} seconds".format(load_time))
print("Time taken for parallel processing with {} workers: {} seconds".format(num_workers, end))
print(de.inc_config, de.inc_score)

Initializing and evaluating new population...
Running evolutionary search...
Generation 1 /10 -- -700.0901
Generation 2 /10 -- -700.0901
Generation 3 /10 -- -700.0901
Generation 4 /10 -- -700.0901
Generation 5 /10 -- -700.0901
Generation 6 /10 -- -700.0901
Generation 7 /10 -- -700.0901
Generation 8 /10 -- -700.0901
Generation 9 /10 -- -700.0901
Generation 10/10 -- -741.1396

Run complete!
Time to initialise client: 0.00013637542724609375 seconds
Time taken for parallel processing with 1 workers: 163.5778317451477 seconds
[0.99217751 0.87456858] -741.1396119056885


In [29]:
# Parallel (2 workers)

num_workers = 2

start = time.time()
de = PDE(
    dimensions=len(bounds),
    configspace=False,            # if passing a custom search space and not ConfigSpace
    f=eggholder_objective_sleep,  # passing the objective function
    pop_size=20,                  # tunable hyperparameter (minimum limit determined by mutation strategy)
    mutation_factor=0.5,          # tunable hyperparameter (determines exploration)
    crossover_prob=0.5,           # tunable hyperparameter (determines exploitation)
    strategy='rand1_bin',         # tunable hyperparameter (trades-off exploration-exploitation)
    num_workers=num_workers
)
load_time = time.time() - start
start = time.time()
traj, runtime, hist = de.run(generations=10, verbose=True)
end = time.time() - start
    
print("Time to initialise client: {} seconds".format(load_time))
print("Time taken for parallel processing with {} workers: {} seconds".format(num_workers, end))
print(de.inc_config, de.inc_score)

Initializing and evaluating new population...
Running evolutionary search...
Generation 1 /10 -- -425.5248
Generation 2 /10 -- -538.0669
Generation 3 /10 -- -538.0669
Generation 4 /10 -- -630.6388
Generation 5 /10 -- -630.6388
Generation 6 /10 -- -630.6388
Generation 7 /10 -- -638.312
Generation 8 /10 -- -638.312
Generation 9 /10 -- -638.312
Generation 10/10 -- -750.5804

Run complete!
Time to initialise client: 0.00013947486877441406 seconds
Time taken for parallel processing with 2 workers: 84.96044039726257 seconds
[0.11792983 0.09285839] -750.5804122818176


In [27]:
# Parallel (4 workers)

num_workers = 4

start = time.time()
de = PDE(
    dimensions=len(bounds),
    configspace=False,            # if passing a custom search space and not ConfigSpace
    f=eggholder_objective_sleep,  # passing the objective function
    pop_size=20,                  # tunable hyperparameter (minimum limit determined by mutation strategy)
    mutation_factor=0.5,          # tunable hyperparameter (determines exploration)
    crossover_prob=0.5,           # tunable hyperparameter (determines exploitation)
    strategy='rand1_bin',         # tunable hyperparameter (trades-off exploration-exploitation)
    num_workers=num_workers
)
load_time = time.time() - start
start = time.time()
traj, runtime, hist = de.run(generations=10, verbose=True)
end = time.time() - start
    
print("Time to initialise client: {} seconds".format(load_time))
print("Time taken for parallel processing with {} workers: {} seconds".format(num_workers, end))
print(de.inc_config, de.inc_score)

Initializing and evaluating new population...
Running evolutionary search...
Generation 1 /10 -- -416.1928
Generation 2 /10 -- -436.4751
Generation 3 /10 -- -436.4751
Generation 4 /10 -- -541.1988
Generation 5 /10 -- -853.7498
Generation 6 /10 -- -853.7498
Generation 7 /10 -- -853.7498
Generation 8 /10 -- -853.7498
Generation 9 /10 -- -853.7498
Generation 10/10 -- -853.7498

Run complete!
Time to initialise client: 0.0003871917724609375 seconds
Time taken for parallel processing with 4 workers: 47.543803215026855 seconds
[0.82788893 0.98450151] -866.91432169631


In [28]:
# Parallel (20 workers)

num_workers = 20

start = time.time()
de = PDE(
    dimensions=len(bounds),
    configspace=False,            # if passing a custom search space and not ConfigSpace
    f=eggholder_objective_sleep,  # passing the objective function
    pop_size=20,                  # tunable hyperparameter (minimum limit determined by mutation strategy)
    mutation_factor=0.5,          # tunable hyperparameter (determines exploration)
    crossover_prob=0.5,           # tunable hyperparameter (determines exploitation)
    strategy='rand1_bin',         # tunable hyperparameter (trades-off exploration-exploitation)
    num_workers=num_workers
)
load_time = time.time() - start
start = time.time()
traj, runtime, hist = de.run(generations=10, verbose=True)
end = time.time() - start
    
print("Time to initialise client: {} seconds".format(load_time))
print("Time taken for parallel processing with {} workers: {} seconds".format(num_workers, end))
print(de.inc_config, de.inc_score)

Initializing and evaluating new population...
Running evolutionary search...
Generation 1 /10 -- -882.5454
Generation 2 /10 -- -882.5454
Generation 3 /10 -- -882.5454
Generation 4 /10 -- -882.5454
Generation 5 /10 -- -882.5454
Generation 6 /10 -- -882.5454
Generation 7 /10 -- -882.5454
Generation 8 /10 -- -882.5454
Generation 9 /10 -- -882.5454
Generation 10/10 -- -882.5454

Run complete!
Time to initialise client: 0.00017309188842773438 seconds
Time taken for parallel processing with 20 workers: 25.20801043510437 seconds
[0.97180013 0.93021658] -882.5453891573479


<br>

Time taken for 10 DE generations with pop-size 20. <br>


| Sequential | Parallel (n=1) | Parallel (n=2) | Parallel (n=4) | Parallel (n=20) |
| :-: | :-: | :-: | :-: | :-: |
| 164.78 | 163.58 | 84.96 | 47.54 | 25.21 | 