Skip to content

Commit

Permalink
Fix database purge code in analytics-api
Browse files Browse the repository at this point in the history
This patch fixes the following issues in db_purge() method.

1) The key type for MESSAGE_TABLE_TIMESTAMP is different from the
   other message index tables.

   pycassaShell output:

   >>> MESSAGETABLETIMESTAMP.key_validation_class
   'IntegerType'

   >>> MESSAGETABLESOURCE.key_validation_class
   'CompositeType(IntegerType, UTF8Type)'

   >>> MESSAGETABLEMESSAGETYPE.key_validation_class
   'CompositeType(IntegerType, UTF8Type)'

   The purge function always expects composite key and tries to extract
   the first element (timestamp) from the key.
   But for MESSAGE_TABLE_TIMESTAMP, the key is of type integer
   and hence it raises exception. Side-effect of this exception:
   data from the table MESSAGE_TABLE_TIMESTAMP not purged.

2) object identity check fails for table.
   => if (table is MESSAGE_TABLE_SOURCE):
   The above check always fails due to object id mismatch.
   Therefore, data is not purged from the MessageTable.
   Replaced object identity check with equality check.

Change-Id: I51d619ac5275acf737094b7cdf36bb3d462fcf81
Closes-Bug: #1487966
  • Loading branch information
Sundaresan Rajangam committed Oct 27, 2015
1 parent 444742c commit 95077da
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 30 deletions.
29 changes: 15 additions & 14 deletions src/opserver/analytics_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,21 +276,27 @@ def db_purge(self, purge_cutoff, purge_id):
b = cf.batch()
try:
# get all columns only in case of one message index table
if (table is MESSAGE_TABLE_SOURCE):
if table == MESSAGE_TABLE_SOURCE:
cols_to_fetch = 1000000
else:
cols_to_fetch = 1

for key, cols in cf.get_range(column_count=cols_to_fetch):
t2 = key[0]
# key is of type integer for MESSAGE_TABLE_TIMESTAMP.
# For other tables, key is a composite type with
# first element being timestamp (integer).
if table == MESSAGE_TABLE_TIMESTAMP:
t2 = key
else:
t2 = key[0]
# each row will have equivalent of 2^23 = 8388608 usecs
row_time = (float(t2)*pow(2, RowTimeInBits))
if (row_time < purge_time):
per_table_deleted +=1
total_rows_deleted +=1
if (table is MESSAGE_TABLE_SOURCE):
if table == MESSAGE_TABLE_SOURCE:
# get message table uuids to delete
del_msg_uuids.append(list(cols.values()))
del_msg_uuids.extend(cols.values())
try:
b.remove(key)
except Exception as e:
Expand Down Expand Up @@ -318,26 +324,21 @@ def db_purge(self, purge_cutoff, purge_id):
"doesnot have uuid %s" % (purge_id, e))
purge_error_details.append("Exception: Message table "
"doesnot have uuid %s" % (e))


except Exception as e:
self._logger.error("Exception: Purge_id:%s table:%s "
"error: %s" % (purge_id, table, e))
purge_error_details.append("Exception: Table:%s "
"error: %s" % (table, e))
continue
self._logger.info("Purge_id %s deleted %d rows from table: %s"
% (purge_id, per_table_deleted, table))
self._logger.warning("Purge_id %s deleted %d rows from "
"table: %s" % (purge_id, per_table_deleted, table))

self._logger.info("Purge_id %s deleted %d rows from table: %s"
self._logger.warning("Purge_id %s deleted %d rows from table: %s"
% (purge_id, msg_table_deleted, COLLECTOR_GLOBAL_TABLE))
# end deleting all relevant UUIDs from message table


self._logger.info("Purge_id %s total rows deleted: %s"
self._logger.warning("Purge_id %s total rows deleted: %s"
% (purge_id, total_rows_deleted))
return (total_rows_deleted, purge_error_details)
# end purge_data
# end db_purge

def get_dbusage_info(self, rest_api_port):
"""Collects database usage information from all db nodes
Expand Down
63 changes: 48 additions & 15 deletions src/opserver/test/utils/analytics_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from opserver_introspect_utils import VerificationOpsSrv
from collector_introspect_utils import VerificationCollector
from alarmgen_introspect_utils import VerificationAlarmGen
from generator_introspect_utils import VerificationGenerator
from opserver.sandesh.viz.constants import COLLECTOR_GLOBAL_TABLE, SOURCE, MODULE
from opserver.opserver_util import OpServerUtils
from sandesh_common.vns.constants import NodeTypeNames, ModuleNames
Expand Down Expand Up @@ -2023,7 +2024,6 @@ def verify_collector_object_log(self, start_time, end_time):
@retry(delay=2, tries=5)
def verify_collector_object_log_before_purge(self, start_time, end_time):
self.logger.info('verify_collector_object_log_before_purge')
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port);
res = self.verify_collector_object_log(start_time, end_time)
self.logger.info("collector object log before purging: %s" % res)
if not res:
Expand All @@ -2035,32 +2035,57 @@ def verify_database_purge_query(self, json_qstr):
self.logger.info('verify database purge query');
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port);
res = vns.post_purge_query_json(json_qstr)
assert(res == 'started')
return True
try:
assert(res['status'] == 'started')
purge_id = res['purge_id']
except KeyError:
assert(False)
else:
return purge_id
# end verify_database_purge_query

@retry(delay=2, tries=5)
def verify_collector_object_log_after_purge(self, start_time, end_time):
self.logger.info('verify_collector_object_log_after_purge')
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port)
res = self.verify_collector_object_log(start_time, end_time)
self.logger.info("collector object log after purging: %s" % res)
if res != []:
return False
return True
# end verify_collector_object_log_after_purge

@retry(delay=2, tries=5)
def verify_database_purge_status(self, purge_id):
self.logger.info('verify database purge status: purge_id [%s]' %
(purge_id))
try:
ops_introspect = VerificationGenerator('127.0.0.1',
self.opserver.http_port)
db_purge_uve = ops_introspect.get_uve('DatabasePurgeInfo')
db_purge_stats = db_purge_uve['stats'][0]
except Exception as e:
self.logger.error('Failed to get DatabasePurgeInfo UVE: %s' % (e))
return False
else:
self.logger.info(str(db_purge_stats))
if db_purge_stats['purge_id'] != purge_id or \
db_purge_stats['purge_status'] != 'success' or \
db_purge_stats['purge_status_details']:
return False
return True
# end verify_database_purge_status

def verify_database_purge_with_percentage_input(self):
self.logger.info('verify database purge query')
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port)
self.logger.info('verify database purge with percentage input')
end_time = UTCTimestampUsec()
start_time = end_time - 10*60*pow(10,6)
assert(self.verify_collector_object_log_before_purge(start_time, end_time))
json_qstr = json.dumps({'purge_input': 100})
assert(self.verify_database_purge_query(json_qstr))
purge_id = self.verify_database_purge_query(json_qstr)
assert(self.verify_database_purge_status(purge_id))
assert(self.verify_collector_object_log_after_purge(start_time, end_time))
return True
# end verify_database_purge_query
# end verify_database_purge_with_percentage_input

def verify_database_purge_support_utc_time_format(self):
self.logger.info('verify database purge support utc time format')
Expand All @@ -2069,7 +2094,8 @@ def verify_database_purge_support_utc_time_format(self):
end_time = OpServerUtils.convert_to_utc_timestamp_usec('now')
start_time = end_time - 20*60*pow(10,6)
assert(self.verify_collector_object_log_before_purge(start_time, end_time))
assert(self.verify_database_purge_query(json_qstr))
purge_id = self.verify_database_purge_query(json_qstr)
assert(self.verify_database_purge_status(purge_id))
assert(self.verify_collector_object_log_after_purge(start_time, end_time))
return True
# end verify_database_purge_support_utc_time_format
Expand All @@ -2082,7 +2108,8 @@ def verify_database_purge_support_datetime_format(self):
end_time = OpServerUtils.convert_to_utc_timestamp_usec(dt)
start_time = end_time - 30*60*pow(10,6)
assert(self.verify_collector_object_log_before_purge(start_time, end_time))
assert(self.verify_database_purge_query(json_qstr))
purge_id = self.verify_database_purge_query(json_qstr)
assert(self.verify_database_purge_status(purge_id))
assert(self.verify_collector_object_log_after_purge(start_time, end_time))
return True
# end verify_database_purge_support_datetime_format
Expand All @@ -2094,7 +2121,8 @@ def verify_database_purge_support_deltatime_format(self):
end_time = OpServerUtils.convert_to_utc_timestamp_usec('-1s')
start_time = end_time - 10*60*pow(10,6)
assert(self.verify_collector_object_log_before_purge(start_time, end_time))
assert(self.verify_database_purge_query(json_qstr))
purge_id = self.verify_database_purge_query(json_qstr)
assert(self.verify_database_purge_status(purge_id))
assert(self.verify_collector_object_log_after_purge(start_time, end_time))
return True
# end verify_database_purge_support_deltatime_format
Expand All @@ -2104,11 +2132,16 @@ def verify_database_purge_request_limit(self):
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port)
json_qstr = json.dumps({'purge_input': 50})
res = vns.post_purge_query_json(json_qstr)
if (res == 'started'):
self.logger.info(str(res))
try:
assert(res['status'] == 'started')
purge_id = res['purge_id']
res1 = vns.post_purge_query_json(json_qstr)
if (res1 == 'running'):
return True
return False
assert(res1['status'] == 'running')
assert(res1['purge_id'] == purge_id)
except KeyError:
assert(False)
return True
# end verify_database_purge_request_limit

@retry(delay=1, tries=5)
Expand Down
1 change: 0 additions & 1 deletion src/opserver/test/utils/opserver_introspect_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ def post_purge_query_json(self, json_str, sync=True):
purge_request_url, json_str, sync)
if resp is not None:
res = json.loads(resp)
res = res['status']
except Exception as e:
print str(e)
finally:
Expand Down

0 comments on commit 95077da

Please sign in to comment.