# Learn how to do parallel processing

This is useful when searching for optimal hyperparameters in order to do minimizations at given hyperparameters in parallel

https://www.geeksforgeeks.org/parallel-processing-in-python/

In [1]:
import multiprocessing
import time

### useless

In [5]:
class Process(multiprocessing.Process): 
    def __init__(self, id): 
        super(Process, self).__init__() 
        self.id = id
                 
    def run(self):  
        print("I'm the process with id: {}".format(self.id)) 
        time.sleep(10)

In [8]:
# if __name__ == '__main__': 
p = Process(0) 
p.start() 
p.join() 
p = Process(1) 
p.start() 
p.join() 

I'm the process with id: 0
I'm the process with id: 1


### example 1

In [17]:
def run(i):  
    print("I'm the process with id: {}".format(i)+'\n') 
    time.sleep(10)

In [18]:
# if __name__ == '__main__': 
pool = multiprocessing.Pool() 
result_async = [pool.apply_async(run, args = (i, )) for i in
                range(10)] 
results = [r.get() for r in result_async] 
print("Output: {}".format(results)) 

I'm the process with id: 0
I'm the process with id: 1
I'm the process with id: 3
I'm the process with id: 2



I'm the process with id: 4
I'm the process with id: 6
I'm the process with id: 5

I'm the process with id: 7


I'm the process with id: 8

I'm the process with id: 9



Output: [None, None, None, None, None, None, None, None, None, None]


### example 2

In [11]:
def square(x): 
    time.sleep(10)
    return x * x 

In [21]:
# if __name__ == '__main__': 
pool = multiprocessing.Pool() 
result_async = [pool.apply_async(square, args = (i, )) for i in
                range(10)] 
results = [r.get() for r in result_async] 
print("Output: {}".format(results)) 

Output: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


In [26]:
result_async[0]

<multiprocessing.pool.ApplyResult at 0x7f31996d0e80>

In [25]:
result_async[0].get()

0

### with minimizer

In [2]:
import numpy as np
from scipy.optimize import minimize

In [6]:
def f(alpha, beta, gamma):
    # gamma = 1
    fun = (alpha-beta-gamma)**2
    grad = 2*(alpha-beta-gamma)
    # print('beta, fun: ', beta, fun, '\n')
    time.sleep(2)
    return fun, grad

In [13]:
def minimizer(beta=1, gamma=1):
    mini = minimize(f, 0, args = (beta, gamma), method = 'BFGS', jac = True)
    print(beta, 'finished')
    return mini, beta

In [10]:
betas = np.arange(10)
gammas = np.arange(10)

In [14]:
output = Parallel(n_jobs = len(betas))(delayed(minimizer)(beta=betas[i]) for i in range(len(betas)))

In [17]:
output[1][0]

      fun: 0.0
 hess_inv: array([[0.5]])
      jac: array([0.])
  message: 'Optimization terminated successfully.'
     nfev: 3
      nit: 2
     njev: 3
   status: 0
  success: True
        x: array([2.])

In [5]:
betas = np.arange(0, 10, 0.5)

betas

array([0. , 0.5, 1. , 1.5, 2. , 2.5, 3. , 3.5, 4. , 4.5, 5. , 5.5, 6. ,
       6.5, 7. , 7.5, 8. , 8.5, 9. , 9.5])

In [6]:
pool = multiprocessing.Pool()

# fixed_args = ((1.5))  # (gamma)
result_async = [pool.apply_async(minimizer, args = (betas[i], 1.5)) for i in range(len(betas))]
results = [r.get() for r in result_async]



0.51.50.01.0    finishedfinishedfinishedfinished



2.54.55.03.05.52.06.56.0 4.09.5 9.03.5     finished  finished   finishedfinishedfinished
finishedfinishedfinishedfinished
finishedfinishedfinished









7.08.07.5 8.5  finished finished
finishedfinished




In [63]:
results[0]

      fun: 0.0
 hess_inv: array([[0.5]])
      jac: array([0.])
  message: 'Optimization terminated successfully.'
     nfev: 3
      nit: 2
     njev: 3
   status: 0
  success: True
        x: array([1.5])

but you want to do it by multiprocessing (on a SLURM cluster):

you can use multiprocessing package on SLURM

https://gist.github.com/kaspermunch/64e11cd21e3953295e149e75bee51733

use pool.starmap

https://stackoverflow.com/questions/5442910/how-to-use-multiprocessing-pool-map-with-multiple-arguments

https://stackoverflow.com/questions/39974874/using-pythons-multiprocessing-on-slurm

In [11]:
pool = multiprocessing.Pool()

# fixed_args = ((1.5))  # (gamma)
results = pool.starmap(minimizer, [(betas[i], 1.5) for i in range(len(betas))])



0.50.0  finishedfinished

1.01.5  finishedfinished

9.05.54.02.05.09.54.53.06.53.5    finished    finished  6.02.5finishedfinished
finishedfinishedfinishedfinished
finishedfinished 
 






finishedfinished

8.07.58.57.0   finished finished
finishedfinished




In [13]:
results[1]

      fun: 0.0
 hess_inv: array([[0.5]])
      jac: array([0.])
  message: 'Optimization terminated successfully.'
     nfev: 3
      nit: 2
     njev: 3
   status: 0
  success: True
        x: array([2.])

### bussilab coretools

https://github.com/bussilab/py-bussilab/blob/master/bussilab/coretools.py

In [15]:
from bussilab import coretools

In [18]:
class MytoolResult(coretools.Result):
    """Result of a mytool calculation."""
    pass

In [19]:
def mytool():
    a = 3
    b = "ciao"
    return MytoolResult(a=a, b=b)

m=mytool()
print(m)

 a: 3
 b: 'ciao'


In [45]:
m

 a: 3
 b: 'ciao'

In [48]:
m.keys()

dict_keys(['a', 'b'])

In [50]:
m

 a: 3
 b: 'ciao'

In [22]:
m.b

'ciao'

In [51]:
class my_class:
    def __init__(self):
        self.a = 3
        self.b = 'ciao'

In [52]:
my_class

__main__.my_class

In [53]:
cl = my_class()

cl

<__main__.my_class at 0x7fb9726bb898>

In [28]:
my_dict = {'a': 3, 'b': 'ciao'}

my_dict

{'a': 3, 'b': 'ciao'}

In [26]:
print(cl)

<__main__.my_class object at 0x7fb9726bb6a0>


In [55]:
class MytoolResult(coretools.Result):
    """Result of a mytool calculation."""
    def __init__(self, a):
        super().__init__()
        self.a = a
        """Documentation for attribute a."""



In [56]:
def mytool():
    a = 3
    b = "ciao"
    result = MytoolResult(a)
    return result

m = mytool()
print(m)

 a: 3


In [33]:
class MytoolResult(coretools.Result):
    pass
    # """Result of a mytool calculation."""
    # def __init__(self, a, b):
    #     super().__init__()
    #     self.a = a
    #     """Documentation for attribute a."""
    #     self.b = b
    #     """Documentation for attribute b."""

In [54]:
def mytool():
    a = 3
    b = "ciao"
    result = coretools.Result()
    result.a = a
    result.b = b
    return result

m = mytool()
print(m)

 a: 3
 b: 'ciao'


In [31]:
m

 a: 3
 b: 'ciao'

### with minimizer (optimization of parameters at given hyperparameters)

NOTES:

multithread

mpi

In [None]:
import numpy as np

In [None]:
pool = multiprocessing.Pool()

fixed_args = (regularization=regularization, alpha=alpha, beta=beta, gamma=gamma, starting_pars=starting_pars)
result_async = [pool.apply_async(minimizer, args = (data_train[seed], *fixed_args, )) for seed in random_states]
results = [r.get() for r in result_async]



In [None]:
random_states = np.arange(5)

test_obs = {}
test_frames = {}

for seed in random_states:
    out = select_traintest(data, random_state=seed)
    test_obs[seed] = out[2]
    test_frames[seed] = out[3]

In [None]:
for seed in test_obs.keys():

    """ 2. minimize loss function on training set to get optimal parameters """

    out = select_traintest(data, test_frames=test_frames[seed], test_obs=test_obs[seed])
    data_train = out[0]
    data_test = out[1]

    mini = minimizer(
        data_train[seed], regularization=regularization, alpha=alpha, beta=beta, gamma=gamma, starting_pars=starting_pars)


### suggestion by Vitto

https://stackoverflow.com/questions/42220458/what-does-the-delayed-function-do-when-used-with-joblib-in-python

In [7]:
from joblib import Parallel, delayed

In [11]:
betas

array([0. , 0.5, 1. , 1.5, 2. , 2.5, 3. , 3.5, 4. , 4.5, 5. , 5.5, 6. ,
       6.5, 7. , 7.5, 8. , 8.5, 9. , 9.5])

In [12]:
delayed(minimizer(betas[i], 1.5) for i in range(len(betas)))

<function joblib.parallel.<genexpr>>

In [4]:
import numpy as np
from scipy.optimize import minimize
from joblib import Parallel, delayed

def f(alpha, beta, gamma):
    # gamma = 1
    fun = (alpha-beta-gamma)**2
    grad = 2*(alpha-beta-gamma)
    # print('beta, fun: ', beta, fun, '\n')
    time.sleep(2)
    return fun, grad

def minimizer(beta, gamma):
    mini = minimize(f, 0, args = (beta, gamma), method = 'BFGS', jac = True)
    print(beta, 'finished')
    return mini

betas = np.arange(0,100,10)

output = Parallel(n_jobs = len(betas))(delayed(minimizer)(beta, 1.5) for beta in betas)

print('---- finished ----')
print('output: ', output)


---- finished ----
output:  [      fun: 0.0
 hess_inv: array([[0.5]])
      jac: array([0.])
  message: 'Optimization terminated successfully.'
     nfev: 3
      nit: 2
     njev: 3
   status: 0
  success: True
        x: array([1.5]),       fun: 0.0
 hess_inv: array([[0.5]])
      jac: array([0.])
  message: 'Optimization terminated successfully.'
     nfev: 4
      nit: 2
     njev: 4
   status: 0
  success: True
        x: array([11.5]),       fun: 0.0
 hess_inv: array([[0.5]])
      jac: array([0.])
  message: 'Optimization terminated successfully.'
     nfev: 5
      nit: 3
     njev: 5
   status: 0
  success: True
        x: array([21.5]),       fun: 1.262177448353619e-29
 hess_inv: array([[0.5]])
      jac: array([7.10542736e-15])
  message: 'Optimization terminated successfully.'
     nfev: 5
      nit: 3
     njev: 5
   status: 0
  success: True
        x: array([31.5]),       fun: 2.0194839173657902e-28
 hess_inv: array([[0.5]])
      jac: array([-2.84217094e-14])
  message:

### suggestion by Ric

mpi4py