Skip to content

Commit

Permalink
0002653: Make unrouted data collection optional
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jun 29, 2016
1 parent 9a262a1 commit bd1206c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
Expand Up @@ -156,6 +156,7 @@ private ParameterConstants() {
public final static String ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED = "routing.data.reader.order.by.gap.id.enabled";
public final static String ROUTING_DATA_READER_THRESHOLD_GAPS_TO_USE_GREATER_QUERY = "routing.data.reader.threshold.gaps.to.use.greater.than.query";
public final static String ROUTING_LOG_STATS_ON_BATCH_ERROR = "routing.log.stats.on.batch.error";
public final static String ROUTING_COLLECT_STATS_UNROUTED = "routing.collect.stats.unrouted";
public final static String ROUTING_USE_FAST_GAP_DETECTOR = "routing.use.fast.gap.detector";

public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates";
Expand Down
Expand Up @@ -536,19 +536,22 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
gapDetector.setIsAllDataRead(context.getDataIds().size() < context.getChannel().getMaxDataToRoute());
context.incrementStat(System.currentTimeMillis() - insertTs,
ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);
Data lastDataProcessed = context.getLastDataProcessed();
if (lastDataProcessed != null && lastDataProcessed.getDataId() > 0) {
String channelId = nodeChannel.getChannelId();
long queryTs = System.currentTimeMillis();
long dataLeftToRoute = sqlTemplate.queryForInt(
getSql("selectUnroutedCountForChannelSql"), channelId,
lastDataProcessed.getDataId());
queryTs = System.currentTimeMillis() - queryTs;
if (queryTs > Constants.LONG_OPERATION_THRESHOLD) {
log.warn("Unrouted query for channel {} took longer than expected", channelId, queryTs);
log.info("The query took {} ms", queryTs);

if (parameterService.is(ParameterConstants.ROUTING_COLLECT_STATS_UNROUTED)) {
Data lastDataProcessed = context.getLastDataProcessed();
if (lastDataProcessed != null && lastDataProcessed.getDataId() > 0) {
String channelId = nodeChannel.getChannelId();
long queryTs = System.currentTimeMillis();
long dataLeftToRoute = sqlTemplate.queryForInt(
getSql("selectUnroutedCountForChannelSql"), channelId,
lastDataProcessed.getDataId());
queryTs = System.currentTimeMillis() - queryTs;
if (queryTs > Constants.LONG_OPERATION_THRESHOLD) {
log.warn("Unrouted query for channel {} took longer than expected", channelId, queryTs);
log.info("The query took {} ms", queryTs);
}
engine.getStatisticManager().setDataUnRouted(channelId, dataLeftToRoute);
}
engine.getStatisticManager().setDataUnRouted(channelId, dataLeftToRoute);
}
}
} catch (Exception e) {
Expand Down
Expand Up @@ -943,6 +943,12 @@ routing.data.reader.threshold.gaps.to.use.greater.than.query=100
# Tags: routing
routing.log.stats.on.batch.error=false

# Enable to collect unrouted data statistics into the stat tables for graphs.
#
# DatabaseOverridable: true
# Tags: routing
routing.collect.stats.unrouted=false

# Use a faster method of gap detection that uses the output of the work from router service
# instead of querying for it.
#
Expand Down

0 comments on commit bd1206c

Please sign in to comment.