In [None]:
%%bash
rm -r Pubyc-Cluster-Overhead

In [None]:
import os
os.mkdir('./Pubyc-Cluster-Overhead')

In [7]:
%%file ./Pubyc-Cluster-Overhead/PubycTaskManager.py
from __future__ import print_function

from multiprocessing.managers import BaseManager
from multiprocessing import JoinableQueue, Queue

class TaskManager(BaseManager):
    pass

if __name__ == '__main__':
    from sys import argv, exit
    if len(argv) != 2:
        print('usage:', argv[0], b'socket_nr')
        exit(0)
    master_socket = int(argv[1])
    task_queue = JoinableQueue()
    result_queue = Queue()
    TaskManager.register('get_job_queue', 
                         callable = lambda:task_queue)
    TaskManager.register('get_result_queue', 
                         callable = lambda:result_queue)
    m = TaskManager(address = ('', master_socket), 
                    authkey = b'secret')
    print('starting queue server, socket', master_socket)
    m.get_server().serve_forever()

Overwriting ./Pubyc-Cluster-Overhead/PubycTaskManager.py


In [8]:
%%file ./Pubyc-Cluster-Overhead/PubycWorker.py
from PubycTaskManager import TaskManager
from multiprocessing import cpu_count, Process

def worker_single_planet(job_queue, result_queue):
    while True:
        args = job_queue.get()
        result_queue.put(((0, 0, 0, 0, 0, 0), 0))
        job_queue.task_done()
        
def start_workers(m):
    job_queue, result_queue = m.get_job_queue(), m.get_result_queue()
    nr_of_processes = cpu_count()
    processes = [
            Process(target = worker_single_planet, args = (job_queue, result_queue))
            for i in range(nr_of_processes)
    ]
    
    for p in processes:
        p.start()
    return nr_of_processes


if __name__ == '__main__':
    from sys import argv, exit
    if len(argv) < 3:
        print('usage:', argv[0], 'server_IP server_socket')
        exit(0)
    server_ip = argv[1]
    server_socket = int(argv[2])
    TaskManager.register('get_job_queue')
    TaskManager.register('get_result_queue')
    m = TaskManager(address=(server_ip, server_socket), authkey = b'secret')
    m.connect()
    nr_of_processes = start_workers(m)
    print(nr_of_processes, 'workers started')

Overwriting ./Pubyc-Cluster-Overhead/PubycWorker.py


In [9]:
%%file ./Pubyc-Cluster-Overhead/PubycMaster.py

from multiprocessing import cpu_count
from PubycTaskManager import TaskManager
import numpy as np
import time

def single_step_arguments(partsize, dt, position, speed, masse, l_in): 
    
    planet_number = len(position)-1
    parts = planet_number // partsize
    upper_planet_index = 0
    lower_planet_index = 0
    
    while (upper_planet_index - planet_number) < 0:
        
        if upper_planet_index + partsize < planet_number:
            upper_planet_index += partsize
        else:
            upper_planet_index = planet_number

        l_in.append((dt, position, speed, masse, lower_planet_index, upper_planet_index))  
        lower_planet_index = upper_planet_index + 1 
    
    return l_in

def update_list(result_tuple, position, speed):
        position[result_tuple[1]] = (result_tuple[0][0], result_tuple[0][1], result_tuple[0][2])
        speed[result_tuple[1]] = (result_tuple[0][3], result_tuple[0][4], result_tuple[0][5]) 

def single_step(job_queue, result_queue, partsize, dt, position, speed, masse):
    arguments_list = []
    result_list = []    
    
    t1 = time.time()
    arguments_list = single_step_arguments(partsize, dt, position, speed, masse, arguments_list)
    t2 = time.time()
    print('finished create argument_list with time: ', (t2-t1) * 1000, 'ms')

    
    t1 = time.time()
    for parameter_set in arguments_list:
        #print(parameter_set)
        job_queue.put(parameter_set)
    t2 = time.time()
    print('finished jop_queue with time:            ', (t2-t1) * 1000, 'ms')
    
    
    #while result_queue.empty():
    #    time.sleep(0.05)
    
    t1 = time.time()    
    while not result_queue.empty():
        update_list(result_queue.get(), position, speed)
    t2 = time.time()
    print('finished result_queue with time:         ', (t2-t1) * 1000, 'ms')
        
        
    t1 = time.time()
    job_queue.join()
    t2 = time.time()
    print('finished join with time:                 ', (t2-t1) * 1000, 'ms\n')

        
        
if __name__ == '__main__':
    from sys import argv, exit
    if len(argv) != 8:
        print('usage:', argv[0], 'server_IP server_socket number_of_trys number_of_parts worker_count')
        exit(0)
    server_ip = argv[1]
    server_socket = int(argv[2])
    args_dt = argv[3]
    partsize = argv[4]
    position = argv[5]
    speed = argv[6]
    masse = argv[7]
    
    TaskManager.register('get_job_queue')
    TaskManager.register('get_result_queue')
    taskmanager = TaskManager(address=(server_ip, server_socket), authkey = b'secret')
    taskmanager.connect()

    t1 = time.time()
    result = single_step(taskmanager, partsize, args_dt, position, speed, masse)
    t2 = time.time()
    print(' result: ', result)
    print(' time:   ', t2-t1, ' s\n')

Overwriting ./Pubyc-Cluster-Overhead/PubycMaster.py


## TimeIt

In [24]:
%load_ext autoreload

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [28]:
from random import random

number_of_bodys = 10000

python_position = []
python_speed = []
python_mass = []

for i in range(number_of_bodys):
    
    python_position.append((random() * 10 ** 12, random() * 10 ** 12, random()* 10 ** 12))
    python_speed.append((random()* 10 ** 5, random()* 10 ** 5, random()* 10 ** 5))
    python_mass.append(random()* 10 ** 30)

In [29]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append("./Pubyc-Cluster-Overhead")

from PubycMaster import single_step
from PubycTaskManager import TaskManager
import numpy as np

position = np.array(python_position, dtype=np.float64)
speed = np.array(python_speed,dtype=np.float64)
mass = np.array(python_mass,dtype=np.float64)
dt = 60 * 60 * 24 * 52

TaskManager.register('get_job_queue')
TaskManager.register('get_result_queue')
server_ip = "192.168.178.203"
server_socket = 12345
taskmanager = TaskManager(address=(server_ip, server_socket), authkey = b'secret')
taskmanager.connect()
job_queue, result_queue = taskmanager.get_job_queue(), taskmanager.get_result_queue()
partsize = (int) (number_of_bodys / 32)
#partsize = 1

In [30]:
%%timeit
%autoreload
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append("./Pubyc-Cluster-Overhead")

from PubycMaster import single_step
from PubycTaskManager import TaskManager
taskmanager
single_step(job_queue, result_queue, partsize, dt, position, speed, mass)
#10000 1PC 11.3 s ± 273 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#20000 1PC 48 s ± 538 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

#10000 2PCs 9.97 s ± 384 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
#20000 2PCs 37.6 s ± 999 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

finished create argument_list with time:  0.011444091796875 ms
finished jop_queue with time:             98.38390350341797 ms
finished result_queue with time:          700.5751132965088 ms
finished join with time:                  7733.75391960144 ms

finished create argument_list with time:  0.01049041748046875 ms
finished jop_queue with time:             39.022207260131836 ms
finished result_queue with time:          1826.5118598937988 ms
finished join with time:                  6263.530254364014 ms

finished create argument_list with time:  0.018358230590820312 ms
finished jop_queue with time:             45.90773582458496 ms
finished result_queue with time:          1919.2500114440918 ms
finished join with time:                  5596.287965774536 ms

finished create argument_list with time:  0.01049041748046875 ms
finished jop_queue with time:             43.82443428039551 ms
finished result_queue with time:          1864.8629188537598 ms
finished join with time:                  

In [22]:
%autoreload
import os
import sys
import pprofile
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append("./Pubyc-Cluster-Overhead")

from PubycMaster import single_step
from PubycTaskManager import TaskManager

profiler = pprofile.Profile()
with profiler:
    single_step(job_queue, result_queue, partsize, dt, position, speed, mass)
profiler.print_stats()

finished create argument_list with time:  0.8223056793212891 ms
finished jop_queue with time:             42.24801063537598 ms
finished result_queue with time:          53.04837226867676 ms
finished join with time:                  0.6468296051025391 ms

Total duration: 0.103864s
File: /opt/anaconda/lib/python3.6/multiprocessing/connection.py
File duration: 0.0630791s (60.73%)
Line #|      Hits|         Time| Time per hit|      %|Source code
------+----------+-------------+-------------+-------+-----------
     1|         0|            0|            0|  0.00%|#
     2|         0|            0|            0|  0.00%|# A higher level module for using sockets (or Windows named pipes)
     3|         0|            0|            0|  0.00%|#
     4|         0|            0|            0|  0.00%|# multiprocessing/connection.py
     5|         0|            0|            0|  0.00%|#
     6|         0|            0|            0|  0.00%|# Copyright (c) 2006-2008, R Oudkerk
     7|         0|    

  'files will cause errors when annotating.' % (stream, )



   770|         0|            0|            0|  0.00%|        import xmlrpc.client as xmlrpclib
   771|         0|            0|            0|  0.00%|        obj = Listener.accept(self)
   772|         0|            0|            0|  0.00%|        return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
   773|         0|            0|            0|  0.00%|
   774|         0|            0|            0|  0.00%|def XmlClient(*args, **kwds):
   775|         0|            0|            0|  0.00%|    global xmlrpclib
   776|         0|            0|            0|  0.00%|    import xmlrpc.client as xmlrpclib
   777|         0|            0|            0|  0.00%|    return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)
   778|         0|            0|            0|  0.00%|
   779|         0|            0|            0|  0.00%|#
   780|         0|            0|            0|  0.00%|# Wait
   781|         0|            0|            0|  0.00%|#
   782|         0|            0| 