Skip to content

Commit

Permalink
Added some notes for review
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 18, 2009
1 parent 2e7c336 commit ebc2f74
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 26 deletions.
Expand Up @@ -19,16 +19,16 @@
*/
package org.jumpmind.symmetric.route;

import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchHistory;

public class DefaultBatchAlgorithm implements IBatchAlgorithm {

public boolean completeBatch(NodeChannel channel, OutgoingBatchHistory history, OutgoingBatch batch, Data data,
boolean databaseTransactionBoundary) {
return history.getDataEventCount() >= channel.getMaxBatchSize() && databaseTransactionBoundary;
public boolean completeBatch(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData,
IRoutingContext routingContext) {
return history.getDataEventCount() >= dataMetaData.getChannel().getMaxBatchSize()
&& routingContext.isEncountedTransactionBoundary();
}

public boolean isAutoRegister() {
Expand Down
Expand Up @@ -20,17 +20,18 @@
package org.jumpmind.symmetric.route;

import org.jumpmind.symmetric.ext.IExtensionPoint;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchHistory;

/**
* A possible extension point that could be configured by channels to allow further control over batching algorithms. I am thinking that
* we provide two implementations that can be configured at the channel level: our default batching based on the number of events and another
* implementation that would batch only on transaction boundaries.
* An extension point that can be configured for a channel to allow further control over batching algorithms.
* <P>
* This is the point where the decision is made whether to end a batch or not.
*
* @since 2.0
*/
public interface IBatchAlgorithm extends IExtensionPoint {
public boolean completeBatch(NodeChannel channel, OutgoingBatchHistory history, OutgoingBatch batch, Data data, boolean databaseTransactionBoundary);
public interface IBatchAlgorithm extends IExtensionPoint {
public boolean completeBatch(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData,
IRoutingContext routingContext);
}
Expand Up @@ -19,16 +19,15 @@
*/
package org.jumpmind.symmetric.route;

import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.NodeChannel;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatchHistory;

public class TransactionalBatchAlgorithm implements IBatchAlgorithm {

public boolean completeBatch(NodeChannel channel, OutgoingBatchHistory history, OutgoingBatch batch, Data data,
boolean databaseTransactionBoundary) {
return databaseTransactionBoundary;
public boolean completeBatch(OutgoingBatchHistory history, OutgoingBatch batch, DataMetaData dataMetaData,
IRoutingContext routingContext) {
return routingContext.isEncountedTransactionBoundary();
}

public boolean isAutoRegister() {
Expand Down
Expand Up @@ -170,19 +170,28 @@ protected void filterDisabledNodes(Set<Node> nodes) {
}
}

/**
* Pre-read data and fill up a queue so we can peek ahead to see if we have crossed a database transaction boundary.
* Then route each {@link Data} while continuing to keep the queue filled until the result set is entirely read.
*
* @param conn
* The connection to use for selecting the data.
* @param context
* The current context of the routing process
*/
protected void selectDataAndRoute(Connection conn, IRoutingContext context) throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
// TODO add a flag to sym_trigger to indicate whether we need to read the row_data and or old_data for
// routing. We will get better performance if we don't read the data.
ps = conn.prepareStatement(getSql("selectDataToBatchSql"), ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(dbDialect.getStreamingResultsFetchSize());
rs = ps.executeQuery();
int peekAheadLength = parameterService.getInt(ParameterConstants.ROUTING_PEEK_AHEAD_WINDOW);
Map<String, Long> transactionIdDataId = new HashMap<String, Long>();
LinkedList<Data> dataQueue = new LinkedList<Data>();
// pre-populate data queue so we can look ahead to see if a
// transaction has finished.
for (int i = 0; i < peekAheadLength && rs.next(); i++) {
readData(rs, dataQueue, transactionIdDataId);
}
Expand Down Expand Up @@ -255,11 +264,9 @@ protected void insertDataEvents(IRoutingContext routingContext, DataMetaData dat
routingContext.setRouted(true);
dataService.insertDataEvent(routingContext.getJdbcTemplate(), dataMetaData.getData().getDataId(),
nodeId, batch.getBatchId());
if (batchAlgorithms.get(routingContext.getChannel().getBatchAlgorithm()).completeBatch(
routingContext.getChannel(), history, batch, dataMetaData.getData(),
routingContext.isEncountedTransactionBoundary())) {
// TODO Add route_time_ms to history. Also
// fix outgoing batch so we don't end up
if (batchAlgorithms.get(routingContext.getChannel().getBatchAlgorithm()).completeBatch(history,
batch, dataMetaData, routingContext)) {
// TODO Add route_time_ms to history. Also fix outgoing batch so we don't end up
// with so many history records
outgoingBatchService.insertOutgoingBatchHistory(routingContext.getJdbcTemplate(), history);
routingContext.getBatchesByNodes().remove(nodeId);
Expand Down
6 changes: 4 additions & 2 deletions symmetric/src/main/resources/ddl-config.xml
Expand Up @@ -21,9 +21,11 @@
<column name="row_data" type="LONGVARCHAR" />
<column name="pk_data" type="LONGVARCHAR" />
<column name="old_data" type="LONGVARCHAR" />
<!-- It might be nice to add a routing_data element that can be select similar
to the old node select which would be available to the routers -->
<column name="trigger_hist_id" type="INTEGER" required="true" />
<column name="transaction_id" type="VARCHAR" size="1000" />
<column name="source_node_id" type="VARCHAR" size="50" />
<column name="source_node_id" type="VARCHAR" size="50" />
<column name="create_time" type="TIMESTAMP" />
<index name="idx_data_create_time">
<index-column name="create_time" />
Expand Down Expand Up @@ -241,7 +243,7 @@
</index>
</table>

<!-- This table configures thresholds at which JMX notifications will be fired -->
<!-- This table configures thresholds at which JMX notifications will be fired-->
<table name="statistic_alert">
<column name="statistic_name" type="VARCHAR" size="50" required="true" primaryKey="true"/>
<column name="threshold_total_max" type="DECIMAL" size="20" />
Expand Down

0 comments on commit ebc2f74

Please sign in to comment.