From 830f7a86d175ccafa2d4c72f15fa6d134b9a46a2 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Mon, 31 Mar 2008 21:22:01 +0000 Subject: [PATCH] 1930496 - Make building of batches transactional so only one connection is reserved from the database. --- .../service/IOutgoingBatchService.java | 2 + .../service/impl/DataExtractorService.java | 4 +- .../symmetric/service/impl/DataService.java | 4 +- .../service/impl/OutgoingBatchService.java | 140 +++++++++--------- .../mock/MockOutgoingBatchService.java | 7 +- 5 files changed, 84 insertions(+), 73 deletions(-) diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index e683a2d017..216a1c2084 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -31,6 +31,8 @@ public interface IOutgoingBatchService { public void insertOutgoingBatch(final OutgoingBatch outgoingBatch); public void buildOutgoingBatches(String nodeId, final List channels); + + public void buildOutgoingBatches(final String nodeId, final NodeChannel channel); public List getOutgoingBatches(String nodeId); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 24dfca126b..8ec8cd2166 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -179,7 +179,9 @@ public boolean extract(Node node, final IExtractListener handler) throws Excepti List channels = configurationService.getChannelsFor(true); - outgoingBatchService.buildOutgoingBatches(node.getNodeId(), channels); + for (NodeChannel nodeChannel : channels) { + outgoingBatchService.buildOutgoingBatches(node.getNodeId(), nodeChannel); + } List batches = filterMaxNumberOfOutgoingBatches(outgoingBatchService.getOutgoingBatches(node .getNodeId()), channels); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index e601ca9aa2..ab18125ba3 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -209,12 +209,10 @@ public void insertReloadEvent(Node targetNode) { * data events and reload batches. */ private void buildReloadBatches(String nodeId) { - List channels = new ArrayList(1); NodeChannel channel = new NodeChannel(); channel.setId(Constants.CHANNEL_RELOAD); channel.setEnabled(true); - channels.add(channel); - outgoingBatchService.buildOutgoingBatches(nodeId, channels); + outgoingBatchService.buildOutgoingBatches(nodeId, channel); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index add6154edd..ae436e5f47 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -46,6 +46,7 @@ import org.springframework.jdbc.core.PreparedStatementCallback; import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.support.JdbcUtils; +import org.springframework.transaction.annotation.Transactional; public class OutgoingBatchService extends AbstractService implements IOutgoingBatchService { @@ -73,7 +74,7 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa private JdbcTemplate outgoingBatchQueryTemplate; - private IOutgoingBatchHistoryService historyService; + private IOutgoingBatchHistoryService historyService; private IDbDialect dbDialect; @@ -86,7 +87,15 @@ public class OutgoingBatchService extends AbstractService implements IOutgoingBa * other than possibly leaving a batch row w/out data every now and then or leaving a batch w/out the * associated history row. */ + @Transactional public void buildOutgoingBatches(final String nodeId, final List channels) { + for (NodeChannel nodeChannel : channels) { + buildOutgoingBatches(nodeId, nodeChannel); + } + } + + @Transactional + public void buildOutgoingBatches(final String nodeId, final NodeChannel channel) { jdbcTemplate.execute(new ConnectionCallback() { public Object doInConnection(Connection conn) throws SQLException, DataAccessException { @@ -96,89 +105,86 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc update.setQueryTimeout(jdbcTemplate.getQueryTimeout()); - for (NodeChannel channel : channels) { + if (channel.isSuspended()) { + logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended."); + } else if (channel.isEnabled()) { + // determine which transactions will be part of this batch on this channel + PreparedStatement select = null; + ResultSet results = null; - if (channel.isSuspended()) { - logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended."); - } else if (channel.isEnabled()) { - // determine which transactions will be part of this batch on this channel - PreparedStatement select = null; - ResultSet results = null; + try { - try { + select = conn.prepareStatement(selectEventsToBatchSql); - select = conn.prepareStatement(selectEventsToBatchSql); + select.setQueryTimeout(jdbcTemplate.getQueryTimeout()); - select.setQueryTimeout(jdbcTemplate.getQueryTimeout()); + select.setString(1, nodeId); + select.setString(2, channel.getId()); + results = select.executeQuery(); - select.setString(1, nodeId); - select.setString(2, channel.getId()); - results = select.executeQuery(); + int count = 0; + boolean peekAheadMode = false; + int peekAheadCountDown = batchSizePeekAhead; + Set transactionIds = new HashSet(); - int count = 0; - boolean peekAheadMode = false; - int peekAheadCountDown = batchSizePeekAhead; - Set transactionIds = new HashSet(); + OutgoingBatch newBatch = new OutgoingBatch(); + newBatch.setBatchType(BatchType.EVENTS); + newBatch.setChannelId(channel.getId()); + newBatch.setNodeId(nodeId); - OutgoingBatch newBatch = new OutgoingBatch(); - newBatch.setBatchType(BatchType.EVENTS); - newBatch.setChannelId(channel.getId()); - newBatch.setNodeId(nodeId); - - // node channel is setup to ignore, just mark the batch as already processed. - if (channel.isIgnored()) { - newBatch.setStatus(Status.OK); - } + // node channel is setup to ignore, just mark the batch as already processed. + if (channel.isIgnored()) { + newBatch.setStatus(Status.OK); + } - if (results.next()) { + if (results.next()) { - insertOutgoingBatch(newBatch); + insertOutgoingBatch(newBatch); - do { - String trxId = results.getString(1); - if (trxId != null) { - transactionIds.add(trxId); - } + do { + String trxId = results.getString(1); + if (trxId != null) { + transactionIds.add(trxId); + } - if (!peekAheadMode - || (peekAheadMode && (trxId != null && transactionIds.contains(trxId)))) { - peekAheadCountDown = batchSizePeekAhead; + if (!peekAheadMode + || (peekAheadMode && (trxId != null && transactionIds.contains(trxId)))) { + peekAheadCountDown = batchSizePeekAhead; - int dataId = results.getInt(2); + int dataId = results.getInt(2); - update.clearParameters(); - update.setLong(1, Long.valueOf(newBatch.getBatchId())); - update.setString(2, nodeId); - update.setLong(3, dataId); - update.addBatch(); + update.clearParameters(); + update.setLong(1, Long.valueOf(newBatch.getBatchId())); + update.setString(2, nodeId); + update.setLong(3, dataId); + update.addBatch(); - count++; + count++; - } else { - peekAheadCountDown--; - } + } else { + peekAheadCountDown--; + } - if (count > channel.getMaxBatchSize()) { - peekAheadMode = true; - } + if (count > channel.getMaxBatchSize()) { + peekAheadMode = true; + } - // put this in so we don't build up too many - // statements to send to the server. - if (count % 10000 == 0) { - update.executeBatch(); - } + // put this in so we don't build up too many + // statements to send to the server. + if (count % 10000 == 0) { + update.executeBatch(); + } - } while (results.next() && peekAheadCountDown != 0); + } while (results.next() && peekAheadCountDown != 0); - historyService.created(new Integer(newBatch.getBatchId()), count); - } + historyService.created(new Integer(newBatch.getBatchId()), count); + } - } finally { + } finally { - JdbcUtils.closeResultSet(results); - JdbcUtils.closeStatement(select); + JdbcUtils.closeResultSet(results); + JdbcUtils.closeStatement(select); - } } update.executeBatch(); @@ -191,7 +197,7 @@ public Object doInConnection(Connection conn) throws SQLException, DataAccessExc }); } - public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) { + public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) { long batchId = dbDialect.insertWithGeneratedKey(createBatchSql, "sym_outgoing_batch_batch_id", new PreparedStatementCallback() { public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException { @@ -212,8 +218,8 @@ public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, D */ @SuppressWarnings("unchecked") public List getOutgoingBatches(String nodeId) { - return (List) outgoingBatchQueryTemplate.query(selectOutgoingBatchSql, - new Object[] { nodeId }, new OutgoingBatchMapper()); + return (List) outgoingBatchQueryTemplate.query(selectOutgoingBatchSql, new Object[] { nodeId }, + new OutgoingBatchMapper()); } @SuppressWarnings("unchecked") @@ -252,13 +258,13 @@ public boolean isInitialLoadComplete(String nodeId) { if (security == null || security.isInitialLoadEnabled()) { return false; } - + List statuses = (List) jdbcTemplate.queryForList(initialLoadStatusSql, new Object[] { nodeId }, String.class); if (statuses == null || statuses.size() == 0) { throw new RuntimeException("The initial load has not been started for " + nodeId); } - + for (String status : statuses) { if (!Status.OK.name().equals(status)) { return false; diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockOutgoingBatchService.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockOutgoingBatchService.java index 94b4f91b1f..a9a6da176f 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockOutgoingBatchService.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockOutgoingBatchService.java @@ -32,8 +32,11 @@ public void buildOutgoingBatches(String nodeId, List channels) { } - public List getOutgoingBatchRange(String startBatchId, - String endBatchId) { + public void buildOutgoingBatches(String nodeId, NodeChannel channel) { + + } + + public List getOutgoingBatchRange(String startBatchId, String endBatchId) { return null; }