Skip to content

Commit

Permalink
0004227: cleanup, get batch algo once for channel
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jan 4, 2020
1 parent 3fcb138 commit 2a7af10
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 9 deletions.
Expand Up @@ -85,14 +85,14 @@ public class ChannelRouterContext extends SimpleRouterContext {
private List<Long> uncommittedDataIds = new ArrayList<Long>();
private long uncommittedDataEventCount = 0;
private long committedDataEventCount = 0;
private Map<String, IBatchAlgorithm> batchAlgorithms;
private IBatchAlgorithm batchAlgorithm;

public ChannelRouterContext(String nodeId, NodeChannel channel, ISqlTransaction transaction, Map<String, IBatchAlgorithm> batchAlgorithms)
public ChannelRouterContext(String nodeId, NodeChannel channel, ISqlTransaction transaction, IBatchAlgorithm batchAlgorithm)
throws SQLException {
super(nodeId, channel);
this.sqlTransaction = transaction;
this.sqlTransaction.setInBatchMode(true);
this.batchAlgorithms = batchAlgorithms;
this.batchAlgorithm = batchAlgorithm;
}

public List<DataEvent> getDataEventList() {
Expand Down Expand Up @@ -336,12 +336,8 @@ public void setOverrideContainsBigLob(boolean overrideContainsBigLob) {
this.overrideContainsBigLob = overrideContainsBigLob;
}

public Map<String, IBatchAlgorithm> getBatchAlgorithms() {
return batchAlgorithms;
}

public boolean isBatchComplete(OutgoingBatch batch, DataMetaData dataMetaData) {
return batchAlgorithms.get(channel.getBatchAlgorithm()).isBatchComplete(batch, dataMetaData, this);
return batchAlgorithm.isBatchComplete(batch, dataMetaData, this);
}

public int getMaxBatchesJdbcFlushSize() {
Expand Down
Expand Up @@ -484,10 +484,11 @@ protected long routeDataForChannel(ProcessInfo processInfo, final NodeChannel no
triggerRouters);
boolean onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned(nodeChannel.getChannel(),
parameterService.getNodeGroupId(), triggerRouters);
IBatchAlgorithm batchAlgorithm = extensionService.getExtensionPointMap(IBatchAlgorithm.class).get(nodeChannel.getBatchAlgorithm());

context = new ChannelRouterContext(sourceNode.getNodeId(), nodeChannel,
symmetricDialect.getPlatform().getSqlTemplate().startSqlTransaction(),
extensionService.getExtensionPointMap(IBatchAlgorithm.class));
batchAlgorithm);
context.setProduceCommonBatches(producesCommonBatches);
context.setProduceGroupBatches(useCommonGroups);
context.setOnlyDefaultRoutersAssigned(onlyDefaultRoutersAssigned);
Expand Down

0 comments on commit 2a7af10

Please sign in to comment.