Skip to content

Commit

Permalink
0002526: Improve performance of data gap detection
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 29, 2016
1 parent 9672558 commit 57c9666
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 32 deletions.
Expand Up @@ -159,6 +159,7 @@ private ParameterConstants() {
public final static String ROUTING_LOG_STATS_ON_BATCH_ERROR = "routing.log.stats.on.batch.error";
public final static String ROUTING_COLLECT_STATS_UNROUTED = "routing.collect.stats.unrouted";
public final static String ROUTING_USE_FAST_GAP_DETECTOR = "routing.use.fast.gap.detector";
public final static String ROUTING_DETECT_INVALID_GAPS = "routing.detect.invalid.gaps";
public final static String ROUTING_QUERY_CHANNELS_FIRST = "routing.query.channels.first";

public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates";
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -86,6 +87,8 @@ public class DataGapFastDetector extends DataGapDetector implements ISqlRowMappe
protected Set<DataGap> gapsAdded;

protected Set<DataGap> gapsDeleted;

protected boolean detectInvalidGaps;

public DataGapFastDetector(IDataService dataService, IParameterService parameterService, IContextService contextService,
ISymmetricDialect symmetricDialect, IRouterService routerService, IStatisticManager statisticManager, INodeService nodeService) {
Expand All @@ -100,6 +103,7 @@ public DataGapFastDetector(IDataService dataService, IParameterService parameter

public void beforeRouting() {
maxDataToSelect = parameterService.getLong(ParameterConstants.ROUTING_LARGEST_GAP_SIZE);
detectInvalidGaps = parameterService.is(ParameterConstants.ROUTING_DETECT_INVALID_GAPS);
reset();

if (isFullGapAnalysis()) {
Expand All @@ -109,7 +113,9 @@ public void beforeRouting() {
log.info("Full gap analysis is running");
long ts = System.currentTimeMillis();
gaps = dataService.findDataGaps();
fixOverlappingGaps(gaps, processInfo);
if (detectInvalidGaps) {
fixOverlappingGaps(gaps, processInfo);
}
queryDataIdMap();
processInfo.setStatus(Status.OK);
log.info("Querying data in gaps from database took {} ms", System.currentTimeMillis() - ts);
Expand All @@ -121,7 +127,9 @@ public void beforeRouting() {
nodeService.findIdentityNodeId(), null, ProcessType.GAP_DETECT));
processInfo.setStatus(Status.QUERYING);
gaps = dataService.findDataGaps();
fixOverlappingGaps(gaps, processInfo);
if (detectInvalidGaps) {
fixOverlappingGaps(gaps, processInfo);
}
processInfo.setStatus(Status.OK);
}
}
Expand Down Expand Up @@ -213,13 +221,15 @@ public void afterRouting() {
expireChecked++;
}
if (isAllDataRead || isGapEmpty) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.info("Found a gap in data_id at {}. Skipping it because " +
(supportsTransactionViews ? "there are no pending transactions" : "the gap expired"), dataGap.getStartId());
} else {
log.info("Found a gap in data_id from {} to {}. Skipping it because " +
(supportsTransactionViews ? "there are no pending transactions" : "the gap expired"),
dataGap.getStartId(), dataGap.getEndId());
if (log.isDebugEnabled()) {
if (dataGap.getStartId() == dataGap.getEndId()) {
log.debug("Found a gap in data_id at {}. Skipping it because " +
(supportsTransactionViews ? "there are no pending transactions" : "the gap expired"), dataGap.getStartId());
} else {
log.debug("Found a gap in data_id from {} to {}. Skipping it because " +
(supportsTransactionViews ? "there are no pending transactions" : "the gap expired"),
dataGap.getStartId(), dataGap.getEndId());
}
}
dataService.deleteDataGap(transaction, dataGap);
gapsDeleted.add(dataGap);
Expand Down Expand Up @@ -256,10 +266,9 @@ public void afterRouting() {
}

if (System.currentTimeMillis() - printStats > 30000) {
log.info(
"The data gap detection process has been running for {}ms, detected {} rows that have been previously routed over a total gap range of {}, "
+ "inserted {} new gaps, deleted {} gaps, and checked data in {} gaps", new Object[] { System.currentTimeMillis() - ts,
dataIdCount, rangeChecked, gapsAdded.size(), gapsDeleted.size(), expireChecked });
log.info("The data gap detection process has been running for {}ms, detected {} rows over a gap range of {}, "
+ "inserted {} new gaps, deleted {} gaps, and checked data in {} gaps", new Object[] { System.currentTimeMillis() - ts,
dataIdCount, rangeChecked, gapsAdded.size(), gapsDeleted.size(), expireChecked });
printStats = System.currentTimeMillis();
}

Expand Down Expand Up @@ -309,15 +318,17 @@ public void afterRouting() {

protected boolean isOkayToAdd(DataGap dataGap) {
boolean isOkay = true;
if (gapsAll.contains(dataGap)) {
log.warn("Detected a duplicate data gap: " + dataGap);
isOkay = false;
} else if (dataGap.getStartId() > dataGap.getEndId()) {
log.warn("Detected an invalid gap range: " + dataGap);
isOkay = false;
} else if (dataGap.gapSize() < maxDataToSelect - 1 && dataGap.gapSize() >= (long) (maxDataToSelect * 0.75)) {
log.warn("Detected a very large gap range: " + dataGap);
isOkay = false;
if (detectInvalidGaps) {
if (gapsAll.contains(dataGap)) {
log.warn("Detected a duplicate data gap: " + dataGap);
isOkay = false;
} else if (dataGap.getStartId() > dataGap.getEndId()) {
log.warn("Detected an invalid gap range: " + dataGap);
isOkay = false;
} else if (dataGap.gapSize() < maxDataToSelect - 1 && dataGap.gapSize() >= (long) (maxDataToSelect * 0.75)) {
log.warn("Detected a very large gap range: " + dataGap);
isOkay = false;
}
}

if (isOkay) {
Expand Down Expand Up @@ -352,19 +363,27 @@ protected Map<DataGap, List<Long>> getDataIdMap() {
HashMap<DataGap, List<Long>> map = new HashMap<DataGap, List<Long>>();
Collections.sort(dataIds);

for (DataGap gap : gaps) {
map.put(gap, new ArrayList<Long>());
Iterator<Long> iterator = dataIds.iterator();
long dataId = -1;
if (iterator.hasNext()) {
dataId = iterator.next().longValue();
}

for (Long dataId : dataIds) {
long id = dataId.longValue();
for (DataGap gap : gaps) {
if (id >= gap.getStartId() && id <= gap.getEndId()) {
map.get(gap).add(dataId);
}

for (DataGap gap : gaps) {
List<Long> idList = map.get(gap);
if (idList == null) {
idList = new ArrayList<Long>();
map.put(gap, idList);
}
}

do {
if (dataId >= gap.getStartId() && dataId <= gap.getEndId()) {
idList.add(dataId);
} else {
break;
}
} while (iterator.hasNext() && (dataId = iterator.next().longValue()) != -1);
}
return map;
}

Expand Down
Expand Up @@ -977,6 +977,15 @@ routing.query.channels.first=true
# Type: boolean
routing.use.fast.gap.detector=true

# Run checks for duplicate, invalid range, overlapping, and large gaps while processing
# each gap. This can be used to log information and catch problems with gap detection,
# but it incurs additional overhead.
#
# DatabaseOverridable: true
# Tags: routing
# Type: boolean
routing.detect.invalid.gaps=false

# For a busy system, how often to run checks on sym_data in order to expire gaps.
# Normally the routing reads all data and gap expiration can run without checking sym_data.
# But when the system is busy, then not all data is read, and gap expiration must query each
Expand Down
Expand Up @@ -90,6 +90,7 @@ public void setUp() throws Exception {
when(parameterService.getLong(ParameterConstants.ROUTING_LARGEST_GAP_SIZE)).thenReturn(50000000L);
when(parameterService.getLong(ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS)).thenReturn(60000L);
when(parameterService.getLong(ParameterConstants.ROUTING_STALE_GAP_BUSY_EXPIRE_TIME)).thenReturn(60000L);
when(parameterService.is(ParameterConstants.ROUTING_DETECT_INVALID_GAPS)).thenReturn(true);

IExtensionService extensionService = mock(ExtensionService.class);
ISymmetricEngine engine = mock(AbstractSymmetricEngine.class);
Expand Down

0 comments on commit 57c9666

Please sign in to comment.