Skip to content

Closing dangling stream - running dask Client in seperate event loop #4546

@gabpeters

Description

@gabpeters

Hey everyone, i have the following issue:
We have a windows web Application which requires to run with asyncio ProactorEventLoop and
we are currently setting up dask Client as new backend executor. Renownedly dask requires asyncio SelectorEventLoop.
I am running the dask Client in a seperate event loop in a new thread and submit jobs to the client which works fine mostly.
But i always get issues like in #2507 after some time which shows
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://192.168.178.23:63882 remote=tcp://192.168.178.23:8786>
Unfortunately this blocks some of jobs which have been submmitted to the client, which is really annoying

This is a minimal snippet of my code, unforutnately this does not reproduce the error, since it only occures when i have a larger amount (>8,9,10) web requests calling jobs.

def test():
    return 5


class ClientWrapper():
    _client = None
    _cluster = None

    def __init__(self):
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        self._selector_loop = asyncio.new_event_loop()

    async def submit(self, func):
        if self._client is None:
            await self.start_client()
        return await SubmitteronLoop().submit(func, self._client,  self._selector_loop)
    

    async def _init_client_locally(self):
        self._client = await Client(await LocalCluster(asynchronous=True), asynchronous=True)

    async def start_client(self):
        self._thr = Thread(target=self.run_thread)
        self._thr.start()
        while self._client is None:
            await asyncio.sleep(0.001)

    def run_thread(self):
        self._selector_loop.create_task(self._init_client_locally())
        self._selector_loop.run_forever()
        
class SubmitteronLoop:
    _result = None
    async def submit(self, func, client, loop):
        loop.create_task(self.submit_client(func, client))
        while self._result is None:
            await asyncio.sleep(0.001)
        return self._result
    
    async def submit_client(self, func,client):
        self._result = await client.submit(func, pure=False)


async def start():
    cw = ClientWrapper()
    print(await cw.submit(test))

asyncio.run(start())

Remark: This error does NOT occure when I connect to an external scheduler via await Client("tcp://scheduler address").
In that case everything works fince
So this drop of connection to the scheduler has to be encountered with LocalCluster.

My setup:

  • Dask version: 2.6.0
  • Python version: 3.7
  • Operating System: windows 32 bit
  • Install method (conda, pip, source): conda

Does somebody have an idea on how to handle this?
Thanks in advance!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions