Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error connecting to dask client - giving error There is no current event loop in thread when not using asyncio #3156

Open
ameetshah1983 opened this issue Oct 18, 2019 · 5 comments

Comments

@ameetshah1983
Copy link

We are trying to connect to dask client using Client(scheduler_url) syntax using concurrent.futures threadpool implementation and receiving the error below.

Traceback (most recent call last):
  File "/home/app/dask.py", line 848, in get_dask_client
    return Client(scheduler)
  File "/base/anaconda/lib/python3.6/site-packages/distributed/client.py", line 668, in __init__
    self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
  File "/base/anaconda/lib/python3.6/site-packages/distributed/utils.py", line 364, in __init__
    current = IOLoop.current()
  File "/base/andaconda/pithon3/lib/python3.6/site-packages/tornado/ioloop.py", line 265, in current
    loop = asyncio.get_event_loop()
  File "/base/anaconda/pithon3/lib/python3.6/asyncio/events.py", line 694, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/base/anaconda/pithon3/lib/python3.6/asyncio/events.py", line 602, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

The function submitting the concurrent future is a async function but the function being executed is not async and not using asyncio in any way. Dont need an event loop but for some reason tornado requires it looks like.

@mrocklin
Copy link
Member

mrocklin commented Oct 19, 2019 via email

@ameetshah1983
Copy link
Author

ameetshah1983 commented Oct 21, 2019

Sorry for some delay in getting back. Looks like the issue occurs when running Sanic service in container on linux. Testing it locally on windows does not have any issues. Code to reproduce the issue -

from distributed import Client
import asyncio
import concurrent.futures

x = concurrent.futures.ThreadPoolExecutor(max_workers=10)

def test999():
    cl = Client('some_machine:8786')
    print(cl.scheduler)
    return cl

@app.route('/test123', methods=['POST'])
   async def test123(request):
    x.submit(test999)
    return

Fixed the issue for now based on thread tornadoweb/tornado#2308 by adding -
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())

@mrocklin
Copy link
Member

Looking briefly at the code above it doesn't appear to be runnable without some effort, like setting up scheduler, indenting code correctly, defining app, and so on. As a result, I'll come back to this when I have more time (which may not be for a very long while)

@jamesstidard
Copy link

Hi,

I'm experiencing the exact same issue, I've put together a self contained script that will reproduce the issue.

requirements:

$ pip install sanic==19.9.0 dask[complete]==2.8.1

script:

import time
import asyncio

from sanic import Sanic
from sanic.response import json

from dask import delayed
from dask.distributed import Client, get_client


@delayed(pure=True)
def sleep(wait):
    time.sleep(wait)
    return wait


@delayed(pure=True)
def task(wait):
    # Grab the client and build a
    # nested graph to execute
    client = get_client()
    sleeps = client.compute([sleep(w) for w in wait])
    result = client.gather(sleeps)
    return result


async def create_server():
    app = Sanic(__name__)

    @app.listener("before_server_start")
    async def _(app_, loop_):
        # for every app process that will run
        # attach their own client
        app_.dask_client = await Client(asynchronous=True, processes=False)

    @app.listener("after_server_stop")
    async def after_server_stop(app_, loop_):
        # clean up dask clients on shutdown
        await app_.dask_client.close()

    @app.route("/")
    async def go(request):
        # handle GET request at http://0.0.0.0:8080/
        client = request.app.dask_client
        result = await client.compute(task([1, 2, 3, 4, 5]))
        return json(result)

    return app


if __name__ == "__main__":
    app = asyncio.run(create_server())
    app.run(host="0.0.0.0", port=8080, workers=1)

If you run this file and open the browser at http://0.0.0.0:8080/ you should see the stack trace:

[2019-11-28 15:00:44 +0000] [46564] [INFO] Goin' Fast @ http://0.0.0.0:8080
[2019-11-28 15:00:46 +0000] [46564] [INFO] Starting worker [46564]
distributed.worker - WARNING -  Compute Failed
Function:  task
args:      ([1, 2, 3, 4, 5])
kwargs:    {}
Exception: RuntimeError('There is no current event loop in thread "Dask-Worker-Threads\'-46584-2".')

[2019-11-28 15:00:54 +0000] [46564] [ERROR] Exception occurred while handling uri: 'http://0.0.0.0:8080/'
Traceback (most recent call last):
  File "/private/var/folders/jd/dzc7lf2x00s5tf0130jm99k80000gn/T/pyground.XXXXXXX.VU3HEsU4/.venv/lib/python3.7/site-packages/sanic/app.py", line 942, in handle_request
    response = await response
  File "/private/var/folders/jd/dzc7lf2x00s5tf0130jm99k80000gn/T/pyground.XXXXXXX.VU3HEsU4/main.py", line 39, in go
    result = await client.compute(task([1, 2, 3, 4, 5]))
  File "/private/var/folders/jd/dzc7lf2x00s5tf0130jm99k80000gn/T/pyground.XXXXXXX.VU3HEsU4/.venv/lib/python3.7/site-packages/distributed/client.py", line 234, in _result
    raise exc.with_traceback(tb)
  File "/private/var/folders/jd/dzc7lf2x00s5tf0130jm99k80000gn/T/pyground.XXXXXXX.VU3HEsU4/main.py", line 19, in task
    client = get_client()
  File "/private/var/folders/jd/dzc7lf2x00s5tf0130jm99k80000gn/T/pyground.XXXXXXX.VU3HEsU4/.venv/lib/python3.7/site-packages/distributed/worker.py", line 3018, in get_client
    return worker._get_client(timeout=timeout)
  File "/private/var/folders/jd/dzc7lf2x00s5tf0130jm99k80000gn/T/pyground.XXXXXXX.VU3HEsU4/.venv/lib/python3.7/site-packages/distributed/worker.py", line 2909, in _get_client
    asynchronous = self.loop is IOLoop.current()
  File "/private/var/folders/jd/dzc7lf2x00s5tf0130jm99k80000gn/T/pyground.XXXXXXX.VU3HEsU4/.venv/lib/python3.7/site-packages/tornado/ioloop.py", line 265, in current
    loop = asyncio.get_event_loop()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/events.py", line 644, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread "Dask-Worker-Threads'-46584-2".
[2019-11-28 15:00:54 +0000] - (sanic.access)[INFO][127.0.0.1:53966]: GET http://0.0.0.0:8080/  500 144

If you toggle processes=False instead, this will not produce the error and you'll get the expected results: after 5 seconds a response in the browsers of [1,2,3,4,5]

@mrocklin I hope this makes this issue a bit more approachable now.

Thanks for your work.

@mrocklin
Copy link
Member

@mrocklin I hope this makes this issue a bit more approachable now.

Thanks @jamesstidard that does help, but it's evidently still not something that I'm personally going to jump into given current time constraints. I don't have any real experience with Sanic.

However, if other people have to investigate this that would be great. My guess is that Sanic is doing something odd with processes or the event loop that clashes with what Dask does. A couple things to try:

  1. Try using the spawn multiprocessing method (see the distributed.worker.multiprocessing-context config value) rather than fork or forkserver, in case there is some odd state that is being copied poorly
  2. Investigate if sanic uses processes or anything by default, and if so see what can be done to play with configuration settings there (sorry, I don't know much about Sanic internals)
  3. See if things work without get_client() and if so just avoid using it, it adds a lot of extra complexity that, at least in this simple example, isn't necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants