Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running endless loops in parallel #196

Open
rd-marcel-haugwitz opened this issue Jan 14, 2021 · 11 comments
Open

Running endless loops in parallel #196

rd-marcel-haugwitz opened this issue Jan 14, 2021 · 11 comments

Comments

@rd-marcel-haugwitz
Copy link

Hi,
I would like to run two classes with endlessly running loops in parallel.
But only one endless loop is starting....
Am I missing something? I am seeing only the output of one method in the console...

import time
from charm4py import Chare, charm, coro
starttime = time.time()

class Ax(Chare):
    @coro
    def run(self):
        while True:
            print("ox")
            time.sleep(1)

class Ox(Chare):
    @coro
    def run(self):
        while True:
            print("ax")
            time.sleep(1)

def main(args):
    ax_proxy = Chare(Ax)
    ox_proxy = Chare(Ox)
    combined_proxies = charm.combine(ax_proxy, ox_proxy)
    combined_proxies.run()

charm.start(main)
@ZwFink
Copy link
Contributor

ZwFink commented Jan 14, 2021

How many PEs did you use to run the above code? Entry methods will run to completion or until the coroutine running the entry method voluntarily yields execution. If the above is running on one PE, one entry method will run forever. Here you will probably want at least 3 PEs: one for each Ox/Ax and one for the main chare. To ensure the Ox/Ax chares are placed on different PEs, you can change the chare creation to the following:

    ax_proxy = Chare(Ax, onPE=1)
    ox_proxy = Chare(Ox, onPE=2)

@rd-marcel-haugwitz
Copy link
Author

Hi ZwFink,

Hmm, I applied your suggestions and tried 3 PEs and it still does not work...

import time
from charm4py import Chare, charm, coro, Array
starttime = time.time()

class Ax(Chare):

    @coro
    def run(self):
        while True:
            print('AX - I am element', self.thisIndex, 'on PE', charm.myPe())
            time.sleep(1)

class Ox(Chare):

    @coro
    def run(self):
        while True:
            print('OX - I am element', self.thisIndex, 'on PE', charm.myPe())
            time.sleep(1)

def main(args):
    ax_proxy = Chare(Ax, onPE=1)
    ox_proxy = Chare(Ox, onPE=2)

    combined_proxies = charm.combine(ax_proxy, ox_proxy)
    combined_proxies.run()

charm.start(main)

In IntelliJ the output looks like this

C:\Users\a12f206\.conda\envs\py36_scos\python.exe -m charmrun.start +p3 C:/dev/IdeaProjects/sco-backend-dummy/src/charm4py_test.py
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\_distributor_init.py:32: UserWarning: loaded more than 1 DLL from .libs:
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\.libs\libopenblas.PYQHXLVVQ7VESDPUVUADXEVJOBGHJPAY.gfortran-win_amd64.dll
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\.libs\libopenblas.QVLO2T66WEPI7JZ63PS3HMOHFEY472BC.gfortran-win_amd64.dll
  stacklevel=1)
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\_distributor_init.py:32: UserWarning: loaded more than 1 DLL from .libs:
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\.libs\libopenblas.PYQHXLVVQ7VESDPUVUADXEVJOBGHJPAY.gfortran-win_amd64.dll
C:\Users\a12f206\.conda\envs\py36_scos\lib\site-packages\numpy\.libs\libopenblas.QVLO2T66WEPI7JZ63PS3HMOHFEY472BC.gfortran-win_amd64.dll
  stacklevel=1)
Charmrun> started all node programs in 1.555 seconds.
Converse/Charm++ Commit ID: v6.10.0-beta1-17-ga5b6b3259
Charm++> Disabling isomalloc because mmap() does not work.
Charm++> scheduler running in netpoll mode.
CharmLB> Load balancer assumes all CPUs are same.
Charm4py> Running Charm4py version 1.0 on Python 3.6.12 (CPython). Using 'cython' interface to access Charm++
Charm++> Running on 1 hosts (1 sockets x 2 cores x 2 PUs = 4-way SMP)
Charm++> cpu topology info is gathered in 0.001 seconds.
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1
AX - I am element (0,) on PE 1

@martinkoenig
Copy link

Hi @ZwFink, this issue is related to the one you helped me a few days ago #194. @rd-marcel-haugwitz and me are working on this together.

The last hint you gave me was this

class ClientMap(ArrayMap):
    def procNum(self, index):
        return 0
...
myMap = Group(ClientMap)
controller = Array(Controller, args=[num_clients, num_cameras_per_client], map=myMap)
controller.run_forever()

This works perfectly fine.
Now we have a second Controller with totally different tasks and functions. Both Controllers have a run() function with no arguments. So we tried to combine both Controller proxies into one, just like in the example Marcel provided.
Maybe this helps to get a bigger picture.

Is this bad practice? Is there a better way to achieve this?

@rd-marcel-haugwitz
Copy link
Author

Hi @ZwFink and @martinkoenig,

with the ClientMap it works when I am starting with at least 2 PEs :-)

import time
from charm4py import Chare, charm, coro, Array, ArrayMap, Group
starttime = time.time()

class ClientMap(ArrayMap):

    pNum = -1

    def __init__(self, pNum):
        self.pNum = pNum

    def procNum(self, index):
        return self.pNum

class Ax(Chare):

    @coro
    def run(self):
        while True:
            print('AX - I am element', self.thisIndex, 'on PE', charm.myPe())
            time.sleep(1)

class Ox(Chare):

    @coro
    def run(self):
        while True:
            print('OX - I am element', self.thisIndex, 'on PE', charm.myPe())
            time.sleep(1)

def main(args):

    my_map1 = Group(ClientMap, args=[0])
    controllerAx = Array(Ax, 1,  args=[], map=my_map1)

    my_map2 = Group(ClientMap, args=[1])
    controllerOx = Array(Ox, 1, args=[], map=my_map2)

    controllerOx.run()
    controllerAx.run()

charm.start(main)

Thank you both for your help!

Marcel

@rd-marcel-haugwitz
Copy link
Author

Hi,

the client map helped, but unfortunately we still have a problem.

In our project we are making use of Reducers and Futures. In combination with endless loops this is not working.
The method Ax.runAx() never finishes...

Do you have any idea? Are we doing something wrong?

import time
from charm4py import Chare, charm, coro, Array, ArrayMap, Group, Future, Reducer


class ClientMap(ArrayMap):
    pNum = -1

    def __init__(self, pNum):
        self.pNum = pNum

    def procNum(self, index):
        return self.pNum

class Wax(Chare):

    def test(self, args):
        print("I NEVER REACH THIS POINT :-(")
        delta = args[0][0]
        future = args[0][1]
        future(delta + 1)

    @coro
    def run(self, future):
        print('WAX START - I am element', self.thisIndex, 'on PE', charm.myPe())
        delta = 5
        self.reduce( self.thisProxy[0].test, [delta, future], Reducer.gather)


class Ax(Chare):

    def __init__(self):
        self.wax = Chare(Wax)

    @coro
    def runAx(self):
        print('AX START - I am element', self.thisIndex, 'on PE', charm.myPe())
        future = Future()
        self.wax.run(future)
        results = future.get()
        print('I NEVER REACH THIS POINT :-(  .... Result of wax is', results)

class Ox(Chare):

    @coro
    def run_endless(self):
        while True:
            print('OX - I am element', self.thisIndex, 'on PE', charm.myPe())
            time.sleep(1)

def main(args):

    my_map1 = Group(ClientMap, args=[0])
    controllerAx = Array(Ax, 1,  args=[], map=my_map1)
    controllerAx.runAx()

    my_map2 = Group(ClientMap, args=[1])
    controllerOx = Array(Ox, 1, args=[], map=my_map2)
    controllerOx.run_endless()

charm.start(main)

@ZwFink
Copy link
Contributor

ZwFink commented Jan 15, 2021

That's interesting that using a PE map solves the problem. I think there are two problems here:

  1. If 2 chares are running on the same PE and one does not ever yield its execution, then the second will never get to run.
  2. I haven't had time to go into the source yet, but the behavior with print() is changed in Charm4Py to use that of Charm++. You can read more about it here. It appears that when chares do not voluntarily yield execution the output is buffered locally in some cases. I'm not sure yet if there is a bug causing one chare's output to be seen, while the other does not.

I think this issue was first a case of number 1, but then perhaps became a case of number 2. If we change the original example you provided to use charm.sleep(time), which, when called from a coroutine, suspends the chare that is running for at least time seconds, you will see the output of both chares regardless of whether they are put on a single PE or not:

import time
from charm4py import Chare, charm, coro, Array, Future
starttime = time.time()

class Ax(Chare):

    def __init__(self, done_future):
        self.done_future = done_future
    @coro
    def run(self):
        iter_num = 0
        while iter_num < 1000:
            iter_num += 1
            print(f'AX - I am element with counter {iter_num} ', self.thisIndex, 'on PE', charm.myPe())
            if not iter_num % 100:
                charm.sleep(1)
        self.done_future()

class Ox(Chare):
    def __init__(self, done_future):
        self.done_future = done_future

    @coro
    def run(self):
        iter_num = 0
        while iter_num < 1000:
            iter_num += 1
            print(f'OX - I am element with counter {iter_num} ', self.thisIndex, 'on PE', charm.myPe())
            if not iter_num % 100:
                charm.sleep(1)
        self.done_future()

def main(args):
    done_future = Future(2)
    ax_proxy = Chare(Ax, onPE=1, args=[done_future])
    ox_proxy = Chare(Ox, onPE=1, args=[done_future])

    combined_proxies = charm.combine(ax_proxy, ox_proxy)
    combined_proxies.run()
    done_future.get()
    charm.exit()

charm.start(main)

I also changed the code so a finite number of iterations are done, but this was just to control the amount of output put to the screen. In the output you will see that both Ax and Ox are printing. Also, if you change the above code so Ax/Ox are on separate PEs, you will see that both run at the same time but one always displays its output before the other. Also, changing the code so Ax/Ox write to a file rather than stdout shows that both run in parallel when used in different PEs.

@ZwFink
Copy link
Contributor

ZwFink commented Jan 15, 2021

And regarding your most recent code, it looks like reductions might currently require all PEs to participate in some form, even if the PE's chares do not contribute anything. This might be a technical limitation of Charm++, which I will look into. For now, after each loop you can call charm.sleep(0) (or some small amount of time) for example, and this will get @rd-marcel-haugwitz 's latest example to run correctly. For example, I changed the run_endless function to the following:

    @coro
    def run_endless(self):
        while True:
            print('OX - I am element', self.thisIndex, 'on PE', charm.myPe())
            time.sleep(1)
            charm.sleep(0)

And the output contains:

AX START - I am element (0,) on PE 0
WAX START - I am element (0,) on PE 0
OX - I am element (0,) on PE 1
OX - I am element (0,) on PE 1
I NEVER REACH THIS POINT :-(
I NEVER REACH THIS POINT :-(  .... Result of wax is 6
OX - I am element (0,) on PE 1
OX - I am element (0,) on PE 1

It's important to remember to use charm.sleep in a coroutine for this to work, otherwise time.sleep is used, which will not fix this problem.

@rd-marcel-haugwitz
Copy link
Author

Hi @ZwFink,

Thanks for help!
The code sample I provided is just dummy code. In our real code we do not have the endless loop with time.sleep() but a stream of camera images that have to be processed. That means that charm.sleep unfortunately won't help in our case.

(@martinkoenig)

@ZwFink
Copy link
Contributor

ZwFink commented Jan 16, 2021

Of course, but I'm saying that charm.sleep(0) could be used to overcome the issues with reductions outlined above at the cost of a few microseconds per camera image. You could do something like:

while True:
  image_data = get_image(image_source)
  image_processor.process(image_data)
  charm.sleep(0)

Here the runtime won't actually sleep but will do the bookkeeping necessary to make progress on reductions happening on other PEs make progress and continue the next loop iteration. Does that make sense? It's a bit wonky but will fix that particular issue.

@ZwFink
Copy link
Contributor

ZwFink commented Jan 16, 2021

I should have specified earlier that charm.sleep(n), when called from a coroutine, is not "do nothing until n seconds have passed", but is rather "do any useful work you might have until at least n seconds have passed". Charm4Py will always check for other useful work, even when n=0. So the charm.sleep(0) above allows the runtime to do what it needs for the reductions to make progress on other PEs. charm.sleep will result in sending one message, which I have done a quick measurement on the default Charm4Py build to find that each call to charm.sleep(0) takes 25-30 microseconds.

@rd-marcel-haugwitz
Copy link
Author

Hi @ZwFink ,
thanks a lot for your quick help. That makes sense. And it works in the sample app I provided.
But unfortunately it is still more complicated on our side...
Our application needs some rest endpoints. What we are trying is to run a flask web sever on one PE.
The endless loop is not in our controll but somewhere deep in the flask code. We cannot simply add charm.sleep somewhere.

I added the webserver to the sample code (Installation of flask is required):

import time
from charm4py import Chare, charm, coro, Array, ArrayMap, Group, Future, Reducer
import flask
from flask import Flask

### web server classes
##############################################

class EndpointAction(object):

    def __init__(self, action):
        self.action = action

    def __call__(self, *args):
        # Perform the action
        answer = self.action()
        # Create the answer (bundle it in a correctly formatted HTTP answer)
        self.response = flask.Response(answer, status=200, headers={})
        # Send it
        return self.response


class FlaskAppWrapper(Chare):
    app = None

    def __init__(self, name):
        self.app = Flask(name)
        # Add root endpoint
        self.add_endpoint(endpoint="/", endpoint_name="/", handler=self.indexAction)
        # You can also add options here : "... , methods=['POST'], ... "

    @coro
    def run(self):
        self.app.run() ##### I can't add charm.sleep here :-/

    def add_endpoint(self, endpoint=None, endpoint_name=None, handler=None):
        self.app.add_url_rule(endpoint, endpoint_name, EndpointAction(handler))

    def indexAction(self):
        # Dummy action
        return "Web server is running..."
        # Test it with curl 127.0.0.1:5000



### charm4py application
##############################################

class ClientMap(ArrayMap):
    pNum = -1

    def __init__(self, pNum):
        self.pNum = pNum

    def procNum(self, index):
        return self.pNum

class Wax(Chare):

    def test(self, args):
        print("I NEVER REACH THIS POINT :-(")
        delta = args[0][0]
        future = args[0][1]
        future(delta + 1)

    @coro
    def run(self, future):
        print('WAX START - I am element', self.thisIndex, 'on PE', charm.myPe())
        delta = 5
        self.reduce( self.thisProxy[0].test, [delta, future], Reducer.gather)


class Ax(Chare):
    def __init__(self):
        self.wax = Chare(Wax)

    @coro
    def runAx(self):
        print('AX START - I am element', self.thisIndex, 'on PE', charm.myPe())
        future = Future()
        self.wax.run(future)
        results = future.get()
        print('I NEVER REACH THIS POINT :-(  .... Result of wax is', results)


def main(args):

    my_map1 = Group(ClientMap, args=[0])
    controllerAx = Array(Ax, 1,  args=[], map=my_map1)
    controllerAx.runAx()

    my_map2 = Group(ClientMap, args=[1])
    web_server = Array(FlaskAppWrapper, 1, args=["my_web_server"], map=my_map2)

    print("Starting server on http://127.0.0.1:5000/")
    web_server.run()

charm.start(main)

@martinkoenig

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants