/
local.py
105 lines (85 loc) · 2.86 KB
/
local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import logging
import os
from tornado.process import Subprocess
from .core import Job, JobQueueCluster, job_parameters, cluster_parameters
logger = logging.getLogger(__name__)
class LocalJob(Job):
__doc__ = """ Use Dask Jobqueue with local bash commands
This is mostly for testing. It uses all the same machinery of
dask-jobqueue, but rather than submitting jobs to some external job
queueing system, it launches them locally. For normal local use, please
see ``dask.distributed.LocalCluster``
Parameters
----------
{job}
""".format(
job=job_parameters
)
config_name = "local"
def __init__(
self,
scheduler=None,
name=None,
queue=None,
project=None,
resource_spec=None,
walltime=None,
config_name=None,
**kwargs
):
# Instantiate args and parameters from parent abstract class
super().__init__(
scheduler=scheduler,
name=name,
config_name=config_name,
shebang="",
**kwargs
)
# Declare class attribute that shall be overridden
self.job_header = ""
logger.debug("Job script: \n %s" % self.job_script())
async def _submit_job(self, script_filename):
# Should we make this async friendly?
with open(script_filename) as f:
text = f.read().strip().split()
self.process = Subprocess(
text, stdout=Subprocess.STREAM, stderr=Subprocess.STREAM
)
lines = []
while True:
line = await self.process.stderr.read_until(
b"\n"
) # make sure that we start
lines.append(line.decode())
if b"Registered to:" in line:
break
if b"error" in line.lower():
raise Exception("Worker failed\n\n" + "".join(lines))
return str(self.process.pid)
@classmethod
async def _close_job(self, job_id, cancel_command):
os.kill(int(job_id), 9)
# from distributed.utils_test import terminate_process
# terminate_process(self.process)
class LocalCluster(JobQueueCluster):
__doc__ = """ Use dask-jobqueue with local bash commands
This is mostly for testing. It uses all the same machinery of
dask-jobqueue, but rather than submitting jobs to some external job
queueing system, it launches them locally. For normal local use, please
see ``dask.distributed.LocalCluster``
Parameters
----------
{job}
{cluster}
Examples
--------
>>> from dask_jobqueue import LocalCluster
>>> cluster = LocalCluster(cores=2, memory="4 GB")
>>> cluster.scale(jobs=3) # ask for 3 jobs
See Also
--------
dask.distributed.LocalCluster
""".format(
job=job_parameters, cluster=cluster_parameters
)
job_cls = LocalJob