Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Danpf/debug cluster #221

Closed
wants to merge 10 commits into from
Closed

Danpf/debug cluster #221

wants to merge 10 commits into from

Conversation

danpf
Copy link
Contributor

@danpf danpf commented Jan 16, 2019

EDIT4:
'edit2' fix is definitely wrong. currently thinking that this occurs when worker dies in the middle of sending data, really hard to test though...
EDIT3:
the 'edit2' fix doesn't seem to work all the time. seems to be related to when I scatter large files...?
EDIT2:
possible fix (not reproducible here(yet)) is to use this patch on distributed. doesn't really make sense though...

diff --git a/distributed/client.py b/distributed/client.py
index 018d8ca6..d57fb7d5 100644
--- a/distributed/client.py
+++ b/distributed/client.py
@@ -667,6 +667,7 @@ class Client(Node):
         asynchronous = kwargs.pop('asynchronous', None)
         if asynchronous or self.asynchronous or getattr(thread_state, 'asynchronous', False):
             callback_timeout = kwargs.pop('callback_timeout', None)
+            callback_timeout = 1200
             future = func(*args, **kwargs)
             if callback_timeout is not None:
                 future = gen.with_timeout(timedelta(seconds=callback_timeout),

EDIT:
i made tests (python3.7 only) for everything and got everything to pass on my end, suggesting that i'm not reproducing my failures properly. I will continue to try to figure out what is causing my failures.


This is partially an issue, i don't expect this to get merged, but it might be useful. in advance, i realize this is mainly a distributed bug/fault, but it's pid tracking/killing is easier in jobqueue than it is in distributed.

Purpose:
i want to use dask on a backfill queue, IE, workers can be killed with signal 15 at any moment. This appears to work fine with gather, but never with as_completed. Since we can be killed at any moment, I want to save my result files back at the scheduler using as_completed so things progress smoothly. not sure if relevant, but there's a large amount of data that has to be passed from the worker to the scheduler, (on the order of 10+GB), relatively small per job, but a very large amount of them, and i think saving them all for a gather call with be too expensive memory wise.

problem:
I consistently get errors that are similar to log1 or log2, or my jobs seem to stall and not repopulate.

question:
I think while waiting in as_completed distributed needs to be checking for lost workers and updating the jobs of those workers as needed, not exactly sure how that should be done though... additionally somehow when you cancel the futures they should be replaced or somehow re-run. At least that's what i surmise from my error logs....

important
if you want to try this PR, you have to add this to your distributed installation:

diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py
index 645869fc..6fb22ad7 100755
--- a/distributed/cli/dask_worker.py
+++ b/distributed/cli/dask_worker.py
@@ -4,6 +4,7 @@ import atexit
 import logging
 import os
 from sys import exit
+import psutil
 
 import click
 from distributed import Nanny, Worker
@@ -101,6 +102,11 @@ def main(scheduler, host, worker_port, listen_address, contact_address,
          bokeh_port, local_directory, scheduler_file, interface,
          death_timeout, preload, preload_argv, bokeh_prefix, tls_ca_file,
          tls_cert, tls_key):
+
+    if "--pid--" in name:
+        name = name.replace("--pid--", "--{}--".format(psutil.Process().pid))
+        logger.info("Overriding name to process pid: {}".format(name))
+
     enable_proctitle_on_current()
     enable_proctitle_on_children()
 

Related:
#122
I tried some techniques from this PR, however I have still seen errors in my production runs with try: except blocks...

Attempts to solve:
I also tried my own as_completed (asyncio only), which appears to work, for my test case, but i'm still getting errors that look like log1 or log2 in production.

A big problem is that I'm using asyncio and it doesn't seem to be a very popular option...

error log1

Traceback (most recent call last):
  File "run_on_all_targets.py", line 82, in <module>
    loop.run_until_complete(main())
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
concurrent.futures._base.CancelledError
Exception ignored in: <generator object Scheduler.add_client at 0x15553f4295e8>
Traceback (most recent call last):
  File "/home/danpf/git/distributed/distributed/scheduler.py", line 2039, in add_client
    self.remove_client(client=client)
  File "/home/danpf/git/distributed/distributed/scheduler.py", line 2078, in remove_client
    remove_client_from_events
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/ioloop.py", line 638, in call_later
    return self.call_at(self.time() + delay, callback, *args, **kwargs)
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/site-packages/tornado/platform/asyncio.py", line 145, in call_at
    functools.partial(stack_context.wrap(callback), *args, **kwargs))
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 641, in call_later
    context=context)
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 651, in call_at
    self._check_closed()
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 461, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

error log2

...
JobQueueCluster.scale_up was called with a number of workers lower that what is already running or pending
Traceback (most recent call last):
  File "run_c_ext.py", line 86, in <module>
    loop.run_until_complete(main())
  File "/home/danpf/.local/share/pyenv/versions/3.7.0/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
concurrent.futures._base.CancelledError
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
distributed.scheduler - INFO - Remove worker tcp://172.16.131.155:13673
distributed.core - INFO - Removing comms to tcp://172.16.131.155:13673
...

sorry for being lengthy..

@guillaumeeb
Copy link
Member

guillaumeeb commented Jan 21, 2019

I cannot dive into this right know, but I just wanted to say this would be really interesting to have this working!

@danpf danpf closed this Aug 1, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants