/
simqueue.py
171 lines (136 loc) · 4.5 KB
/
simqueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# (c) 2015-2018 Acellera Ltd http://www.acellera.com
# All Rights Reserved
# Distributed under HTMD Software License Agreement
# No redistribution in whole or part
#
from abc import ABCMeta, abstractmethod
import logging
logger = logging.getLogger(__name__)
class RetrieveError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class SubmitError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class InProgressError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class ProjectNotExistError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class SimQueue(metaclass=ABCMeta):
def __init__(self):
super().__init__()
self._sentinel = 'htmd.queues.done'
# For synchronous
self._dirs = None
@abstractmethod
def retrieve(self):
""" Subclasses need to implement this method """
pass
@abstractmethod
def submit(self, dirs):
""" Subclasses need to implement this method """
pass
def _submitinit(self, dirs):
if isinstance(dirs, str):
dirs = [dirs, ]
if self._dirs is None:
self._dirs = []
self._dirs.extend(dirs)
return dirs
@abstractmethod
def inprogress(self):
""" Subclasses need to implement this method """
pass
def wait(self, sentinel=False):
""" Blocks script execution until all queued work completes
Parameters
----------
sentinel : bool, default=False
If False, it relies on the queueing system reporting to determine the number of running jobs. If True, it
relies on the filesystem, in particular on the existence of a sentinel file for job completion.
Examples
--------
>>> self.wait()
"""
from time import sleep
import sys
while (self.inprogress() if not sentinel else self.notcompleted()) != 0:
self.retrieve()
sys.stdout.flush()
sleep(5)
def notcompleted(self):
"""Returns the sum of the number of job directories which do not have the sentinel file for completion.
Returns
-------
total : int
Total number of directories which have not completed
"""
import os
total = 0
if self._dirs is None:
raise RuntimeError('This method relies on running synchronously.')
for i in self._dirs:
if not os.path.exists(os.path.join(i, self._sentinel)):
total += 1
return total
def _cleanSentinel(self, d):
import os
if os.path.exists(os.path.join(d, self._sentinel)):
try:
os.remove(os.path.join(d, self._sentinel))
except:
logger.warning('Could not remove {} sentinel from {}'.format(self._sentinel, d))
else:
logger.info('Removed existing {} sentinel from {}'.format(self._sentinel, d))
def _getRunScript(self, d):
import os
runscript = os.path.abspath(os.path.join(d, 'run.sh'))
if not os.path.exists(runscript):
raise FileExistsError('File {} does not exist.'.format(runscript))
if not os.access(runscript, os.X_OK):
raise PermissionError('File {} does not have execution permissions.'.format(runscript))
return runscript
@abstractmethod
def stop(self):
""" Subclasses need to implement this method """
pass
@property
@abstractmethod
def ncpu(self):
""" Subclasses need to have this property """
pass
@ncpu.setter
@abstractmethod
def ncpu(self, value):
""" Subclasses need to have this setter """
pass
@property
@abstractmethod
def ngpu(self):
""" Subclasses need to have this property """
pass
@ngpu.setter
@abstractmethod
def ngpu(self, value):
""" Subclasses need to have this setter """
pass
@property
@abstractmethod
def memory(self):
""" Subclasses need to have this property. This property is expected to return a integer in MiB"""
pass
@memory.setter
@abstractmethod
def memory(self, value):
""" Subclasses need to have this setter """
pass