In [45]:
%%file ./TaskmanagerCarlo.py

from __future__ import print_function

from multiprocessing.managers import BaseManager
from multiprocessing import JoinableQueue, Queue

class TaskManager(BaseManager):
    pass

if __name__ == '__main__':
    from sys import argv, exit
    if len(argv) != 2:
        print('usage:', argv[0], b'socket_nr')
        exit(0)
    master_socket = int(argv[1])
    task_queue = JoinableQueue()
    result_queue = Queue()
    TaskManager.register('get_job_queue', 
                         callable = lambda:task_queue)
    TaskManager.register('get_result_queue', 
                         callable = lambda:result_queue)
    m = TaskManager(address = ('', master_socket), 
                    authkey = b'secret')
    print('starting queue server, socket', master_socket)
    m.get_server().serve_forever()

Overwriting ./TaskmanagerCarlo.py


In [46]:
%%file ./workerCarloCython.pyx
from libc.stdlib cimport rand, srand, RAND_MAX
cimport cython

cpdef monte_carlo(quantity, seed):
    return cdef_monte_carlo(quantity, seed)

@cython.cdivision(True)
cdef long cdef_monte_carlo(long quantity, long seed):
    cdef long hits = 0
    cdef double width = 0.0
    cdef double height = 0.0
    cdef double temp = 0.0
    srand(seed)
    
    for _ in range(quantity):
        width = <double>rand()/RAND_MAX
        height = <double>rand()/RAND_MAX
        if width * width + height * height < 1:
            hits += 1

    return hits

Overwriting ./workerCarloCython.pyx


In [47]:
%%file ./workerCarlo.py

from __future__ import print_function

from multiprocessing import cpu_count, Process
from TaskmanagerCarlo import TaskManager
from workerCarloCython import monte_carlo

def __worker_monte_carlo(job_queue, result_queue):
    while True:
        arguments = job_queue.get()
        x, y = arguments
        result = monte_carlo(x,y)
        result_queue.put(result)
        job_queue.task_done()

def __start_workers(m):
    job_queue, result_queue = m.get_job_queue(), m.get_result_queue()
    nr_of_processes = cpu_count()
    processes = [Process(target = __worker_monte_carlo,
            args = (job_queue, result_queue))
        for i in range(nr_of_processes)]
    for p in processes:
        p.start()
    return nr_of_processes

if __name__ == '__main__':
    from sys import argv, exit
    if len(argv) < 3:
        print('usage:', argv[0], 'server_IP server_socket')
        exit(0)
    server_ip = argv[1]
    server_socket = int(argv[2])
    TaskManager.register('get_job_queue')
    TaskManager.register('get_result_queue')
    m = TaskManager(address=(server_ip, server_socket), authkey = b'secret')
    m.connect()
    nr_of_processes = __start_workers(m)
    print(nr_of_processes, 'workers started')

Overwriting ./workerCarlo.py


In [48]:
%%file ./setupCarlo.py
# Aufruf: python3 setup.py build_ext --inplace
# Windows: zusaetzliche Option --compiler=mingw32
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext
import numpy


ext_modules=[ Extension("workerCarlo", ["workerCarlo.py"],
        extra_compile_args=['-O3'], libraries=['m']),
             Extension("workerCarloCython", ["workerCarloCython.pyx"],
        extra_compile_args=['-O3', '-fopenmp'], libraries=['m'],
        #extra_compile_args=['-O3'], libraries=['m'],
        extra_link_args=['-fopenmp'],
)       
        #Maybe needed
        #extra_compile_args=['-O3'], libraries=['m'],
        #include_dirs=[numpy.get_include()]),
]
             
setup( name = 'cython demo',
  cmdclass = {'build_ext': build_ext},
  ext_modules = ext_modules,
     include_dirs=[numpy.get_include()])

Overwriting ./setupCarlo.py


In [49]:
%%bash
#Compilen
python3 setupCarlo.py build_ext --inplace

running build_ext
cythoning workerCarlo.py to workerCarlo.c
building 'workerCarlo' extension
gcc -pthread -B /opt/anaconda/compiler_compat -Wl,--sysroot=/ -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-prototypes -fPIC -I/opt/anaconda/lib/python3.6/site-packages/numpy/core/include -I/opt/anaconda/include/python3.6m -c workerCarlo.c -o build/temp.linux-x86_64-3.6/workerCarlo.o -O3
gcc -pthread -shared -B /opt/anaconda/compiler_compat -L/opt/anaconda/lib -Wl,-rpath=/opt/anaconda/lib -Wl,--no-as-needed -Wl,--sysroot=/ build/temp.linux-x86_64-3.6/workerCarlo.o -L/opt/anaconda/lib -lm -lpython3.6m -o /home/shiroten/Nextcloud HS-Augsburg/Studium Dokumente/Aktuelles Semester/Rösch - Programmieren 3/Programmieren-3---Private/prog3-10/workerCarlo.cpython-36m-x86_64-linux-gnu.so
cythoning workerCarloCython.pyx to workerCarloCython.c
building 'workerCarloCython' extension
gcc -pthread -B /opt/anaconda/compiler_compat -Wl,--sysroot=/ -Wsign-compare -DNDEBUG -g -fwrapv -O3 -Wall -Wstrict-pro

In [53]:
%%file ./masterCarlo.py

from __future__ import print_function

from TaskmanagerCarlo import TaskManager
import random, time

def __calculate(m, number_of_trys, number_of_parts, worker_count):
    worker_trys = number_of_trys / worker_count
    worker_trys_part = worker_trys / number_of_parts
    
    job_queue, result_queue = m.get_job_queue(), m.get_result_queue()

    in_list = []
    result_list = []    
    
    for i in range (number_of_parts * worker_count):
        random_number = int(random.random() * (2**31 -1))
        in_list.append((worker_trys_part, random_number))
    
    for parameter_set in in_list:
        job_queue.put(parameter_set)    

    job_queue.join()
    while not result_queue.empty():
        result_list.append(result_queue.get()) 
        
    print(result_list)

    hits = 0
    trys = 0

    for e in in_list:
        x = e[0]
        trys += x

    for result_tuple in result_list:
        hits += result_tuple


    print((hits)/trys * 4)


if __name__ == '__main__':
    from sys import argv, exit
    if len(argv) != 6:
        print('usage:', argv[0], 'server_IP server_socket number_of_trys number_of_parts worker_count')
        exit(0)
    server_ip = argv[1]
    server_socket = int(argv[2])
    number_of_trys = int(argv[3])
    number_of_parts = int(argv[4])
    worker_count = int(argv[5])
    
    TaskManager.register('get_job_queue')
    TaskManager.register('get_result_queue')
    m = TaskManager(address=(server_ip, server_socket), authkey = b'secret')
    m.connect()

    t1 = time.time()
    result = __calculate(m, number_of_trys, number_of_parts, worker_count)
    t2 = time.time()
    print(' result: ', result)
    print(' time:   ', t2-t1, ' s\n')

Overwriting ./masterCarlo.py
