Skip to content

Commit

Permalink
Merge pull request #3127 from phicharp/v6r15-PhC-160930
Browse files Browse the repository at this point in the history
[v6r15] Minor fixes
  • Loading branch information
Andrei Tsaregorodtsev committed Oct 7, 2016
2 parents f23daca + 4bd05b0 commit 9c98151
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 36 deletions.
73 changes: 39 additions & 34 deletions DataManagementSystem/Agent/RequestOperations/RemoveFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,49 +73,54 @@ def __call__( self ):

# We check the status of the SE from the LFN that are successful
# No idea what to do with the others...
succ = res['Value']['Successful']
targetSEs = set( [se for lfn in succ for se in succ[lfn] ] )
replicas = res['Value']['Successful']
targetSEs = set( [se for lfn in replicas for se in replicas[lfn] ] )

bannedTargets = set()
if targetSEs:
bannedTargets = self.checkSEsRSS( targetSEs, access = 'RemoveAccess' )
if not bannedTargets['OK']:
gMonitor.addMark( "RemoveFileAtt" )
gMonitor.addMark( "RemoveFileFail" )
return bannedTargets

if bannedTargets['Value']:
return S_OK( "%s targets are banned for removal" % ",".join( bannedTargets['Value'] ) )
bannedTargets = set( bannedTargets['Value'] )
if bannedTargets and 'always banned' in self.operation.Error:
return S_OK( "%s targets are always banned for removal" % ",".join( sorted( bannedTargets ) ) )

# # prepare waiting file dict
toRemoveDict = dict( [ ( opFile.LFN, opFile ) for opFile in waitingFiles ] )
gMonitor.addMark( "RemoveFileAtt", len( toRemoveDict ) )

# # 1st step - bulk removal
self.log.debug( "bulk removal of %s files" % len( toRemoveDict ) )
bulkRemoval = self.bulkRemoval( toRemoveDict )
if not bulkRemoval["OK"]:
self.log.error( "Bulk file removal failed", bulkRemoval["Message"] )
else:
gMonitor.addMark( "RemoveFileOK", len( toRemoveDict ) - len( bulkRemoval["Value"] ) )
toRemoveDict = bulkRemoval["Value"]

# # 2nd step - single file removal
for lfn, opFile in toRemoveDict.items():
self.log.info( "removing single file %s" % lfn )
singleRemoval = self.singleRemoval( opFile )
if not singleRemoval["OK"]:
self.log.error( 'Error removing single file', singleRemoval["Message"] )
gMonitor.addMark( "RemoveFileFail", 1 )
# # We take only files that have no replica at the banned SEs... If no replica, don't
toRemoveDict = dict( ( opFile.LFN, opFile ) for opFile in waitingFiles if not bannedTargets.intersection( replicas.get( opFile.LFN, [] ) ) )

if toRemoveDict:
gMonitor.addMark( "RemoveFileAtt", len( toRemoveDict ) )
# # 1st step - bulk removal
self.log.debug( "bulk removal of %s files" % len( toRemoveDict ) )
bulkRemoval = self.bulkRemoval( toRemoveDict )
if not bulkRemoval["OK"]:
self.log.error( "Bulk file removal failed", bulkRemoval["Message"] )
else:
self.log.info( "file %s has been removed" % lfn )
gMonitor.addMark( "RemoveFileOK", 1 )

# # set
failedFiles = [ ( lfn, opFile ) for ( lfn, opFile ) in toRemoveDict.items()
if opFile.Status in ( "Failed", "Waiting" ) ]
if failedFiles:
self.operation.Error = "failed to remove %d files" % len( failedFiles )

gMonitor.addMark( "RemoveFileOK", len( toRemoveDict ) - len( bulkRemoval["Value"] ) )
toRemoveDict = bulkRemoval["Value"]

# # 2nd step - single file removal
for lfn, opFile in toRemoveDict.items():
self.log.info( "removing single file %s" % lfn )
singleRemoval = self.singleRemoval( opFile )
if not singleRemoval["OK"]:
self.log.error( 'Error removing single file', singleRemoval["Message"] )
gMonitor.addMark( "RemoveFileFail", 1 )
else:
self.log.info( "file %s has been removed" % lfn )
gMonitor.addMark( "RemoveFileOK", 1 )

# # set
failedFiles = [ ( lfn, opFile ) for ( lfn, opFile ) in toRemoveDict.items()
if opFile.Status in ( "Failed", "Waiting" ) ]
if failedFiles:
self.operation.Error = "failed to remove %d files" % len( failedFiles )

if bannedTargets:
return S_OK( "%s targets are banned for removal" % ",".join( sorted( bannedTargets ) ) )
return S_OK()

def bulkRemoval( self, toRemoveDict ):
Expand Down Expand Up @@ -147,7 +152,7 @@ def bulkRemoval( self, toRemoveDict ):
opFile.Status = "Done"

# # return files still waiting
toRemoveDict = dict( [ ( opFile.LFN, opFile ) for opFile in self.operation if opFile.Status == "Waiting" ] )
toRemoveDict = dict( ( lfn, opFile ) for lfn, opFile in toRemoveDict.iteritems() if opFile.Status == "Waiting" )
return S_OK( toRemoveDict )

def singleRemoval( self, opFile ):
Expand Down
7 changes: 5 additions & 2 deletions TransformationSystem/Client/FileReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ def commit( self ):
""" Commit pending file status update records
"""
if not self.statusDict:
return S_OK({})
return S_OK( {} )

return self.transClient.setFileStatusForTransformation( self.transformation, self.statusDict, force = self.force )
result = self.transClient.setFileStatusForTransformation( self.transformation, self.statusDict, force = self.force )
if result['OK']:
self.statusDict = {}
return result

def generateForwardDISET( self ):
""" Commit the accumulated records and generate request eventually
Expand Down

0 comments on commit 9c98151

Please sign in to comment.