forked from CMSCompOps/WorkflowWebTools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkflowinfo.py
580 lines (431 loc) · 17.8 KB
/
workflowinfo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
#pylint: skip-file
"""
Module containing and returning information about workflows.
:authors: Daniel Abercrombie <dabercro@mit.edu>
"""
from __future__ import print_function
import os
import re
import json
import time
import datetime
import threading
from collections import defaultdict
from functools import wraps
from cmstoolbox.webtools import get_json
from cmstoolbox.sitereadiness import site_list
from . import serverconfig
def cached_json(attribute, timeout=None):
"""
A decorator for caching dictionaries in local files.
:param str attribute: The key of the :py:class:`WorkflowInfo` cache to
set using the decorated function.
:param int timeout: The amount of time before refreshing the JSON file, in seconds.
:returns: Function decorator
:rtype: func
"""
def cache_decorator(func):
"""
The actual decorator (since decorator takes an argument)
:param func func: A function to modify
:returns: Decorated function
:rtype: func
"""
@wraps(func)
def function_wrapper(self, *args, **kwargs):
"""
Executes the :py:class:`WorkflowInfo` function
:returns: Output of the originally decorated function
:rtype: dict
"""
tmout = timeout or serverconfig.config_dict()['cache_refresh'].get(attribute)
if not os.path.exists(self.cache_dir):
os.mkdir(self.cache_dir)
self.cachelock.acquire()
if attribute not in self.cachelocks:
self.cachelocks[attribute] = threading.Lock()
self.cachelocks[attribute].acquire()
self.cachelock.release()
check_var = self.cache.get(attribute)
if check_var is None:
file_name = self.cache_filename(attribute)
if os.path.exists(file_name) and \
(tmout is None or time.time() - tmout < os.stat(file_name).st_mtime):
try:
with open(file_name, 'r') as cache_file:
check_var = json.load(cache_file)
except ValueError:
print('JSON file no good. Deleting %s. Try again later.' % file_name)
os.remove(file_name)
# If still None, call the wrapped function
if check_var is None:
check_var = func(self, *args, **kwargs)
with open(file_name, 'w') as cache_file:
json.dump(check_var, cache_file)
self.cache[attribute] = check_var
self.cachelocks[attribute].release()
return check_var or {}
return function_wrapper
return cache_decorator
def list_workflows(status):
"""
Get the list of workflows currently in a given status.
For a list of valid requests, visit the
`Request Manager Interface <https://cmsweb.cern.ch/reqmgr2/>`_.
:param str status: The status of the workflow lists being looked for
:returns: A list of workflows matching the status
:rtype: list
"""
request = get_json('cmsweb.cern.ch', '/reqmgr2/data/request',
params={'status': status, 'detail': 'false'},
use_cert=True)
return request['result']
def errors_for_workflow(workflow, url='cmsweb.cern.ch'):
"""
Get the useful status information from a workflow
:param str workflow: the name of the workflow request
:param str url: the base url to find the information at
:returns: a dictionary containing error codes in the following format::
{step: {errorcode: {site: number_errors}}}
:rtype: dict
"""
result = get_json(url,
'/wmstatsserver/data/jobdetail/%s' % workflow,
use_cert=True)
output = {}
if not result['result']:
return output
for step, stepdata in result['result'][0].get(workflow, {}).items():
errors = {}
for code, codedata in stepdata.get('jobfailed', {}).items():
sites = {}
for site, sitedata in codedata.items():
if sitedata['errorCount']:
sites[site] = sitedata['errorCount']
if sites:
errors[code] = sites
if errors:
output[step] = errors
return output
def explain_errors(workflow, errorcode):
"""
Get example errors for a given workflow and errorcode
:param str workflow: is the workflow name
:param str errorcode: is the error code
:returns: a dict of log snippets from different sites.
:rtype: list
"""
result = get_json('cmsweb.cern.ch',
'/wmstatsserver/data/jobdetail/%s' % workflow,
use_cert=True)
output = []
if not result['results']:
return output
for stepdata in result['result'][0].get(workflow, {}).values():
for sitedata in stepdata.get('jobfailed', {}).get(errorcode, {}).values():
for samples in sitedata['samples'][0]['errors'].values():
output.extend(samples)
return output
class Info(object):
"""
Implements shared operations on the cache
"""
def __init__(self):
# Stores things using the cached_json decorator
self.cache = {}
self.cache_dir = os.path.join(os.environ.get('TMPDIR', '/tmp'), 'workflowinfo')
self.bak_dir = os.path.join(self.cache_dir, 'bak')
self.cachelock = threading.Lock()
self.cachelocks = {}
def __str__(self):
pass
def cache_filename(self, attribute):
"""
Return the name of the file for caching
:param str attribute: The information to store in the file
:returns: The full file name to store the cache
:rtype: str
"""
return os.path.join(self.cache_dir, '%s_%s.cache.json' % (self, attribute))
def reset(self):
"""
Reset the cache for this object and clear out the files.
"""
print('Reseting %s' % self)
if not os.path.exists(self.bak_dir):
os.mkdir(self.bak_dir)
for attribute in self.cache:
cache_file = self.cache_filename(attribute)
if os.path.exists(cache_file):
os.rename(cache_file, cache_file.replace(self.cache_dir, self.bak_dir))
self.cache.clear()
class WorkflowInfo(Info):
"""
Class that holds methods for accessing various information about a workflow.
"""
def __init__(self, workflow, url='cmsweb.cern.ch'):
"""
Initialize the workflow info class
:param str workflow: is the name of the workflow
:param str url: is the url to fetch information from
"""
super(WorkflowInfo, self).__init__()
self.workflow = workflow
self.url = url
# Is set first time get_explanation() is called
self.explanations = None
def __str__(self):
return 'workflowinfo_%s' % self.workflow
@cached_json('workflow_params')
def get_workflow_parameters(self):
"""
Get the workflow parameters from ReqMgr2, or returns a cached value.
See the `ReqMgr 2 wiki <https://github.com/dmwm/WMCore/wiki/reqmgr2-apis>`_
for more details.
:returns: Parameters for the workflow from ReqMgr2.
:rtype: dict
"""
try:
result = get_json(self.url,
'/reqmgr2/data/request',
params={'name': self.workflow},
use_https=True, use_cert=True)
for params in result['result']:
for key, item in params.items():
if key == self.workflow:
return item
except Exception as error:
print('Failed to get from reqmgr', self.workflow)
print(str(error))
return None
@cached_json('errors')
def get_errors(self, get_unreported=False):
"""
A wrapper for :py:func:`errors_for_workflow` if you happen to have
a :py:class:`WorkflowInfo` object already.
:param bool get_unreported: Get the unreported errors from ACDC server
:returns: a dictionary containing error codes in the following format::
{step: {errorcode: {site: number_errors}}}
:rtype: dict
"""
output = errors_for_workflow(self.workflow, self.url)
if get_unreported:
acdc_server_response = get_json(
'cmsweb.cern.ch', '/couchdb/acdcserver/_design/ACDC/_view/byCollectionName',
{'key': '"%s"' % self.workflow, 'include_docs': 'true', 'reduce': 'false'},
use_cert=True)
for row in acdc_server_response['rows']:
task = row['doc']['fileset_name']
new_output = output.get(task, {})
new_errorcode = new_output.get('NotReported', {})
for file_replica in row['doc']['files'].values():
for site in file_replica['locations']:
new_errorcode[site] = 0
new_output['NotReported'] = new_errorcode
output[task] = new_output
for step in list(output):
if True in [(steptype in step) for steptype in ['LogCollect', 'Cleanup']]:
output.pop(step)
return output
@cached_json('reqdetail')
def _get_reqdetail(self):
"""
Get the request detail from the wmstatsserver
:returns: The request detail json from the server or cache
:rtype: dict
"""
reqDetail = {self.workflow : {}}
raw = get_json(self.url,
'/wmstatsserver/data/request/%s' % self.workflow,
use_cert=True)
result = raw.get('result', [])
if not result: return reqDetail
reqDetail[self.workflow] = result[0].get(self.workflow, {})
return reqDetail
def get_failure_rate(self):
"""
Get failure rate if you happen to have a `WorkflowInfo` object
:returns: workflow's failure rate
:rtype: float
"""
result = self._get_reqdetail()
frate = 0.
wf_agents = result.get(self.workflow, {}).get('AgentJobInfo', {})
if not wf_agents:
return frate
nsuccess = 0
nfailure = 0
for agent, agentdata in wf_agents.items():
status = agentdata.get('status', {})
if not status: continue
nsuccess += status.get('success', 0)
for ftype, num in status.get('failure', {}).items():
nfailure += num
try:
frate = float(nfailure)/(nfailure+nsuccess)
except ZeroDivisionError:
pass
return frate
def sum_errors(self):
"""
:returns: The total number of errors reported by this workflow
:rtype: int
"""
errors = self.get_errors(True)
return sum([num for codes in errors.values() for sites in codes.values()
for num in sites.values()])
@cached_json('recovery_info')
def get_recovery_info(self):
"""
Get the recovery info for this workflow.
:returns: a dictionary containing the information used in recovery.
The keys in this dictionary are arranged like the following::
{ task: { 'sites_to_run': list(sites), 'missing_to_run': int() } }
:rtype: dict
"""
recovery_info = {}
docs = get_json(self.url,
'/couchdb/acdcserver/_design/ACDC/_view/byCollectionName',
params={'key': '"%s"' % self.workflow,
'include_docs': 'true',
'reduce': 'false'},
use_cert=True)
recovery_docs = [row['doc'] for row in docs.get('rows', [])]
site_white_list = set(self.get_workflow_parameters()['SiteWhitelist'])
for doc in recovery_docs:
task = doc['fileset_name']
# For each task, we have the following keys:
# sites - a set of sites that the recovery docs say to run on.
for replica, info in doc['files'].items():
# For fake files, just return the site whitelist
if replica.startswith('MCFakeFile'):
locations = site_white_list
else:
locations = set(info['locations'])
vals = recovery_info.get(task, {})
if not vals:
recovery_info[task] = {}
recovery_info[task]['sites_to_run'] = \
list(set(vals.get('sites_to_run', set())) | locations)
recovery_info[task]['missing_to_run'] = \
(vals.get('missing_to_run', 0) + info['events'])
return recovery_info
def site_to_run(self, task):
"""
Gets a list of sites that a task in the workflow can run at
:param str task: The full name of the task to find sites for
:returns: a list of site to run at
:rtype: list
"""
site_set = self.get_recovery_info().get(task, {}).get('sites_to_run', [])
out_list = []
all_site_list = site_list()
for site in site_set:
if site.startswith('T0_') or site.endswith('_MSS') or site.endswith('_Export'):
continue
clean_site = re.sub(r'_(ECHO_)?(Disk)$', '', site)
if clean_site not in out_list and clean_site and \
clean_site in all_site_list:
out_list.append(clean_site)
out_list.sort()
return out_list
@cached_json('jobdetail')
def _get_jobdetail(self):
"""
Get the jobdetail from the wmstatsserver
:returns: The job detail json from the server or cache
:rtype: dict
"""
return get_json(self.url,
'/wmstatsserver/data/jobdetail/%s' % self.workflow,
use_cert=True)
def get_explanation(self, errorcode, step=''):
"""
Gets a list of error logs for a given error code.
:param str errorcode: The error code to explain
:param str step: The full name of the step to return explanations from
:returns: list of error logs
:rtype: list
"""
if self.explanations is None:
self.explanations = defaultdict(lambda: defaultdict(lambda: []))
result = self._get_jobdetail()
for stepname, stepdata in result['result'][0].get(self.workflow, {}).items():
# Get the errors from both 'jobfailed' and 'submitfailed' details
for error, site in [(error, site) for status in ['jobfailed', 'submitfailed'] \
for error, site in stepdata.get(status, {}).items()]:
if error == '0':
continue
for sitename, samples in site.items():
for detail in [values for sample in samples['samples']
for errs in sample['errors'].values()
for values in errs]:
self.explanations[error][stepname].append('\n\n'.join(
['Site name: %s' % sitename,
'%s (Exit code: %s)' % (detail['type'], detail['exitCode']),
detail['details']]))
explain = self.explanations.get(errorcode, {'': ['No info for this error code']})
if step in explain.keys():
return explain[step]
return [val for expl in explain.values() for val in expl]
def get_prep_id(self):
"""
:returns: the PrepID for this workflow
:rtype: str
"""
return str(self.get_workflow_parameters().get('PrepID', 'NoPrepID'))
def get_monitoring_info(self):
"""
:returns: the information to send to CMSMONIT
:rtype: dict
"""
# Dummy call to get self.explanations filled
self.get_explanation(0)
return {
'errors': self.get_errors(True),
'prepID': self.get_prep_id(),
'params': self.get_workflow_parameters(),
'recovery': self.get_recovery_info(),
'logs': self.explanations
}
class PrepIDInfo(Info):
"""
A class that just holds a small amount of information about a given PrepID.
"""
def __init__(self, prep_id, url='cmsweb.cern.ch'):
super(PrepIDInfo, self).__init__()
self.prep_id = prep_id
self.url = url
def __str__(self):
return 'prepIDinfo_%s' % self.prep_id
@cached_json('requests')
def get_requests(self):
"""
:returns: The requests for the Prep ID from ReqMgr2 API
:rtype: dict
"""
if self.prep_id == 'NoPrepID':
return None
result = get_json(self.url, '/reqmgr2/data/request',
params={'prep_id': self.prep_id, 'detail': 'true'},
use_cert=True)
if not result['result']:
return None
return result['result'][0]
def get_workflows_requesttime(self):
"""
:returns: A list of tuples containing (workflow name, timestamp of request)
:rtype: list
"""
request = self.get_requests()
return [(workflow, time.mktime(datetime.datetime(*value['RequestDate']).timetuple())) \
for workflow, value in request.items()]
def get_workflows(self):
"""
:returns: A list of tuples containing (workflow name, timestamp of request)
:rtype: list
"""
return [
workflow for workflow, _ in
self.get_workflows_requesttime()
]