## Parallelization

By default whole population is evaluation in numpy array.

However, elementwise_evaluation might be necessary.

then paralleliation makes sense.

i) vectorized through numpy

ii) default

iii) threads

iv) dask


### Vectorized through NumPy

### Elementwise and Serialized

In [1]:
import numpy as np
import time
from pymoo.model.problem import Problem


class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=2, n_obj=1, elementwise_evaluation=True, **kwargs)

    def _evaluate(self, x, out, *args, **kwargs):
        time.sleep(1)
        out["F"] = x.sum()

problem = MyProblem()
X = np.random.random((10,2))

In [2]:
problem.parallelization = None

In [3]:
%time F = problem.evaluate(X)

CPU times: user 2.57 ms, sys: 1.77 ms, total: 4.34 ms
Wall time: 10 s


### Multiple Threads 

Uses multiprocess default in python

default n-1 threads, but can be changed


In [4]:
problem.parallelization = "threads"

In [5]:
%time F = problem.evaluate(X)

CPU times: user 23 ms, sys: 31.6 ms, total: 54.6 ms
Wall time: 2.11 s


### Dask

In [17]:
from dask.distributed import Client, client, LocalCluster
import numpy as np

from pymoo.model.problem import Problem

local = False

if local:
    cluster = LocalCluster(dashboard_address=":9300")
    client = Client(cluster)
else:
    client = Client(address="localhost:9000")
    #client = Client(address="host-94108.dhcp.egr.msu.edu:8786")

    
def task(x):
        return np.linalg.inv(x.reshape((1000,1000))).sum()

class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=1000000, n_obj=1, elementwise_evaluation=False, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):
        jobs = client.map(task, X)
        out["F"] = np.row_stack([job.result() for job in jobs])


X = np.random.random((100, 1000000))
%time F = MyProblem().evaluate(X)



class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=1000000, n_obj=1, elementwise_evaluation=False, **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):
        out["F"] = np.row_stack([task(x) for x in X])
        
%time F = MyProblem().evaluate(X)



  (array([0.24855179, 0.44244268, 0.52615791, ..., 0 ...  0.24339124]),)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s))


KeyboardInterrupt: 

CPU times: user 30.4 s, sys: 3.2 s, total: 33.6 s
Wall time: 8.81 s


tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: scheduler='tcp://35.9.132.172:8786' processes=40 cores=40>>
Traceback (most recent call last):
  File "/Users/blankjul/anaconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/Users/blankjul/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 931, in _heartbeat
    self.scheduler_comm.send({'op': 'heartbeat-client'})
  File "/Users/blankjul/anaconda3/lib/python3.7/site-packages/distributed/batched.py", line 117, in send
    raise CommClosedError
distributed.comm.core.CommClosedError
distributed.batched - INFO - Batched Comm Closed: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe
