-
Notifications
You must be signed in to change notification settings - Fork 39
/
condor_ce_router_defaults
executable file
·309 lines (272 loc) · 14.3 KB
/
condor_ce_router_defaults
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
#!/usr/bin/python
import os
import re
import pwd
import classad
JOB_ROUTER_DEFAULTS = """JOB_ROUTER_DEFAULTS_GENERATED @=jrd
[ MaxIdleJobs = 2000;
MaxJobs = $(CONDORCE_MAX_JOBS);
/* by default, accept all jobs */
Requirements = True;
/* now modify routed job attributes */
/* remove routed job if the client disappears for 48 hours or it is idle for 6 */
/*set_PeriodicRemove = (LastClientContact - time() > 48*60*60) ||
(JobStatus == 1 && (time() - QDate) > 6*60);*/
delete_PeriodicRemove = true;
delete_CondorCE = true;
delete_TotalSubmitProcs = true;
set_RoutedJob = true;
copy_environment = "orig_environment";
set_osg_environment = "@osg_environment@";
set_CondorCECollectorHost = "$(COLLECTOR_HOST)";
eval_set_environment = debug(strcat("HOME=", userHome(Owner, "/"), %s" ",
ifThenElse(orig_environment is undefined,
osg_environment,
strcat(osg_environment, " ", orig_environment))));
/* Set new requirements */
/* set_requirements = LastClientContact - time() < 30*60; */
set_requirements = True;
/* Note default memory request of 2GB */
/* Note yet another nested condition allow pass attributes (maxMemory,xcount,jobtype,queue)
via gWMS Factory described within ClassAd */
eval_set_OriginalMemory = ifThenElse(maxMemory isnt undefined,
maxMemory,
ifThenElse(default_maxMemory isnt undefined,
default_maxMemory,
2000));
/* Duplicate OriginalMemory expression and add remote_ prefix.
This passes the attribute from gridmanager to BLAHP. */
eval_set_remote_OriginalMemory = ifThenElse(maxMemory isnt undefined,
maxMemory,
ifThenElse(default_maxMemory isnt undefined,
default_maxMemory,
2000));
set_JOB_GLIDEIN_Memory = "$$(TotalMemory:0)";
set_JobMemory = JobIsRunning ? int(MATCH_EXP_JOB_GLIDEIN_Memory)*95/100 : OriginalMemory;
set_RequestMemory = ifThenElse(WantWholeNode is true,
!isUndefined(TotalMemory) ? TotalMemory*95/100 : JobMemory,
OriginalMemory);
eval_set_remote_queue = ifThenElse(batch_queue isnt undefined,
batch_queue,
ifThenElse(queue isnt undefined,
queue,
ifThenElse(default_queue isnt undefined,
default_queue,
"")));
/* HTCondor uses RequestCpus; blahp uses SMPGranularity and NodeNumber. Default is 1 core. */
copy_RequestCpus = "orig_RequestCpus";
eval_set_OriginalCpus = ifThenElse(xcount isnt undefined,
xcount,
ifThenElse(orig_RequestCpus isnt undefined,
ifThenElse(orig_RequestCpus > 1,
orig_RequestCpus,
ifThenElse(default_xcount isnt undefined,
default_xcount,
1)),
ifThenElse(default_xcount isnt undefined,
default_xcount,
1)));
set_GlideinCpusIsGood = !isUndefined(MATCH_EXP_JOB_GLIDEIN_Cpus) && (int(MATCH_EXP_JOB_GLIDEIN_Cpus) isnt error);
set_JOB_GLIDEIN_Cpus = "$$(ifThenElse(WantWholeNode is true, !isUndefined(TotalCpus) ? TotalCpus : JobCpus, OriginalCpus))";
set_JobIsRunning = (JobStatus =!= 1) && (JobStatus =!= 5) && GlideinCpusIsGood;
set_JobCpus = JobIsRunning ? int(MATCH_EXP_JOB_GLIDEIN_Cpus) : OriginalCpus;
set_RequestCpus = ifThenElse(WantWholeNode is true,
!isUndefined(TotalCpus) ? TotalCpus : JobCpus,
OriginalCpus);
eval_set_remote_SMPGranularity = ifThenElse(xcount isnt undefined,
xcount,
ifThenElse(default_xcount isnt undefined,
default_xcount,
1));
eval_set_remote_NodeNumber = ifThenElse(xcount isnt undefined,
xcount,
ifThenElse(default_xcount isnt undefined,
default_xcount,
1));
/* If remote_cerequirements is a string, BLAH will parse it as an expression before examining it */
eval_set_remote_cerequirements = strcat(ifThenElse(default_remote_cerequirements isnt undefined,
strcat(string(default_remote_cerequirements), " && "),
""),
ifThenElse(maxWallTime isnt undefined,
strcat("Walltime == ",
string(60*maxWallTime),
" && CondorCE == 1"),
ifThenElse(default_maxWallTime isnt undefined,
strcat("Walltime == ",
string(60*default_maxWallTime),
" && CondorCE == 1"),
"CondorCE == 1")));
copy_OnExitHold = "orig_OnExitHold";
set_OnExitHold = ifThenElse(orig_OnExitHold isnt undefined,
orig_OnExitHold,
false) ||
ifThenElse(minWalltime isnt undefined && RemoteWallClockTime isnt undefined,
RemoteWallClockTime < 60*minWallTime,
false);
copy_OnExitHoldReason = "orig_OnExitHoldReason";
set_OnExitHoldReason = ifThenElse((orig_OnExitHold isnt undefined) && orig_OnExitHold,
ifThenElse(orig_OnExitHoldReason isnt undefined,
orig_OnExitHoldReason,
strcat("The on_exit_hold expression (",
unparse(orig_OnExitHold),
") evaluated to TRUE.")),
ifThenElse(minWalltime isnt undefined &&
RemoteWallClockTime isnt undefined &&
(RemoteWallClockTime < 60*minWallTime),
strcat("The job's wall clock time, ",
int(RemoteWallClockTime/60),
"min, is less than the minimum specified by the job (",
minWalltime,
")"),
"Job held for unknown reason."));
copy_OnExitHoldSubCode = "orig_OnExitHoldSubCode";
set_OnExitHoldSubCode = ifThenElse((orig_OnExitHold isnt undefined) && orig_OnExitHold,
ifThenElse(orig_OnExitHoldSubCode isnt undefined,
orig_OnExitHoldSubCode,
1),
42);
set_AccountingGroupOSG = @accounting_group@;
eval_set_AccountingGroup = AccountingGroupOSG;
]
@jrd
"""
# Allow admins to prevent pilots from advertising back to the CE.
# https://ticket.grid.iu.edu/26666
advertise_pilots = '" CONDORCE_COLLECTOR_HOST=", CondorCECollectorHost, '
try:
if os.environ['DISABLE_GLIDEIN_ADS'].lower() == 'true':
advertise_pilots = ''
except KeyError:
pass
JOB_ROUTER_DEFAULTS = JOB_ROUTER_DEFAULTS % advertise_pilots
osg_environment_files = ["/var/lib/osg/osg-job-environment.conf",
"/var/lib/osg/osg-local-job-environment.conf"]
def parse_extattr(extattr_file):
if not os.path.exists(extattr_file):
return []
fd = open(extattr_file)
results = []
for line in fd:
if line.startswith("#"):
continue
line = line.strip()
if not line:
continue
info = line.rsplit(" ", 1)
if len(info) != 2:
continue
results.append((str(info[0].strip()), str(info[1]).strip()))
return results
def parse_uids(uid_file):
if not os.path.exists(uid_file):
return []
fd = open(uid_file)
results = []
for line in fd:
if line.startswith("#"):
continue
line = line.strip()
if not line:
continue
info = line.split(" ", 1)
if len(info) != 2:
continue
try:
uid = int(info[0])
results.append((pwd.getpwuid(uid).pw_name, str(info[1]).strip()))
except ValueError:
results.append((info[0], str(info[1]).strip()))
return results
def set_accounting_group(uid_file="/etc/osg/uid_table.txt", extattr_file="/etc/osg/extattr_table.txt"):
attr_mappings = parse_extattr(extattr_file)
uid_mappings = parse_uids(uid_file)
if not classad:
return "Owner"
elif not attr_mappings and not uid_mappings:
return None
accounting_group_str = ''
for mapping in uid_mappings:
accounting_group_str += 'ifThenElse(Owner == %s, strcat(%s, ".", Owner), ' % (classad.quote(mapping[0]),
classad.quote(mapping[1]))
for mapping in attr_mappings:
accounting_group_str += 'ifThenElse(regexp(%s, x509UserProxySubject), strcat(%s, ".", Owner), ' \
% (classad.quote(mapping[0]), classad.quote(mapping[1]))
accounting_group_str += 'ifThenElse(x509UserProxyFirstFQAN isnt Undefined && ' + \
'regexp(%s, x509UserProxyFirstFQAN), strcat(%s, ".", Owner), ' \
% (classad.quote(mapping[0]), classad.quote(mapping[1]))
accounting_group_str += "Owner" + ")"*(len(attr_mappings)*2+len(uid_mappings))
return accounting_group_str
def condor_env_escape(val):
"""
Escape the environment variable to match Condor's escape sequence.
From condor_submit's man page:
1 Put double quote marks around the entire argument string. Any literal
double quote marks within the string must be escaped by repeating the
double quote mark.
2 Use white space (space or tab characters) to separate environment
entries.
3 To put any white space in an environment entry, surround the space and
as much of the surrounding entry as desired with single quote marks.
4 To insert a literal single quote mark, repeat the single quote mark
anywhere inside of a section surrounded by single quote marks.
THIS IS NOT A GENERIC ESCAPER; we assume this only works on the OSG
environment file format. We also assume the input is valid.
"""
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
val = val.replace('\\', '') # Nuke escape sequences.
val = val.replace('"', '""')
val = val.replace("'", "''")
return "'" + val + "'"
export_line_re = re.compile(r'^export\s+([a-zA-Z_]\w*)')
variable_line_re = re.compile(r'([a-zA-Z_]\w*)=(.+)')
shell_var_re = re.compile(r'"?\$(\w*)"?')
def read_osg_environment_file(filename):
"""
Parse the OSG environment file.
This file is maddening because it APPEARS to be a file you can source
with bash; however, it has a very limited syntax.
"""
fd = open(filename, 'r')
export_lines = []
env = {}
for line in fd.readlines():
line = line.strip()
# Ignore comments
if line.startswith("#"):
continue
m = export_line_re.match(line)
if m:
export_lines.append(m.group(1))
m = variable_line_re.match(line)
if m:
(job_var, value) = m.groups()
shell_var = shell_var_re.match(value)
if shell_var:
ce_var = os.getenv(shell_var.group(1))
if ce_var:
env[job_var] = condor_env_escape(ce_var)
else:
env[job_var] = condor_env_escape(value)
return dict([(i[0], i[1]) for i in env.items() if i[0] in export_lines])
def main():
master_env = {}
env_file_contents = []
# Read in the environment files.
for filename in osg_environment_files:
if os.path.exists(filename) and os.access(filename, os.R_OK):
env_file_contents.append(read_osg_environment_file(filename))
# Merge the file contents, in order
for contents in env_file_contents:
for key, val in contents.items():
master_env[key] = val
# Mangle environment string
env_string = " ".join(["%s=%s" % (i[0], i[1]) for i in master_env.items()])
accounting_group_str = set_accounting_group()
defaults_with_env = JOB_ROUTER_DEFAULTS.replace("@osg_environment@", env_string)
try:
print defaults_with_env.replace("@accounting_group@", accounting_group_str)
except TypeError: # if no accounting group mappings, remove them from the classad
print re.sub(r'.*AccountingGroupOSG.*\n', '', defaults_with_env)
if __name__ == "__main__":
main()