Skip to content

Commit

Permalink
clients: getting rsemgr upload to the uploadclient - rucio#3084
Browse files Browse the repository at this point in the history
  • Loading branch information
TomasJavurek committed Nov 28, 2019
1 parent fa2c75d commit 694ffb6
Showing 1 changed file with 73 additions and 55 deletions.
128 changes: 73 additions & 55 deletions lib/rucio/client/uploadclient.py
Expand Up @@ -33,10 +33,11 @@
import time

from rucio.client.client import Client
from rucio.common.exception import (RucioException, RSEBlacklisted, DataIdentifierAlreadyExists,
DataIdentifierNotFound, NoFilesUploaded, NotAllFilesUploaded,
ResourceTemporaryUnavailable, ServiceUnavailable, InputValidationError)
from rucio.common.utils import adler32, execute, generate_uuid, md5, send_trace
from rucio.common.config import config_get_int
from rucio.common.exception import (RucioException, RSEBlacklisted, RSEChecksumUnavailable, RSEOperationNotSupported,
DataIdentifierAlreadyExists, DataIdentifierNotFound, NoFilesUploaded, NotAllFilesUploaded,
FileReplicaAlreadyExists, ResourceTemporaryUnavailable, ServiceUnavailable, InputValidationError)
from rucio.common.utils import adler32, execute, generate_uuid, make_valid_did, md5, send_trace
from rucio.rse import rsemanager as rsemgr
from rucio import version

Expand Down Expand Up @@ -130,7 +131,9 @@ def upload(self, items, summary_file_path=None, traces_copy_out=None):
registered_dataset_dids = set()
num_succeeded = 0
summary = []

summary_exception = {'PFN missing for NON-DETERMINISTIC DST': 0,
'Both DID and REPLICA exist': 0,
'REPLICA exists at DST': 0}
for file in files:
basename = file['basename']
logger.info('Preparing upload for file %s' % basename)
Expand Down Expand Up @@ -161,6 +164,7 @@ def upload(self, items, summary_file_path=None, traces_copy_out=None):
is_deterministic = rse_settings.get('deterministic', True)
if not is_deterministic and not pfn:
logger.error('PFN has to be defined for NON-DETERMINISTIC RSE.')
summary_exception['PFN missing for NON-DETERMINISTIC DST'] += 1
continue
if pfn and is_deterministic:
logger.warning('Upload with given pfn implies that no_register is True, except non-deterministic RSEs')
Expand All @@ -177,6 +181,7 @@ def upload(self, items, summary_file_path=None, traces_copy_out=None):
self.client.get_did(file['did_scope'], file['did_name'])
logger.info('File already registered. Skipping upload.')
trace['stateReason'] = 'File already exists'
summary_exception['Both DID and REPLICA exist'] += 1
continue
except DataIdentifierNotFound:
logger.info('File already exists on RSE. Previous left overs will be overwritten.')
Expand All @@ -185,15 +190,18 @@ def upload(self, items, summary_file_path=None, traces_copy_out=None):
if rsemgr.exists(rse_settings, pfn):
logger.info('File already exists on RSE with given pfn. Skipping upload. Existing replica has to be removed first.')
trace['stateReason'] = 'File already exists'
summary_exception['REPLICA exists at DST'] += 1
continue
elif rsemgr.exists(rse_settings, file_did):
logger.info('File already exists on RSE with different pfn. Skipping upload.')
trace['stateReason'] = 'File already exists'
summary_exception['REPLICA exists at DST'] += 1
continue
else:
if rsemgr.exists(rse_settings, pfn if pfn else file_did):
logger.info('File already exists on RSE. Skipping upload')
trace['stateReason'] = 'File already exists'
summary_exception['REPLICA exists at DST'] += 1
continue
protocols = rsemgr.get_protocols_ordered(rse_settings=rse_settings, operation='write', scheme=force_scheme)
protocols.reverse()
Expand All @@ -217,16 +225,16 @@ def upload(self, items, summary_file_path=None, traces_copy_out=None):
trace['protocol'] = cur_scheme
trace['transferStart'] = time.time()
try:
state = rsemgr.upload(rse_settings=rse_settings,
lfns=lfn,
source_dir=file['dirname'],
force_scheme=cur_scheme,
force_pfn=pfn,
transfer_timeout=file.get('transfer_timeout'),
delete_existing=delete_existing,
sign_service=sign_service)
success = state['success']
file['upload_result'] = state
self._upload_item(rse_settings=rse_settings,
lfn=lfn,
source_dir=file['dirname'],
force_scheme=cur_scheme,
force_pfn=pfn,
transfer_timeout=file.get('transfer_timeout'),
delete_existing=delete_existing,
sign_service=sign_service)
success = True
file['upload_result'] = {0: True, 1: None, 'success': True, 'pfn': pfn} # needs to be removed
except (ServiceUnavailable, ResourceTemporaryUnavailable) as error:
logger.warning('Upload attempt failed')
logger.debug('Exception: %s' % str(error))
Expand Down Expand Up @@ -281,9 +289,9 @@ def upload(self, items, summary_file_path=None, traces_copy_out=None):
json.dump(final_summary, summary_file, sort_keys=True, indent=1)

if num_succeeded == 0:
raise NoFilesUploaded()
raise NoFilesUploaded(str(summary_exception))
elif num_succeeded != len(files):
raise NotAllFilesUploaded()
raise NotAllFilesUploaded(str(summary_exception))
return 0

def _register_file(self, file, registered_dataset_dids):
Expand Down Expand Up @@ -496,16 +504,11 @@ def _upload_item(self, rse_settings, lfn, source_dir=None, force_pfn=None, force
:param sign_service: use the given service (e.g. gcs, s3, swift) to sign the URL
:raises RucioException(msg): general exception with msg for more details.
"""
# TODO:
# - deletion of previous attempts is done with delete protocl, but list is taken from read protocol
# - exceptions
# - checksums

"""
logger = self.logger

# Construct protocol for write and read operation.
protocol_write = self._create_protocol(rse_settings, 'write', scheme=force_scheme)
protocol_write = self._create_protocol(rse_settings, 'write', force_scheme=force_scheme)
protocol_read = self._create_protocol(rse_settings, 'read')

base_name = lfn.get('filename', lfn['name'])
Expand All @@ -517,34 +520,40 @@ def _upload_item(self, rse_settings, lfn, source_dir=None, force_pfn=None, force
logger.warning('Missing checksum for file %s:%s' % (lfn['scope'], name))

# Getting pfn
pfn = None
readpfn = None
try:
pfn = list(protocol_write.lfns2pfns(make_valid_did(lfn)).values())[0]
readpfn = list(protocol_read.lfns2pfns(make_valid_did(lfn)).values())[0]
except Exception as error:
logger.warning('Failed to create PFN for LFN: %s' % (log_prefix, lfn))
pfn = force_pfn if force_pfn
readpfn = pfn
logger.warning('Failed to create PFN for LFN: %s' % lfn)
logger.warning(str(error))
if force_pfn:
pfn = force_pfn
readpfn = pfn

# Auth. mostly for object stores
if sign_service:
pfn = self.client.get_signed_url(sign_service, 'write', pfn) # NOQA pylint: disable=undefined-variable
readpfn = self.client.get_signed_url(sign_service, 'read', pfn) # NOQA pylint: disable=undefined-variable

# Create a name of tmp file if renaming operation is supported
pfn_tmp = '%s.rucio.upload' % pfn if protocol_write.renaming else: pfn
pfn_tmp = '%s.rucio.upload' % pfn if protocol_write.renaming else pfn
readpfn_tmp = '%s.rucio.upload' % readpfn if protocol_write.renaming else readpfn

# Either DID eixsts or not register_after_upload
if protocol_write.overwrite is False and delete_existing is False and protocol_read.exists(pfn):
raise exception.FileReplicaAlreadyExists('File %s in scope %s already exists on storage as PFN %s' % (name, scope, pfn)) # wrong exception ?
# Either DID eixsts or not register_after_upload
if protocol_write.overwrite is False and delete_existing is False and protocol_read.exists(readpfn):
raise FileReplicaAlreadyExists('File %s in scope %s already exists on storage as PFN %s' % (name, scope, pfn)) # wrong exception ?

# Removing tmp from earlier attempts
if protocol_read.exists('%s.rucio.upload' % pfn):
if protocol_read.exists('%s.rucio.upload' % readpfn):
try:
# Construct protocol for delete operation.
protocol_delete = self._create_protocol(rse_settings, 'delete')
protocol_delete.delete('%s.rucio.upload' % list(protocol_delete.lfns2pfns(make_valid_did(lfn)).values())[0])
protocol_delete.close()
except Exception as e:
raise exception.RSEOperationNotSupported('Unable to remove temporary file %s.rucio.upload: %s' % (pfn, str(e)))
raise RSEOperationNotSupported('Unable to remove temporary file %s.rucio.upload: %s' % (pfn, str(e)))

# Removing not registered files from earlier attempts
if delete_existing:
Expand All @@ -554,51 +563,60 @@ def _upload_item(self, rse_settings, lfn, source_dir=None, force_pfn=None, force
protocol_delete.delete('%s' % list(protocol_delete.lfns2pfns(make_valid_did(lfn)).values())[0])
protocol_delete.close()
except Exception as error:
raise exception.RSEOperationNotSupported('Unable to remove file %s: %s' % (pfn, str(error))
raise RSEOperationNotSupported('Unable to remove file %s: %s' % (pfn, str(error)))

# Process the upload of the tmp file
try:
protocol_write.put(base_name, pfn_tmp, source_dir, transfer_timeout=transfer_timeout)
except Exception as error:
raise error

# Checksum verification, obsolete, see Gabriele changes.
# Checksum verification, obsolete, see Gabriele changes.
try:
stats = None
# Get the stats of given pfn through several attempts
retries = config_get_int('client', 'protocol_stat_retries', raise_exception=False, default=6) # needs to be hardcoded?
for attempt in range(retries):
try:
stats = protocol.stat(pfn)
except exception.RSEChecksumUnavailable as error:
# The stat succeeded here, but the checksum failed
raise error
except Exception as e:
sleep(2**attempt)
stats = self._retry_protocol_stat(protocol_read, readpfn_tmp)
if not isinstance(stats, dict):
raise exception.RucioException('Could not get protocol.stats for given PFN: %s' pfn)
raise RucioException('Could not get protocol.stats for given PFN: %s' % pfn)

# The checksum and filesize check
if ('filesize' in stats) and ('filesize' in lfn):
if stats['filesize'] != lfn['filesize']:
raise exception.RucioException('Checksum after upload does not match the origin.')
if int(stats['filesize']) != int(lfn['filesize']):
raise RucioException('Filesize after upload does not match the origin.')
if rse_settings['verify_checksum'] is not False:
if ('adler32' in stats) and ('adler32' in lfn):
if stats['adler32'] != lfn['adler32']:
raise exception.RucioException('Checksum after upload does not match the origin.')
raise RucioException('Checksum after upload does not match the origin.')

except Exception as error:
raise error
raise error

# The upload finished successful and the file can be renamed
try:
if protocol_write.renaming:
protocol_write.rename(pfn_tmp, pfn)
except Exception as e:
raise exception.RucioException('Unable to rename the tmp file %s.' % pfn_tmp)
raise RucioException('Unable to rename the tmp file %s.' % pfn_tmp)

protocol_write.close()
protocol_read.close()

def _retry_protocol_stat(self, protocol, pfn):
"""
try to stat file, on fail try again 1s, 2s, 4s, 8s, 16s, 32s later. Fail is all fail
:param protocol The protocol to use to reach this file
:param pfn Physical file name of the target for the protocol stat
"""
retries = config_get_int('client', 'protocol_stat_retries', raise_exception=False, default=6)
for attempt in range(retries):
try:
stats = protocol.stat(pfn)
return stats
except RSEChecksumUnavailable as error:
# The stat succeeded here, but the checksum failed
raise error
except Exception as error:
time.sleep(2**attempt)
return protocol.stat(pfn)

def _create_protocol(self, rse_settings, operation, force_scheme=None):
"""
Protol construction.
Expand All @@ -607,10 +625,10 @@ def _create_protocol(self, rse_settings, operation, force_scheme=None):
:param: force_scheme custom scheme
"""
try:
protocol = rsemgr.create_protocol(rse_settings, 'write', scheme=force_scheme)
protocol = rsemgr.create_protocol(rse_settings, operation, scheme=force_scheme)
protocol.connect()
except Exception as error:
self.logger.warning('%sFailed to create protocol for LFN: %s' % (log_prefix, lfn))
self.logger.debug('scheme: %s, exception: %s' % (scheme, error))
self.logger.warning('Failed to create protocol for operation: %s' % operation)
self.logger.debug('scheme: %s, exception: %s' % (force_scheme, error))
raise error
return protocol

0 comments on commit 694ffb6

Please sign in to comment.