Skip to content

Commit

Permalink
0002526: Improve performance of data gap detection
Browse files Browse the repository at this point in the history
More aggressive prevention of duplicate/overlapping gaps by checking
against existing gaps not just new gaps
  • Loading branch information
erilong committed Apr 21, 2016
1 parent d3bd1f5 commit d0cd632
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 34 deletions.
Expand Up @@ -77,6 +77,8 @@ public class DataGapFastDetector extends DataGapDetector implements ISqlRowMappe

protected long maxDataToSelect;

protected Set<DataGap> gapsAll;

protected Set<DataGap> gapsAdded;

protected Set<DataGap> gapsDeleted;
Expand All @@ -97,6 +99,7 @@ public void beforeRouting() {
ProcessInfo processInfo = this.statisticManager.newProcessInfo(new ProcessInfoKey(
nodeService.findIdentityNodeId(), null, ProcessType.GAP_DETECT));
processInfo.setStatus(Status.QUERYING);
maxDataToSelect = parameterService.getLong(ParameterConstants.ROUTING_LARGEST_GAP_SIZE);
gaps = dataService.findDataGaps();
processInfo.setStatus(Status.OK);

Expand All @@ -114,6 +117,7 @@ public void beforeRouting() {

protected void reset() {
dataIds = new ArrayList<Long>();
gapsAll = new HashSet<DataGap>();
gapsAdded = new HashSet<DataGap>();
gapsDeleted = new HashSet<DataGap>();
}
Expand All @@ -130,7 +134,6 @@ public void afterRouting() {
long printStats = System.currentTimeMillis();
long gapTimoutInMs = parameterService.getLong(ParameterConstants.ROUTING_STALE_DATA_ID_GAP_TIME);
final int dataIdIncrementBy = parameterService.getInt(ParameterConstants.DATA_ID_INCREMENT_BY);
maxDataToSelect = parameterService.getLong(ParameterConstants.ROUTING_LARGEST_GAP_SIZE);

long databaseTime = symmetricDialect.getDatabaseTime();
ISqlTemplate sqlTemplate = symmetricDialect.getPlatform().getSqlTemplate();
Expand All @@ -150,6 +153,7 @@ public void afterRouting() {
long lastDataId = -1;
int dataIdCount = 0;
int rangeChecked = 0;
gapsAll.addAll(gaps);
Map<DataGap, List<Long>> dataIdMap = getDataIdMap();

for (final DataGap dataGap : gaps) {
Expand All @@ -163,39 +167,14 @@ public void afterRouting() {
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
for (Number number : ids) {
long dataId = number.longValue();
processInfo.incrementCurrentDataCount();
if (lastDataId == -1 && dataGap.getStartId() + dataIdIncrementBy <= dataId) {
// there was a new gap at the start
DataGap newGap = new DataGap(dataGap.getStartId(), dataId - 1);
if (isOkayToAdd(newGap)) {
dataService.insertDataGap(transaction, newGap);
}
} else if (lastDataId != -1 && lastDataId + dataIdIncrementBy != dataId && lastDataId != dataId) {
// found a gap somewhere in the existing gap
DataGap newGap = new DataGap(lastDataId + 1, dataId - 1);
if (isOkayToAdd(newGap)) {
dataService.insertDataGap(transaction, newGap);
}
}
lastDataId = dataId;
}

// if we found data in the gap
if (lastDataId != -1) {
if (!lastGap && lastDataId + dataIdIncrementBy <= dataGap.getEndId()) {
DataGap newGap = new DataGap(lastDataId + dataIdIncrementBy, dataGap.getEndId());
if (isOkayToAdd(newGap)) {
dataService.insertDataGap(transaction, newGap);
}
}

if (ids.size() > 0) {
dataService.deleteDataGap(transaction, dataGap);
gapsDeleted.add(dataGap);
gapsAll.remove(dataGap);

// if we did not find data in the gap and it was not the
// last gap
// if we did not find data in the gap and it was not the last gap
} else if (!lastGap) {
Date createTime = dataGap.getCreateTime();
if (supportsTransactionViews) {
Expand All @@ -213,6 +192,7 @@ public void afterRouting() {

dataService.deleteDataGap(transaction, dataGap);
gapsDeleted.add(dataGap);
gapsAll.remove(dataGap);
}
}
} else if (createTime != null && databaseTime - createTime.getTime() > gapTimoutInMs) {
Expand All @@ -225,10 +205,38 @@ public void afterRouting() {
}
dataService.deleteDataGap(transaction, dataGap);
gapsDeleted.add(dataGap);
gapsAll.remove(dataGap);
}
}
}

for (Number number : ids) {
long dataId = number.longValue();
processInfo.incrementCurrentDataCount();
if (lastDataId == -1 && dataGap.getStartId() + dataIdIncrementBy <= dataId) {
// there was a new gap at the start
DataGap newGap = new DataGap(dataGap.getStartId(), dataId - 1);
if (isOkayToAdd(newGap)) {
dataService.insertDataGap(transaction, newGap);
}
} else if (lastDataId != -1 && lastDataId + dataIdIncrementBy != dataId && lastDataId != dataId) {
// found a gap somewhere in the existing gap
DataGap newGap = new DataGap(lastDataId + 1, dataId - 1);
if (isOkayToAdd(newGap)) {
dataService.insertDataGap(transaction, newGap);
}
}
lastDataId = dataId;
}

// if we found data in the gap
if (lastDataId != -1 && !lastGap && lastDataId + dataIdIncrementBy <= dataGap.getEndId()) {
DataGap newGap = new DataGap(lastDataId + dataIdIncrementBy, dataGap.getEndId());
if (isOkayToAdd(newGap)) {
dataService.insertDataGap(transaction, newGap);
}
}

if (System.currentTimeMillis() - printStats > 30000) {
log.info(
"The data gap detection process has been running for {}ms, detected {} rows that have been previously routed over a total gap range of {}, "
Expand Down Expand Up @@ -278,7 +286,7 @@ public void afterRouting() {

protected boolean isOkayToAdd(DataGap dataGap) {
boolean isOkay = true;
if (gapsAdded.contains(dataGap)) {
if (gapsAll.contains(dataGap)) {
log.warn("Detected a duplicate data gap: " + dataGap);
isOkay = false;
} else if (dataGap.getStartId() > dataGap.getEndId()) {
Expand All @@ -288,7 +296,7 @@ protected boolean isOkayToAdd(DataGap dataGap) {
log.warn("Detected a very large gap range: " + dataGap);
isOkay = false;
} else {
for (DataGap gapAdded : gapsAdded) {
for (DataGap gapAdded : gapsAll) {
if (dataGap.overlaps(gapAdded)) {
log.warn("Detected an overlapping data gap: " + dataGap);
isOkay = false;
Expand All @@ -299,6 +307,7 @@ protected boolean isOkayToAdd(DataGap dataGap) {

if (isOkay) {
gapsAdded.add(dataGap);
gapsAll.add(dataGap);
} else {
log.info("Data IDs: " + dataIds.toString());
log.info("Data Gaps: " + gaps.toString());
Expand Down
Expand Up @@ -5,10 +5,13 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import org.jumpmind.db.platform.DatabaseInfo;
Expand Down Expand Up @@ -398,8 +401,136 @@ public void testGapsBeforeAndAfterFull() throws Exception {
verifyNoMoreInteractions(dataService);
}

// Uncomment to run random data through data gap detector
//@Test
public void testGapsOverlap() throws Exception {
List<Long> dataIds = new ArrayList<Long>();

List<DataGap> dataGaps = new ArrayList<DataGap>();
dataGaps.add(new DataGap(30953883, 80953883));
dataGaps.add(new DataGap(30953884, 80953883));

runGapDetector(dataGaps, dataIds, true);

verify(dataService).findDataGaps();
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(30953884, 80953883));
verifyNoMoreInteractions(dataService);
}

//@Test
public void testGapsOverlapMultiple() throws Exception {
List<Long> dataIds = new ArrayList<Long>();

List<DataGap> dataGaps = new ArrayList<DataGap>();
dataGaps.add(new DataGap(1, 10));
dataGaps.add(new DataGap(3, 8));
dataGaps.add(new DataGap(4, 6));
dataGaps.add(new DataGap(4, 8));
dataGaps.add(new DataGap(4, 5));
dataGaps.add(new DataGap(5, 10));
dataGaps.add(new DataGap(6, 11));

runGapDetector(dataGaps, dataIds, true);

verify(dataService).findDataGaps();
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(1, 10));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(3, 8));
verify(dataService).insertDataGap(sqlTransaction, new DataGap(1, 10));

verify(dataService).deleteDataGap(sqlTransaction, new DataGap(1, 10));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(4, 6));
verify(dataService).insertDataGap(sqlTransaction, new DataGap(1, 10));

verify(dataService).deleteDataGap(sqlTransaction, new DataGap(1, 10));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(4, 8));
verify(dataService).insertDataGap(sqlTransaction, new DataGap(1, 10));

verify(dataService).deleteDataGap(sqlTransaction, new DataGap(1, 10));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(4, 5));
verify(dataService).insertDataGap(sqlTransaction, new DataGap(1, 10));

verify(dataService).deleteDataGap(sqlTransaction, new DataGap(1, 10));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(5, 10));
verify(dataService).insertDataGap(sqlTransaction, new DataGap(1, 10));

verify(dataService).deleteDataGap(sqlTransaction, new DataGap(1, 10));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(6, 11));
verify(dataService).insertDataGap(sqlTransaction, new DataGap(1, 11));

verifyNoMoreInteractions(dataService);
}

//@Test
public void testGapsOverlapThenData() throws Exception {
List<Long> dataIds = new ArrayList<Long>();
dataIds.add(30953883L);

List<DataGap> dataGaps = new ArrayList<DataGap>();
dataGaps.add(new DataGap(30953883, 80953883));
dataGaps.add(new DataGap(30953884, 80953883));

runGapDetector(dataGaps, dataIds, true);

verify(dataService).findDataGaps();
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(30953884, 80953883));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(30953883, 80953883));
verify(dataService).insertDataGap(new DataGap(30953884, 80953883));
verifyNoMoreInteractions(dataService);
}

//@Test
public void testGapsOverlapThenDataFull() throws Exception {
when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true);
List<Long> dataIds = new ArrayList<Long>();
dataIds.add(30953883L);

@SuppressWarnings("unchecked")
ISqlRowMapper<Long> mapper = (ISqlRowMapper<Long>) Matchers.anyObject();
String sql = Matchers.anyString();
when(sqlTemplate.query(sql, mapper, Matchers.eq(30953883L), Matchers.eq(80953883L))).thenReturn(dataIds);

List<DataGap> dataGaps1 = new ArrayList<DataGap>();
dataGaps1.add(new DataGap(30953883, 80953883));
dataGaps1.add(new DataGap(30953884, 80953883));

List<DataGap> dataGaps2 = new ArrayList<DataGap>();
dataGaps2.add(new DataGap(30953884, 80953883));

dataIds = new ArrayList<Long>();
dataIds.add(30953883L);

runGapDetector(dataGaps1, dataGaps2, dataIds, true);

verify(dataService, VerificationModeFactory.times(2)).findDataGaps();
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(30953884, 80953883));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(30953883, 80953883));
verify(dataService).insertDataGap(new DataGap(30953884, 80953883));
verifyNoMoreInteractions(dataService);
}

//@Test
public void testGapsOverlapAfterLastGap() throws Exception {
List<Long> dataIds = new ArrayList<Long>();
dataIds.add(30953883L);

List<DataGap> dataGaps = new ArrayList<DataGap>();
dataGaps.add(new DataGap(30953883, 80953883));
dataGaps.add(new DataGap(30953884, 80953883));
dataGaps.add(new DataGap(30953885, 81953883));
dataGaps.add(new DataGap(30953885, 30953885));

runGapDetector(dataGaps, dataIds, true);

verify(dataService).findDataGaps();
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(30953884, 80953883));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(30953885, 81953883));
verify(dataService).deleteDataGap(sqlTransaction, new DataGap(30953885, 30953885));

verify(dataService).deleteDataGap(sqlTransaction, new DataGap(30953883, 80953883));
verify(dataService).insertDataGap(new DataGap(30953884, 80953883));
verifyNoMoreInteractions(dataService);
}

@Test
public void testRandom() throws Exception {
ThreadLocalRandom rand = ThreadLocalRandom.current();
for (int loop = 0; loop < 5000; loop++) {
Expand All @@ -421,10 +552,38 @@ public void testRandom() throws Exception {
}
}
}


final Set<DataGap> allGaps = new HashSet<DataGap>(dataGaps);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
DataGap gap = (DataGap) args[1];
checkDeleteGap(allGaps, gap);
return null;
}
}).when(dataService).deleteDataGap(Matchers.eq(sqlTransaction), (DataGap) Matchers.anyObject());

doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
DataGap gap = (DataGap) args[1];
checkInsertGap(allGaps, gap);
return null;
}
}).when(dataService).insertDataGap(Matchers.eq(sqlTransaction), (DataGap) Matchers.anyObject());

doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
DataGap gap = (DataGap) args[0];
checkInsertGap(allGaps, gap);
return null;
}
}).when(dataService).insertDataGap((DataGap) Matchers.anyObject());

runGapDetector(dataGaps, dataIds, true);

verify(dataService).findDataGaps();
verify(dataService).findDataGaps();

int index = 0;
int lastIndex = dataGaps.size() - 1;
Expand Down Expand Up @@ -472,5 +631,34 @@ public void testRandom() throws Exception {
Mockito.reset(dataService);
}
}

private void checkInsertGap(Set<DataGap> allGaps, DataGap gap) {
checkGap(allGaps, gap);
if (allGaps.contains(gap)) {
throw new RuntimeException("Detected a duplicate data gap: " + gap);
}
for (DataGap gapAdded : allGaps) {
if (gap.overlaps(gapAdded)) {
throw new RuntimeException("Detected an overlapping data gap: " + gap);
}
}
allGaps.add(gap);
}

private void checkDeleteGap(Set<DataGap> allGaps, DataGap gap) {
checkGap(allGaps, gap);
if (!allGaps.contains(gap)) {
throw new RuntimeException("Detected a delete of a non-existent data gap: " + gap);
}
allGaps.remove(gap);
}

private void checkGap(Set<DataGap> allGaps, DataGap gap) {
if (gap.getStartId() > gap.getEndId()) {
throw new RuntimeException("Detected an invalid gap range: " + gap);
} else if (gap.gapSize() < 50000000 - 1 && gap.gapSize() >= (long) (50000000 * 0.75)) {
throw new RuntimeException("Detected a very large gap range: " + gap);
}
}

}

0 comments on commit d0cd632

Please sign in to comment.