Skip to content

Commit

Permalink
Merge remote-tracking branch 'release/integration' into v6r18-fixes-6
Browse files Browse the repository at this point in the history
* release/integration: (45 commits)
  support for MySQL 5.6
  mostly pylints
  support for MySQL 5.6
  minor fixes
  timedelta fix 3
  Timedelta fix 2
  Fixed timedelta
  Added S_OK()
  Fixed getSiteMask() call
  Removed double call of selectStatusElement()
  Sites can now be degraded & Updated getUsableSites()
  v6r17p14 notes
  Strip right paddings
  Added method to get both SEs and CEs
  Fixed policy configuration and removed hardcoded return
  Reorder the import
  Updated documentation
  Add error logging in RemovaReplica operation handler
  Add a script to list the requests in the ReqProxies
  Mock SiteClient
  ...
  • Loading branch information
fstagni committed Mar 22, 2017
2 parents ff3ee86 + 1ebea7a commit 6093c7b
Show file tree
Hide file tree
Showing 37 changed files with 988 additions and 284 deletions.
39 changes: 22 additions & 17 deletions AccountingSystem/DB/AccountingDB.py
Expand Up @@ -471,7 +471,7 @@ def deleteType( self, typeName ):
if not retVal[ 'OK' ]:
return retVal
retVal = self._update( "DELETE FROM `%s` WHERE name='%s'" % ( _getTableName( "catalog", "Types" ), typeName ) )
del( self.dbCatalog[ typeName ] )
del self.dbCatalog[ typeName ]
return S_OK()

def __getIdForKeyValue( self, typeName, keyName, keyValue, conn = False ):
Expand Down Expand Up @@ -843,10 +843,12 @@ def __extractFromBucket( self, typeName, startTime, bucketLength, keyValues, buc
value = bucketValues[ pos ]
fullFieldName = "`%s`.`%s`" % ( tableName, valueField )
sqlValList.append( "%s=GREATEST(0,%s-(%s*%s))" % ( fullFieldName, fullFieldName, value, proportion ) )
sqlValList.append( "`%s`.`entriesInBucket`=GREATEST(0,`%s`.`entriesInBucket`-(%s*%s))" % ( tableName,
tableName,
bucketValues[-1],
proportion ) )
sqlValList.append( "`%s`.`entriesInBucket`=GREATEST(0,`%s`.`entriesInBucket`-(%s*%s))" % (
tableName,
tableName,
bucketValues[-1],
proportion
))
cmd += ", ".join( sqlValList )
cmd += " WHERE `%s`.`startTime`='%s' AND `%s`.`bucketLength`='%s' AND " % (
tableName,
Expand Down Expand Up @@ -1217,18 +1219,21 @@ def __slowCompactBucketsForType( self, typeName ):
secondsLimit = self.dbBucketsLength[ typeName ][ bPos ][0]
bucketLength = self.dbBucketsLength[ typeName ][ bPos ][1]
timeLimit = ( nowEpoch - nowEpoch % bucketLength ) - secondsLimit
nextBucketLength = self.dbBucketsLength[ typeName ][ bPos + 1 ][1]
self.log.info( "[COMPACT] Compacting data newer that %s with bucket size %s for %s" % ( Time.fromEpoch( timeLimit ), bucketLength, typeName ) )
self.log.info( "[COMPACT] Compacting data newer that %s with bucket size %s for %s" % (
Time.fromEpoch( timeLimit ),
bucketLength,
typeName))
querySize = 10000
previousRecordsSelected = querySize
totalCompacted = 0
while previousRecordsSelected == querySize:
#Retrieve the data
self.log.info( "[COMPACT] Retrieving buckets to compact newer that %s with size %s" % ( Time.fromEpoch( timeLimit ),
bucketLength ) )
self.log.info( "[COMPACT] Retrieving buckets to compact newer that %s with size %s" % (
Time.fromEpoch( timeLimit ),
bucketLength))
roundStartTime = time.time()
result = self.__selectIndividualForCompactBuckets( typeName, timeLimit, bucketLength,
nextBucketLength, querySize )
querySize )
if not result[ 'OK' ]:
#self.__rollbackTransaction( connObj )
return result
Expand Down Expand Up @@ -1265,7 +1270,7 @@ def __slowCompactBucketsForType( self, typeName ):
#return self.__commitTransaction( connObj )
return S_OK()

def __selectIndividualForCompactBuckets( self, typeName, timeLimit, bucketLength, nextBucketLength, querySize, connObj = False ):
def __selectIndividualForCompactBuckets( self, typeName, timeLimit, bucketLength, querySize, connObj = False ):
"""
Nasty SQL query to get ideal buckets using grouping by date calculations and adding value contents
"""
Expand Down Expand Up @@ -1398,10 +1403,11 @@ def regenerateBuckets( self, typeName ):
whereString = "%s <= %d" % ( endTimeTableField,
endRangeTime )
else:
whereString = "%s > %d AND %s <= %d" % ( startTimeTableField,
startRangeTime,
endTimeTableField,
endRangeTime )
whereString = "%s > %d AND %s <= %d" % (
startTimeTableField,
startRangeTime,
endTimeTableField,
endRangeTime)
sameBucketCondition = "(%s) = (%s)" % ( bucketizedStart, bucketizedEnd )
#Records that fit in a bucket
sqlQuery = "SELECT %s, %s, COUNT(%s) FROM `%s` WHERE %s AND %s GROUP BY %s, %s" % (
Expand All @@ -1412,8 +1418,7 @@ def regenerateBuckets( self, typeName ):
whereString,
sameBucketCondition,
groupingString,
bucketizedStart
)
bucketizedStart)
sqlQueries.append( sqlQuery )
#Records that fit in more than one bucket
sqlQuery = "SELECT %s, %s, %s, 1 FROM `%s` WHERE %s AND NOT %s" % ( startTimeTableField,
Expand Down
24 changes: 17 additions & 7 deletions AccountingSystem/Service/DataStoreHandler.py
Expand Up @@ -7,9 +7,10 @@
from DIRAC import S_OK, S_ERROR, gConfig, gLogger
from DIRAC.AccountingSystem.DB.MultiAccountingDB import MultiAccountingDB
from DIRAC.ConfigurationSystem.Client import PathFinder
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.DISET.RequestHandler import RequestHandler,getServiceOption
from DIRAC.Core.Utilities import Time
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC.Core.DISET.RPCClient import RPCClient

__RCSID__ = "$Id$"

Expand All @@ -21,11 +22,14 @@ class DataStoreHandler( RequestHandler ):
def initializeHandler( cls, svcInfoDict ):
multiPath = PathFinder.getDatabaseSection( "Accounting/MultiDB" )
cls.__acDB = MultiAccountingDB( multiPath )
cls.__acDB.autoCompactDB() #pylint: disable=no-member
result = cls.__acDB.markAllPendingRecordsAsNotTaken() #pylint: disable=no-member
if not result[ 'OK' ]:
return result
gThreadScheduler.addPeriodicTask( 60, cls.__acDB.loadPendingRecords ) #pylint: disable=no-member
#we can run multiple services in read only mode. In that case we do not bucket
cls.runBucketing = getServiceOption( svcInfoDict, 'RunBucketing', True )
if cls.runBucketing:
cls.__acDB.autoCompactDB() #pylint: disable=no-member
result = cls.__acDB.markAllPendingRecordsAsNotTaken() #pylint: disable=no-member
if not result[ 'OK' ]:
return result
gThreadScheduler.addPeriodicTask( 60, cls.__acDB.loadPendingRecords ) #pylint: disable=no-member
return S_OK()

types_registerType = [ basestring, list, list, list ]
Expand Down Expand Up @@ -146,7 +150,13 @@ def export_compactDB( self ):
"""
Compact the db by grouping buckets
"""
return self.__acDB.compactBuckets() #pylint: disable=no-member
#if we are running slaves (not only one service) we can redirect the request to the master
#For more information please read the Administrative guide Accounting part!
#ADVICE: If you want to trigger the bucketing, please make sure the bucketing is not running!!!!
if self.runBucketing:
return self.__acDB.compactBuckets() #pylint: disable=no-member
else:
return RPCClient('Accounting/DataStoreMaster').compactDB()

types_remove = [ basestring, datetime.datetime, datetime.datetime, list ]
def export_remove( self, typeName, startTime, endTime, valuesList ):
Expand Down

0 comments on commit 6093c7b

Please sign in to comment.