/
Cluster.py
186 lines (160 loc) · 5.86 KB
/
Cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import time
from getpass import getuser
# functions about using anthill cluster
from General import *
class jobOnCluster:
def __init__(self, cmds, myid='', label=''): # can skip myid and label if want to be simple
self.cmds = cmds # this is a list
self.running = 0
self.jobid = ''
self.tried = 0
self.myid = myid # this is an easy identifier (usually a string) for user to determine where problems come from
self.label = label # this is a file indicate whether the job has finished
self.type = 'local' # set to SGE to use Sun Grid Engine
def submit(self, time):
if (self.type == 'sge'):
self.jobid = qsub(self.cmds, hrs=int(time))
self.running = 1
self.tried += 1
elif (self.type == 'local'):
self.jobid = "\n".join(self.cmds)
self.err = os.system("\n".join(self.cmds))
self.running = 0
self.tried += 1
def checkjob(self):
if self.checkJobRun() == 0:
self.running = 0
def checkJobRun(self):
if (self.type == 'sge'):
while True:
try:
out = sub.check_output('qstat', stderr=sub.STDOUT, shell=True)
break
except:
time.sleep(5)
continue
if out == None:
return 0
out = out.split('\n')
for l in out:
arr = l.split()
if arr == []:
continue
if (arr[0] == self.jobid) and (not arr[4] in ['dr']):
return 1
return 0
elif (self.type == 'local'):
return self.err
def checkfinish(self): # label is a file when it exists it means the job has finished
if os.path.isfile(self.label) == True:
return 1
else:
return 0
def qsub(cmds, user='', fileName=None, mem=2, hrs=3, ironfs=False, opts=[], maxJobs=2500, avoidNode = []):
assert mem > 0, 'memory must be greater than 0'
assert isinstance(hrs, int) and hrs >= 1, 'hrs must be and integer greater than 0'
if fileName == None:
fileName = 'script-%.15f.sh' % time.time()
with open(fileName, 'w') as fH:
fH.write('#!bin/bash\n\n')
fH.write('#$ -cwd\n')
fH.write('#$ -j y\n')
fH.write('#$ -V\n')
fH.write('#$ -l vf=' + str(mem) + 'G\n')
fH.write('#$ -l h_rt=' + str(hrs - 1) + ':59:59\n')
if len(avoidNode) > 0:
fH.write('#$ -l hostname=\'!' + '|'.join(avoidNode) + '\'\n')
if ironfs == True:
fH.write('#$ -l ironfs\n')
for opt in opts:
if opt != None:
fH.write('\n' + opt + '\n')
for cmd in cmds:
fH.write('\n' + cmd)
qsubCall = ['qsub', fileName]
if isinstance(maxJobs, int):
while numJobs(user) >= maxJobs:
time.sleep(2)
p = sub.Popen(qsubCall, stdout=sub.PIPE)
std_out = p.communicate()[0]
print(std_out)
jobid = parseJobID(std_out)
return jobid # will return job id
def waitJobs(jobs, type = 'list', sleep_time=120, rerun_time=24, giveup_time=3, subdir = True):
success = True
odir = os.getcwd()
if type == 'list':
assert isinstance(jobs, list)
while len(jobs)>0:
for j in jobs:
j.checkjob()
if j.running == 0:
jobs.remove(j)
if j.checkfinish() == 0:
print('Job about ' + j.myid + ' may have died ...')
if j.tried > giveup_time:
print('have failed %d times, give up ...' % (giveup_time+1))
success = False
continue
print('resubmitting ... ')
if subdir:
os.chdir(j.myid)
j.submit(rerun_time)
jobs.append(j)
os.chdir(odir)
else:
time.sleep(sleep_time)
print('Running, '+ str(len(jobs)) + ' jobs left ...')
if type == 'dict':
assert isinstance(jobs, dict)
while len(jobs)>0:
for k in jobs.keys():
j = jobs[k]
j.checkjob()
if j.running == 0:
jobs.pop(k)
if j.checkfinish() == 0:
print('Job about ' + j.myid + ' may have died ...')
if j.tried > giveup_time:
print('have failed %d times, give up ...' % (giveup_time+1))
success = False
continue
print('resubmitting ... ')
if subdir:
os.chdir(j.myid)
j.submit(rerun_time)
jobs[k] = j
os.chdir(odir)
print('Running, '+ str(len(jobs)) + ' jobs left ...')
time.sleep(sleep_time)
return success
def numJobs(user=''):
if user == '':
user = getuser()
while True:
out = sub.check_output('qstat -u ' + user + ' | grep "" | wc -l', stderr=sub.STDOUT, shell=True)
try:
njobs = int(out) - 2
break
except:
time.sleep(5)
continue
return max(0, njobs)
def parseJobID(string):
id = re.search('job\s(\d+)', string)
if id != None:
return id.group(1)
def createLocalSpace(user=''):
if user == '':
user = getuser()
while True:
rint = str(random.randint(0, 1000000)).rjust(6, '0')
ldir = '/data/scratch/'+ user + '/' + rint
if os.path.isdir(ldir):
continue
else:
break
os.makedirs(ldir)
return ldir
def destroyLocalSpace(ldir):
shutil.rmtree(ldir)