Skip to content

Commit

Permalink
tmp fts token cont
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Feb 6, 2024
1 parent 7b22bac commit 453a5fe
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 21 deletions.
52 changes: 32 additions & 20 deletions src/DIRAC/DataManagementSystem/Client/FTS3Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from DIRAC.Resources.Storage.StorageElement import StorageElement

from DIRAC.FrameworkSystem.Client.Logger import gLogger
from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager

from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
from DIRAC.Core.Utilities.DErrno import cmpError
Expand Down Expand Up @@ -422,6 +423,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
log.debug(f"Not preparing transfer for file {ftsFile.lfn}")
continue

srcToken, dstToken = None

sourceSURL, targetSURL = allSrcDstSURLs[ftsFile.lfn]
stageURL = allStageURLs.get(ftsFile.lfn)

Expand Down Expand Up @@ -478,26 +481,33 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
if self.activity:
trans_metadata["activity"] = self.activity

# will need https://gitlab.cern.ch/fts/fts-rest-flask/-/merge_requests/116
# If both supports tokens
# get an access token
# In [26]: pfnparse(url)
# Out[26]:
# {'OK': True,
# 'Value': {'Protocol': 'root',
# 'Host': 'x509up_u1000@eoslhcb.cern.ch',
# 'Port': '',
# 'WSUrl': '',
# 'Path': '//eos/lhcb/grid/user/lhcb/tata',
# 'FileName': 'yoyo.txt'}}
# In [24]: fullPath
# Out[24]: PosixPath('//eos/lhcb/grid/user/lhcb/tata')

# In [25]: fullPath.resolve()
# Out[25]: PosixPath('/eos/lhcb/grid/user/lhcb/tata')

# In [19]: fullPath.relative_to('//eos/lhcb')
# Out[19]: PosixPath('grid/user/lhcb/tata')
# Add tokens if both storages support it
if self.__seTokenSupport(hopSrcSEName) and self.__seTokenSupport(hopDstSEName):
res = srcSE.getWLCGTokenPath(ftsFile.lfn)
if not res["OK"]:
return res
srcTokenPath = res["Value"]
res = gTokenManager.getToken(
userGroup="lhcb_data",
requiredTimeLeft=3600,
scope=[f"storage.read:/{srcTokenPath}", "offline_access"],
)
if not res["OK"]:
return res
srcToken = res["Value"]

res = dstSE.getWLCGTokenPath(ftsFile.lfn)
if not res["OK"]:
return res
dstTokenPath = res["Value"]
res = gTokenManager.getToken(
userGroup="lhcb_data",
requiredTimeLeft=3600,
scope=[f"storage.create:/{dstTokenPath}", "offline_access"],
)
if not res["OK"]:
return res
dstToken = res["Value"]

# because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433)
# the checksum needs to be lowercase. It does not impact the other
Expand All @@ -511,6 +521,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
filesize=ftsFile.size,
metadata=trans_metadata,
activity=self.activity,
source_token=srcToken,
destination_token=dstToken,
)

transfers.append(trans)
Expand Down
15 changes: 15 additions & 0 deletions src/DIRAC/Resources/Storage/StorageBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import shutil
import tempfile

from pathlib import Path

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Utilities.Pfn import pfnparse, pfnunparse
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
Expand Down Expand Up @@ -462,3 +464,16 @@ def getOccupancy(self, **kwargs):
finally:
# Clean the temporary dir
shutil.rmtree(tmpDirName)

def getWLCGTokenPath(self, lfn: str, wlcgTokenBasePath: str) -> str:
"""
Returns the path expected to be in a WLCG token
It basically consists of `basepath - tokenBasePath + LFN
The tokenBasePath is a configuration on the storage side.
"""

allDict = dict.fromkeys(["Protocol", "Host", "Port", "Path", "FileName", "Options"], "")
allDict.update({"Path": self.protocolParameters["Path"], "FileName": lfn.lstrip("/")})
fullPath = pfnunparse(allDict)["Value"]
return Path(fullPath).relative_to(Path(wlcgTokenBasePath))
16 changes: 15 additions & 1 deletion src/DIRAC/Resources/Storage/StorageElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from DIRAC.Core.Utilities import DErrno
from DIRAC.Core.Utilities.File import convertSizeUnits
from DIRAC.Core.Utilities.List import getIndexInList
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnSingleResult
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnSingleResult, convertToReturnValue
from DIRAC.Core.Utilities.TimeUtilities import toEpochMilliSeconds
from DIRAC.Resources.Storage.StorageFactory import StorageFactory
from DIRAC.Core.Utilities.Pfn import pfnparse
Expand Down Expand Up @@ -291,6 +291,7 @@ def __init__(self, name, protocolSections=None, vo=None, hideExceptions=False):
"isDirectory",
"isFile",
"getOccupancy",
"getWLCGTokenPath",
]

self.okMethods = [
Expand Down Expand Up @@ -952,6 +953,19 @@ def getLFNFromURL(self, urls):
# This is the generic wrapper for file operations
#

@convertToReturnValue
def getWLCGTokenPath(self, lfn: str):
wlcgTokenBasePath = self.options.get("WLCGTokenBasePath")
if not wlcgTokenBasePath:
raise ValueError("WLCGTokenBasePath not configured")

for storage in self.storages.values():
try:
return storage.getWLCGTokenPath(lfn, wlcgTokenBasePath)
except:
continue
raise RuntimeError("Could not get WLCGTokenPath")

def getURL(self, lfn, protocol=False, replicaDict=None):
"""execute 'getTransportURL' operation.
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/TransformationSystem/Client/Utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ def getPluginParam(self, name, default=None):
# First look at a generic value...
optionPath = f"TransformationPlugins/{name}"
value = Operations().getValue(optionPath, None)

self.logVerbose(f"Default plugin param {optionPath}: '{value}'")
# Then look at a plugin-specific value
optionPath = f"TransformationPlugins/{self.plugin}/{name}"
Expand Down

0 comments on commit 453a5fe

Please sign in to comment.