Skip to content

Commit

Permalink
0003723: Purge stranded batches can get deadlock or timeout when
Browse files Browse the repository at this point in the history
outgoing_batches is large
  • Loading branch information
erilong committed Sep 13, 2018
1 parent f89429e commit 2926a7d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 21 deletions.
Expand Up @@ -202,24 +202,38 @@ private long purgeOutgoingBatch(final Calendar time) {
}

private long purgeStrandedBatches() {
int updateStrandedBatchesCount = sqlTemplate.update(getSql("updateStrandedBatches"),
OutgoingBatch.Status.OK.name(), 1, OutgoingBatch.Status.OK.name());
if (updateStrandedBatchesCount > 0) {
log.info(
"Set the status to {} for {} batches that no longer are associated with valid nodes",
OutgoingBatch.Status.OK.name(), updateStrandedBatchesCount);
statisticManager.incrementPurgedBatchOutgoingRows(updateStrandedBatchesCount);
int totalRowsPurged = 0;
log.info("Looking for old nodes in batches");
List<String> nodes = sqlTemplateDirty.query(getSql("selectNodesWithStrandedBatches"),
new StringMapper(), 1, OutgoingBatch.Status.OK.name());
if (nodes.size() > 0) {
log.info("Found {} old nodes in batches", nodes.size());
for (String nodeId : nodes) {
int rowsPurged = sqlTemplate.update(getSql("updateStrandedBatches"),
OutgoingBatch.Status.OK.name(), nodeId, OutgoingBatch.Status.OK.name());
log.info("Set the status to {} for {} batches associated with node ID {}",
OutgoingBatch.Status.OK.name(), rowsPurged, nodeId);
totalRowsPurged += rowsPurged;
statisticManager.incrementPurgedBatchOutgoingRows(rowsPurged);
}
}

updateStrandedBatchesCount = sqlTemplate.update(getSql("updateStrandedBatchesByChannel"),
OutgoingBatch.Status.OK.name(), OutgoingBatch.Status.OK.name());
if (updateStrandedBatchesCount > 0) {
log.info("Set the status to {} for {} batches that no longer are associated with valid channels",
OutgoingBatch.Status.OK.name(), updateStrandedBatchesCount);
statisticManager.incrementPurgedBatchOutgoingRows(updateStrandedBatchesCount);
log.info("Looking for old channels in batches");
List<String> channels = sqlTemplateDirty.query(getSql("selectChannelsWithStrandedBatches"),
new StringMapper(), OutgoingBatch.Status.OK.name());
if (channels.size() > 0) {
log.info("Found {} old channels in batches", channels.size());
for (String channelId : channels) {
int rowsPurged = sqlTemplate.update(getSql("updateStrandedBatchesByChannel"),
OutgoingBatch.Status.OK.name(), channelId, OutgoingBatch.Status.OK.name());
log.info("Set the status to {} for {} batches associated with channel ID {}",
OutgoingBatch.Status.OK.name(), rowsPurged, channelId);
totalRowsPurged += rowsPurged;
statisticManager.incrementPurgedBatchOutgoingRows(rowsPurged);
}
}

return updateStrandedBatchesCount;
return totalRowsPurged;
}

private long purgeStrandedChannels() {
Expand Down
Expand Up @@ -57,13 +57,15 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac
putSql("selectDataRangeSql" ,
"select min(data_id) as min_id, max(data_id) as max_id from $(data) where data_id < (select max(data_id) from $(data)) " );

putSql("updateStrandedBatches" ,
"update $(outgoing_batch) set status=? where node_id not " +
" in (select node_id from $(node) where sync_enabled=?) and status != ? " );
putSql("selectNodesWithStrandedBatches", "select distinct node_id from $(outgoing_batch) " +
"where node_id not in (select node_id from $(node) where sync_enabled = ?) and status != ?");

putSql("updateStrandedBatchesByChannel",
"update $(outgoing_batch) set status=? where channel_id not " +
"in (select channel_id from $(channel)) and status != ?");
putSql("updateStrandedBatches", "update $(outgoing_batch) set status=? where node_id=? and status != ?");

putSql("selectChannelsWithStrandedBatches", "select distinct channel_id from $(outgoing_batch) " +
"where channel_id not in (select channel_id from $(channel)) and status != ?");

putSql("updateStrandedBatchesByChannel", "update $(outgoing_batch) set status=? where channel_id=? and status != ?");

putSql("deleteStrandedData" ,
"delete from $(data) where " +
Expand Down

0 comments on commit 2926a7d

Please sign in to comment.