Skip to content

Commit

Permalink
for file Operations (link,rename,mkdir,remove) in subscribers, separate
Browse files Browse the repository at this point in the history
the file op from the fileEvents, so that when fileEvents don't include
the operation, need to skip without falling through to file transfer.
working on #796
  • Loading branch information
petersilva committed Nov 6, 2023
1 parent 6ba871f commit ab43a40
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions sarracenia/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ def do_download(self) -> None:

if not os.path.isdir(msg['new_dir']):
try:
logger.info( "missing destination directories, makedirs: {msg['new_dir']} " )
self.worklist.directories_ok.append(msg['new_dir'])
os.makedirs(msg['new_dir'], 0o775, True)
except Exception as ex:
Expand All @@ -1486,6 +1487,7 @@ def do_download(self) -> None:

if 'fileOp' in msg :
if 'rename' in msg['fileOp']:

if 'renameUnlink' in msg:
self.removeOneFile(msg['fileOp']['rename'])
msg.setReport(201, 'old unlinked %s' % msg['fileOp']['rename'])
Expand All @@ -1503,7 +1505,12 @@ def do_download(self) -> None:
msg.setReport(201, 'renamed')
continue

elif ('directory' in msg['fileOp']) and ('remove' in msg['fileOp'] ) and ( 'rmdir' in self.o.fileEvents):
elif ('directory' in msg['fileOp']) and ('remove' in msg['fileOp'] ):
if 'rmdir' in self.o.fileEvents:
msg.setReport(202, "skipping rmdir %s" % new_path)
self.worklist.ok.append(msg)
continue

if self.removeOneFile(new_path):
msg.setReport(201, 'rmdired')
self.worklist.ok.append(msg)
Expand All @@ -1515,7 +1522,12 @@ def do_download(self) -> None:
self.reject(msg, 500, "rmdir %s failed" % new_path)
continue

elif ('remove' in msg['fileOp']) and ('delete' in self.o.fileEvents):
elif ('remove' in msg['fileOp']):
if 'delete' in self.o.fileEvents:
msg.setReport(202, "skipping delete %s" % new_path)
self.worklist.ok.append(msg)
continue

if self.removeOneFile(new_path):
msg.setReport(201, 'removed')
self.worklist.ok.append(msg)
Expand All @@ -1529,7 +1541,12 @@ def do_download(self) -> None:

# no elif because if rename fails and operation is an mkdir or a symlink..
# need to retry as ordinary creation, similar to normal file copy case.
if ('directory' in msg['fileOp']) and ('mkdir' in self.o.fileEvents):
if 'directory' in msg['fileOp']:
if 'mkdir' not in self.o.fileEvents:
msg.setReport(202, "skipping mkdir %s" % new_path)
self.worklist.ok.append(msg)
continue

if self.mkdir(msg):
msg.setReport(201, 'made directory')
self.worklist.ok.append(msg)
Expand All @@ -1539,7 +1556,12 @@ def do_download(self) -> None:
self.reject(msg, 500, "mkdir %s failed" % msg['new_file'])
continue

elif 'link' in msg['fileOp'] or 'hlink' in msg['fileOp'] and ('link' in self.o.fileEvents):
elif 'link' in msg['fileOp'] or 'hlink' in msg['fileOp']:
if 'link' not in self.o.fileEvents:
msg.setReport(202, "skipping link %s" % new_path)
self.worklist.ok.append(msg)
continue

if self.link1file(msg):
msg.setReport(201, 'linked')
self.worklist.ok.append(msg)
Expand Down Expand Up @@ -1914,9 +1936,10 @@ def download(self, msg, options) -> bool:
'AcceptSizeWrong download size mismatch, received %d of expected %d bytes for %s'
% (len_written, block_length, new_inflight_path))
else:
logger.error(
'incomplete download only %d of expected %d bytes for %s'
% (len_written, block_length, new_inflight_path))
if len_written > block_length:
logger.error( f'download more {len_written} than expected {block_length} bytes for {new_inflight_path}' )
else:
logger.error( f'incomplete download only {len_written} of expected {block_length} bytes for {new_inflight_path}' )
return False
# when len_written is different than block_length
msg['size'] = len_written
Expand Down Expand Up @@ -2143,13 +2166,13 @@ def send(self, msg, options):
if 'directory' in msg['fileOp'] :
if 'contentType' not in msg:
msg['contentType'] = 'text/directory'
if hasattr(self.proto[self.scheme], 'delete'):
if hasattr(self.proto[self.scheme], 'mkdir'):
logger.debug( f"message is to mkdir {new_file}")
if not self.o.dry_run:
self.proto[self.scheme].mkdir(new_file)
self.metrics['flow']['transferTxFiles'] += 1
return True
logger.error("%s, delete not supported" % self.scheme)
logger.error("%s, mkdir not supported" % self.scheme)
return False


Expand Down

0 comments on commit ab43a40

Please sign in to comment.