Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0002526: Improve performance of data gap detection
  • Loading branch information
erilong committed Mar 17, 2016
1 parent ed03253 commit 3a215dc
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 9 deletions.
Expand Up @@ -73,6 +73,8 @@ public class ChannelRouterContext extends SimpleRouterContext {
private long maxPeekAheadQueueSize;
private List<DataGap> dataGaps = new ArrayList<DataGap>();
private Set<String> transactions = new HashSet<String>();
private long lastDataId;
private List<Long> dataIds = new ArrayList<Long>();

public ChannelRouterContext(String nodeId, NodeChannel channel, ISqlTransaction transaction)
throws SQLException {
Expand All @@ -91,6 +93,10 @@ public void clearDataEventsList() {

public void addDataEvent(long dataId, long batchId, String routerId) {
dataEventsToSend.add(new DataEvent(dataId, batchId, routerId));
if (dataId != lastDataId) {
dataIds.add(dataId);
lastDataId = dataId;
}
}

public Map<String, OutgoingBatch> getBatchesByNodes() {
Expand Down Expand Up @@ -268,4 +274,8 @@ public boolean isOnlyDefaultRoutersAssigned() {
return onlyDefaultRoutersAssigned;
}

public List<Long> getDataIds() {
return dataIds;
}

}
Expand Up @@ -258,4 +258,16 @@ protected boolean isDataGapExpired(long dataId, long databaseTime) {
}
}

public void afterRouting() {
}

public List<DataGap> getDataGaps() {
return dataService.findDataGaps();
}

public void addDataIds(List<Long> dataIds) {
}

public void setIsAllDataRead(boolean isAllDataRead) {
}
}
Expand Up @@ -111,6 +111,7 @@ public DataGapRouteReader(ChannelRouterContext context, ISymmetricEngine engine)
if (lastSelectUsedGreaterThanQueryByEngineName.get(engineName) == null) {
lastSelectUsedGreaterThanQueryByEngineName.put(engineName, Boolean.FALSE);
}
this.dataGaps = new ArrayList<DataGap>(context.getDataGaps());
}

public void run() {
Expand Down Expand Up @@ -264,11 +265,6 @@ protected ISqlReadCursor<Data> prepareCursor() {
ParameterConstants.ROUTING_MAX_GAPS_TO_QUALIFY_IN_SQL, 100);

int maxGapsBeforeGreaterThanQuery = parameterService.getInt(ParameterConstants.ROUTING_DATA_READER_THRESHOLD_GAPS_TO_USE_GREATER_QUERY, 100);

this.dataGaps = engine.getDataService().findDataGaps();
if (this.dataGaps != null) {
context.setDataGaps(new ArrayList<DataGap>(this.dataGaps));
}

boolean useGreaterThanDataId = false;
if (maxGapsBeforeGreaterThanQuery > 0 && this.dataGaps.size() > maxGapsBeforeGreaterThanQuery) {
Expand Down
Expand Up @@ -65,6 +65,7 @@
import org.jumpmind.symmetric.route.ColumnMatchDataRouter;
import org.jumpmind.symmetric.route.ConfigurationChangedDataRouter;
import org.jumpmind.symmetric.route.DataGapDetector;
import org.jumpmind.symmetric.route.DataGapFastDetector;
import org.jumpmind.symmetric.route.DataGapRouteReader;
import org.jumpmind.symmetric.route.DefaultBatchAlgorithm;
import org.jumpmind.symmetric.route.DefaultDataRouter;
Expand Down Expand Up @@ -171,15 +172,25 @@ synchronized public long routeData(boolean force) {
insertInitialLoadEvents();

long ts = System.currentTimeMillis();
DataGapDetector gapDetector = new DataGapDetector(
engine.getDataService(), parameterService, symmetricDialect,
this, engine.getStatisticManager(), engine.getNodeService());
DataGapDetector gapDetector = null;
if (parameterService.is("routing.use.fast.gap.detector")) {
gapDetector = new DataGapFastDetector(
engine.getDataService(), parameterService, symmetricDialect,
this, engine.getStatisticManager(), engine.getNodeService());
} else {
gapDetector = new DataGapDetector(
engine.getDataService(), parameterService, symmetricDialect,
this, engine.getStatisticManager(), engine.getNodeService());
}
gapDetector.beforeRouting();
dataCount = routeDataForEachChannel(gapDetector);
ts = System.currentTimeMillis() - ts;
if (dataCount > 0 || ts > Constants.LONG_OPERATION_THRESHOLD) {
log.info("Routed {} data events in {} ms", dataCount, ts);
}
if (dataCount > 0) {
gapDetector.afterRouting();
}
} finally {
if (!force) {
engine.getClusterService().unlock(ClusterConstants.ROUTE);
Expand Down Expand Up @@ -332,6 +343,7 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) {
sourceNode
, gapDetector);
} else {
gapDetector.setIsAllDataRead(false);
if (log.isDebugEnabled()) {
log.debug(
"Not routing the {} channel. It is either disabled or suspended.",
Expand Down Expand Up @@ -466,7 +478,8 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
symmetricDialect.getPlatform().getSqlTemplate().startSqlTransaction());
context.setProduceCommonBatches(producesCommonBatches);
context.setOnlyDefaultRoutersAssigned(onlyDefaultRoutersAssigned);

context.setDataGaps(gapDetector.getDataGaps());

dataCount = selectDataAndRoute(processInfo, context);
return dataCount;
} catch (DelayRoutingException ex) {
Expand Down Expand Up @@ -506,6 +519,8 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
context.getDataEventList());
context.clearDataEventsList();
completeBatchesAndCommit(context);
gapDetector.addDataIds(context.getDataIds());
gapDetector.setIsAllDataRead(context.getDataIds().size() < context.getChannel().getMaxDataToRoute());
context.incrementStat(System.currentTimeMillis() - insertTs,
ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
Data lastDataProcessed = context.getLastDataProcessed();
Expand Down Expand Up @@ -545,6 +560,12 @@ protected void completeBatchesAndCommit(ChannelRouterContext context) {
Set<IDataRouter> usedRouters = new HashSet<IDataRouter>(context.getUsedDataRouters());
List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(context.getBatchesByNodes()
.values());

// TODO: use context variable instead
if (!engine.getParameterService().is("routing.full.gap.analysis")) {
engine.getParameterService().saveParameter(parameterService.getExternalId(), parameterService.getNodeGroupId(),
"routing.full.gap.analysis", "true", "RouterService");
}
context.commit();

if (engine.getParameterService().is(ParameterConstants.ROUTING_LOG_STATS_ON_BATCH_ERROR)) {
Expand Down
Expand Up @@ -52,6 +52,9 @@ public RouterServiceSqlMap(IDatabasePlatform platform, Map<String, String> repla
""
+ "select distinct(data_id) from $(data_event) where data_id >=? and data_id <= ? order by data_id asc ");

putSql("selectDataIdFromDataSql",
"select data_id from $(data) where $(dataRange) order by data_id asc");

putSql("selectUnroutedCountForChannelSql", ""
+ "select count(*) from $(data) where channel_id=? and data_id >=? ");

Expand Down
Expand Up @@ -884,6 +884,14 @@ routing.data.reader.threshold.gaps.to.use.greater.than.query=100
# Tags: routing
routing.log.stats.on.batch.error=false

# Use a faster method of gap detection that uses the output of the work from router service
# instead of querying for it.
#
# DatabaseOverridable: true
# Tags: routing
# Type: boolean
routing.use.fast.gap.detector=true

# This is the number of data events that will be batched and committed together while building a batch.
# Note that this only kicks in if the prospective batch size is bigger than the configured max batch size.
#
Expand Down

0 comments on commit 3a215dc

Please sign in to comment.