Skip to content

Commit

Permalink
0004094: Routing common batch mode for groups of nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Sep 26, 2019
1 parent c58da50 commit 8389629
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 27 deletions.
Expand Up @@ -22,9 +22,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.Node;
Expand Down Expand Up @@ -62,8 +60,6 @@ public boolean shouldDataBeRouted(SimpleRouterContext context, DataMetaData data

public List<DataGap> getDataGaps();

public void stop ();

public Set<Channel> getCommomBatchChannels(List<Channel> channels, String nodeGroupId, List<TriggerRouter> triggerRouters);
public void stop();

}
Expand Up @@ -583,6 +583,7 @@ protected String qualifyUsingDataGaps(List<DataGap> dataGaps, int numberOfGapsTo
return FormatUtils.replace("dataRange", gapClause.toString(), sql);
}

@Deprecated
protected boolean producesCommonBatches(Channel channel, String nodeGroupId, List<TriggerRouter> triggerRouters) {
String channelId = channel.getChannelId();
Boolean producesCommonBatches = commonBatchesLastKnownState.get(channelId);
Expand Down Expand Up @@ -709,8 +710,9 @@ protected long routeDataForChannel(ProcessInfo processInfo, final NodeChannel no
long ts = System.currentTimeMillis();
long dataCount = -1;
try {
boolean useCommonGroups = parameterService.is(ParameterConstants.ROUTING_USE_COMMON_GROUPS);
List<TriggerRouter> triggerRouters = engine.getTriggerRouterService().getTriggerRouters(false);
boolean producesCommonBatches = producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(),
boolean producesCommonBatches = !useCommonGroups && producesCommonBatches(nodeChannel.getChannel(), parameterService.getNodeGroupId(),
triggerRouters);
boolean onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned(nodeChannel.getChannel(),
parameterService.getNodeGroupId(), triggerRouters);
Expand All @@ -719,7 +721,7 @@ protected long routeDataForChannel(ProcessInfo processInfo, final NodeChannel no
symmetricDialect.getPlatform().getSqlTemplate().startSqlTransaction(),
extensionService.getExtensionPointMap(IBatchAlgorithm.class));
context.setProduceCommonBatches(producesCommonBatches);
context.setProduceGroupBatches(parameterService.is(ParameterConstants.ROUTING_USE_COMMON_GROUPS));
context.setProduceGroupBatches(useCommonGroups);
context.setOnlyDefaultRoutersAssigned(onlyDefaultRoutersAssigned);
context.setDataGaps(gapDetector.getDataGaps());
context.setOverrideContainsBigLob(isOverrideContainsBigLob);
Expand Down Expand Up @@ -847,13 +849,12 @@ protected void completeBatchesAndCommit(ChannelRouterContext context) {

Set<IDataRouter> usedRouters = new HashSet<IDataRouter>(context.getUsedDataRouters());
List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(context.getBatchesByNodes().values());
completeBatches(context, batches, usedRouters);

for (Map<String, OutgoingBatch> groupBatches : context.getBatchesByGroups().values()) {
batches = new ArrayList<OutgoingBatch>(groupBatches.values());
completeBatches(context, batches, usedRouters);
batches.addAll(groupBatches.values());
}


completeBatches(context, batches, usedRouters);
context.commit();

for (IDataRouter dataRouter : usedRouters) {
Expand All @@ -870,7 +871,6 @@ protected void completeBatches(ChannelRouterContext context, List<OutgoingBatch>
}

for (OutgoingBatch batch : batches) {
batch.setRouterMillis(System.currentTimeMillis() - batch.getCreateTime().getTime());
for (IDataRouter dataRouter : usedRouters) {
dataRouter.completeBatch(context, batch);
}
Expand All @@ -879,6 +879,7 @@ protected void completeBatches(ChannelRouterContext context, List<OutgoingBatch>
} else {
batch.setStatus(Status.NE);
}
batch.setRouterMillis((System.currentTimeMillis() - batch.getCreateTime().getTime()) / batches.size());
engine.getOutgoingBatchService().updateOutgoingBatch(context.getSqlTransaction(), batch);
}
}
Expand Down Expand Up @@ -959,7 +960,7 @@ protected long selectDataAndRoute(ProcessInfo processInfo, ChannelRouterContext
.getInt(ParameterConstants.ROUTING_FLUSH_JDBC_BATCH_SIZE);
try {
long ts = System.currentTimeMillis();
long startTime = System.currentTimeMillis();
long startTime = ts;
nextData = reader.take();
do {
if (nextData != null) {
Expand Down Expand Up @@ -1158,8 +1159,8 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
nodeIds = new HashSet<String>(1);
nodeIds.add(Constants.UNROUTED_NODE_ID);
}

if (context.isProduceGroupBatches() && !context.isProduceCommonBatches()) {
if (context.isProduceGroupBatches()) {
Map<Integer, Map<String, OutgoingBatch>> batchesByGroups = context.getBatchesByGroups();
int groupKey = nodeIds.hashCode();
batches = batchesByGroups.get(groupKey);
Expand Down Expand Up @@ -1192,7 +1193,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
} else {
context.setLastLoadId(-1);
}

for (String nodeId : nodeIds) {
if (nodeId != null) {
OutgoingBatch batch = batches.get(nodeId);
Expand Down Expand Up @@ -1326,15 +1327,4 @@ protected Table buildTableFromTriggerHistory(TriggerHistory triggerHistory) {
return table;
}

@Override
public Set<Channel> getCommomBatchChannels(List<Channel> channels, String nodeGroupId, List<TriggerRouter> triggerRouters) {
Set<Channel> commonBatchChannels = new HashSet<Channel>();

for (Channel channel : channels) {
if (producesCommonBatches(channel, nodeGroupId, triggerRouters)) {
commonBatchChannels.add(channel);
}
}
return commonBatchChannels;
}
}

0 comments on commit 8389629

Please sign in to comment.