Skip to content
This repository was archived by the owner on Sep 12, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions Unified/actor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python
from assignSession import *
from utils import workflowInfo, sendEmail, componentInfo, campaignInfo, unifiedConfiguration, siteInfo, sendLog, setDatasetStatus, moduleLock, invalidate
from utils import workflowInfo, sendEmail, componentInfo, campaignInfo, unifiedConfiguration, siteInfo, sendLog, setDatasetStatus, moduleLock, invalidate, wtcInfo
from utils import closeoutInfo, userLock
import reqMgrClient
import wtcClient
Expand Down Expand Up @@ -342,6 +342,7 @@ def actor(url,options=None):
SI = siteInfo()
UC = unifiedConfiguration()
WC = wtcClient()
WI = wtcInfo()

action_list = WC.get_actions()
if action_list is None:
Expand Down Expand Up @@ -436,15 +437,11 @@ def actor(url,options=None):
session.commit()
#===========================================================
elif to_force:
wfi.sendLog('actor','Bypassing from workflow traffic controler request')
forcing = json.loads(open('/afs/cern.ch/user/v/vlimant/public/ops/forcecomplete.json').read())
forcing.append( wfname )
open('/afs/cern.ch/user/v/vlimant/public/ops/forcecomplete.json','w').write( json.dumps( sorted(set(forcing)) ))
wfi.sendLog('actor','Force-completing from workflow traffic controler request')
WI.add(action='force', keyword = wfname, user = action_list[wfname].get( 'user', 'unified'))
elif to_hold:
wfi.sendLog('actor','Holding on workflow traffic controler request')
holding = json.loads(open('/afs/cern.ch/user/v/vlimant/public/ops/onhold.json').read())
holding.append( wfname )
open('/afs/cern.ch/user/v/vlimant/public/ops/onhold.json','w').write( json.dumps( sorted(set(holding)) ))
WI.add(action='hold', keyword = wfname, user = action_list[wfname].get( 'user', 'unified'))
#===========================================================
elif to_acdc:
if 'AllSteps' in tasks:
Expand Down
56 changes: 20 additions & 36 deletions Unified/checkor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python
from assignSession import *
from utils import getWorkflows, workflowInfo, getDatasetEventsAndLumis, findCustodialLocation, getDatasetEventsPerLumi, siteInfo, getDatasetPresence, campaignInfo, getWorkflowById, forceComplete, makeReplicaRequest, getDatasetSize, getDatasetFiles, sendLog, reqmgr_url, dbs_url, dbs_url_writer, getForceCompletes, display_time, checkMemory, ThreadHandler
from utils import getWorkflows, workflowInfo, getDatasetEventsAndLumis, findCustodialLocation, getDatasetEventsPerLumi, siteInfo, getDatasetPresence, campaignInfo, getWorkflowById, forceComplete, makeReplicaRequest, getDatasetSize, getDatasetFiles, sendLog, reqmgr_url, dbs_url, dbs_url_writer, display_time, checkMemory, ThreadHandler, wtcInfo
from utils import componentInfo, unifiedConfiguration, userLock, moduleLock, dataCache, unified_url, getDatasetLumisAndFiles, getDatasetRuns, duplicateAnalyzer, invalidateFiles, findParent, do_html_in_each_module
import phedexClient
import dbs3Client
Expand Down Expand Up @@ -120,48 +120,32 @@ def time_point(label="",sub_lap=False, percent=None):
## retrieve bypass and onhold configuration
bypasses = []
forcings = []
overrides = getForceCompletes()
holdings = []


actors = UC.get('allowed_bypass')
WI = wtcInfo()
actors = [a for a,_ in UC.get('allowed_bypass')]
for user,extending in WI.getHold().items():
if not user in actors:
print user,"is not allowed to hold"
continue
print user,"is holding",extending
holdings.extend( extending )

for bypassor,email in actors:
bypass_file = '/afs/cern.ch/user/%s/%s/public/ops/bypass.json'%(bypassor[0],bypassor)
if not os.path.isfile(bypass_file):
#sendLog('checkor','no file %s',bypass_file)
for user,extending in WI.getBypass().items():
if not user in actors:
print user,"is not allowed to bypass"
continue
try:
print "Can read bypass from", bypassor
extending = json.loads(open(bypass_file).read())
print bypassor,"is bypassing",json.dumps(sorted(extending))
bypasses.extend( extending )
except:
sendLog('checkor',"cannot get by-passes from %s for %s"%(bypass_file ,bypassor))
sendEmail("malformated by-pass information","%s is not json readable"%(bypass_file), destination=[email])
print user,"is bypassing",extending
bypasses.extend( extending )

holding_file = '/afs/cern.ch/user/%s/%s/public/ops/onhold.json'%(bypassor[0],bypassor)
if not os.path.isfile(holding_file):
#sendLog('checkor',"no file %s"%holding_file)
overrides = {}
for user,extending in WI.getForce().items():
if not user in actors:
print user,"is not allowed to force complete"
continue
try:
extending = json.loads(open(holding_file).read())
print bypassor,"is holding",json.dumps(sorted(extending))
holdings.extend( extending )
except:
sendLog('checkor',"cannot get holdings from %s for %s"%(holding_file, bypassor))
sendEmail("malformated by-pass information","%s is not json readable"%(holding_file), destination=[email])

unhold= ['fabozzi_Run2016B-v2-Tau-07Aug17_ver2_8029_170831_201310_7397']
for unh in unhold:
if unh in holdings:
holdings.remove(unh)


for rider,extending in overrides.items():
print rider,"bypasses by forcecompleting",json.dumps(sorted(extending))
print user,"is force-completing",extending
bypasses.extend( extending )

overrides[user] = extending

if use_mcm:
## this is a list of prepids that are good to complete
Expand Down
5 changes: 3 additions & 2 deletions Unified/completor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from assignSession import *
import sys
import reqMgrClient
from utils import workflowInfo, getWorkflowById, forceComplete, getDatasetEventsAndLumis, componentInfo, monitor_dir, reqmgr_url, unifiedConfiguration, getForceCompletes, getAllStuckDataset, monitor_pub_dir, moduleLock , eosFile, eosRead
from utils import workflowInfo, getWorkflowById, forceComplete, getDatasetEventsAndLumis, componentInfo, monitor_dir, reqmgr_url, unifiedConfiguration, getAllStuckDataset, monitor_pub_dir, moduleLock , eosFile, eosRead, wtcInfo
from utils import campaignInfo, siteInfo, sendLog, sendEmail
from collections import defaultdict
import json
Expand Down Expand Up @@ -85,7 +85,8 @@ def completor(url, specific):

long_lasting = {}

overrides = getForceCompletes()
WI = wtcInfo()
overrides = WI.getForce()
if use_mcm:
## add all workflow that mcm wants to get force completed
mcm_force = mcm.get('/restapi/requests/forcecomplete')
Expand Down
1 change: 1 addition & 0 deletions campaignsConfiguration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/env python
from utils import mongo_client
import optparse
import ssl,pymongo
Expand Down
88 changes: 88 additions & 0 deletions utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6093,6 +6093,94 @@ def checkIfBlockIsAtASite(url,block,site):

return False

class wtcInfo:
def __init__(self):
self.client = mongo_client()
self.db = self.client.unified.wtcInfo

def add(self, action, keyword, user=None):
##add an item for the action (hold, bypass, force) for the keyword
if not keyword:
print "blank keyword is not allowed"
return
n = time.gmtime()
now = time.mktime( n )
nows = time.asctime( n )
document= {
'user' : user if user else os.environ.get('USER',None),
'keyword' : keyword,
'action' : action,
'time' : now,
'date' : nows}
self.db.update_one( {'keyword' : keyword},
{"$set": document},
upsert = True
)
def sync(self):

force = getForceCompletes()
for user,items in force.items():
for item in items:
print user, item
self.add( action='force', keyword=item, user = user)


UC = unifiedConfiguration()
actors = UC.get('allowed_bypass')

for bypassor,email in actors:
bypass_file = '/afs/cern.ch/user/%s/%s/public/ops/bypass.json'%(bypassor[0],bypassor)
if not os.path.isfile(bypass_file):
continue
try:
print "Can read bypass from", bypassor
extending = json.loads(open(bypass_file).read())
print bypassor,"is bypassing",json.dumps(sorted(extending))
for ex in extending:
self.add( action = 'bypass' , keyword = ex, user = bypassor)
except:
pass

holding_file = '/afs/cern.ch/user/%s/%s/public/ops/onhold.json'%(bypassor[0],bypassor)
if not os.path.isfile(holding_file):
continue
try:
extending = json.loads(open(holding_file).read())
print bypassor,"is holding",json.dumps(sorted(extending))
for ex in extending:
self.add( action = 'hold' , keyword = ex, user = bypassor)
except:
pass

def _get(self, action):
r= defaultdict(list)
for i in self.db.find({'action' : action}):
r[i['user']].append(i['keyword'])
return dict(r)

def getHold(self):
return self._get('hold')
def getBypass(self):
return self._get('bypass')
def getForce(self):
return self._get('force')

def clean(self):
wfns = []
for s in ['announced','normal-archived','rejected','aborted']:
wfns.extend( getWorkflows( reqmgr_url , s))
for item in self.db.find():
key = item['keyword']
if any([key in wfn for wfn in wfns]):
print item.get('keyword'),"can go"
self.db.delete_one({'_id' : item.get('_id',None)})

def remove(self, keyword):
## will remove from the db anything that the item matches on
for item in self.db.find():
if item.get('keyword',None) in keyword:
print item,"goes away"
self.db.delete_one({'_id' : item.get('_id',None)})

def getForceCompletes():
overrides = {}
Expand Down
20 changes: 20 additions & 0 deletions wtcActions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env python
from utils import wtcInfo
import optparse
import os
user = os.environ.get('USER')

parser = optparse.OptionParser()
parser.add_option('--action', choices=['hold','bypass','force'])
parser.add_option('--keyword')
parser.add_option('--pop',action='store_true')
(options,args) = parser.parse_args()

WI = wtcInfo()
if options.pop:
WI.remove( options.keyword )
else:
WI.add( action= options.action,
keyword = options.keyword,
user= user)