Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiple mad istances #43

Open
rdemaria opened this issue Oct 4, 2018 · 3 comments
Open

multiple mad istances #43

rdemaria opened this issue Oct 4, 2018 · 3 comments

Comments

@rdemaria
Copy link
Collaborator

rdemaria commented Oct 4, 2018

I would like to start a certain number of madx instances (say 16) to parallelize some studies. I understand that the communication with the Madx object are blocking, although the mad process lives in a separate process. I thought to create a list of Madx object and a thread pool to send the commands, but it does not really work reliably:

from concurrent.futures import ThreadPoolExecutor
from cpymad.madx import Madx

class MadxPool(object):
    def __init__(self,mad_istances=1,workers=None):
        self.mad_istances=mad_istances
        if workers is None:
            self.workers=mad_istances
        else:
            self.workers=workers
        self.madout=[ [] for thread in range(self.mad_istances)]
        self.mad=[ Madx(stdout=out.append) for out in self.madout]
        self.pool=ThreadPoolExecutor(max_workers=self.workers)
    def input(self,text):
        for mad in self.mad:
            self.pool.submit(mad.input,text)
    def input_map(self,text_list):
        for mad,text in zip(self.mad,text_list):
            self.pool.submit(mad.input,text)

mad=MadxPool(16)
mad.input(<heavy script>)

The mad instances run in parallel, and partially work, however I get few of those errors:

Traceback (most recent call last):
  File "/home/rdemaria/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/home/rdemaria/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/home/rdemaria/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/minrpc/service.py", line 120, in <module>
    Service.stdio_main(sys.argv[1:])
  File "/home/rdemaria/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/minrpc/service.py", line 38, in stdio_main
    svc.run()
  File "/home/rdemaria/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/minrpc/service.py", line 53, in run
    while self._communicate():
  File "/home/rdemaria/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/minrpc/service.py", line 74, in _communicate
    return self._dispatch(request)
  File "/home/rdemaria/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/minrpc/service.py", line 90, in _dispatch
    self._reply_data(response)
  File "/home/rdemaria/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/minrpc/service.py", line 111, in _reply_data
    self._conn.send(('data', (data,)))
  File "/home/rdemaria/.pyenv/versions/anaconda3-5.2.0/lib/python3.6/site-packages/minrpc/connection.py", line 53, in send
    return pickle.dump(data, self._send, -1)
BrokenPipeError: [Errno 32] Broken pipe

Do you suggest another way? A process pool complains because the Madx object is not pickable.

@coldfix
Copy link
Member

coldfix commented Oct 4, 2018

Hi, there is at leaste one error because your code doesn't ensure that at most one thread is using a particular Madx instance at a time. If two threads talk to the same Madx at the same time, it will gobble up the commands/responses and lead to crashes.

Can you try adding this guarantee, e.g.:

    def input(self,text):
        jobs = [self.pool.submit(mad.input, text) for mad in self.mad]
        return [job.result() for job in jobs]

    def input_map(self, texts):
        jobs = [self.pool.submit(mad.input, text) for mad, text in zip(self.mad, texts)]
        return [job.result() for job in jobs]

@coldfix
Copy link
Member

coldfix commented Oct 4, 2018

I guess I could also investigate the possibility of adding async mode to Madx in which input() and commands return futures instead of blocking. Read access, however, would still be synchronous (because it will be significantly more work to update the entire API..).

@rdemaria
Copy link
Collaborator Author

rdemaria commented Oct 5, 2018

It looks like it is working with your fix. I have also implemented a general method:

    def map(self,target,*args):
        lst=zip(self.mad,zip(*args))
        jobs = [self.pool.submit(target, mad,*arg) for mad,arg in lst]
        return [job.result() for job in jobs]

However I am seeing, while on an ipython shell, a call that takes much longer than expected with the threads with 3% CPU load. It could be some i/o locking maybe, but I need to investigate further. At the moment I am using 16 mad instances in a 16 core machine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants