Skip to content

Commit

Permalink
0005450: Routing default batch algorithm protection against large
Browse files Browse the repository at this point in the history
transactions
  • Loading branch information
erilong committed Sep 8, 2022
1 parent 2e0091d commit 4697850
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 3 deletions.
2 changes: 1 addition & 1 deletion symmetric-assemble/src/asciidoc/configuration/channels.ad
Expand Up @@ -26,7 +26,7 @@ Channel ID:: Identifier used through the system to identify a given channel.
.Channel Batching Algorithms
|===

|Default|All changes that happen in a transaction are guaranteed to be batched together. Multiple transactions will be batched and committed together until there is no more data to be sent or the max_batch_size is reached.
|Default|All changes that happen in a transaction are guaranteed to be batched together. Multiple transactions will be batched and committed together until there is no more data to be sent or the max_batch_size is reached. The routing.max.batch.size.exceed.percent parameter is used to keep batch sizes within a percentage over the max_batch_size, or it can be set to zero for no limit.

|Transactional|Batches will map directly to database transactions. If there are many small database transactions, then there will be many batches. The max_batch_size column has no effect.

Expand Down
Expand Up @@ -203,6 +203,7 @@ private ParameterConstants() {
public final static String ROUTING_USE_NON_COMMON_FOR_INCOMING = "routing.use.non.common.for.incoming";
public final static String ROUTING_GAPS_USE_TRANSACTION_VIEW = "routing.gaps.use.transaction.view";
public final static String ROUTING_GAPS_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS = "routing.gaps.transaction.view.clock.sync.threshold";
public final static String ROUTING_MAX_BATCH_SIZE_EXCEED_PERCENT = "routing.max.batch.size.exceed.percent";
public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates";
@Deprecated
public final static String INCOMING_BATCH_DELETE_ON_LOAD = "incoming.batch.delete.on.load";
Expand Down
Expand Up @@ -31,7 +31,7 @@ public class DefaultBatchAlgorithm implements IBatchAlgorithm, IBuiltInExtension
public static final String NAME = "default";

public boolean isBatchComplete(OutgoingBatch batch, DataMetaData dataMetaData, SimpleRouterContext routingContext) {
return batch.getDataRowCount() >= dataMetaData.getNodeChannel().getMaxBatchSize()
&& routingContext.isEncountedTransactionBoundary();
return (batch.getDataRowCount() >= dataMetaData.getNodeChannel().getMaxBatchSize() && routingContext.isEncountedTransactionBoundary()) ||
(routingContext.getBatchSizeNotToExceed() > 0 && batch.getDataRowCount() >= routingContext.getBatchSizeNotToExceed());
}
}
Expand Up @@ -40,6 +40,7 @@ public class SimpleRouterContext extends Context {
protected Map<String, Long> stats = new HashMap<String, Long>();
protected String nodeId;
protected boolean requestGapDetection = false;
protected int batchSizeNotToExceed;

public SimpleRouterContext() {
}
Expand Down Expand Up @@ -77,6 +78,14 @@ public boolean isEncountedTransactionBoundary() {
return this.encountedTransactionBoundary;
}

public int getBatchSizeNotToExceed() {
return batchSizeNotToExceed;
}

public void setBatchSizeNotToExceed(int batchSizeNotToExceed) {
this.batchSizeNotToExceed = batchSizeNotToExceed;
}

synchronized public void incrementStat(long amount, String name) {
Long val = stats.get(name);
if (val == null) {
Expand Down
Expand Up @@ -468,6 +468,10 @@ protected long routeDataForChannel(ProcessInfo processInfo, final NodeChannel no
context.setDataGaps(gapDetector.getDataGaps());
context.setOverrideContainsBigLob(isOverrideContainsBigLob);
context.setMaxBatchesJdbcFlushSize(parameterService.getInt(ParameterConstants.ROUTING_FLUSH_BATCHES_JDBC_BATCH_SIZE, 5000));
int maxBatchSizeExceedPercent = parameterService.getInt(ParameterConstants.ROUTING_MAX_BATCH_SIZE_EXCEED_PERCENT);
if (maxBatchSizeExceedPercent > 0) {
context.setBatchSizeNotToExceed((int) (nodeChannel.getMaxBatchSize() * (1 + (maxBatchSizeExceedPercent / 100f))));
}
if (overrideBatchesByNodes != null) {
context.getBatchesByNodes().putAll(overrideBatchesByNodes);
}
Expand Down
11 changes: 11 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -1634,6 +1634,17 @@ routing.use.common.groups=true
# Tags: routing
routing.use.non.common.for.incoming=true

# The percentage of the channel's max batch size that a batch can exceed when seeking a transaction boundary
# using the default batch algorithm. Use zero to indicate that the batch size can grow as large as needed
# to include the complete transaction. For example, a setting of 100 percent with a channel's max batch size of 1000
# allows a batch size of 2000 to be routed before it will be forced as complete. This setting protects from a large
# transaction that causes batch sizes that far exceed the channel's max size and have trouble loading on the target database.
#
# DatabaseOverridable: true
# Type: integer
# Tags: routing
routing.max.batch.size.exceed.percent=100

# This is the number of data events that will be batched and committed together while building a batch.
# Note that this only kicks in if the prospective batch size is bigger than the configured max batch size.
#
Expand Down

0 comments on commit 4697850

Please sign in to comment.