Skip to content

Commit

Permalink
0002318: Performance improvements in gap detection during routing. Im…
Browse files Browse the repository at this point in the history
…proves routing performance with lots of gaps.
  • Loading branch information
chenson42 committed Jun 22, 2015
1 parent c183083 commit 4f5ff94
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 202 deletions.
Expand Up @@ -20,18 +20,18 @@
*/
package org.jumpmind.symmetric.route;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfo.Status;
import org.jumpmind.symmetric.model.ProcessInfoKey;
import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
Expand Down Expand Up @@ -79,17 +79,23 @@ public DataGapDetector(IDataService dataService, IParameterService parameterServ
* dual route data.
*/
public void beforeRouting() {
long printStats = System.currentTimeMillis();
ProcessInfo processInfo = this.statisticManager.newProcessInfo(new ProcessInfoKey(
nodeService.findIdentityNodeId(), null, ProcessType.GAP_DETECT));
try {
long ts = System.currentTimeMillis();
final List<DataGap> gaps = removeAbandonedGaps(dataService.findDataGaps());
processInfo.setStatus(Status.QUERYING);
final List<DataGap> gaps = dataService.findDataGaps();
long lastDataId = -1;
final int dataIdIncrementBy = parameterService
.getInt(ParameterConstants.DATA_ID_INCREMENT_BY);
final long maxDataToSelect = parameterService
.getInt(ParameterConstants.ROUTING_LARGEST_GAP_SIZE);
long databaseTime = symmetricDialect.getDatabaseTime();
int idsFilled = 0;
int newGapsInserted = 0;
int rangeChecked = 0;
int gapsDeleted = 0;
for (final DataGap dataGap : gaps) {
final boolean lastGap = dataGap.equals(gaps.get(gaps.size() - 1));
String sql = routerService.getSql("selectDistinctDataIdFromDataEventUsingGapsSql");
Expand All @@ -99,76 +105,99 @@ public void beforeRouting() {
processInfo.setStatus(Status.QUERYING);
List<Number> ids = sqlTemplate.query(sql, new NumberMapper(), params);
processInfo.setStatus(Status.PROCESSING);
for (Number number : ids) {
long dataId = number.longValue();
processInfo.incrementCurrentDataCount();
if (lastDataId == -1 && dataGap.getStartId() + dataIdIncrementBy <= dataId) {
// there was a new gap at the start
dataService.insertDataGap(new DataGap(dataGap.getStartId(), dataId - 1));
} else if (lastDataId != -1 && lastDataId + dataIdIncrementBy != dataId
&& lastDataId != dataId) {
// found a gap somewhere in the existing gap
dataService.insertDataGap(new DataGap(lastDataId + 1, dataId - 1));

idsFilled += ids.size();
rangeChecked += dataGap.getEndId() - dataGap.getStartId();

ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
for (Number number : ids) {
long dataId = number.longValue();
processInfo.incrementCurrentDataCount();
if (lastDataId == -1 && dataGap.getStartId() + dataIdIncrementBy <= dataId) {
// there was a new gap at the start
dataService.insertDataGap(transaction, new DataGap(dataGap.getStartId(), dataId - 1));
newGapsInserted++;
} else if (lastDataId != -1 && lastDataId + dataIdIncrementBy != dataId && lastDataId != dataId) {
// found a gap somewhere in the existing gap
dataService.insertDataGap(transaction, new DataGap(lastDataId + 1, dataId - 1));
newGapsInserted++;
}
lastDataId = dataId;
}
lastDataId = dataId;
}

// if we found data in the gap
if (lastDataId != -1) {
if (!lastGap && lastDataId + dataIdIncrementBy <= dataGap.getEndId()) {
dataService.insertDataGap(new DataGap(lastDataId + dataIdIncrementBy,
dataGap.getEndId()));
}
// if we found data in the gap
if (lastDataId != -1) {
if (!lastGap && lastDataId + dataIdIncrementBy <= dataGap.getEndId()) {
dataService.insertDataGap(transaction, new DataGap(lastDataId + dataIdIncrementBy, dataGap.getEndId()));
newGapsInserted++;
}

dataService.deleteDataGap(dataGap);

// if we did not find data in the gap and it was not the
// last gap
} else if (!lastGap) {
if (dataService.countDataInRange(dataGap.getStartId() - 1,
dataGap.getEndId() + 1) == 0) {
if (symmetricDialect.supportsTransactionViews()) {
long transactionViewClockSyncThresholdInMs = parameterService
.getLong(
ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS,
60000);
Date createTime = dataService
.findCreateTimeOfData(dataGap.getEndId() + 1);
if (createTime != null
&& !symmetricDialect
.areDatabaseTransactionsPendingSince(createTime
.getTime()
+ transactionViewClockSyncThresholdInMs)) {
if (dataService.countDataInRange(dataGap.getStartId() - 1,
dataGap.getEndId() + 1) == 0) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info(
"Found a gap in data_id at {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId());
} else {
log.info(
"Found a gap in data_id from {} to {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId(), dataGap.getEndId());
dataService.deleteDataGap(transaction, dataGap);
gapsDeleted++;

// if we did not find data in the gap and it was not the
// last gap
} else if (!lastGap) {
if (dataService.countDataInRange(dataGap.getStartId() - 1, dataGap.getEndId() + 1) == 0) {
if (symmetricDialect.supportsTransactionViews()) {
long transactionViewClockSyncThresholdInMs = parameterService.getLong(
ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS, 60000);
Date createTime = dataService.findCreateTimeOfData(dataGap.getEndId() + 1);
if (createTime != null
&& !symmetricDialect.areDatabaseTransactionsPendingSince(createTime.getTime()
+ transactionViewClockSyncThresholdInMs)) {
if (dataService.countDataInRange(dataGap.getStartId() - 1, dataGap.getEndId() + 1) == 0) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info(
"Found a gap in data_id at {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId());
} else {
log.info(
"Found a gap in data_id from {} to {}. Skipping it because there are no pending transactions in the database",
dataGap.getStartId(), dataGap.getEndId());
}

dataService.deleteDataGap(transaction, dataGap);
gapsDeleted++;
}

dataService.deleteDataGap(dataGap);
}
} else if (isDataGapExpired(dataGap.getEndId() + 1, databaseTime)) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info("Found a gap in data_id at {}. Skipping it because the gap expired", dataGap.getStartId());
} else {
log.info("Found a gap in data_id from {} to {}. Skipping it because the gap expired",
dataGap.getStartId(), dataGap.getEndId());
}
dataService.deleteDataGap(transaction, dataGap);
gapsDeleted++;
}
} else if (isDataGapExpired(dataGap.getEndId() + 1, databaseTime)) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info(
"Found a gap in data_id at {}. Skipping it because the gap expired",
dataGap.getStartId());
} else {
log.info(
"Found a gap in data_id from {} to {}. Skipping it because the gap expired",
dataGap.getStartId(), dataGap.getEndId());
}
dataService.deleteDataGap(dataGap);
}
} else {
dataService.checkForAndUpdateMissingChannelIds(dataGap.getStartId() - 1,
dataGap.getEndId() + 1);
}
}

if (System.currentTimeMillis() - printStats > 30000) {
log.info(
"The data gap detection process has been running for {}ms, detected {} rows that have been previously routed over a total gap range of {}, "
+ "inserted {} new gaps, and deleted {} gaps", new Object[] { System.currentTimeMillis() - ts,
idsFilled, rangeChecked, newGapsInserted, gapsDeleted });
printStats = System.currentTimeMillis();
}

transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
if (transaction != null) {
transaction.close();
}
}
}
Expand All @@ -190,27 +219,6 @@ public void beforeRouting() {

}

/**
* If the system was shutdown in the middle of processing a large gap we
* could end up with a gap containing other gaps.
*
* @param gaps
*/
protected List<DataGap> removeAbandonedGaps(List<DataGap> gaps) {
List<DataGap> finalList = new ArrayList<DataGap>(gaps);
for (final DataGap dataGap1 : gaps) {
for (final DataGap dataGap2 : gaps) {
if (!dataGap1.equals(dataGap2) && dataGap1.contains(dataGap2)) {
finalList.remove(dataGap2);
if (dataService != null) {
dataService.deleteDataGap(dataGap2);
}
}
}
}
return finalList;
}

protected boolean isDataGapExpired(long dataId, long databaseTime) {
long gapTimoutInMs = parameterService
.getLong(ParameterConstants.ROUTING_STALE_DATA_ID_GAP_TIME);
Expand Down

0 comments on commit 4f5ff94

Please sign in to comment.