Skip to content

Commit

Permalink
Merge pull request #16549 from ggovi/conddb-utilities-conddb-tool-fix…
Browse files Browse the repository at this point in the history
…1-80X

fixes for the conddb copy functions
  • Loading branch information
cmsbuild committed Dec 6, 2016
2 parents 72dac4a + 433a9cb commit 0125196
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 55 deletions.
19 changes: 17 additions & 2 deletions CondCore/Utilities/python/conddblib.py
Expand Up @@ -273,6 +273,8 @@ class IOV:
'payload_hash':(DbRef(Payload,'hash'),_Col.pk) }


# the string 'GLOBAL' being a keyword in sqlalchemy ( upper case ), when used in the model cause the two GT tables to be unreadable ( bug )
# the only choice is to use lower case names, and rename the tables in sqlite after creation!!
class GlobalTag:
__tablename__ = 'global_tag'
columns = { 'name':(sqlalchemy.String(name_length),_Col.pk),
Expand All @@ -282,7 +284,6 @@ class GlobalTag:
'insertion_time':(sqlalchemy.TIMESTAMP,_Col.notNull),
'snapshot_time':(sqlalchemy.TIMESTAMP,_Col.notNull) }


class GlobalTagMap:
__tablename__ = 'global_tag_map'
columns = { 'global_tag_name':(DbRef(GlobalTag,'name'),_Col.pk),
Expand Down Expand Up @@ -348,6 +349,7 @@ def __init__(self, url, init=False):
'cms_orcon_prod',
'cmsintr_lb',
}
self._url = url
self._backendName = ('sqlite' if self._is_sqlite else 'oracle' )
self._schemaName = ( None if self._is_sqlite else schema_name )
logging.debug(' ... using db "%s", schema "%s"' % (url, self._schemaName) )
Expand All @@ -369,6 +371,7 @@ def get_dbtype(self,theType):
def session(self):
s = self._session()
s.get_dbtype = self.get_dbtype
s._is_sqlite = self._is_sqlite
return s

@property
Expand Down Expand Up @@ -428,7 +431,19 @@ def init(self, drop=False):
self.get_dbtype(GlobalTag).__table__.create(bind = self.engine)
self.get_dbtype(GlobalTagMap).__table__.create(bind = self.engine)
#self.metadata.create_all(self.engine)

if self.is_sqlite:
# horrible hack, but no choice because of the sqlalchemy bug ( see comment in the model)
import sqlite3
import string
conn = sqlite3.connect( self._url.database )
c = conn.cursor()
stmt = string.Template('ALTER TABLE $before RENAME TO $after')
c.execute( stmt.substitute( before=GlobalTag.__tablename__, after='TMP0' ) )
c.execute( stmt.substitute( before='TMP0', after=GlobalTag.__tablename__.upper() ) )
c.execute( stmt.substitute( before=GlobalTagMap.__tablename__, after='TMP1' ) )
c.execute( stmt.substitute( before='TMP1', after=GlobalTagMap.__tablename__.upper() ) )
conn.commit()
conn.close()
# TODO: Create indexes
#logger.debug('Creating indexes...')

Expand Down
130 changes: 77 additions & 53 deletions CondCore/Utilities/scripts/conddb
Expand Up @@ -123,7 +123,6 @@ def _run_editor(editor, tempfd):


def _parse_timestamp(timestamp):

try:
return datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
except ValueError:
Expand Down Expand Up @@ -403,7 +402,9 @@ def output_table(args, table, headers, filters=None, output_file=None, no_first_
filters = [None] * len(headers)

def max_length_filter(s):
s = str(s).replace('\n', '\\n')
#s = str(s).replace('\n', '\\n')
s = str(s).replace('\n', ' ')
s = s.replace(chr(13),' ')
return '%s...' % s[:conddb.name_length] if ( len(s) > conddb.name_length and not no_max_length ) else s

new_table = [[] for i in range(len(table))]
Expand Down Expand Up @@ -901,39 +902,49 @@ def diff(args):
if not (is_tag1 and is_tag2) and not (is_global_tag1 and is_global_tag2):
raise Exception('There are no tag or global tag pairs named %s and %s in the database(s).' % (args.first, args.second))

def convertRunToTimes(startRun, stopRun=None):

if not stopRun :
stopRun = startRun + 1

startTime1, stopTime1 = runToTime(startRun)
startTime2, stopTime2 = runToTime(stopRun)

timeMap = { 'start' : {
def convertRunToTimes( fromRun, toRun ):

fromTime = None
fromLumi = None
toTime = None
toLumi = None
# the time we get may be a bit delayed (7-10 sec according to Salvatore)
if not fromRun is None:
startTime1, stopTime1 = runToTime( fromRun )
fromTime = startTime1-15.
fromLumi = fromRun<<32|0x1
if not toRun is None:
startTime2, stopTime2 = runToTime( toRun )
toTime = stopTime2+15.
toLumi = toRun<<32|0x1

timeMap = { 'from' : {
'hash' : None,
'run' : startRun,
'time' : startTime1-15., # the time we get may be a bit delayed (7-10 sec according to Salvatore)
'lumi' : startRun<<32|0x1,
'run' : fromRun,
'time' : fromTime, # the time we get may be a bit delayed (7-10 sec according to Salvatore)
'lumi' : fromLumi,
},
'stop' : {
'to' : {
'hash' : None,
'run' : stopRun,
'time' : stopTime2+15., # the time we get may be a bit delayed (7-10 sec according to Salvatore)
'lumi' : stopRun<<32|0x1,
'run' : toRun,
'time' : toTime, # the time we get may be a bit delayed (7-10 sec according to Salvatore)
'lumi' : toLumi,
}
}

logging.debug("convertRunToTimes> start: %s stop %s \n timeMap: %s " % (startRun, stopRun, str(timeMap)))
logging.debug("convertRunToTimes> start: %s stop %s \n timeMap: %s " % (fromRun, toRun, str(timeMap)))

return timeMap

def runToTime(runNr):

connStr = conddb._getCMSFrontierSQLAlchemyConnectionString('PromptProd', 'CMS_CONDITIONS')

connection = conddb.connect(connStr)
url = conddb.make_url( connStr, True )
connection = conddb.connect(url)
session = connection.session()

IOV = session.get_dbtype(conddb.IOV)

startIOV = session.query(IOV.insertion_time).filter(IOV.tag_name == 'runinfo_start_31X_hlt', IOV.since == runNr).all()
stopIOV = session.query(IOV.insertion_time).filter(IOV.tag_name == 'runinfo_31X_hlt', IOV.since == runNr).all()

Expand All @@ -950,30 +961,36 @@ def _update_tag_log(session,the_tag,the_timestamp,the_action,note):
TagLog = session.get_dbtype(conddb.TagLog)
session.add(TagLog(tag_name=the_tag, event_time=the_timestamp, action=the_action, user_name=userName, host_name=hostName, command=userCommand, user_text=note ))

def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromIOV=None, toIOV=None, timeMap=None):
logging.info('Copying tag %s to %s ...', str_db_object(args.db, first), str_db_object(args.destdb, second))

def _copy_tag(args, copyTime, session1, session2, first, second, fromIOV=None, toIOV=None, timeMap=None):
Tag1 = session1.get_dbtype(conddb.Tag)
Tag2 = session2.get_dbtype(conddb.Tag)
# Copy the tag
tag = _rawdict(session1.query(Tag1).get(first))
tag['name'] = second
if isSQLite:

if session2._is_sqlite:
if tag['end_of_validity'] >= maxSince:
tag['end_of_validity'] = -1
else:
if tag['end_of_validity'] == -1:
if tag['end_of_validity'] == -1 or tag['end_of_validity'] > maxSince :
tag['end_of_validity'] = maxSince
tag['insertion_time'] = copyTime
tag['modification_time'] = copyTime

if timeMap:
fromIOV = timeMap['from'][ tag['time_type'].lower().strip() ]
toIOV = timeMap['to'] [ tag['time_type'].lower().strip() ]

if fromIOV is None:
fromIOV = 1

if timeMap and not fromIOV and not toIOV:
fromIOV = timeMap['start'][ tag['time_type'].lower().strip() ]
toIOV = timeMap['stop'] [ tag['time_type'].lower().strip() ]
selectStr = 'from since=%s' %fromIOV
if toIOV is not None:
selectStr += ' to since=%s' %toIOV
if args.snapshot is not None:
selectStr += ' selecting insertion time < %s' %args.snapshot

logging.info('Copying tag %s to %s %s', str_db_object(args.db, first), str_db_object(args.destdb, second), selectStr)

query = session2.query(Tag2).filter(Tag2.name == second )
destExists = False
Expand Down Expand Up @@ -1029,27 +1046,7 @@ def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromI
scalar()
logging.debug('The closest smaller IOV than the given starting one (--from %s) is %s...', fromVal, prev_iov)


# Copy the distinct payloads referenced in the IOVs of the tag
# FIXME: Put the DISTINCT query as a subquery (we can't directly use distinct on BLOBs)
query = session1.query(IOV1.payload_hash).filter(IOV1.tag_name == first)
if prev_iov is not None:
query = query.filter(IOV1.since >= prev_iov)
if toIOV is not None:
toVal = toIOV
logging.debug("filtering with TO %s of type %s for tag: %s to " % (toIOV, tag['time_type'], str(tag['name'])) )
query = query.filter(IOV1.since <= toVal)
query = query.distinct()
for (payload_hash, ) in query:
if _exists(session2, Payload2.hash, payload_hash):
logging.info('Skipping copy of payload %s to %s since it already exists...', str_db_object(args.db, payload_hash), str_db_object(args.destdb, payload_hash))
else:
logging.info('Copying payload %s to %s ...', str_db_object(args.db, payload_hash), str_db_object(args.destdb, payload_hash))
payload = _rawdict(session1.query(Payload1).filter(Payload1.hash == payload_hash).one())
payload['insertion_time'] = copyTime
session2.add(Payload2(** payload))

# Copy the IOVs of the tag
# Select the input IOVs
query = session1.query(IOV1).filter(IOV1.tag_name == first)
if prev_iov is not None:
query = query.filter(IOV1.since >= prev_iov)
Expand All @@ -1058,6 +1055,7 @@ def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromI
query = query.filter(_inserted_before(IOV1,args.snapshot))
query = query.order_by(IOV1.since, IOV1.insertion_time.desc())
iovs = {}
hashes = set()
for iov in query:
iov = _rawdict(iov)
iov['tag_name'] = second
Expand All @@ -1073,10 +1071,28 @@ def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromI
first_iov = False

since = iov['since']

if since not in iovs.keys():
# updating the insertion time to the execution time.
# Because of that, for a given since, only the most recent will be added.
iovs[since] = iov['payload_hash']
hashes.add( iov['payload_hash'] )
else:
logging.warning('Skipping older iov with since %s...', since)
logging.info('Found %s iovs and %s referenced payloads to copy.',len(iovs.keys()), len(hashes))
# Copy the payloads referenced in the selected iovs
np = 0
for h in hashes:
if _exists(session2, Payload2.hash, h):
logging.info('Skipping copy of payload %s to %s since it already exists...', str_db_object(args.db, h), str_db_object(args.destdb, h))
else:
logging.info('Copying payload %s to %s ...', str_db_object(args.db, h), str_db_object(args.destdb, h))
payload = _rawdict(session1.query(Payload1).filter(Payload1.hash == h).one())
payload['insertion_time'] = copyTime
session2.add(Payload2(** payload))
np += 1
logging.info('%s payload(s) copied.',np)
# Calculate if extra iovs are needed - for the override mode ( they will have already their payloads copied )
extraiovs = {}
if args.override:
# the interval to be overriden is defined by the new iov set boundaries,
Expand All @@ -1098,7 +1114,7 @@ def _copy_tag(args, copyTime, session1, session2, isSQLite, first, second, fromI
if newSince < since:
extraiovs[since] = iovs[newSince]
break

# Copy the set of IOVs collected
niovs = 0
for k,v in iovs.items():
logging.debug('Copying IOV %s -> %s...', k, v)
Expand Down Expand Up @@ -1148,7 +1164,7 @@ def copy(args):

copyTime = datetime.datetime.now()

niovs = _copy_tag(args, copyTime, session1, session2, connection2.is_sqlite, args.first, args.second, getattr(args, 'from'), args.to)
niovs = _copy_tag(args, copyTime, session1, session2, args.first, args.second, getattr(args, 'from'), args.to)

_confirm_changes(args)
note = args.note
Expand All @@ -1165,6 +1181,7 @@ def copy(args):
if args.second is None:
args.second = args.first

# 'from' is a keyword!
timeMap = convertRunToTimes(getattr(args, 'from'), args.to)

logging.info('Copying global tag %s to %s ...', str_db_object(args.db, args.first), str_db_object(args.destdb, args.second))
Expand All @@ -1177,6 +1194,12 @@ def copy(args):
global_tag = _rawdict(session1.query(GlobalTag1).get(args.first))
global_tag['name'] = args.second
global_tag['validity'] = 0 # XXX: SQLite does not work with long ints...
if args.snapshot is None:
args.snapshot = str(global_tag['snapshot_time'].strftime("%Y-%m-%d %H:%M:%S"))
else:
global_tag['snapshot_time'] = _parse_timestamp(args.snapshot)
if _exists(session2, GlobalTag2.name, args.second):
raise Exception('A GlobalTag named "%s" already exists in %s' %(args.second, args.destdb))
session2.add(GlobalTag2(**global_tag))

# Copy the tags of the global tag
Expand All @@ -1189,7 +1212,8 @@ def copy(args):
logging.warn('Skipping copy of tag %s to %s since it already exists... *The tags may differ in content*', str_db_object(args.db, tag), str_db_object(args.destdb, tag))
else:
logging.debug('going to copy tag %s to %s ... ', str_db_object(args.db, tag), str_db_object(args.destdb, tag))
_copy_tag(args, session1, session2, tag, tag, timeMap=timeMap)
copyTime = datetime.datetime.now()
_copy_tag(args, copyTime, session1, session2, tag, tag, timeMap=timeMap)

# Copy the map of the global tag
query = session1.query(GlobalTagMap1).filter(GlobalTagMap1.global_tag_name == args.first)
Expand Down

0 comments on commit 0125196

Please sign in to comment.