diff --git a/Unified/assignor.py b/Unified/assignor.py index a2caa6cd0..c11d1168c 100755 --- a/Unified/assignor.py +++ b/Unified/assignor.py @@ -2,20 +2,16 @@ from assignSession import * import reqMgrClient from utils import workflowInfo, campaignInfo, siteInfo, userLock, unifiedConfiguration, reqmgr_url, monitor_pub_dir, monitor_dir, global_SI -from utils import getWorkLoad, getDatasetPresence, getDatasets, findCustodialLocation, getDatasetBlocksFraction, getDatasetEventsPerLumi, getLFNbase, getDatasetBlocks, lockInfo, isHEPCloudReady, do_html_in_each_module +from utils import getDatasetEventsPerLumi, getLFNbase, lockInfo, isHEPCloudReady, do_html_in_each_module from utils import componentInfo, sendEmail, sendLog, getWorkflows, closeAllBlocks, eosRead #from utils import lockInfo -from utils import moduleLock, notRunningBefore +from utils import moduleLock import optparse -import itertools -import time from htmlor import htmlor -import os import random import json import copy import os -import sys def assignor(url ,specific = None, talk=True, options=None): if userLock() and not options.manual: return @@ -61,7 +57,6 @@ def assignor(url ,specific = None, talk=True, options=None): #if options.partial and not specific: # pass - dataset_endpoints = json.loads(eosRead('%s/dataset_endpoints.json'%monitor_dir)) aaa_mapping = json.loads(eosRead('%s/equalizor.json'%monitor_pub_dir))['mapping'] all_stuck = set() all_stuck.update( json.loads(eosRead('%s/stuck_transfers.json'%monitor_pub_dir) )) @@ -110,13 +105,9 @@ def rank( wfn ): options_text="" if options.early: options_text+=", early option is ON" - if options.partial: - options_text+=", partial option is ON" - options_text+=", good fraction is %.2f"%options.good_enough - - wfh.sendLog('assignor',"%s to be assigned%s"%(wfo.name, options_text)) + wfh.sendLog('assignor',"%s to be assigned %s"%(wfo.name, options_text)) ## the site whitelist takes into account siteInfo, campaignInfo, memory and cores (lheinput,primary,parent,secondary, sites_allowed) = wfh.getSiteWhiteList() @@ -200,19 +191,15 @@ def rank( wfn ): continue - original_sites_allowed = copy.deepcopy( sites_allowed ) wfh.sendLog('assignor',"Site white list %s"%sorted(sites_allowed)) - override_sec_location = CI.get(wfh.request['Campaign'], 'SecondaryLocation', []) blocks = wfh.getBlocks() if blocks: wfh.sendLog('assignor',"Needs {} blocks in input {}".format(len(blocks), '\n'.join(blocks))) wfh.sendLog('assignor',"Allowed %s"%sorted(sites_allowed)) - secondary_locations=None primary_aaa = options.primary_aaa secondary_aaa = options.secondary_aaa - do_partial = False #options.good_enough if options.partial else 0 if 'Campaign' in wfh.request and wfh.request['Campaign'] in CI.campaigns: assign_parameters.update( CI.campaigns[wfh.request['Campaign']] ) @@ -221,131 +208,22 @@ def rank( wfn ): primary_aaa = primary_aaa or assign_parameters['primary_AAA'] if 'secondary_AAA' in assign_parameters: secondary_aaa = secondary_aaa or assign_parameters['secondary_AAA'] - if 'partial_copy' in assign_parameters: - ## can this only work if there is a stuck input ? maybe not - ## this is a number. 0 means no - print "Could do partial disk copy assignment" - if is_stuck or options.partial: - do_partial = assign_parameters['partial_copy'] - wfh.sendLog('assignor',"Overiding partial copy assignment to %.2f fraction"% do_partial) - #sendEmail('stuck input to assignment','%s is stuck for assigning %s and going fractional'%(','.join( is_stuck), wfo.name)) - - do_partial = options.good_enough if options.partial else do_partial - - - for sec in list(secondary): - if override_sec_location: - print "We don't care where the secondary is" - print "Cannot pass for now" - #sendEmail("tempting to pass sec location check","but we cannot yet IMO") - #pass - - presence = getDatasetPresence( url, sec ) - print sec - print json.dumps(presence, indent=2) - one_secondary_locations = [site for (site,(there,frac)) in presence.items() if frac>98.] - - if secondary_aaa: - if not one_secondary_locations: - sec_availability = getDatasetBlocksFraction( url, sec ) - if sec_availability >=1. and options.go: - ## there is at least one copy of each block on disk. We should go ahead and let it go. - wfh.sendLog('assignor',"The secondary %s is available %s times on disk, and usable"%( sec, sec_availability)) - else: - ## not even a copy on disk anywhere !!!! - sites_allowed = [] ## will block the assignment - wfh.sendLog('assignor',"The secondary %s is nowhere on disk"% sec) - #just continue without checking - continue - #one_secondary_locations = [site for (site,(there,frac)) in presence.items() if there] - if secondary_locations==None: - secondary_locations = one_secondary_locations - else: - secondary_locations = list(set(secondary_locations) & set(one_secondary_locations)) - ## reduce the site white list to site with secondary only - #sites_allowed = [site for site in sites_allowed if any([osite.startswith(site) for osite in one_secondary_locations])] - sites_allowed = [site for site in sites_allowed if SI.CE_to_SE(site) in one_secondary_locations] - - wfh.sendLog('assignor',"Intersecting with secondary requirement, now allowed %s"%sorted(sites_allowed)) + wfh.sendLog('assignor',"Initial values for primary_AAA=%s and secondary_AAA=%s"%(primary_aaa, secondary_aaa)) - initial_sites_allowed = copy.deepcopy( sites_allowed ) ## keep track of this, after secondary input location restriction : that's how you want to operate it + ## keep track of this, after secondary input location restriction : that's how you want to operate it + initial_sites_allowed = copy.deepcopy( sites_allowed ) - sites_all_data = copy.deepcopy( sites_allowed ) - sites_with_data = copy.deepcopy( sites_allowed ) - sites_with_any_data = copy.deepcopy( sites_allowed ) - primary_locations = None - available_fractions = {} set_lfn = '/store/mc' ## by default - endpoints = set() for prim in list(primary): - if prim in dataset_endpoints: - print "endpoints from stagor",dataset_endpoints[prim] - endpoints.update( dataset_endpoints[prim] ) set_lfn = getLFNbase( prim ) ## if they are requested for processing, they should bbe all closed already + # FIXME: remove this closeAllBlocks closeAllBlocks(url, prim, blocks) - presence = getDatasetPresence( url, prim , only_blocks=blocks) - if talk: - print prim - print json.dumps(presence, indent=2) - available_fractions[prim] = getDatasetBlocksFraction(url, prim, sites = [SI.CE_to_SE(site) for site in sites_allowed] , only_blocks = blocks) - if primary_aaa: - available_fractions[prim] = getDatasetBlocksFraction(url, prim, only_blocks = blocks) - - sites_all_data = [site for site in sites_with_data if SI.CE_to_SE(site) in [psite for (psite,(there,frac)) in presence.items() if there]] - if primary_aaa: - sites_all_data = set() - for (psite,(there,frac)) in presence.items(): - if there: - sites_all_data.update( SI.SE_to_CEs(psite) ) - sites_all_data = list(sites_all_data) - #sites_all_data = list(set([SI.SE_to_CE(psite) for (psite,(there,frac)) in presence.items() if there])) - sites_with_data = [site for site in sites_with_data if SI.CE_to_SE(site) in [psite for (psite,frac) in presence.items() if frac[1]>90.]] - sites_with_any_data = [site for site in sites_with_any_data if SI.CE_to_SE(site) in presence.keys()] - if primary_aaa: - sites_with_any_data = set() - for psite in presence.keys(): - sites_with_any_data.update( SI.SE_to_CEs(psite) ) - sites_with_any_data = list(sites_with_any_data) - #sites_with_any_data = list(set([SI.SE_to_CE(psite) for psite in presence.keys()])) - - holding_but_not_allowed = set() - for se_site in presence.keys(): - if not (set(SI.SE_to_CEs(se_site)) & set(sites_allowed)): - holding_but_not_allowed.add( se_site ) - #wfh.sendLog('assignor',"Holding the data but not allowed %s"%sorted(list(set([se_site for se_site in presence.keys() if not SI.SE_to_CE(se_site) in sites_allowed])))) - wfh.sendLog('assignor',"Holding the data but not allowed %s"%sorted( holding_but_not_allowed )) - if primary_locations==None: - primary_locations = presence.keys() - else: - primary_locations = list(set(primary_locations) & set(presence.keys() )) - - sites_with_data = list(set(sites_with_data)) - sites_with_any_data = list(set(sites_with_any_data)) - - opportunistic_sites=[] - down_time = False - ## opportunistic running where any piece of data is available - if secondary_locations or primary_locations: - ## intersection of both any pieces of the primary and good IO - #opportunistic_sites = [SI.SE_to_CE(site) for site in list((set(secondary_locations) & set(primary_locations) & set(SI.sites_with_goodIO)) - set(sites_allowed))] - if secondary_locations and primary_locations: - opportunistic_sites = [SI.SE_to_CE(site) for site in list((set(secondary_locations) & set(primary_locations)) - set([SI.CE_to_SE(site) for site in sites_allowed]))] - elif primary_locations: - opportunistic_sites = [SI.SE_to_CE(site) for site in list(set(primary_locations) - set([SI.CE_to_SE(site) for site in sites_allowed]))] - else: - opportunistic_sites = [] - wfh.sendLog('assignor',"We could be running in addition at %s"% sorted(opportunistic_sites)) - if any([osite in SI.sites_not_ready for osite in opportunistic_sites]): - wfh.sendLog('assignor',"One of the usable site is in downtime %s"%([osite for osite in opportunistic_sites if osite in SI.sites_not_ready])) - down_time = True - ## should this be send back to considered ? - ## should be 2 but for the time-being let's lower it to get things going - copies_wanted,cpuh = wfh.getNCopies() + _copies_wanted,cpuh = wfh.getNCopies() wfh.sendLog('assignor',"we need %s CPUh"%cpuh) if cpuh>max_cpuh_block and not options.go: #sendEmail('large workflow','that wf %s has a large number of CPUh %s, not assigning, please check the logs'%(wfo.name, cpuh))#,destination=['Dmytro.Kovalskyi@cern.ch']) @@ -353,31 +231,19 @@ def rank( wfn ): wfh.sendLog('assignor',"Requiring a large number of CPUh %s, not assigning"%cpuh) continue - if 'Campaign' in wfh.request and wfh.request['Campaign'] in CI.campaigns and 'maxcopies' in CI.campaigns[wfh.request['Campaign']]: - copies_needed_from_campaign = CI.campaigns[wfh.request['Campaign']]['maxcopies'] - copies_wanted = min(copies_needed_from_campaign, copies_wanted) - - if not options.early: - less_copies_than_requested = UC.get("less_copies_than_requested") - copies_wanted = max(1,copies_wanted-less_copies_than_requested) # take one out for the efficiency - else: - ## find out whether there is a site in the whitelist, that is lacking jobs and reduce to 1 copy needed to get things going - pass - - wfh.sendLog('assignor',"needed availability fraction %s"% copies_wanted) - ## should also check on number of sources, if large enough, we should be able to overflow most, efficiently ## default back to white list to original white list with any data wfh.sendLog('assignor',"Allowed sites :%s"% sorted(sites_allowed)) + # TODO Alan on 1/april/2020: keep the AAA functionality if primary_aaa: ## remove the sites not reachable localy if not in having the data - if not sites_all_data: + if not sites_allowed: wfh.sendLog('assignor',"Overiding the primary on AAA setting to Off") primary_aaa=False else: - aaa_grid = set(sites_all_data) + aaa_grid = set(sites_allowed) for site in list(aaa_grid): aaa_grid.update( aaa_mapping.get(site,[]) ) sites_allowed = list(set(initial_sites_allowed) & aaa_grid) @@ -395,10 +261,8 @@ def rank( wfn ): continue if not primary_aaa: - if not isStoreResults: - sites_allowed = sites_with_any_data - else: - ## if we are dealing with a StoreResults request, we don't need to check dataset availability and + if isStoreResults: + ## if we are dealing with a StoreResults request, we don't need to check dataset availability and ## should use the SiteWhiteList set in the original request if 'SiteWhitelist' in wfh.request: sites_allowed = wfh.request['SiteWhitelist'] @@ -407,24 +271,8 @@ def rank( wfn ): sendLog('assignor','Cannot assign StoreResults request because SiteWhitelist is missing', level='critical') n_stalled += 1 continue - available_fractions = {} wfh.sendLog('assignor',"Selected for any data %s"%sorted(sites_allowed)) - ### check on endpoints for on-going transfers - if do_partial: - if endpoints: - end_sites = [SI.SE_to_CE(s) for s in endpoints] - sites_allowed = list(set(sites_allowed + end_sites)) - if down_time and not any(osite in SI.sites_not_ready for osite in end_sites): - print "Flip the status of downtime, since our destinations are good" - down_time = False - print "with added endpoints",sorted(end_sites) - else: - print "Cannot do partial assignment without knowin the endpoints" - n_stalled+=1 - continue - - #if not len(sites_allowed): # if not options.early: # wfh.sendLog('assignor',"cannot be assign with no matched sites") @@ -438,48 +286,6 @@ def rank( wfn ): allowed_and_low = sorted(set(low_pressure) & set(sites_allowed)) if allowed_and_low: wfh.sendLog('assignor',"The workflow can run at %s under low pressure currently"%( ','.join( allowed_and_low ))) - copies_wanted = max(1., copies_wanted-1.) - - - if available_fractions and not all([available>=copies_wanted for available in available_fractions.values()]): - not_even_once = not all([available>=1. for available in available_fractions.values()]) - above_good = all([available >= do_partial for available in available_fractions.values()]) - wfh.sendLog('assignor',"The input dataset is not available %s times, only %s"%( copies_wanted, available_fractions.values())) - if down_time and not options.go and not options.early: - wfo.status = 'considered' - session.commit() - wfh.sendLog('assignor',"sending back to considered because of site downtime, instead of waiting") - #sendEmail( "cannot be assigned due to downtime","%s is not sufficiently available, due to down time of a site in the whitelist. check the assignor logs. sending back to considered."% wfo.name) - sendLog('assignor','%s is not sufficiently available, due to down time of a site in the whitelist. sending back to considered.'%( wfo.name ), level='delay') - n_stalled+=1 - continue - #pass - - print json.dumps(available_fractions) - if (options.go and not_even_once) or not options.go: - known = [] - try: - known = json.loads(open('cannot_assign.json').read()) - except: - pass - if not wfo.name in known and not options.limit and not options.go and not options.early and not (do_partial and above_good): - wfh.sendLog('assignor',"cannot be assigned, %s is not sufficiently available.\n %s"%(wfo.name,json.dumps(available_fractions))) - #sendEmail( "cannot be assigned","%s is not sufficiently available.\n %s"%(wfo.name,json.dumps(available_fractions))) - known.append( wfo.name ) - open('cannot_assign.json','w').write(json.dumps( known, indent=2)) - - if options.early: - if wfo.status == 'considered': - wfh.sendLog('assignor',"setting considered-tried") - wfo.status = 'considered-tried' - session.commit() - else: - print "tried but status is",wfo.status - if do_partial and above_good: - print "Will move on with partial locations" - else: - n_stalled+=1 - continue if not len(sites_allowed) and not options.SiteWhitelist: if not options.early: @@ -595,6 +401,7 @@ def pick_campaign( assign_parameters, parameters): parameters['EventsPerJob'] = eventsPerJob else: spl = wfh.getSplittings()[0] + # FIXME: decide which of the lines below needs to remain... eventsPerJobEstimated = spl['events_per_job'] if 'events_per_job' in spl else None eventsPerJobEstimated = spl['avg_events_per_job'] if 'avg_events_per_job' in spl else None if eventsPerJobEstimated and eventsPerJobEstimated > eventsPerJob: @@ -666,8 +473,6 @@ def pick_campaign( assign_parameters, parameters): parser.add_option('-t','--test', help='Only test the assignment',action='store_true',dest='test',default=False) parser.add_option('-m','--manual', help='Manual assignment, bypassing lock check',action='store_true',dest='manual',default=False) parser.add_option('-e', '--early', help='Fectch from early statuses',default=False, action="store_true") - parser.add_option('-p', '--partial', help='Let the workflow assign to place with any part of the data, existent of being made',default=False, action="store_true") - parser.add_option('--good_enough', help='Only useful with --partial option, determines whether to get the workflow started', default=0.5, type=float) parser.add_option('--go',help="Overrides the campaign go",default=False,action='store_true') parser.add_option('--team',help="Specify the agent to use",default=None) parser.add_option('--primary_aaa',help="Force to use the secondary location restriction, if any, and use the full site whitelist initially provided to run that type of wf",default=False, action='store_true') diff --git a/Unified/injector.py b/Unified/injector.py index 2e5c8b066..de4a3a5e9 100755 --- a/Unified/injector.py +++ b/Unified/injector.py @@ -222,7 +222,7 @@ def injector(url, options, specific): parser = optparse.OptionParser() parser.add_option('-i','--invalidate',help="fetch invalidations from mcm",default=False,action='store_true') parser.add_option('-w','--wmstatus',help="from which status in req-mgr",default="assignment-approved") - parser.add_option('-s','--setstatus',help="What status to set locally",default="considered") + parser.add_option('-s','--setstatus',help="What status to set locally",default="staged") parser.add_option('-u','--user',help="What user to fetch workflow from",default="pdmvserv") parser.add_option('-r','--replace',help="the workflow name that should be used for replacement",default=None) parser.add_option('--user_relval',help="The user that can inject workflows for relvals", default=None) diff --git a/Unified/transferor.py b/Unified/transferor.py index e2f1dccf9..fdc96db9a 100755 --- a/Unified/transferor.py +++ b/Unified/transferor.py @@ -32,6 +32,9 @@ def transferor(url ,specific = None, talk=True, options=None): execute = False else: execute = True + # FIXME Alan on 1/april/2020: does NOT get any real data placement made + execute = False + print "transferor module set to execute data placement: %s", execute SI = siteInfo() CI = campaignInfo() @@ -245,6 +248,8 @@ def prio_and_size( i, j): else: wfo.status = 'trouble' ## so that we look or a replacement else: + # TODO Alan on 1/april/2020: it looks like this has to be done for + # any workflow not in assignment-approved wfo.status = 'away' wfh.sendLog('transferor', '%s in status %s, setting %s'%( wfo.name,wfh.request['RequestStatus'],wfo.status)) continue @@ -345,7 +350,8 @@ def prio_and_size( i, j): if options and options.tosites: sites_allowed = options.tosites.split(',') - + # TODO Alan on 1/april/2020: the 2 lines below can be removed once we + # handle data locking from WMCore for dataset in list(primary)+list(parent)+list(secondary): LI.lock( dataset , reason='staging' ) @@ -413,7 +419,6 @@ def prio_and_size( i, j): prim_to_distribute = [site for site in sites_allowed if not SI.CE_to_SE(site) in prim_location] prim_to_distribute = [site for site in prim_to_distribute if not SI.CE_to_SE(site) in prim_destination] ## take out the ones that cannot receive transfers - potential_destinations = len(prim_to_distribute) #prim_to_distribute = [site for site in prim_to_distribute if not SI.CE_to_SE(site) in SI.sites_veto_transfer] prim_to_distribute = [site for site in prim_to_distribute if (SI.disk[SI.CE_to_SE(site)] or wfh.isRelval())] @@ -599,8 +604,9 @@ def prio_and_size( i, j): ## no explicit transfer required this time if staging: ## but using existing ones - wfh.sendLog('transferor', "latches on existing transfers, and nothing else, settin staging") - wfo.status = 'staging' + wfh.sendLog('transferor', "latches on existing transfers, and nothing else, setting staging") + wfh.sendLog('transferor', "forcing it from staging to staged in favor of MicroServices") + wfo.status = 'staged' needs_transfer+=1 else: wfh.sendLog('transferor', "should just be assigned now to %s"%sorted(sites_allowed)) @@ -611,11 +617,13 @@ def prio_and_size( i, j): continue else: ## there is an explicit transfer required + # TODO Alan on 1/april/2020: this needs to be kept if staging: ## and also using an existing one wfh.sendLog('transferor', "latches on existing transfers") if not options.test: - wfo.status = 'staging' + wfh.sendLog('transferor', "forcing it from staging to staged in favor of MicroServices") + wfo.status = 'staged' wfh.sendLog('transferor', "setting %s status to %s"%(wfo.name,wfo.status)) #session.commit() wfh.sendLog('transferor',"needs a transfer") @@ -668,8 +676,9 @@ def prio_and_size( i, j): if execute: sendLog('transferor', details_text) else: - print "Would make a replica to",site,"(CE)",site_se,"(SE) for" + print "DRY-RUN: Would make a replica to",site,"(CE)",site_se,"(SE) for" print details_text + continue ## operate the transfer if options and options.stop: @@ -702,7 +711,12 @@ def prio_and_size( i, j): transfered_items[ph].update( items ) else: sendLog('transferor','Could not make a replica request for items %s to site %s'%(items,site_se),level='critical') - + + if not transfered_items: + sendLog('transferor', + 'Could not make a replica request for items %s to site %s' % (items_to_transfer, site), + level='critical') + continue #result = makeReplicaRequest(url, site_se, items_to_transfer, 'prestaging', priority=priority, approve=True) #phedexids = [o['id'] for o in result['phedex']['request_created']]: #else: @@ -711,10 +725,6 @@ def prio_and_size( i, j): # fake_id-=1 - - if not transfered_items: - sendLog('transferor','Could not make a replica request for items %s to site %s'%(items_to_transfer,site),level='critical') - continue for phedexid,items in transfered_items.items(): print phedexid,"transfer created" for transfering in list(set(map(lambda it : it.split('#')[0], items))): @@ -731,11 +741,14 @@ def prio_and_size( i, j): wf_id_in_prestaging.add( wfid ) #session.commit() + # TODO Alan on 1/april/2020: it might be that this module will be resume in the lines below only for wfid in wf_id_in_prestaging: + wfh.sendLog('transferor', "transfers made for wfid: %s" % wfid) + wfh.sendLog('transferor', "forcing it from staging to staged in favor of MicroServices") tr_wf = session.query(Workflow).get(wfid) if tr_wf and tr_wf.status!='staging': if execute: - tr_wf.status = 'staging' + tr_wf.status = 'staged' if talk: print "setting",tr_wf.name,"to staging" #session.commit()