Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Danpf/debug cluster #221

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions .travis.yml
Expand Up @@ -35,6 +35,10 @@ matrix:
env:
- OS=ubuntu-14.04
- JOBQUEUE=slurm
- python: "3.7"
env:
- OS=ubuntu-14.04
- JOBQUEUE=none

env:
global:
Expand Down
1 change: 1 addition & 0 deletions dask_jobqueue/__init__.py
Expand Up @@ -7,6 +7,7 @@
from .sge import SGECluster
from .lsf import LSFCluster
from .oar import OARCluster
from .debug import DEBUGCluster

from ._version import get_versions
__version__ = get_versions()['version']
Expand Down
52 changes: 52 additions & 0 deletions dask_jobqueue/debug.py
@@ -0,0 +1,52 @@
from __future__ import absolute_import, division, print_function

import logging
import os

import dask

from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)


class DEBUGCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask locally with os.system calls

Parameters
----------
%(JobQueueCluster.parameters)s

Examples
--------
>>> from dask_jobqueue import DEBUGCluster
>>> cluster = DEBUGCluster(queue='regular')
>>> cluster.scale(10) # this may take a few seconds to launch

>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters. This automatically launches and kill workers based on load.

>>> cluster.adapt()
""", 4)

# Override class variables
submit_command = 'python '
cancel_command = 'kill '

def __init__(self, config_name='debug', **kwargs):
super(DEBUGCluster, self).__init__(config_name=config_name, **kwargs)
if "python" not in self.shebang:
self.shebang = "#!/usr/bin/env python"
og_cmd_template = self._command_template
self._command_template = ("import subprocess\n"
"import shlex\n"
"CMD='xxx'\n"
"lf = open('logfile', 'a')\n"
"print(subprocess.Popen(shlex.split(CMD), stderr=subprocess.STDOUT, stdout=lf).pid)")
#"print(subprocess.Popen(shlex.split(CMD), stderr=subprocess.STDOUT, stdout=subprocess.DEVNULL).pid)"
self._command_template = self._command_template.replace('xxx', og_cmd_template)
self._command_template = self._command_template.replace("${JOB_ID}", "pid")

self.job_header = ''
24 changes: 24 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Expand Up @@ -138,3 +138,27 @@ jobqueue:
mem: null
job-extra: []
log-directory: null

debug:
name: dask-worker

# Dask worker options
cores: 1 # Total number of cores per job
memory: 1 # Total amount of memory per job
processes: 1 # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR

# debug resource manager options
shebang: "#!/usr/bin/env python"
queue: null
project: null
walltime: '00:30'
extra: []
env-extra: []
ncpus: null
mem: null
job-extra: []
log-directory: null
213 changes: 213 additions & 0 deletions dask_jobqueue/tests/test_debug.py
@@ -0,0 +1,213 @@
from __future__ import absolute_import, division, print_function

import asyncio
import subprocess
import time

from dask_jobqueue import DEBUGCluster
from distributed import Client, as_completed


def sleep_abit(x):
time.sleep(1)
return x+10


def test_sync_gather():
"""
this test is now working
"""
d = DEBUGCluster(cores=1, memory="1gb", extra=["--no-nanny", "--no-bokeh"])
d.adapt(minimum=3, maximum=3)
# d.scale(3)
c = Client(d)

while len(c._scheduler_identity["workers"]) < 3:
continue

ret = c.map(lambda x: sleep_abit(x), list(range(10)))
time.sleep(1.2)
old_pids = []
for k, v in c._scheduler_identity["workers"].items():
pid = int(v['id'].split('--')[-2])
old_pids.append(pid)
subprocess.call("kill -9 {}".format(pid).split())
not_done_count = sum([1 for task in ret if task.status == 'pending'])
assert not_done_count == 7

final_ret = c.gather(ret)
new_pids = []
for k, v in c._scheduler_identity["workers"].items():
pid = int(v['id'].split('--')[-2])
new_pids.append(pid)
for pid in new_pids:
assert pid not in old_pids

assert len(final_ret) == 10
c.close()


async def _a_g_main():
d = DEBUGCluster(cores=1, memory="1gb", extra=["--no-nanny", "--no-bokeh"])
d.adapt(minimum=3, maximum=3)
# d.scale(3)
c = await Client(d, asynchronous=True)

while len(c._scheduler_identity["workers"]) < 3:
await asyncio.sleep(1)

ret = c.map(lambda x: sleep_abit(x), list(range(10)))

await asyncio.sleep(1.2)

old_pids = []
for k, v in c._scheduler_identity["workers"].items():
pid = int(v['id'].split('--')[-2])
old_pids.append(pid)
subprocess.call("kill -9 {}".format(pid).split())

not_done_count = sum([1 for task in ret if task.status == 'pending'])
assert not_done_count == 7

final_ret = await asyncio.gather(*ret)
new_pids = []
for k, v in c._scheduler_identity["workers"].items():
pid = int(v['id'].split('--')[-2])
new_pids.append(pid)
for pid in new_pids:
assert pid not in old_pids
assert len(final_ret) == 10
c.close()


def test_async_gather():
"""
this test is now working
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(_a_g_main())


def test_sync_as_completed():
d = DEBUGCluster(cores=1, memory="1gb", extra=["--no-nanny", "--no-bokeh"])
d.adapt(minimum=3, maximum=3)
# d.scale(3)
c = Client(d)

while len(c._scheduler_identity["workers"]) < 3:
continue

ret = c.map(lambda x: sleep_abit(x), list(range(10)))

old_pids = []
new_pids = []
count = 0
work_queue = as_completed(ret)
for ret in work_queue:
result = ret.result()
count += 1
if count == 3:
for k, v in c._scheduler_identity["workers"].items():
pid = int(v['id'].split('--')[-2])
old_pids.append(pid)
subprocess.call("kill -9 {}".format(pid).split())
elif count == 7:
for k, v in c._scheduler_identity["workers"].items():
pid = int(v['id'].split('--')[-2])
new_pids.append(pid)
for pid in new_pids:
assert pid not in old_pids
assert count == 10
c.close()


async def _a_c_main():
d = DEBUGCluster(cores=1, memory="1gb", extra=["--no-nanny", "--no-bokeh"])
d.adapt(minimum=3, maximum=3)
# d.scale(3)
c = await Client(d, asynchronous=True)

while len(c._scheduler_identity["workers"]) < 3:
await asyncio.sleep(1)

ret = c.map(lambda x: sleep_abit(x), list(range(10)))

old_pids = []
new_pids = []
count = 0
work_queue = as_completed(ret)
async for ret in work_queue:
result = await ret
count += 1
if count == 3:
for k, v in c._scheduler_identity["workers"].items():
pid = int(v['id'].split('--')[-2])
old_pids.append(pid)
subprocess.call("kill -9 {}".format(pid).split())
elif count == 7:
for k, v in c._scheduler_identity["workers"].items():
pid = int(v['id'].split('--')[-2])
new_pids.append(pid)
for pid in new_pids:
assert pid not in old_pids
assert count == 10

c.close()


def test_async_as_completed():
loop = asyncio.get_event_loop()
loop.run_until_complete(_a_c_main())


# not sure
# def local_main():
# d = LocalCluster(ncores=1, n_workers=1)
# d.adapt(minimum=3, maximum=3)
# # d.scale(3)
# c = Client(d)

# while len(c._scheduler_identity["workers"]) < 3:
# sleepy()

# print("finally")
# print(c._scheduler_identity)

# ret = c.map(lambda x: sleep_abit(x), list(range(10)))
# # proc = psutil.Process().pid
# print("THIS IS", psutil.Process())

# count = 0
# subprocess.run("ps -u $USER", shell=True)
# subprocess.run("pstree $USER -acp", shell=True)
# work_queue = as_completed(ret)
# for ret in work_queue:
# try:
# result = ret.result()
# except KilledWorker:
# c.retry([ret])
# work_queue.add(ret)
# else:
# print("result", result)
# count += 1
# if count == 3:
# for k, v in c._scheduler_identity["workers"].items():
# print(v)
# pid = int(v['name'])
# subprocess.call(f"kill -15 {pid}", shell=True)
# assert count == 10

# c.close()


# if __name__ == "__main__":
# if sys.argv[1] == "async_a_c":
# loop = asyncio.get_event_loop()
# loop.run_until_complete(a_c_main())
# if sys.argv[1] == "async_g":
# loop = asyncio.get_event_loop()
# loop.run_until_complete(a_g_main())
# # elif sys.argv[1] == "local":
# # local_main()
# else:
# main()