# Parallel job execution with dask - Using custom-defined modules

This notebook contains an example showing how to execute a function that uses a subroutine saved in `../example_code/very_hard_problems` on multiple remote workers in parallel.

In [1]:
import sys,os,time,dask.bag
import numpy as np

sys.path.append('/'.join(os.getcwd().split('/')[:-1] + ['example_code']))
import very_hard_problems as vhp

In [2]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('../worker.yaml')
cluster

VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [3]:
from dask.distributed import Client, progress
c = Client(cluster)
c

0,1
Client  Scheduler: tcp://10.16.113.16:44901  Dashboard: /user/ingmar/proxy/8787/status,Cluster  Workers: 20  Cores: 40  Memory: 120.00 GB


In [4]:
# upload custom-defined modules
files_to_upload = [
    'very_hard_problems.py'
]

for file in files_to_upload:
    c.upload_file('../example_code/' + file)

In [5]:
c.get_versions(check=True)

{'scheduler': {'host': (('python', '3.6.3.final.0'),
   ('python-bits', 64),
   ('OS', 'Linux'),
   ('OS-release', '4.4.111+'),
   ('machine', 'x86_64'),
   ('processor', 'x86_64'),
   ('byteorder', 'little'),
   ('LC_ALL', 'en_US.UTF-8'),
   ('LANG', 'en_US.UTF-8'),
   ('LOCALE', 'en_US.UTF-8')),
  'packages': {'required': (('dask', '0.20.2'),
    ('distributed', '1.24.2'),
    ('msgpack', '0.5.6'),
    ('cloudpickle', '0.6.1'),
    ('tornado', '5.0.2'),
    ('toolz', '0.9.0')),
   'optional': (('numpy', '1.15.1'),
    ('pandas', '0.23.2'),
    ('bokeh', '1.0.1'),
    ('lz4', '2.1.2'),
    ('dask_ml', '0.11.0'),
    ('blosc', '1.5.1'))}},
 'workers': {'tcp://10.16.113.29:44822': {'host': (('python', '3.6.3.final.0'),
    ('python-bits', 64),
    ('OS', 'Linux'),
    ('OS-release', '4.4.111+'),
    ('machine', 'x86_64'),
    ('processor', 'x86_64'),
    ('byteorder', 'little'),
    ('LC_ALL', 'en_US.UTF-8'),
    ('LANG', 'en_US.UTF-8'),
    ('LOCALE', 'en_US.UTF-8')),
   'packages': {'

In [6]:
tic = time.time()
vhp.very_hard_problem(10000000)
print('Serial execution of very_hard_problem takes', time.time() - tic, 'seconds per process')

Serial execution of very_hard_problem takes 1.1686387062072754 seconds per process


In [7]:
num_procs = 20
parameters = np.arange(10000000,10000000 + num_procs)

parameters_bag = dask.bag.from_sequence(parameters)
mapping = parameters_bag.map(vhp.very_hard_problem)

tic = time.time()
mapping.compute()
time_elapsed = time.time() - tic
print('Parallel execution takes ' + str(time_elapsed) + ' seconds in total, ' + str(time_elapsed/num_procs) + ' per process')

Parallel execution takes 1.3404245376586914 seconds in total, 0.06702122688293458 per process
