Skip to content

Commit

Permalink
0002674: Route for only channels that have data waiting to route
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jul 8, 2016
1 parent bda0b3d commit 80e6d17
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 3 deletions.
Expand Up @@ -158,6 +158,7 @@ private ParameterConstants() {
public final static String ROUTING_LOG_STATS_ON_BATCH_ERROR = "routing.log.stats.on.batch.error";
public final static String ROUTING_COLLECT_STATS_UNROUTED = "routing.collect.stats.unrouted";
public final static String ROUTING_USE_FAST_GAP_DETECTOR = "routing.use.fast.gap.detector";
public final static String ROUTING_QUERY_CHANNELS_FIRST = "routing.query.channels.first";

public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates";
@Deprecated
Expand Down
Expand Up @@ -39,6 +39,8 @@
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SyntaxParsingException;
import org.jumpmind.symmetric.common.Constants;
Expand Down Expand Up @@ -91,6 +93,7 @@
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IRouterService;
import org.jumpmind.symmetric.statistic.StatisticConstants;
import org.jumpmind.util.FormatUtils;

/**
* @see IRouterService
Expand Down Expand Up @@ -346,10 +349,13 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) {
new ProcessInfoKey(sourceNode.getNodeId(), null, ProcessType.ROUTER_JOB));
processInfo.setStatus(ProcessInfo.Status.PROCESSING);
try {
final List<NodeChannel> channels = engine.getConfigurationService().getNodeChannels(
false);
final List<NodeChannel> channels = engine.getConfigurationService().getNodeChannels(false);
Set<String> readyChannels = null;
if (parameterService.is(ParameterConstants.ROUTING_QUERY_CHANNELS_FIRST)) {
readyChannels = getReadyChannels(gapDetector);
}
for (NodeChannel nodeChannel : channels) {
if (nodeChannel.isEnabled()) {
if (nodeChannel.isEnabled() && (readyChannels == null || readyChannels.contains(nodeChannel.getChannelId()))) {
processInfo.setCurrentChannelId(nodeChannel.getChannelId());
dataCount += routeDataForChannel(processInfo,
nodeChannel,
Expand All @@ -373,6 +379,64 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) {
return dataCount;
}

protected Set<String> getReadyChannels(DataGapDetector gapDetector) {
List<DataGap> dataGaps = gapDetector.getDataGaps();
int dataIdSqlType = engine.getSymmetricDialect().getSqlTypeForIds();
int numberOfGapsToQualify = parameterService.getInt(ParameterConstants.ROUTING_MAX_GAPS_TO_QUALIFY_IN_SQL, 100);
int maxGapsBeforeGreaterThanQuery = parameterService.getInt(
ParameterConstants.ROUTING_DATA_READER_THRESHOLD_GAPS_TO_USE_GREATER_QUERY, 100);
String sql;
Object[] args;
int[] types;
if (maxGapsBeforeGreaterThanQuery > 0 && dataGaps.size() > maxGapsBeforeGreaterThanQuery) {
sql = getSql("selectChannelsUsingStartDataId");
args = new Object[] { dataGaps.get(0).getStartId() };
types = new int[] { dataIdSqlType };

} else {
sql = qualifyUsingDataGaps(dataGaps, numberOfGapsToQualify, getSql("selectChannelsUsingGapsSql"));
int numberOfArgs = 2 * (numberOfGapsToQualify < dataGaps.size() ? numberOfGapsToQualify : dataGaps.size());
args = new Object[numberOfArgs];
types = new int[numberOfArgs];

for (int i = 0; i < numberOfGapsToQualify && i < dataGaps.size(); i++) {
DataGap gap = dataGaps.get(i);
args[i * 2] = gap.getStartId();
types[i * 2] = dataIdSqlType;
if ((i + 1) == numberOfGapsToQualify && (i + 1) < dataGaps.size()) {
args[i * 2 + 1] = dataGaps.get(dataGaps.size() - 1).getEndId();
} else {
args[i * 2 + 1] = gap.getEndId();
}
types[i * 2 + 1] = dataIdSqlType;
}

}
final Set<String> readyChannels = new HashSet<String>();
sqlTemplate.query(sql, new ISqlRowMapper<String>() {
public String mapRow(Row row) {
readyChannels.add(row.getString("channel_id"));
return null;
}
}, args, types);
return readyChannels;
}

protected String qualifyUsingDataGaps(List<DataGap> dataGaps, int numberOfGapsToQualify,
String sql) {
StringBuilder gapClause = new StringBuilder();
for (int i = 0; i < numberOfGapsToQualify && i < dataGaps.size(); i++) {
if (i == 0) {
gapClause.append("(");
} else {
gapClause.append(" or ");
}
gapClause.append("(data_id between ? and ?)");
}
gapClause.append(")");
return FormatUtils.replace("dataRange", gapClause.toString(), sql);
}

protected boolean producesCommonBatches(Channel channel, String nodeGroupId, List<TriggerRouter> triggerRouters) {
String channelId = channel.getChannelId();
Boolean producesCommonBatches = !Constants.CHANNEL_CONFIG.equals(channelId)
Expand Down
Expand Up @@ -29,6 +29,10 @@ public class RouterServiceSqlMap extends AbstractSqlMap {
public RouterServiceSqlMap(IDatabasePlatform platform, Map<String, String> replacementTokens) {
super(platform, replacementTokens);

putSql("selectChannelsUsingGapsSql", "select distinct channel_id from $(data) where $(dataRange)");

putSql("selectChannelsUsingStartDataId", "select distinct channel_id from $(data) where data_id >= ?");

putSql("selectDataUsingGapsSql",
""
+ "select $(selectDataUsingGapsSqlHint) d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, "
Expand Down
Expand Up @@ -956,6 +956,13 @@ routing.log.stats.on.batch.error=false
# Tags: routing
routing.collect.stats.unrouted=false

# Enable to query for which channels have data waiting, and then only route for those channels.
#
# DatabaseOverridable: true
# Tags: routing
# Type: boolean
routing.query.channels.first=true

# Use a faster method of gap detection that uses the output of the work from router service
# instead of querying for it.
#
Expand Down

0 comments on commit 80e6d17

Please sign in to comment.