Skip to content

Commit

Permalink
0002526: Improve performance of data gap detection
Browse files Browse the repository at this point in the history
don't query for database time on each routing run
  • Loading branch information
erilong committed Jul 19, 2016
1 parent 6802e59 commit a8f859c
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 13 deletions.
Expand Up @@ -32,17 +32,20 @@ public enum Status {GP,SK,OK};
private long startId;
private long endId;
private Date createTime;
private Date lastUpdateTime;

public DataGap(long startId, long endId) {
this.startId = startId;
this.endId = endId;
this.createTime = new Date();
this.lastUpdateTime = createTime;
}

public DataGap(long startId, long endId, Date createTime) {
this.startId = startId;
this.endId = endId;
this.createTime = createTime;
this.lastUpdateTime = createTime;
}

@Override
Expand All @@ -61,7 +64,15 @@ public long getStartId() {
public Date getCreateTime() {
return createTime;
}


public Date getLastUpdateTime() {
return lastUpdateTime;
}

public void setLastUpdateTime(Date lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}

public boolean contains(DataGap gap) {
return startId <= gap.startId && endId >= gap.endId;
}
Expand Down
Expand Up @@ -138,7 +138,7 @@ public void afterRouting() {
long gapTimoutInMs = parameterService.getLong(ParameterConstants.ROUTING_STALE_DATA_ID_GAP_TIME);
final int dataIdIncrementBy = parameterService.getInt(ParameterConstants.DATA_ID_INCREMENT_BY);

long databaseTime = symmetricDialect.getDatabaseTime();
long currentTime = System.currentTimeMillis();
ISqlTemplate sqlTemplate = symmetricDialect.getPlatform().getSqlTemplate();

boolean supportsTransactionViews = symmetricDialect.supportsTransactionViews();
Expand All @@ -149,8 +149,10 @@ public void afterRouting() {
earliestTransactionTime = date.getTime() - parameterService.getLong(
ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS, 60000);
}
currentTime = symmetricDialect.getDatabaseTime();
}

Date currentDate = new Date(currentTime);
if (!isAllDataRead) {
long lastBusyExpireRunTime = contextService.getLong(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME);
long busyExpireMillis = parameterService.getLong(ParameterConstants.ROUTING_STALE_GAP_BUSY_EXPIRE_TIME);
Expand Down Expand Up @@ -191,7 +193,7 @@ public void afterRouting() {
if (supportsTransactionViews) {
isExpired = createTime != null && (createTime.getTime() < earliestTransactionTime || earliestTransactionTime == 0);
} else {
isExpired = createTime != null && databaseTime - createTime.getTime() > gapTimoutInMs;
isExpired = createTime != null && currentTime - createTime.getTime() > gapTimoutInMs;
}

if (isExpired) {
Expand Down Expand Up @@ -221,13 +223,13 @@ public void afterRouting() {
processInfo.incrementCurrentDataCount();
if (lastDataId == -1 && dataGap.getStartId() + dataIdIncrementBy <= dataId) {
// there was a new gap at the start
DataGap newGap = new DataGap(dataGap.getStartId(), dataId - 1);
DataGap newGap = new DataGap(dataGap.getStartId(), dataId - 1, currentDate);
if (isOkayToAdd(newGap)) {
dataService.insertDataGap(transaction, newGap);
}
} else if (lastDataId != -1 && lastDataId + dataIdIncrementBy != dataId && lastDataId != dataId) {
// found a gap somewhere in the existing gap
DataGap newGap = new DataGap(lastDataId + 1, dataId - 1);
DataGap newGap = new DataGap(lastDataId + 1, dataId - 1, currentDate);
if (isOkayToAdd(newGap)) {
dataService.insertDataGap(transaction, newGap);
}
Expand All @@ -237,7 +239,7 @@ public void afterRouting() {

// if we found data in the gap
if (lastDataId != -1 && !lastGap && lastDataId + dataIdIncrementBy <= dataGap.getEndId()) {
DataGap newGap = new DataGap(lastDataId + dataIdIncrementBy, dataGap.getEndId());
DataGap newGap = new DataGap(lastDataId + dataIdIncrementBy, dataGap.getEndId(), currentDate);
if (isOkayToAdd(newGap)) {
dataService.insertDataGap(transaction, newGap);
}
Expand Down Expand Up @@ -270,7 +272,7 @@ public void afterRouting() {
}

if (lastDataId != -1) {
DataGap newGap = new DataGap(lastDataId + 1, lastDataId + maxDataToSelect);
DataGap newGap = new DataGap(lastDataId + 1, lastDataId + maxDataToSelect, currentDate);
if (isOkayToAdd(newGap)) {
log.debug("Inserting new last data gap: {}", newGap);
dataService.insertDataGap(newGap);
Expand Down
Expand Up @@ -1456,15 +1456,16 @@ public void insertDataGap(DataGap gap) {
public void insertDataGap(ISqlTransaction transaction, DataGap gap) {
log.debug("Inserting data gap: {}", gap);
transaction.prepareAndExecute(getSql("insertDataGapSql"),
new Object[] { DataGap.Status.GP.name(), AppUtils.getHostName(), gap.getStartId(), gap.getEndId() }, new int[] {
Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC });
new Object[] { DataGap.Status.GP.name(), AppUtils.getHostName(), gap.getStartId(), gap.getEndId(),
gap.getLastUpdateTime(), gap.getCreateTime() }, new int[] {
Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.TIMESTAMP });
}

public void updateDataGap(DataGap gap, DataGap.Status status) {
sqlTemplate.update(
getSql("updateDataGapSql"),
new Object[] { status.name(), AppUtils.getHostName(), gap.getStartId(),
gap.getEndId() }, new int[] { Types.VARCHAR, Types.VARCHAR,
new Object[] { status.name(), AppUtils.getHostName(), gap.getLastUpdateTime(), gap.getStartId(),
gap.getEndId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP,
symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() });
}

Expand Down
Expand Up @@ -96,11 +96,11 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace

putSql("insertDataGapSql",
""
+ "insert into $(data_gap) (status, last_update_hostname, start_id, end_id, last_update_time, create_time) values(?, ?, ?, ?, current_timestamp, current_timestamp) ");
+ "insert into $(data_gap) (status, last_update_hostname, start_id, end_id, last_update_time, create_time) values(?, ?, ?, ?, ?, ?) ");

putSql("updateDataGapSql",
""
+ "update $(data_gap) set status=?, last_update_hostname=?, last_update_time=current_timestamp where start_id=? and end_id=? ");
+ "update $(data_gap) set status=?, last_update_hostname=?, last_update_time=? where start_id=? and end_id=? ");

putSql("deleteDataGapSql",
"delete from $(data_gap) where start_id=? and end_id=? ");
Expand Down
Expand Up @@ -306,6 +306,7 @@ public void testGapExpire() throws Exception {
dataGaps.add(new DataGap(5, 6));
dataGaps.add(new DataGap(7, 50000006));

when(symmetricDialect.supportsTransactionViews()).thenReturn(true);
when(symmetricDialect.getDatabaseTime()).thenReturn(System.currentTimeMillis() + 60001L);
runGapDetector(dataGaps, new ArrayList<Long>(), true);

Expand All @@ -322,6 +323,7 @@ public void testGapExpireBusyChannel() throws Exception {
dataGaps.add(new DataGap(5, 6));
dataGaps.add(new DataGap(7, 50000006));

when(symmetricDialect.supportsTransactionViews()).thenReturn(true);
when(symmetricDialect.getDatabaseTime()).thenReturn(System.currentTimeMillis() + 60001L);
when(dataService.countDataInRange(4, 7)).thenReturn(1);
runGapDetector(dataGaps, new ArrayList<Long>(), false);
Expand All @@ -340,6 +342,7 @@ public void testGapBusyExpireRun() throws Exception {
dataGaps.add(new DataGap(5, 6));
dataGaps.add(new DataGap(7, 50000006));

when(symmetricDialect.supportsTransactionViews()).thenReturn(true);
when(symmetricDialect.getDatabaseTime()).thenReturn(System.currentTimeMillis() + 60001L);
when(contextService.getLong(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME)).thenReturn(System.currentTimeMillis() - 61000);
runGapDetector(dataGaps, new ArrayList<Long>(), false);
Expand Down

0 comments on commit a8f859c

Please sign in to comment.