Skip to content
Permalink
Browse files

0003885: Routing run immediately when channel is full

  • Loading branch information...
elong
elong committed Mar 6, 2019
1 parent 30890c9 commit 1c1bcb223ec03e3796aafd4afb8f6d9d210d43f2
@@ -132,6 +132,8 @@
protected boolean syncTriggersBeforeInitialLoadAttempted = false;

protected boolean firstTimeCheck = true;

protected boolean hasMaxDataRoutedOnChannel;

public RouterService(ISymmetricEngine engine) {
super(engine.getParameterService(), engine.getSymmetricDialect());
@@ -215,19 +217,27 @@ synchronized public long routeData(boolean force) {
}
}
insertInitialLoadEvents();
engine.getClusterService().refreshLock(ClusterConstants.ROUTE);

long ts = System.currentTimeMillis();
gapDetector.beforeRouting();

dataCount = routeDataForEachChannel();
ts = System.currentTimeMillis() - ts;
if (dataCount > 0 || ts > Constants.LONG_OPERATION_THRESHOLD) {
log.info("Routed {} data events in {} ms", dataCount, ts);
}
if (dataCount > 0) {
gapDetector.afterRouting();
}
do {
engine.getClusterService().refreshLock(ClusterConstants.ROUTE);

long ts = System.currentTimeMillis();
hasMaxDataRoutedOnChannel = false;
gapDetector.beforeRouting();

dataCount = routeDataForEachChannel();
ts = System.currentTimeMillis() - ts;
if (dataCount > 0 || ts > Constants.LONG_OPERATION_THRESHOLD) {
log.info("Routed {} data events in {} ms", dataCount, ts);
}
if (dataCount > 0) {
gapDetector.afterRouting();
}
if (hasMaxDataRoutedOnChannel) {
log.debug("Immediately routing again because a channel reached max data to route");
}
} while (hasMaxDataRoutedOnChannel);

} finally {
if (!force) {
engine.getClusterService().unlock(ClusterConstants.ROUTE);
@@ -756,6 +766,7 @@ protected int routeDataForChannel(ProcessInfo processInfo, final NodeChannel nod
completeBatchesAndCommit(context);
gapDetector.addDataIds(context.getDataIds());
gapDetector.setIsAllDataRead(context.getDataIds().size() < context.getChannel().getMaxDataToRoute());
hasMaxDataRoutedOnChannel |= context.getDataIds().size() >= context.getChannel().getMaxDataToRoute();
context.incrementStat(System.currentTimeMillis() - insertTs,
ChannelRouterContext.STAT_INSERT_DATA_EVENTS_MS);

@@ -1240,6 +1240,14 @@ routing.detect.invalid.gaps=true
# Tags: routing
routing.stale.gap.busy.expire.time.ms=7200000

# Start routing again immediately when that last run of routing reached the max data to route
# for a channel.
#
# DatabaseOverridable: true
# Type: boolean
# Tags: routing
routing.immediate.if.max.channel=true

# This is the number of data events that will be batched and committed together while building a batch.
# Note that this only kicks in if the prospective batch size is bigger than the configured max batch size.
#

0 comments on commit 1c1bcb2

Please sign in to comment.
You can’t perform that action at this time.