@@ -1,5 +1,6 @@
import string
import os.path
import urllib

#
# Project:
@@ -25,6 +26,7 @@ def __init__(self):
self.frontend_descript_file = "frontend.descript"
self.group_descript_file = "group.descript"
self.params_descript_file = "params.cfg"
self.attrs_descript_file = "attrs.cfg"
self.signature_descript_file = "signatures.sha1"
self.signature_type = "sha1"

@@ -39,55 +41,89 @@ def __init__(self):
#
############################################################

# loads a file composed of
# loads a file or URL composed of
# NAME VAL
# and creates
# self.data[NAME]=VAL
# It also defines:
# self.config_file="name of file"
# If validate is defined, also defines
# self.hash_value
class ConfigFile:
def __init__(self,config_dir,config_file,convert_function=repr):
def __init__(self,config_dir,config_file,convert_function=repr,
validate=None): # if defined, must be (hash_algo,value)
self.config_dir=config_dir
self.config_file=config_file
self.load(os.path.join(config_dir,config_file),convert_function)
self.data={}
self.load(os.path.join(config_dir,config_file),convert_function,validate)
self.derive()

def load(self,fname,convert_function):
def open(self,fname):
if (fname[:5]=="http:") or (fname[:6]=="https:") or (fname[:4]=="ftp:"):
# one of the supported URLs
return urllib.urlopen(fname)
else:
# local file
return open(fname,"r")


def validate_func(self,data,validate,fname):
if validate!=None:
import hashCrypto
vhash=hashCrypto.get_hash(validate[0],data)
self.hash_value=vhash
if (validate[1]!=None) and (vhash!=validate[1]):
raise IOError, "Failed validation of '%s'. Hash %s computed to '%s', expected '%s'"%(fname,validate[0],vhash,validate[1])

def load(self,fname,convert_function,
validate=None): # if defined, must be (hash_algo,value)
self.data={}
fd=open(fname,"r")
fd=self.open(fname)
try:
lines=fd.readlines()
data=fd.read()
self.validate_func(data,validate,fname)
lines=data.splitlines()
del data
for line in lines:
if line[0]=="#":
continue # comment
if len(string.strip(line))==0:
continue # empty line
larr=string.split(line,None,1)
lname=larr[0]
if len(larr)==1:
lval=""
else:
lval=larr[1][:-1] #strip newline
exec("self.data['%s']=%s"%(lname,convert_function(lval)))
self.split_func(line,convert_function)
finally:
fd.close()

def split_func(self,line,convert_function):
larr=string.split(line,None,1)
lname=larr[0]
if len(larr)==1:
lval=""
else:
lval=larr[1]
exec("self.data['%s']=%s"%(lname,convert_function(lval)))

def derive(self):
return # by default, do nothing

# load from the group subdir
class GroupConfigFile(ConfigFile):
def __init__(self,base_dir,group_name,config_file,convert_function=repr):
ConfigFile.__init__(self,os.path.join(base_dir,"group_"+group_name),config_file,convert_function)
def __init__(self,base_dir,group_name,config_file,convert_function=repr,
validate=None): # if defined, must be (hash_algo,value)
ConfigFile.__init__(self,os.path.join(base_dir,"group_"+group_name),config_file,convert_function,validate)
self.group_name=group_name

# load both the main and group subdir config file
# and join the results
# Also defines:
# self.group_hash_value, if group_validate defined
class JoinConfigFile(ConfigFile):
def __init__(self,base_dir,group_name,config_file,convert_function=repr):
ConfigFile.__init__(self,base_dir,config_file,convert_function)
def __init__(self,base_dir,group_name,config_file,convert_function=repr,
main_validate=None,group_validate=None): # if defined, must be (hash_algo,value)
ConfigFile.__init__(self,base_dir,config_file,convert_function,main_validate)
self.group_name=group_name
group_obj=GroupConfigFile(base_dir,group_name,config_file,convert_function)
group_obj=GroupConfigFile(base_dir,group_name,config_file,convert_function,group_validate)
if group_validate!=None:
self.group_hash_value=group_obj.hash_value
#merge by overriding whatever is found in the subdir
for k in group_obj.data.keys():
self.data[k]=group_obj.data[k]
@@ -129,30 +165,40 @@ def __init__(self,base_dir,group_name):
else:
raise RuntimeError, "Unknown parameter type '%s' for '%s'!"%(type_str,k)

class SignatureDescript:
class AttrsDescript(JoinConfigFile):
def __init__(self,base_dir,group_name):
global frontendConfig
JoinConfigFile.__init__(self,base_dir,group_name,frontendConfig.attrs_descript_file,
str) # they are already in python form

# this one is the special frontend work dir signature file
class SignatureDescript(ConfigFile):
def __init__(self,config_dir):
global frontendConfig
self.config_dir=config_dir
self.config_file=frontendConfig.signature_descript_file
self.load(os.path.join(self.config_dir,self.config_file))
ConfigFile.__init__(self,config_dir,frontendConfig.signature_descript_file,
None) # Not used, redefining split_func
self.signature_type=frontendConfig.signature_type

def load(self,fname):
self.data={}
fd=open(fname,"r")
try:
lines=fd.readlines()
for line in lines:
if line[0]=="#":
continue # comment
if len(string.strip(line))==0:
continue # empty line
larr=string.split(line,None)
if len(larr)!=3:
raise RuntimeError, "Invalid line (expected 3 elements, found %i)"%len(larr)
self.data[larr[2]]=(larr[0],larr[1])
finally:
fd.close()
def split_func(self,line,convert_function):
larr=string.split(line,None)
if len(larr)!=3:
raise RuntimeError, "Invalid line (expected 3 elements, found %i)"%len(larr)
self.data[larr[2]]=(larr[0],larr[1])

# this one is the generic hash descript file
class BaseSignatureDescript(ConfigFile):
def __init__(self,config_dir,signature_fname,signature_type,validate=None):
ConfigFile.__init__(self,config_dir,signature_fname,
None, # Not used, redefining split_func
validate)
self.signature_type=signature_type

def split_func(self,line,convert_function):
larr=string.split(line,None,1)
if len(larr)!=2:
raise RuntimeError, "Invalid line (expected 2 elements, found %i)"%len(larr)
lval=larr[1]
self.data[lval]=larr[0]

############################################################
#
@@ -255,3 +301,78 @@ def __init__(self,base_dir,group_name):
self.group_descript_fname=gd[1]
self.group_descript_signature=gd[0]

class StageFiles:
def __init__(self,base_URL,descript_fname,validate_algo,signature_hash):
self.base_URL=base_URL
self.validate_algo=validate_algo
self.stage_descript=ConfigFile(base_URL, descript_fname, repr,
(validate_algo,None)) # just get the hash value... will validate later

self.signature_descript=BaseSignatureDescript(base_URL,self.stage_descript.data['signature'],validate_algo,(validate_algo,signature_hash))

if self.stage_descript.hash_value!=self.signature_descript.data[descript_fname]:
raise IOError, "Descript file %s signature invalid, expected'%s' got '%s'"%(descript_fname,self.signature_descript.data[descript_fname],self.stage_descript.hash_value)

def get_stage_file(self,fname,repr):
return ConfigFile(self.base_URL,fname,repr,
(self.validate_algo,self.signature_descript.data[fname]))

def get_file_list(self,list_type): # example list_type == 'preentry_file_list'
if not self.stage_descript.data.has_key(list_type):
raise KeyError,"Unknown list type '%s'; valid typtes are %s"%(list_type,self.stage_descript.data.keys())

list_fname=self.stage_descript.data[list_type]
return self.get_stage_file(self.stage_descript.data[list_type],
lambda x:string.split(x,None,4))

# this class knows how to interpret some of the files in the Stage area
class ExtStageFiles(StageFiles):
def __init__(self,base_URL,descript_fname,validate_algo,signature_hash):
StageFiles.__init__(self,base_URL,descript_fname,validate_algo,signature_hash)
self.preentry_file_list=None

def get_constants(self):
self.load_preentry_file_list()
return self.get_stage_file(self.preentry_file_list.data['constants.cfg'][0],repr)

def get_condor_vars(self):
self.load_preentry_file_list()
return self.get_stage_file(self.preentry_file_list.data['condor_vars.lst'][0],lambda x:string.split(x,None,6))

# internal
def load_preentry_file_list(self):
if self.preentry_file_list==None:
self.preentry_file_list=self.get_file_list('preentry_file_list')
# else, nothing to do

# this class knows how to interpret some of the files in the Stage area
# Will parrpopriately merge the main and the group ones
class MergeStageFiles:
def __init__(self,base_URL,validate_algo,
main_descript_fname,main_signature_hash,
group_name,group_descript_fname,group_signature_hash):
self.group_name=group_name
self.main_stage=ExtStageFiles(base_URL,main_descript_fname,validate_algo,main_signature_hash)
self.group_stage=ExtStageFiles(os.path.join(base_URL,"group_"+group_name),group_descript_fname,validate_algo,group_signature_hash)

def get_constants(self):
main_consts=self.main_stage.get_constants()
group_consts=self.group_stage.get_constants()
# group constants override the main ones
for k in group_consts.data.keys():
main_consts.data[k]=group_consts.data[k]
main_consts.group_name=self.group_name
main_consts.group_hash_value=group_consts.hash_value

return main_consts

def get_condor_vars(self):
main_cv=self.main_stage.get_condor_vars()
group_cv=self.group_stage.get_condor_vars()
# group condor_vars override the main ones
for k in group_cv.data.keys():
main_cv.data[k]=group_cv.data[k]
main_cv.group_name=self.group_name
main_cv.group_hash_value=group_cv.hash_value

return main_cv
@@ -124,8 +124,29 @@ def fetch_fork_result_list(pipe_ids):
return out


######################
# expand $$(attribute)
def expand_DD(qstr,attr_dict):
import re
robj=re.compile("\$\$\((?P<attrname>[^\)]*)\)")
while 1:
m=robj.search(qstr)
if m==None:
break # no more substitutions to do
attr_name=m.group('attrname')
if not attr_dict.has_key(attr_name):
raise KeyError, "Missing attribute %s"%attr_name
attr_val=attr_dict[attr_name]
if type(attr_val)==type(1):
attr_str=str(attr_val)
else: # assume it is a string for all other purposes... quote and escape existing quotes
attr_str='"%s"'%attr_val.replace('"','\\"')
qstr="%s%s%s"%(qstr[:m.start()],attr_str,qstr[m.end():])
return qstr


############################################################
def iterate_one(client_name,elementDescript,paramsDescript,signatureDescript,x509_proxy_plugin,stats,history_obj):
def iterate_one(client_name,elementDescript,paramsDescript,attr_dict,signatureDescript,x509_proxy_plugin,stats,history_obj):
frontend_name=elementDescript.frontend_data['FrontendName']
group_name=elementDescript.element_data['GroupName']
security_name=elementDescript.merged_data['SecurityName']
@@ -143,7 +164,7 @@ def iterate_one(client_name,elementDescript,paramsDescript,signatureDescript,x50
os.close(r)
try:
glidein_dict={}
factory_constraint=elementDescript.merged_data['FactoryQueryExpr']
factory_constraint=expand_DD(elementDescript.merged_data['FactoryQueryExpr'],attr_dict)
factory_pools=elementDescript.merged_data['FactoryCollectors']
for factory_pool in factory_pools:
factory_pool_node=factory_pool[0]
@@ -199,7 +220,7 @@ def iterate_one(client_name,elementDescript,paramsDescript,signatureDescript,x50
condorq_format_list=list(condorq_format_list)+list((('x509UserProxyFirstFQAN','s'),))
condorq_format_list=list(condorq_format_list)+list((('x509UserProxyFQAN','s'),))
condorq_dict=glideinFrontendLib.getCondorQ(elementDescript.merged_data['JobSchedds'],
elementDescript.merged_data['JobQueryExpr'],
expand_DD(elementDescript.merged_data['JobQueryExpr'],attr_dict),
condorq_format_list)

os.write(w,cPickle.dumps(condorq_dict))
@@ -392,7 +413,7 @@ def iterate_one(client_name,elementDescript,paramsDescript,signatureDescript,x50
os.close(r)
try:
if dt=='Real':
out=glideinFrontendLib.countRealRunning(elementDescript.merged_data['MatchExprCompiledObj'],condorq_dict_running,glidein_dict,condorq_match_list)
out=glideinFrontendLib.countRealRunning(elementDescript.merged_data['MatchExprCompiledObj'],condorq_dict_running,glidein_dict,attr_dict,condorq_match_list)
elif dt=='Glidein':
count_status_multi={}
for glideid in glidein_dict.keys():
@@ -404,7 +425,7 @@ def iterate_one(client_name,elementDescript,paramsDescript,signatureDescript,x50
count_status_multi[request_name][st]=glideinFrontendLib.countCondorStatus(c)
out=count_status_multi
else:
c,p,h=glideinFrontendLib.countMatch(elementDescript.merged_data['MatchExprCompiledObj'],condorq_dict_types[dt]['dict'],glidein_dict,condorq_match_list)
c,p,h=glideinFrontendLib.countMatch(elementDescript.merged_data['MatchExprCompiledObj'],condorq_dict_types[dt]['dict'],glidein_dict,attr_dict,condorq_match_list)
t=glideinFrontendLib.countCondorQ(condorq_dict_types[dt]['dict'])
out=(c,p,h,t)

@@ -765,7 +786,7 @@ def iterate_one(client_name,elementDescript,paramsDescript,signatureDescript,x50
return

############################################################
def iterate(parent_pid,elementDescript,paramsDescript,signatureDescript,x509_proxy_plugin):
def iterate(parent_pid,elementDescript,paramsDescript,attr_dict,signatureDescript,x509_proxy_plugin):
sleep_time=int(elementDescript.frontend_data['LoopDelay'])

factory_pools=elementDescript.merged_data['FactoryCollectors']
@@ -793,7 +814,7 @@ def iterate(parent_pid,elementDescript,paramsDescript,signatureDescript,x509_pro
# recreate every time (an easy way to start from a clean state)
stats['group']=glideinFrontendMonitoring.groupStats()

done_something=iterate_one(published_frontend_name,elementDescript,paramsDescript,signatureDescript,x509_proxy_plugin,stats,history_obj)
done_something=iterate_one(published_frontend_name,elementDescript,paramsDescript,attr_dict,signatureDescript,x509_proxy_plugin,stats,history_obj)

glideinFrontendLib.log_files.logActivity("Writing stats")
try:
@@ -858,8 +879,24 @@ def main(parent_pid, work_dir, group_name):
float(elementDescript.frontend_data['LogRetentionMaxMBs']))

paramsDescript=glideinFrontendConfig.ParamsDescript(work_dir,group_name)
attrsDescript=glideinFrontendConfig.AttrsDescript(work_dir,group_name)
signatureDescript=glideinFrontendConfig.GroupSignatureDescript(work_dir,group_name)

#
# We decided we will not use the data from the stage area
# Leaving it commented in the code, in case we decide in the future
# it was a good validation of the Web server health
#
#stageArea=glideinFrontendConfig.MergeStageFiles(elementDescript.frontend_data['WebURL'],
# signatureDescript.signature_type,
# signatureDescript.frontend_descript_fname,signatureDescript.frontend_descript_signature,
# group_name,
# signatureDescript.group_descript_fname,signatureDescript.group_descript_signature)
# constsDescript=stageArea.get_constants()
#

attr_dict=attrsDescript.data

glideinFrontendMonitoring.monitoringConfig.monitor_dir=os.path.join(work_dir,"monitor/group_%s"%group_name)

glideinFrontendInterface.frontendConfig.advertise_use_tcp=(elementDescript.frontend_data['AdvertiseWithTCP'] in ('True','1'))
@@ -895,7 +932,7 @@ def main(parent_pid, work_dir, group_name):
try:
try:
glideinFrontendLib.log_files.logActivity("Starting up")
iterate(parent_pid,elementDescript,paramsDescript,signatureDescript,x509_proxy_plugin)
iterate(parent_pid,elementDescript,paramsDescript,attr_dict,signatureDescript,x509_proxy_plugin)
except KeyboardInterrupt:
glideinFrontendLib.log_files.logActivity("Received signal...exit")
except:
@@ -209,7 +209,7 @@ def getCondorQUsers(condorq_dict):
# A special "glidein name" of (None, None, None) is used for jobs
# that don't match any "real glidein name"

def countMatch(match_obj,condorq_dict,glidein_dict,condorq_match_list=None):
def countMatch(match_obj,condorq_dict,glidein_dict,attr_dict,condorq_match_list=None):
out_glidein_counts={}
#new_out_counts: keys are site indexes(numbers),
#elements will be the number of real
@@ -337,7 +337,7 @@ def countMatch(match_obj,condorq_dict,glidein_dict,condorq_match_list=None):
final_unique[(None,None,None)]=count_unmatched
return (out_glidein_counts,final_out_counts,final_unique)

def countRealRunning(match_obj,condorq_dict,glidein_dict,condorq_match_list=None):
def countRealRunning(match_obj,condorq_dict,glidein_dict,attr_dict,condorq_match_list=None):
out_glidein_counts={}

if condorq_match_list!=None: