Skip to content
This repository was archived by the owner on Sep 12, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 14 additions & 209 deletions Unified/assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,16 @@
from assignSession import *
import reqMgrClient
from utils import workflowInfo, campaignInfo, siteInfo, userLock, unifiedConfiguration, reqmgr_url, monitor_pub_dir, monitor_dir, global_SI
from utils import getWorkLoad, getDatasetPresence, getDatasets, findCustodialLocation, getDatasetBlocksFraction, getDatasetEventsPerLumi, getLFNbase, getDatasetBlocks, lockInfo, isHEPCloudReady, do_html_in_each_module
from utils import getDatasetEventsPerLumi, getLFNbase, lockInfo, isHEPCloudReady, do_html_in_each_module
from utils import componentInfo, sendEmail, sendLog, getWorkflows, closeAllBlocks, eosRead
#from utils import lockInfo
from utils import moduleLock, notRunningBefore
from utils import moduleLock
import optparse
import itertools
import time
from htmlor import htmlor
import os
import random
import json
import copy
import os
import sys

def assignor(url ,specific = None, talk=True, options=None):
if userLock() and not options.manual: return
Expand Down Expand Up @@ -61,7 +57,6 @@ def assignor(url ,specific = None, talk=True, options=None):
#if options.partial and not specific:
# pass

dataset_endpoints = json.loads(eosRead('%s/dataset_endpoints.json'%monitor_dir))
aaa_mapping = json.loads(eosRead('%s/equalizor.json'%monitor_pub_dir))['mapping']
all_stuck = set()
all_stuck.update( json.loads(eosRead('%s/stuck_transfers.json'%monitor_pub_dir) ))
Expand Down Expand Up @@ -110,13 +105,9 @@ def rank( wfn ):

options_text=""
if options.early: options_text+=", early option is ON"
if options.partial:
options_text+=", partial option is ON"
options_text+=", good fraction is %.2f"%options.good_enough



wfh.sendLog('assignor',"%s to be assigned%s"%(wfo.name, options_text))
wfh.sendLog('assignor',"%s to be assigned %s"%(wfo.name, options_text))

## the site whitelist takes into account siteInfo, campaignInfo, memory and cores
(lheinput,primary,parent,secondary, sites_allowed) = wfh.getSiteWhiteList()
Expand Down Expand Up @@ -200,19 +191,15 @@ def rank( wfn ):
continue


original_sites_allowed = copy.deepcopy( sites_allowed )
wfh.sendLog('assignor',"Site white list %s"%sorted(sites_allowed))
override_sec_location = CI.get(wfh.request['Campaign'], 'SecondaryLocation', [])

blocks = wfh.getBlocks()
if blocks:
wfh.sendLog('assignor',"Needs {} blocks in input {}".format(len(blocks), '\n'.join(blocks)))
wfh.sendLog('assignor',"Allowed %s"%sorted(sites_allowed))
secondary_locations=None

primary_aaa = options.primary_aaa
secondary_aaa = options.secondary_aaa
do_partial = False #options.good_enough if options.partial else 0

if 'Campaign' in wfh.request and wfh.request['Campaign'] in CI.campaigns:
assign_parameters.update( CI.campaigns[wfh.request['Campaign']] )
Expand All @@ -221,163 +208,42 @@ def rank( wfn ):
primary_aaa = primary_aaa or assign_parameters['primary_AAA']
if 'secondary_AAA' in assign_parameters:
secondary_aaa = secondary_aaa or assign_parameters['secondary_AAA']
if 'partial_copy' in assign_parameters:
## can this only work if there is a stuck input ? maybe not
## this is a number. 0 means no
print "Could do partial disk copy assignment"
if is_stuck or options.partial:
do_partial = assign_parameters['partial_copy']
wfh.sendLog('assignor',"Overiding partial copy assignment to %.2f fraction"% do_partial)
#sendEmail('stuck input to assignment','%s is stuck for assigning %s and going fractional'%(','.join( is_stuck), wfo.name))

do_partial = options.good_enough if options.partial else do_partial


for sec in list(secondary):
if override_sec_location:
print "We don't care where the secondary is"
print "Cannot pass for now"
#sendEmail("tempting to pass sec location check","but we cannot yet IMO")
#pass

presence = getDatasetPresence( url, sec )
print sec
print json.dumps(presence, indent=2)
one_secondary_locations = [site for (site,(there,frac)) in presence.items() if frac>98.]

if secondary_aaa:
if not one_secondary_locations:
sec_availability = getDatasetBlocksFraction( url, sec )
if sec_availability >=1. and options.go:
## there is at least one copy of each block on disk. We should go ahead and let it go.
wfh.sendLog('assignor',"The secondary %s is available %s times on disk, and usable"%( sec, sec_availability))
else:
## not even a copy on disk anywhere !!!!
sites_allowed = [] ## will block the assignment
wfh.sendLog('assignor',"The secondary %s is nowhere on disk"% sec)
#just continue without checking
continue

#one_secondary_locations = [site for (site,(there,frac)) in presence.items() if there]
if secondary_locations==None:
secondary_locations = one_secondary_locations
else:
secondary_locations = list(set(secondary_locations) & set(one_secondary_locations))
## reduce the site white list to site with secondary only
#sites_allowed = [site for site in sites_allowed if any([osite.startswith(site) for osite in one_secondary_locations])]
sites_allowed = [site for site in sites_allowed if SI.CE_to_SE(site) in one_secondary_locations]

wfh.sendLog('assignor',"Intersecting with secondary requirement, now allowed %s"%sorted(sites_allowed))
wfh.sendLog('assignor',"Initial values for primary_AAA=%s and secondary_AAA=%s"%(primary_aaa, secondary_aaa))

initial_sites_allowed = copy.deepcopy( sites_allowed ) ## keep track of this, after secondary input location restriction : that's how you want to operate it
## keep track of this, after secondary input location restriction : that's how you want to operate it
initial_sites_allowed = copy.deepcopy( sites_allowed )

sites_all_data = copy.deepcopy( sites_allowed )
sites_with_data = copy.deepcopy( sites_allowed )
sites_with_any_data = copy.deepcopy( sites_allowed )
primary_locations = None
available_fractions = {}
set_lfn = '/store/mc' ## by default

endpoints = set()
for prim in list(primary):
if prim in dataset_endpoints:
print "endpoints from stagor",dataset_endpoints[prim]
endpoints.update( dataset_endpoints[prim] )
set_lfn = getLFNbase( prim )
## if they are requested for processing, they should bbe all closed already
# FIXME: remove this closeAllBlocks
closeAllBlocks(url, prim, blocks)
presence = getDatasetPresence( url, prim , only_blocks=blocks)
if talk:
print prim
print json.dumps(presence, indent=2)
available_fractions[prim] = getDatasetBlocksFraction(url, prim, sites = [SI.CE_to_SE(site) for site in sites_allowed] , only_blocks = blocks)
if primary_aaa:
available_fractions[prim] = getDatasetBlocksFraction(url, prim, only_blocks = blocks)

sites_all_data = [site for site in sites_with_data if SI.CE_to_SE(site) in [psite for (psite,(there,frac)) in presence.items() if there]]
if primary_aaa:
sites_all_data = set()
for (psite,(there,frac)) in presence.items():
if there:
sites_all_data.update( SI.SE_to_CEs(psite) )
sites_all_data = list(sites_all_data)
#sites_all_data = list(set([SI.SE_to_CE(psite) for (psite,(there,frac)) in presence.items() if there]))
sites_with_data = [site for site in sites_with_data if SI.CE_to_SE(site) in [psite for (psite,frac) in presence.items() if frac[1]>90.]]
sites_with_any_data = [site for site in sites_with_any_data if SI.CE_to_SE(site) in presence.keys()]
if primary_aaa:
sites_with_any_data = set()
for psite in presence.keys():
sites_with_any_data.update( SI.SE_to_CEs(psite) )
sites_with_any_data = list(sites_with_any_data)
#sites_with_any_data = list(set([SI.SE_to_CE(psite) for psite in presence.keys()]))

holding_but_not_allowed = set()
for se_site in presence.keys():
if not (set(SI.SE_to_CEs(se_site)) & set(sites_allowed)):
holding_but_not_allowed.add( se_site )
#wfh.sendLog('assignor',"Holding the data but not allowed %s"%sorted(list(set([se_site for se_site in presence.keys() if not SI.SE_to_CE(se_site) in sites_allowed]))))
wfh.sendLog('assignor',"Holding the data but not allowed %s"%sorted( holding_but_not_allowed ))
if primary_locations==None:
primary_locations = presence.keys()
else:
primary_locations = list(set(primary_locations) & set(presence.keys() ))

sites_with_data = list(set(sites_with_data))
sites_with_any_data = list(set(sites_with_any_data))

opportunistic_sites=[]
down_time = False
## opportunistic running where any piece of data is available
if secondary_locations or primary_locations:
## intersection of both any pieces of the primary and good IO
#opportunistic_sites = [SI.SE_to_CE(site) for site in list((set(secondary_locations) & set(primary_locations) & set(SI.sites_with_goodIO)) - set(sites_allowed))]
if secondary_locations and primary_locations:
opportunistic_sites = [SI.SE_to_CE(site) for site in list((set(secondary_locations) & set(primary_locations)) - set([SI.CE_to_SE(site) for site in sites_allowed]))]
elif primary_locations:
opportunistic_sites = [SI.SE_to_CE(site) for site in list(set(primary_locations) - set([SI.CE_to_SE(site) for site in sites_allowed]))]
else:
opportunistic_sites = []
wfh.sendLog('assignor',"We could be running in addition at %s"% sorted(opportunistic_sites))
if any([osite in SI.sites_not_ready for osite in opportunistic_sites]):
wfh.sendLog('assignor',"One of the usable site is in downtime %s"%([osite for osite in opportunistic_sites if osite in SI.sites_not_ready]))
down_time = True
## should this be send back to considered ?


## should be 2 but for the time-being let's lower it to get things going
copies_wanted,cpuh = wfh.getNCopies()
_copies_wanted,cpuh = wfh.getNCopies()
wfh.sendLog('assignor',"we need %s CPUh"%cpuh)
if cpuh>max_cpuh_block and not options.go:
#sendEmail('large workflow','that wf %s has a large number of CPUh %s, not assigning, please check the logs'%(wfo.name, cpuh))#,destination=['Dmytro.Kovalskyi@cern.ch'])
sendLog('assignor','%s requires a large numbr of CPUh %s , not assigning, please check with requester'%( wfo.name, cpuh), level='critical')
wfh.sendLog('assignor',"Requiring a large number of CPUh %s, not assigning"%cpuh)
continue

if 'Campaign' in wfh.request and wfh.request['Campaign'] in CI.campaigns and 'maxcopies' in CI.campaigns[wfh.request['Campaign']]:
copies_needed_from_campaign = CI.campaigns[wfh.request['Campaign']]['maxcopies']
copies_wanted = min(copies_needed_from_campaign, copies_wanted)

if not options.early:
less_copies_than_requested = UC.get("less_copies_than_requested")
copies_wanted = max(1,copies_wanted-less_copies_than_requested) # take one out for the efficiency
else:
## find out whether there is a site in the whitelist, that is lacking jobs and reduce to 1 copy needed to get things going
pass

wfh.sendLog('assignor',"needed availability fraction %s"% copies_wanted)

## should also check on number of sources, if large enough, we should be able to overflow most, efficiently

## default back to white list to original white list with any data
wfh.sendLog('assignor',"Allowed sites :%s"% sorted(sites_allowed))

# TODO Alan on 1/april/2020: keep the AAA functionality
if primary_aaa:
## remove the sites not reachable localy if not in having the data
if not sites_all_data:
if not sites_allowed:
wfh.sendLog('assignor',"Overiding the primary on AAA setting to Off")
primary_aaa=False
else:
aaa_grid = set(sites_all_data)
aaa_grid = set(sites_allowed)
for site in list(aaa_grid):
aaa_grid.update( aaa_mapping.get(site,[]) )
sites_allowed = list(set(initial_sites_allowed) & aaa_grid)
Expand All @@ -395,10 +261,8 @@ def rank( wfn ):
continue

if not primary_aaa:
if not isStoreResults:
sites_allowed = sites_with_any_data
else:
## if we are dealing with a StoreResults request, we don't need to check dataset availability and
if isStoreResults:
## if we are dealing with a StoreResults request, we don't need to check dataset availability and
## should use the SiteWhiteList set in the original request
if 'SiteWhitelist' in wfh.request:
sites_allowed = wfh.request['SiteWhitelist']
Expand All @@ -407,24 +271,8 @@ def rank( wfn ):
sendLog('assignor','Cannot assign StoreResults request because SiteWhitelist is missing', level='critical')
n_stalled += 1
continue
available_fractions = {}
wfh.sendLog('assignor',"Selected for any data %s"%sorted(sites_allowed))

### check on endpoints for on-going transfers
if do_partial:
if endpoints:
end_sites = [SI.SE_to_CE(s) for s in endpoints]
sites_allowed = list(set(sites_allowed + end_sites))
if down_time and not any(osite in SI.sites_not_ready for osite in end_sites):
print "Flip the status of downtime, since our destinations are good"
down_time = False
print "with added endpoints",sorted(end_sites)
else:
print "Cannot do partial assignment without knowin the endpoints"
n_stalled+=1
continue


#if not len(sites_allowed):
# if not options.early:
# wfh.sendLog('assignor',"cannot be assign with no matched sites")
Expand All @@ -438,48 +286,6 @@ def rank( wfn ):
allowed_and_low = sorted(set(low_pressure) & set(sites_allowed))
if allowed_and_low:
wfh.sendLog('assignor',"The workflow can run at %s under low pressure currently"%( ','.join( allowed_and_low )))
copies_wanted = max(1., copies_wanted-1.)


if available_fractions and not all([available>=copies_wanted for available in available_fractions.values()]):
not_even_once = not all([available>=1. for available in available_fractions.values()])
above_good = all([available >= do_partial for available in available_fractions.values()])
wfh.sendLog('assignor',"The input dataset is not available %s times, only %s"%( copies_wanted, available_fractions.values()))
if down_time and not options.go and not options.early:
wfo.status = 'considered'
session.commit()
wfh.sendLog('assignor',"sending back to considered because of site downtime, instead of waiting")
#sendEmail( "cannot be assigned due to downtime","%s is not sufficiently available, due to down time of a site in the whitelist. check the assignor logs. sending back to considered."% wfo.name)
sendLog('assignor','%s is not sufficiently available, due to down time of a site in the whitelist. sending back to considered.'%( wfo.name ), level='delay')
n_stalled+=1
continue
#pass

print json.dumps(available_fractions)
if (options.go and not_even_once) or not options.go:
known = []
try:
known = json.loads(open('cannot_assign.json').read())
except:
pass
if not wfo.name in known and not options.limit and not options.go and not options.early and not (do_partial and above_good):
wfh.sendLog('assignor',"cannot be assigned, %s is not sufficiently available.\n %s"%(wfo.name,json.dumps(available_fractions)))
#sendEmail( "cannot be assigned","%s is not sufficiently available.\n %s"%(wfo.name,json.dumps(available_fractions)))
known.append( wfo.name )
open('cannot_assign.json','w').write(json.dumps( known, indent=2))

if options.early:
if wfo.status == 'considered':
wfh.sendLog('assignor',"setting considered-tried")
wfo.status = 'considered-tried'
session.commit()
else:
print "tried but status is",wfo.status
if do_partial and above_good:
print "Will move on with partial locations"
else:
n_stalled+=1
continue

if not len(sites_allowed) and not options.SiteWhitelist:
if not options.early:
Expand Down Expand Up @@ -595,6 +401,7 @@ def pick_campaign( assign_parameters, parameters):
parameters['EventsPerJob'] = eventsPerJob
else:
spl = wfh.getSplittings()[0]
# FIXME: decide which of the lines below needs to remain...
eventsPerJobEstimated = spl['events_per_job'] if 'events_per_job' in spl else None
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

eventsPerJobEstimated = spl['avg_events_per_job'] if 'avg_events_per_job' in spl else None
if eventsPerJobEstimated and eventsPerJobEstimated > eventsPerJob:
Expand Down Expand Up @@ -666,8 +473,6 @@ def pick_campaign( assign_parameters, parameters):
parser.add_option('-t','--test', help='Only test the assignment',action='store_true',dest='test',default=False)
parser.add_option('-m','--manual', help='Manual assignment, bypassing lock check',action='store_true',dest='manual',default=False)
parser.add_option('-e', '--early', help='Fectch from early statuses',default=False, action="store_true")
parser.add_option('-p', '--partial', help='Let the workflow assign to place with any part of the data, existent of being made',default=False, action="store_true")
parser.add_option('--good_enough', help='Only useful with --partial option, determines whether to get the workflow started', default=0.5, type=float)
parser.add_option('--go',help="Overrides the campaign go",default=False,action='store_true')
parser.add_option('--team',help="Specify the agent to use",default=None)
parser.add_option('--primary_aaa',help="Force to use the secondary location restriction, if any, and use the full site whitelist initially provided to run that type of wf",default=False, action='store_true')
Expand Down
Loading