-
Notifications
You must be signed in to change notification settings - Fork 172
/
InputDataResolution.py
160 lines (130 loc) · 7.06 KB
/
InputDataResolution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
""" The input data resolution module is a plugin that
allows to define VO input data policy in a simple way using existing
utilities in DIRAC or extension code supplied by the VO.
The arguments dictionary from the Job Wrapper includes the file catalogue
result and in principle has all the necessary information to resolve input data
for applications.
"""
import DIRAC
from DIRAC import S_OK, S_ERROR, gLogger, gConfig
from DIRAC.Core.Utilities.ModuleFactory import ModuleFactory
from DIRAC.WorkloadManagementSystem.Client.PoolXMLSlice import PoolXMLSlice
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
COMPONENT_NAME = "InputDataResolution"
CREATE_CATALOG = False
class InputDataResolution:
"""Defines the Input Data Policy"""
#############################################################################
def __init__(self, argumentsDict):
"""Standard constructor"""
self.arguments = argumentsDict
self.name = COMPONENT_NAME
self.log = gLogger.getSubLogger(self.name)
op = Operations()
self.arguments.setdefault("Configuration", {})["AllReplicas"] = op.getValue(
"InputDataPolicy/AllReplicas", False
)
self.arguments["Configuration"].setdefault("Protocol", op.getValue("InputDataPolicy/Protocols/Local", []))
self.arguments["Configuration"].setdefault(
"RemoteProtocol", op.getValue("InputDataPolicy/Protocols/Remote", [])
)
# By default put input data into the current directory
self.arguments.setdefault("InputDataDirectory", gConfig.getValue("/LocalSite/InputDataDirectory", "CWD"))
#############################################################################
def execute(self):
"""Given the arguments from the Job Wrapper, this function calls existing
utilities in DIRAC to resolve input data.
"""
resolvedInputData = self.__resolveInputData()
if not resolvedInputData["OK"]:
self.log.error(f"InputData resolution failed with result:\n{resolvedInputData['Message']}")
return resolvedInputData
# For local running of this module we can expose an option to ignore missing files
ignoreMissing = self.arguments.get("IgnoreMissing", False)
# Missing some of the input files is a fatal error unless ignoreMissing option is defined
failedReplicas = resolvedInputData["Value"].get("Failed", {})
if failedReplicas and not ignoreMissing:
self.log.error("Failed to obtain access to the following files:\n%s" % ("\n".join(sorted(failedReplicas))))
return S_ERROR("Failed to access some of requested input data")
if not resolvedInputData["Value"].get("Successful"):
return S_ERROR("Could not access any requested input data")
if CREATE_CATALOG:
res = self._createCatalog(resolvedInputData)
if not res["OK"]:
return res
return resolvedInputData
#############################################################################
def _createCatalog(self, resolvedInputData, catalogName="pool_xml_catalog.xml", pfnType="ROOT"):
"""By default uses PoolXMLSlice, VO extensions can modify at will"""
resolvedData = resolvedInputData["Successful"]
tmpDict = {}
for lfn, mdata in resolvedData.items():
tmpDict[lfn] = mdata
tmpDict[lfn]["pfntype"] = pfnType
self.log.verbose(f"Adding PFN file type {pfnType} for LFN:{lfn}")
catalogName = self.arguments["Configuration"].get("CatalogName", catalogName)
self.log.verbose(f"Catalog name will be: {catalogName}")
resolvedData = tmpDict
appCatalog = PoolXMLSlice(catalogName)
return appCatalog.execute(resolvedData)
#############################################################################
def __resolveInputData(self):
"""This method controls the execution of the DIRAC input data modules according
to the VO policy defined in the configuration service.
"""
site = self.arguments["Configuration"].get("SiteName", DIRAC.siteName())
self.arguments.setdefault("Job", {})
policy = self.arguments["Job"].get("InputDataPolicy", [])
if policy:
# In principle this can be a list of modules with the first taking precedence
if isinstance(policy, str):
policy = [policy]
self.log.info(f"Job has a specific policy setting: {', '.join(policy)}")
else:
self.log.debug(f"Attempting to resolve input data policy for site {site}")
inputDataPolicy = Operations().getOptionsDict("InputDataPolicy")
if not inputDataPolicy["OK"]:
return S_ERROR("Could not resolve InputDataPolicy from Operations InputDataPolicy")
options = inputDataPolicy["Value"]
policy = options.get(site, options.get("Default", []))
if policy:
policy = [x.strip() for x in policy.split(",")]
if site in options:
prStr = "Found specific"
else:
prStr = "Applying default"
self.log.info("{} input data policy for site {}:\n{}".format(prStr, site, "\n".join(policy)))
dataToResolve = [] # if none, all supplied input data is resolved
successful = {}
for modulePath in policy:
result = self.__runModule(modulePath, dataToResolve)
if not result["OK"]:
self.log.warn(f"Problem during {modulePath} execution")
return result
result = result["Value"]
successful.update(result.get("Successful", {}))
dataToResolve = result.get("Failed", [])
if dataToResolve:
self.log.info("{} failed for the following files:\n{}".format(modulePath, "\n".join(dataToResolve)))
else:
self.log.info(f"All replicas resolved after {modulePath} execution")
break
if successful:
self.log.verbose("Successfully resolved:", str(successful))
return S_OK({"Successful": successful, "Failed": dataToResolve})
#############################################################################
def __runModule(self, modulePath, remainingReplicas):
"""This method provides a way to run the modules specified by the VO that
govern the input data access policy for the current site. Using the
InputDataPolicy section from Operations different modules can be defined for
particular sites or for InputDataPolicy defined in the JDL of the jobs.
"""
self.log.info(f"Attempting to run {modulePath}")
moduleFactory = ModuleFactory()
moduleInstance = moduleFactory.getModule(modulePath, self.arguments)
if not moduleInstance["OK"]:
return moduleInstance
module = moduleInstance["Value"]
result = module.execute(remainingReplicas)
return result
# EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#