From ebc2f7465c5357da5db43ec34065941c4a7b458b Mon Sep 17 00:00:00 2001 From: chenson42 Date: Sat, 18 Jul 2009 01:31:32 +0000 Subject: [PATCH] Added some notes for review --- .../route/DefaultBatchAlgorithm.java | 10 ++++----- .../symmetric/route/IBatchAlgorithm.java | 15 ++++++------- .../route/TransactionalBatchAlgorithm.java | 9 ++++---- .../service/impl/RoutingService.java | 21 ++++++++++++------- symmetric/src/main/resources/ddl-config.xml | 6 ++++-- 5 files changed, 35 insertions(+), 26 deletions(-) diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultBatchAlgorithm.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultBatchAlgorithm.java index 74ad7a3124..df9a6c2c7f 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultBatchAlgorithm.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/DefaultBatchAlgorithm.java @@ -19,16 +19,16 @@ */ package org.jumpmind.symmetric.route; -import org.jumpmind.symmetric.model.Data; -import org.jumpmind.symmetric.model.NodeChannel; +import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchHistory; public class DefaultBatchAlgorithm implements IBatchAlgorithm { - public boolean completeBatch(NodeChannel channel, OutgoingBatchHistory history, OutgoingBatch batch, Data data, - boolean databaseTransactionBoundary) { - return history.getDataEventCount() >= channel.getMaxBatchSize() && databaseTransactionBoundary; + public boolean completeBatch(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData, + IRoutingContext routingContext) { + return history.getDataEventCount() >= dataMetaData.getChannel().getMaxBatchSize() + && routingContext.isEncountedTransactionBoundary(); } public boolean isAutoRegister() { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/IBatchAlgorithm.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/IBatchAlgorithm.java index 6dd60820ea..ea580554f5 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/IBatchAlgorithm.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/IBatchAlgorithm.java @@ -20,17 +20,18 @@ package org.jumpmind.symmetric.route; import org.jumpmind.symmetric.ext.IExtensionPoint; -import org.jumpmind.symmetric.model.Data; -import org.jumpmind.symmetric.model.NodeChannel; +import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchHistory; /** - * A possible extension point that could be configured by channels to allow further control over batching algorithms. I am thinking that - * we provide two implementations that can be configured at the channel level: our default batching based on the number of events and another - * implementation that would batch only on transaction boundaries. + * An extension point that can be configured for a channel to allow further control over batching algorithms. + *

+ * This is the point where the decision is made whether to end a batch or not. + * * @since 2.0 */ -public interface IBatchAlgorithm extends IExtensionPoint { - public boolean completeBatch(NodeChannel channel, OutgoingBatchHistory history, OutgoingBatch batch, Data data, boolean databaseTransactionBoundary); +public interface IBatchAlgorithm extends IExtensionPoint { + public boolean completeBatch(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData, + IRoutingContext routingContext); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/route/TransactionalBatchAlgorithm.java b/symmetric/src/main/java/org/jumpmind/symmetric/route/TransactionalBatchAlgorithm.java index a31bbd96a3..7921bced06 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/route/TransactionalBatchAlgorithm.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/route/TransactionalBatchAlgorithm.java @@ -19,16 +19,15 @@ */ package org.jumpmind.symmetric.route; -import org.jumpmind.symmetric.model.Data; -import org.jumpmind.symmetric.model.NodeChannel; +import org.jumpmind.symmetric.model.DataMetaData; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatchHistory; public class TransactionalBatchAlgorithm implements IBatchAlgorithm { - public boolean completeBatch(NodeChannel channel, OutgoingBatchHistory history, OutgoingBatch batch, Data data, - boolean databaseTransactionBoundary) { - return databaseTransactionBoundary; + public boolean completeBatch(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData, + IRoutingContext routingContext) { + return routingContext.isEncountedTransactionBoundary(); } public boolean isAutoRegister() { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java index 3e1d38f18a..38f0d64ed4 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RoutingService.java @@ -170,10 +170,21 @@ protected void filterDisabledNodes(Set nodes) { } } + /** + * Pre-read data and fill up a queue so we can peek ahead to see if we have crossed a database transaction boundary. + * Then route each {@link Data} while continuing to keep the queue filled until the result set is entirely read. + * + * @param conn + * The connection to use for selecting the data. + * @param context + * The current context of the routing process + */ protected void selectDataAndRoute(Connection conn, IRoutingContext context) throws SQLException { PreparedStatement ps = null; ResultSet rs = null; try { + // TODO add a flag to sym_trigger to indicate whether we need to read the row_data and or old_data for + // routing. We will get better performance if we don't read the data. ps = conn.prepareStatement(getSql("selectDataToBatchSql"), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ps.setFetchSize(dbDialect.getStreamingResultsFetchSize()); @@ -181,8 +192,6 @@ protected void selectDataAndRoute(Connection conn, IRoutingContext context) thro int peekAheadLength = parameterService.getInt(ParameterConstants.ROUTING_PEEK_AHEAD_WINDOW); Map transactionIdDataId = new HashMap(); LinkedList dataQueue = new LinkedList(); - // pre-populate data queue so we can look ahead to see if a - // transaction has finished. for (int i = 0; i < peekAheadLength && rs.next(); i++) { readData(rs, dataQueue, transactionIdDataId); } @@ -255,11 +264,9 @@ protected void insertDataEvents(IRoutingContext routingContext, DataMetaData dat routingContext.setRouted(true); dataService.insertDataEvent(routingContext.getJdbcTemplate(), dataMetaData.getData().getDataId(), nodeId, batch.getBatchId()); - if (batchAlgorithms.get(routingContext.getChannel().getBatchAlgorithm()).completeBatch( - routingContext.getChannel(), history, batch, dataMetaData.getData(), - routingContext.isEncountedTransactionBoundary())) { - // TODO Add route_time_ms to history. Also - // fix outgoing batch so we don't end up + if (batchAlgorithms.get(routingContext.getChannel().getBatchAlgorithm()).completeBatch(history, + batch, dataMetaData, routingContext)) { + // TODO Add route_time_ms to history. Also fix outgoing batch so we don't end up // with so many history records outgoingBatchService.insertOutgoingBatchHistory(routingContext.getJdbcTemplate(), history); routingContext.getBatchesByNodes().remove(nodeId); diff --git a/symmetric/src/main/resources/ddl-config.xml b/symmetric/src/main/resources/ddl-config.xml index c5e5b11e20..c1a21f9773 100644 --- a/symmetric/src/main/resources/ddl-config.xml +++ b/symmetric/src/main/resources/ddl-config.xml @@ -21,9 +21,11 @@ + - + @@ -241,7 +243,7 @@ - +