-
Notifications
You must be signed in to change notification settings - Fork 193
/
cobalt.py
228 lines (178 loc) · 8.04 KB
/
cobalt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import logging
import os
import time
from parsl.providers.error import ScaleOutFailed
from parsl.channels import LocalChannel
from parsl.launchers import AprunLauncher
from parsl.providers.cobalt.template import template_string
from parsl.providers.cluster_provider import ClusterProvider
from parsl.utils import RepresentationMixin, wtime_to_minutes
logger = logging.getLogger(__name__)
translate_table = {
'QUEUED': 'PENDING',
'STARTING': 'PENDING',
'RUNNING': 'RUNNING',
'EXITING': 'COMPLETED',
'KILLING': 'COMPLETED'
}
class CobaltProvider(ClusterProvider, RepresentationMixin):
""" Cobalt Execution Provider
This provider uses cobalt to submit (qsub), obtain the status of (qstat), and cancel (qdel)
jobs. Theo script to be used is created from a template file in this
same module.
Parameters
----------
channel : Channel
Channel for accessing this provider. Possible channels include
:class:`~parsl.channels.LocalChannel` (the default),
:class:`~parsl.channels.SSHChannel`, or
:class:`~parsl.channels.SSHInteractiveLoginChannel`.
nodes_per_block : int
Nodes to provision per block.
min_blocks : int
Minimum number of blocks to maintain.
max_blocks : int
Maximum number of blocks to maintain.
walltime : str
Walltime requested per block in HH:MM:SS.
account : str
Account that the job will be charged against.
queue : str
Torque queue to request blocks from.
scheduler_options : str
String to prepend to the submit script to the scheduler.
worker_init : str
Command to be run before starting a worker, such as 'module load Anaconda; source activate env'.
launcher : Launcher
Launcher for this provider. Possible launchers include
:class:`~parsl.launchers.AprunLauncher` (the default) or,
:class:`~parsl.launchers.SingleNodeLauncher`
"""
def __init__(self,
channel=LocalChannel(),
nodes_per_block=1,
init_blocks=0,
min_blocks=0,
max_blocks=10,
parallelism=1,
walltime="00:10:00",
account=None,
queue=None,
scheduler_options='',
worker_init='',
launcher=AprunLauncher(),
cmd_timeout=10):
label = 'cobalt'
super().__init__(label,
channel=channel,
nodes_per_block=nodes_per_block,
init_blocks=init_blocks,
min_blocks=min_blocks,
max_blocks=max_blocks,
parallelism=parallelism,
walltime=walltime,
launcher=launcher,
cmd_timeout=cmd_timeout)
self.account = account
self.queue = queue
self.scheduler_options = scheduler_options
self.worker_init = worker_init
def _status(self):
""" Internal: Do not call. Returns the status list for a list of job_ids
Args:
self
Returns:
[status...] : Status list of all jobs
"""
jobs_missing = list(self.resources.keys())
retcode, stdout, stderr = self.execute_wait("qstat -u $USER")
# Execute_wait failed. Do no update.
if retcode != 0:
return
for line in stdout.split('\n'):
if line.startswith('='):
continue
parts = line.upper().split()
if parts and parts[0] != 'JOBID':
job_id = parts[0]
if job_id not in self.resources:
continue
status = translate_table.get(parts[4], 'UNKNOWN')
self.resources[job_id]['status'] = status
jobs_missing.remove(job_id)
# squeue does not report on jobs that are not running. So we are filling in the
# blanks for missing jobs, we might lose some information about why the jobs failed.
for missing_job in jobs_missing:
if self.resources[missing_job]['status'] in ['RUNNING', 'KILLING', 'EXITING']:
self.resources[missing_job]['status'] = translate_table['EXITING']
def submit(self, command, tasks_per_node, job_name="parsl.cobalt"):
""" Submits the command onto an Local Resource Manager job of parallel elements.
Submit returns an ID that corresponds to the task that was just submitted.
If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer
If tasks_per_node == 1:
A single node is provisioned
If tasks_per_node > 1 :
tasks_per_node number of nodes are provisioned.
Args:
- command :(String) Commandline invocation to be made on the remote side.
- tasks_per_node (int) : command invocations to be launched per node
Kwargs:
- job_name (String): Name for job, must be unique
Returns:
- None: At capacity, cannot provision more
- job_id: (string) Identifier for the job
"""
if self.provisioned_blocks >= self.max_blocks:
logger.warning("[%s] at capacity, cannot add more blocks now", self.label)
return None
account_opt = '-A {}'.format(self.account) if self.account is not None else ''
job_name = "parsl.{0}.{1}".format(job_name, time.time())
script_path = "{0}/{1}.submit".format(self.script_dir, job_name)
script_path = os.path.abspath(script_path)
job_config = {}
job_config["scheduler_options"] = self.scheduler_options
job_config["worker_init"] = self.worker_init
logger.debug("Requesting nodes_per_block:%s tasks_per_node:%s",
self.nodes_per_block, tasks_per_node)
# Wrap the command
job_config["user_script"] = self.launcher(command, tasks_per_node, self.nodes_per_block)
queue_opt = '-q {}'.format(self.queue) if self.queue is not None else ''
logger.debug("Writing submit script")
self._write_submit_script(template_string, script_path, job_name, job_config)
channel_script_path = self.channel.push_file(script_path, self.channel.script_dir)
command = 'qsub -n {0} {1} -t {2} {3} {4}'.format(
self.nodes_per_block, queue_opt, wtime_to_minutes(self.walltime), account_opt, channel_script_path)
logger.debug("Executing {}".format(command))
retcode, stdout, stderr = self.execute_wait(command)
# TODO : FIX this block
if retcode != 0:
logger.error("Failed command: {0}".format(command))
logger.error("Launch failed stdout:\n{0} \nstderr:{1}\n".format(stdout, stderr))
logger.debug("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())
job_id = None
if retcode == 0:
# We should be getting only one line back
job_id = stdout.strip()
self.resources[job_id] = {'job_id': job_id, 'status': 'PENDING'}
else:
logger.error("Submission of command to scale_out failed: {0}".format(stderr))
raise (ScaleOutFailed(self.__class__, "Request to submit job to local scheduler failed"))
logger.debug("Returning job id : {0}".format(job_id))
return job_id
def cancel(self, job_ids):
""" Cancels the jobs specified by a list of job ids
Args:
job_ids : [<job_id> ...]
Returns :
[True/False...] : If the cancel operation fails the entire list will be False.
"""
job_id_list = ' '.join(job_ids)
retcode, stdout, stderr = self.execute_wait("qdel {0}".format(job_id_list))
rets = None
if retcode == 0:
for jid in job_ids:
self.resources[jid]['status'] = translate_table['KILLING'] # Setting state to cancelled
rets = [True for i in job_ids]
else:
rets = [False for i in job_ids]
return rets