diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultBatchAlgorithm.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultBatchAlgorithm.java index 74ad7a3124..df9a6c2c7f 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultBatchAlgorithm.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultBatchAlgorithm.java @@ -19,16 +19,16 @@ */ package org.jumpmind.symmetric.route; -import org.jumpmind.symmetric.model.Data; -import org.jumpmind.symmetric.model.NodeChannel; +import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchHistory; public class DefaultBatchAlgorithm implements IBatchAlgorithm { - public boolean completeBatch(NodeChannel channel, OutgoingBatchHistory history, OutgoingBatch batch, Data data, - boolean databaseTransactionBoundary) { - return history.getDataEventCount() >= channel.getMaxBatchSize() && databaseTransactionBoundary; + public boolean completeBatch(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData, + IRoutingContext routingContext) { + return history.getDataEventCount() >= dataMetaData.getChannel().getMaxBatchSize() + && routingContext.isEncountedTransactionBoundary(); } public boolean isAutoRegister() { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/IBatchAlgorithm.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/IBatchAlgorithm.java index 6dd60820ea..ea580554f5 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/IBatchAlgorithm.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/IBatchAlgorithm.java @@ -20,17 +20,18 @@ package org.jumpmind.symmetric.route; import org.jumpmind.symmetric.ext.IExtensionPoint; -import org.jumpmind.symmetric.model.Data; -import org.jumpmind.symmetric.model.NodeChannel; +import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchHistory; /** - * A possible extension point that could be configured by channels to allow further control over batching algorithms. I am thinking that - * we provide two implementations that can be configured at the channel level: our default batching based on the number of events and another - * implementation that would batch only on transaction boundaries. + * An extension point that can be configured for a channel to allow further control over batching algorithms. + *
+ * This is the point where the decision is made whether to end a batch or not.
+ *
* @since 2.0
*/
-public interface IBatchAlgorithm extends IExtensionPoint {
- public boolean completeBatch(NodeChannel channel, OutgoingBatchHistory history, OutgoingBatch batch, Data data, boolean databaseTransactionBoundary);
+public interface IBatchAlgorithm extends IExtensionPoint {
+ public boolean completeBatch(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData,
+ IRoutingContext routingContext);
}
diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/TransactionalBatchAlgorithm.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/TransactionalBatchAlgorithm.java
index a31bbd96a3..7921bced06 100644
--- a/symmetric/src/main/java/org/jumpmind/symmetric/route/TransactionalBatchAlgorithm.java
+++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/TransactionalBatchAlgorithm.java
@@ -19,16 +19,15 @@
*/
package org.jumpmind.symmetric.route;
-import org.jumpmind.symmetric.model.Data;
-import org.jumpmind.symmetric.model.NodeChannel;
+import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchHistory;
public class TransactionalBatchAlgorithm implements IBatchAlgorithm {
- public boolean completeBatch(NodeChannel channel, OutgoingBatchHistory history, OutgoingBatch batch, Data data,
- boolean databaseTransactionBoundary) {
- return databaseTransactionBoundary;
+ public boolean completeBatch(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData,
+ IRoutingContext routingContext) {
+ return routingContext.isEncountedTransactionBoundary();
}
public boolean isAutoRegister() {
diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java
index 3e1d38f18a..38f0d64ed4 100644
--- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java
+++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java
@@ -170,10 +170,21 @@ protected void filterDisabledNodes(Set