diff --git a/Unified/actor.py b/Unified/actor.py index f7372b64c..680b6cb15 100755 --- a/Unified/actor.py +++ b/Unified/actor.py @@ -1,6 +1,6 @@ #!/usr/bin/env python from assignSession import * -from utils import workflowInfo, sendEmail, componentInfo, campaignInfo, unifiedConfiguration, siteInfo, sendLog, setDatasetStatus, moduleLock, invalidate +from utils import workflowInfo, sendEmail, componentInfo, campaignInfo, unifiedConfiguration, siteInfo, sendLog, setDatasetStatus, moduleLock, invalidate, wtcInfo from utils import closeoutInfo, userLock import reqMgrClient import wtcClient @@ -342,6 +342,7 @@ def actor(url,options=None): SI = siteInfo() UC = unifiedConfiguration() WC = wtcClient() + WI = wtcInfo() action_list = WC.get_actions() if action_list is None: @@ -436,15 +437,11 @@ def actor(url,options=None): session.commit() #=========================================================== elif to_force: - wfi.sendLog('actor','Bypassing from workflow traffic controler request') - forcing = json.loads(open('/afs/cern.ch/user/v/vlimant/public/ops/forcecomplete.json').read()) - forcing.append( wfname ) - open('/afs/cern.ch/user/v/vlimant/public/ops/forcecomplete.json','w').write( json.dumps( sorted(set(forcing)) )) + wfi.sendLog('actor','Force-completing from workflow traffic controler request') + WI.add(action='force', keyword = wfname, user = action_list[wfname].get( 'user', 'unified')) elif to_hold: wfi.sendLog('actor','Holding on workflow traffic controler request') - holding = json.loads(open('/afs/cern.ch/user/v/vlimant/public/ops/onhold.json').read()) - holding.append( wfname ) - open('/afs/cern.ch/user/v/vlimant/public/ops/onhold.json','w').write( json.dumps( sorted(set(holding)) )) + WI.add(action='hold', keyword = wfname, user = action_list[wfname].get( 'user', 'unified')) #=========================================================== elif to_acdc: if 'AllSteps' in tasks: diff --git a/Unified/checkor.py b/Unified/checkor.py index 5279a750f..c80d75658 100755 --- a/Unified/checkor.py +++ b/Unified/checkor.py @@ -1,6 +1,6 @@ #!/usr/bin/env python from assignSession import * -from utils import getWorkflows, workflowInfo, getDatasetEventsAndLumis, findCustodialLocation, getDatasetEventsPerLumi, siteInfo, getDatasetPresence, campaignInfo, getWorkflowById, forceComplete, makeReplicaRequest, getDatasetSize, getDatasetFiles, sendLog, reqmgr_url, dbs_url, dbs_url_writer, getForceCompletes, display_time, checkMemory, ThreadHandler +from utils import getWorkflows, workflowInfo, getDatasetEventsAndLumis, findCustodialLocation, getDatasetEventsPerLumi, siteInfo, getDatasetPresence, campaignInfo, getWorkflowById, forceComplete, makeReplicaRequest, getDatasetSize, getDatasetFiles, sendLog, reqmgr_url, dbs_url, dbs_url_writer, display_time, checkMemory, ThreadHandler, wtcInfo from utils import componentInfo, unifiedConfiguration, userLock, moduleLock, dataCache, unified_url, getDatasetLumisAndFiles, getDatasetRuns, duplicateAnalyzer, invalidateFiles, findParent, do_html_in_each_module import phedexClient import dbs3Client @@ -120,48 +120,32 @@ def time_point(label="",sub_lap=False, percent=None): ## retrieve bypass and onhold configuration bypasses = [] forcings = [] - overrides = getForceCompletes() holdings = [] - - actors = UC.get('allowed_bypass') + WI = wtcInfo() + actors = [a for a,_ in UC.get('allowed_bypass')] + for user,extending in WI.getHold().items(): + if not user in actors: + print user,"is not allowed to hold" + continue + print user,"is holding",extending + holdings.extend( extending ) - for bypassor,email in actors: - bypass_file = '/afs/cern.ch/user/%s/%s/public/ops/bypass.json'%(bypassor[0],bypassor) - if not os.path.isfile(bypass_file): - #sendLog('checkor','no file %s',bypass_file) + for user,extending in WI.getBypass().items(): + if not user in actors: + print user,"is not allowed to bypass" continue - try: - print "Can read bypass from", bypassor - extending = json.loads(open(bypass_file).read()) - print bypassor,"is bypassing",json.dumps(sorted(extending)) - bypasses.extend( extending ) - except: - sendLog('checkor',"cannot get by-passes from %s for %s"%(bypass_file ,bypassor)) - sendEmail("malformated by-pass information","%s is not json readable"%(bypass_file), destination=[email]) + print user,"is bypassing",extending + bypasses.extend( extending ) - holding_file = '/afs/cern.ch/user/%s/%s/public/ops/onhold.json'%(bypassor[0],bypassor) - if not os.path.isfile(holding_file): - #sendLog('checkor',"no file %s"%holding_file) + overrides = {} + for user,extending in WI.getForce().items(): + if not user in actors: + print user,"is not allowed to force complete" continue - try: - extending = json.loads(open(holding_file).read()) - print bypassor,"is holding",json.dumps(sorted(extending)) - holdings.extend( extending ) - except: - sendLog('checkor',"cannot get holdings from %s for %s"%(holding_file, bypassor)) - sendEmail("malformated by-pass information","%s is not json readable"%(holding_file), destination=[email]) - - unhold= ['fabozzi_Run2016B-v2-Tau-07Aug17_ver2_8029_170831_201310_7397'] - for unh in unhold: - if unh in holdings: - holdings.remove(unh) - - - for rider,extending in overrides.items(): - print rider,"bypasses by forcecompleting",json.dumps(sorted(extending)) + print user,"is force-completing",extending bypasses.extend( extending ) - + overrides[user] = extending if use_mcm: ## this is a list of prepids that are good to complete diff --git a/Unified/completor.py b/Unified/completor.py index d8b68ad5d..a59613ec6 100755 --- a/Unified/completor.py +++ b/Unified/completor.py @@ -2,7 +2,7 @@ from assignSession import * import sys import reqMgrClient -from utils import workflowInfo, getWorkflowById, forceComplete, getDatasetEventsAndLumis, componentInfo, monitor_dir, reqmgr_url, unifiedConfiguration, getForceCompletes, getAllStuckDataset, monitor_pub_dir, moduleLock , eosFile, eosRead +from utils import workflowInfo, getWorkflowById, forceComplete, getDatasetEventsAndLumis, componentInfo, monitor_dir, reqmgr_url, unifiedConfiguration, getAllStuckDataset, monitor_pub_dir, moduleLock , eosFile, eosRead, wtcInfo from utils import campaignInfo, siteInfo, sendLog, sendEmail from collections import defaultdict import json @@ -85,7 +85,8 @@ def completor(url, specific): long_lasting = {} - overrides = getForceCompletes() + WI = wtcInfo() + overrides = WI.getForce() if use_mcm: ## add all workflow that mcm wants to get force completed mcm_force = mcm.get('/restapi/requests/forcecomplete') diff --git a/campaignsConfiguration.py b/campaignsConfiguration.py index 56d48b1d9..790ff2225 100644 --- a/campaignsConfiguration.py +++ b/campaignsConfiguration.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python from utils import mongo_client import optparse import ssl,pymongo diff --git a/utils.py b/utils.py index 9f998869b..3aa4015ff 100755 --- a/utils.py +++ b/utils.py @@ -6093,6 +6093,94 @@ def checkIfBlockIsAtASite(url,block,site): return False +class wtcInfo: + def __init__(self): + self.client = mongo_client() + self.db = self.client.unified.wtcInfo + + def add(self, action, keyword, user=None): + ##add an item for the action (hold, bypass, force) for the keyword + if not keyword: + print "blank keyword is not allowed" + return + n = time.gmtime() + now = time.mktime( n ) + nows = time.asctime( n ) + document= { + 'user' : user if user else os.environ.get('USER',None), + 'keyword' : keyword, + 'action' : action, + 'time' : now, + 'date' : nows} + self.db.update_one( {'keyword' : keyword}, + {"$set": document}, + upsert = True + ) + def sync(self): + + force = getForceCompletes() + for user,items in force.items(): + for item in items: + print user, item + self.add( action='force', keyword=item, user = user) + + + UC = unifiedConfiguration() + actors = UC.get('allowed_bypass') + + for bypassor,email in actors: + bypass_file = '/afs/cern.ch/user/%s/%s/public/ops/bypass.json'%(bypassor[0],bypassor) + if not os.path.isfile(bypass_file): + continue + try: + print "Can read bypass from", bypassor + extending = json.loads(open(bypass_file).read()) + print bypassor,"is bypassing",json.dumps(sorted(extending)) + for ex in extending: + self.add( action = 'bypass' , keyword = ex, user = bypassor) + except: + pass + + holding_file = '/afs/cern.ch/user/%s/%s/public/ops/onhold.json'%(bypassor[0],bypassor) + if not os.path.isfile(holding_file): + continue + try: + extending = json.loads(open(holding_file).read()) + print bypassor,"is holding",json.dumps(sorted(extending)) + for ex in extending: + self.add( action = 'hold' , keyword = ex, user = bypassor) + except: + pass + + def _get(self, action): + r= defaultdict(list) + for i in self.db.find({'action' : action}): + r[i['user']].append(i['keyword']) + return dict(r) + + def getHold(self): + return self._get('hold') + def getBypass(self): + return self._get('bypass') + def getForce(self): + return self._get('force') + + def clean(self): + wfns = [] + for s in ['announced','normal-archived','rejected','aborted']: + wfns.extend( getWorkflows( reqmgr_url , s)) + for item in self.db.find(): + key = item['keyword'] + if any([key in wfn for wfn in wfns]): + print item.get('keyword'),"can go" + self.db.delete_one({'_id' : item.get('_id',None)}) + + def remove(self, keyword): + ## will remove from the db anything that the item matches on + for item in self.db.find(): + if item.get('keyword',None) in keyword: + print item,"goes away" + self.db.delete_one({'_id' : item.get('_id',None)}) def getForceCompletes(): overrides = {} diff --git a/wtcActions.py b/wtcActions.py new file mode 100644 index 000000000..211316e3e --- /dev/null +++ b/wtcActions.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python +from utils import wtcInfo +import optparse +import os +user = os.environ.get('USER') + +parser = optparse.OptionParser() +parser.add_option('--action', choices=['hold','bypass','force']) +parser.add_option('--keyword') +parser.add_option('--pop',action='store_true') +(options,args) = parser.parse_args() + +WI = wtcInfo() +if options.pop: + WI.remove( options.keyword ) +else: + WI.add( action= options.action, + keyword = options.keyword, + user= user) +