Skip to content

Commit

Permalink
0002526: Improve performance of data gap detection
Browse files Browse the repository at this point in the history
avoid context service calls when running non-clustered
  • Loading branch information
erilong committed Jul 21, 2016
1 parent 62e882c commit db5c9ff
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 41 deletions.
Expand Up @@ -265,4 +265,8 @@ public void addDataIds(List<Long> dataIds) {

public void setIsAllDataRead(boolean isAllDataRead) {
}

public void setFullGapAnalysis(boolean isFullGapAnalysis) {
}

}
Expand Up @@ -76,9 +76,11 @@ public class DataGapFastDetector extends DataGapDetector implements ISqlRowMappe
protected boolean isAllDataRead = true;

protected long maxDataToSelect;

protected boolean isFullGapAnalysis = true;

protected boolean isBusyExpire;
protected long lastBusyExpireRunTime;

protected Set<DataGap> gapsAll;

protected Set<DataGap> gapsAdded;
Expand All @@ -94,31 +96,32 @@ public DataGapFastDetector(IDataService dataService, IParameterService parameter
this.symmetricDialect = symmetricDialect;
this.statisticManager = statisticManager;
this.nodeService = nodeService;
reset();
}

public void beforeRouting() {
reset();
ProcessInfo processInfo = this.statisticManager.newProcessInfo(new ProcessInfoKey(
nodeService.findIdentityNodeId(), null, ProcessType.GAP_DETECT));
processInfo.setStatus(Status.QUERYING);
maxDataToSelect = parameterService.getLong(ParameterConstants.ROUTING_LARGEST_GAP_SIZE);
gaps = dataService.findDataGaps();
processInfo.setStatus(Status.OK);
fixOverlappingGaps(gaps, processInfo);

if (contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)) {
if (isFullGapAnalysis()) {
log.info("Full gap analysis is running");
long ts = System.currentTimeMillis();
queryDataIdMap();
log.info("Querying data in gaps from database took {} ms", System.currentTimeMillis() - ts);
afterRouting();
log.info("Full gap analysis is done after {} ms", System.currentTimeMillis() - ts);
gaps = dataService.findDataGaps();
reset();
gaps = dataService.findDataGaps();
log.info("Full gap analysis is done after {} ms", System.currentTimeMillis() - ts);
}
processInfo.setStatus(Status.OK);
}

protected void reset() {
isAllDataRead = true;
dataIds = new ArrayList<Long>();
gapsAll = new HashSet<DataGap>();
gapsAdded = new HashSet<DataGap>();
Expand Down Expand Up @@ -153,8 +156,9 @@ public void afterRouting() {
}

Date currentDate = new Date(currentTime);
boolean isBusyExpire = false;
if (!isAllDataRead) {
long lastBusyExpireRunTime = contextService.getLong(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME);
long lastBusyExpireRunTime = getLastBusyExpireRunTime();
long busyExpireMillis = parameterService.getLong(ParameterConstants.ROUTING_STALE_GAP_BUSY_EXPIRE_TIME);
isBusyExpire = lastBusyExpireRunTime == 0 || System.currentTimeMillis() - lastBusyExpireRunTime >= busyExpireMillis;
}
Expand Down Expand Up @@ -278,10 +282,10 @@ public void afterRouting() {
dataService.insertDataGap(newGap);
}
}

contextService.save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, "false");
setFullGapAnalysis(false);
if (!isAllDataRead && expireChecked > 0) {
contextService.save(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME, String.valueOf(System.currentTimeMillis()));
setLastBusyExpireRunTime(System.currentTimeMillis());
}

long updateTimeInMs = System.currentTimeMillis() - ts;
Expand Down Expand Up @@ -435,4 +439,32 @@ public void setIsAllDataRead(boolean isAllDataRead) {
this.isAllDataRead &= isAllDataRead;
}

public boolean isFullGapAnalysis() {
if (parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) {
isFullGapAnalysis = contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS);
}
return isFullGapAnalysis;
}

public void setFullGapAnalysis(boolean isFullGapAnalysis) {
if (parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) {
contextService.save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, Boolean.toString(isFullGapAnalysis));
}
this.isFullGapAnalysis = isFullGapAnalysis;
}

protected long getLastBusyExpireRunTime() {
if (parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) {
lastBusyExpireRunTime = contextService.getLong(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME);
}
return lastBusyExpireRunTime;
}

protected void setLastBusyExpireRunTime(long lastBusyExpireRunTime) {
if (parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED)) {
contextService.save(ContextConstants.ROUTING_LAST_BUSY_EXPIRE_RUN_TIME, String.valueOf(lastBusyExpireRunTime));
}
this.lastBusyExpireRunTime = lastBusyExpireRunTime;
}

}
Expand Up @@ -44,7 +44,6 @@
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SyntaxParsingException;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.Channel;
Expand Down Expand Up @@ -114,6 +113,8 @@ public class RouterService extends AbstractService implements IRouterService {

protected IExtensionService extensionService;

protected DataGapDetector gapDetector;

protected boolean syncTriggersBeforeInitialLoadAttempted = false;

protected boolean firstTimeCheckForAbandonedBatches = true;
Expand Down Expand Up @@ -141,6 +142,14 @@ public RouterService(ISymmetricEngine engine) {

setSqlMap(new RouterServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));

if (parameterService.is(ParameterConstants.ROUTING_USE_FAST_GAP_DETECTOR)) {
gapDetector = new DataGapFastDetector(engine.getDataService(), parameterService, engine.getContextService(),
symmetricDialect, this, engine.getStatisticManager(), engine.getNodeService());
} else {
gapDetector = new DataGapDetector(engine.getDataService(), parameterService, symmetricDialect,
this, engine.getStatisticManager(), engine.getNodeService());
}
}

/**
Expand Down Expand Up @@ -185,18 +194,8 @@ synchronized public long routeData(boolean force) {
insertInitialLoadEvents();

long ts = System.currentTimeMillis();
DataGapDetector gapDetector = null;
if (parameterService.is(ParameterConstants.ROUTING_USE_FAST_GAP_DETECTOR)) {
gapDetector = new DataGapFastDetector(
engine.getDataService(), parameterService, engine.getContextService(), symmetricDialect,
this, engine.getStatisticManager(), engine.getNodeService());
} else {
gapDetector = new DataGapDetector(
engine.getDataService(), parameterService, symmetricDialect,
this, engine.getStatisticManager(), engine.getNodeService());
}
gapDetector.beforeRouting();
dataCount = routeDataForEachChannel(gapDetector);
dataCount = routeDataForEachChannel();
ts = System.currentTimeMillis() - ts;
if (dataCount > 0 || ts > Constants.LONG_OPERATION_THRESHOLD) {
log.info("Routed {} data events in {} ms", dataCount, ts);
Expand Down Expand Up @@ -289,7 +288,7 @@ protected void insertInitialLoadEvents() {
}
}
if (isInitialLoadQueued) {
engine.getContextService().save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, "true");
gapDetector.setFullGapAnalysis(true);
}
}
}
Expand Down Expand Up @@ -345,7 +344,7 @@ protected void sendReverseInitialLoad() {
* thread pool here and waiting for all channels to be processed. The other
* reason is to reduce the number of connections we are required to have.
*/
protected int routeDataForEachChannel(DataGapDetector gapDetector) {
protected int routeDataForEachChannel() {
int dataCount = 0;
Node sourceNode = engine.getNodeService().findIdentity();
ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(
Expand All @@ -355,15 +354,12 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) {
final List<NodeChannel> channels = engine.getConfigurationService().getNodeChannels(false);
Set<String> readyChannels = null;
if (parameterService.is(ParameterConstants.ROUTING_QUERY_CHANNELS_FIRST)) {
readyChannels = getReadyChannels(gapDetector);
readyChannels = getReadyChannels();
}
for (NodeChannel nodeChannel : channels) {
if (nodeChannel.isEnabled() && (readyChannels == null || readyChannels.contains(nodeChannel.getChannelId()))) {
processInfo.setCurrentChannelId(nodeChannel.getChannelId());
dataCount += routeDataForChannel(processInfo,
nodeChannel,
sourceNode
, gapDetector);
dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode);
} else {
gapDetector.setIsAllDataRead(false);
if (log.isDebugEnabled()) {
Expand All @@ -382,7 +378,7 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) {
return dataCount;
}

protected Set<String> getReadyChannels(DataGapDetector gapDetector) {
protected Set<String> getReadyChannels() {
List<DataGap> dataGaps = gapDetector.getDataGaps();
int dataIdSqlType = engine.getSymmetricDialect().getSqlTypeForIds();
int numberOfGapsToQualify = parameterService.getInt(ParameterConstants.ROUTING_MAX_GAPS_TO_QUALIFY_IN_SQL, 100);
Expand Down Expand Up @@ -553,8 +549,7 @@ protected boolean onlyDefaultRoutersAssigned(Channel channel, String nodeGroupId
return onlyDefaultRoutersAssigned;
}

protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode,
DataGapDetector gapDetector) {
protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nodeChannel, final Node sourceNode) {
ChannelRouterContext context = null;
long ts = System.currentTimeMillis();
int dataCount = -1;
Expand Down Expand Up @@ -651,9 +646,7 @@ protected void completeBatchesAndCommit(ChannelRouterContext context) {
List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(context.getBatchesByNodes()
.values());

if (!engine.getContextService().is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)) {
engine.getContextService().save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, "true");
}
gapDetector.setFullGapAnalysis(true);
context.commit();

if (engine.getParameterService().is(ParameterConstants.ROUTING_LOG_STATS_ON_BATCH_ERROR)) {
Expand Down
Expand Up @@ -972,7 +972,7 @@ routing.query.channels.first=true
# Use a faster method of gap detection that uses the output of the work from router service
# instead of querying for it.
#
# DatabaseOverridable: true
# DatabaseOverridable: false
# Tags: routing
# Type: boolean
routing.use.fast.gap.detector=true
Expand Down
Expand Up @@ -67,6 +67,7 @@ public class DataGapDetectorTest {
IRouterService routerService;
IStatisticManager statisticManager;
INodeService nodeService;
DataGapFastDetector detector;

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -108,6 +109,8 @@ public void setUp() throws Exception {

nodeService = mock(NodeService.class);
when(nodeService.findIdentity()).thenReturn(new Node(NODE_ID, NODE_GROUP_ID));
detector = newGapDetector();
detector.setFullGapAnalysis(false);
}

protected DataGapFastDetector newGapDetector() {
Expand All @@ -116,7 +119,7 @@ protected DataGapFastDetector newGapDetector() {

protected void runGapDetector(List<DataGap> dataGaps, List<Long> dataIds, boolean isAllDataRead) {
when(dataService.findDataGaps()).thenReturn(dataGaps);
DataGapFastDetector detector = newGapDetector();

detector.beforeRouting();
detector.addDataIds(dataIds);
detector.setIsAllDataRead(isAllDataRead);
Expand All @@ -131,7 +134,6 @@ public List<DataGap> answer(InvocationOnMock invocation) {
}
});

DataGapFastDetector detector = newGapDetector();
detector.beforeRouting();
detector.addDataIds(dataIds);
detector.setIsAllDataRead(isAllDataRead);
Expand All @@ -158,6 +160,7 @@ public void testNewGap() throws Exception {

@Test
public void testNewGapFull() throws Exception {
detector.setFullGapAnalysis(true);
when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true);

List<Long> dataIds = new ArrayList<Long>();
Expand Down Expand Up @@ -203,6 +206,7 @@ public void testTwoNewGaps() throws Exception {

@Test
public void testTwoNewGapsFull() throws Exception {
detector.setFullGapAnalysis(true);
when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true);

List<Long> dataIds = new ArrayList<Long>();
Expand Down Expand Up @@ -257,7 +261,8 @@ public void testGapInGap() throws Exception {
}

@Test
public void testGapInGapFull() throws Exception {
public void testGapInGapFull() throws Exception {
detector.setFullGapAnalysis(true);
when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true);

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -408,6 +413,7 @@ public void testGapExpireOracleBusyChannel() throws Exception {

@Test
public void testGapsBeforeAndAfterFull() throws Exception {
detector.setFullGapAnalysis(true);
when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true);
List<Long> dataIds = new ArrayList<Long>();
dataIds.add(843L);
Expand Down Expand Up @@ -505,6 +511,7 @@ public void testGapsOverlapThenData() throws Exception {

@Test
public void testGapsOverlapThenDataFull() throws Exception {
detector.setFullGapAnalysis(true);
when(contextService.is(ContextConstants.ROUTING_FULL_GAP_ANALYSIS)).thenReturn(true);
List<Long> dataIds = new ArrayList<Long>();
dataIds.add(30953883L);
Expand Down

0 comments on commit db5c9ff

Please sign in to comment.