From 2082477a6862067d26b745b9993439769eaffef5 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Sun, 30 Mar 2008 14:45:15 +0000 Subject: [PATCH] 1929400 - Don't ACK multiple batches in the same transaction for fear of database deadlocks under stress conditions. --- .../symmetric/model/OutgoingBatch.java | 8 +- .../service/IAcknowledgeService.java | 4 +- .../service/impl/AcknowledgeService.java | 61 +++++++------- .../symmetric/service/impl/PushService.java | 5 +- .../service/impl/RegistrationService.java | 2 +- .../transport/AckResourceHandler.java | 4 +- .../internal/InternalTransportManager.java | 80 +++++++------------ .../service/impl/AcknowledgeServiceTest.java | 9 +-- .../impl/OutgoingBatchServiceTest.java | 5 +- .../service/mock/MockAcknowledgeService.java | 4 +- 10 files changed, 75 insertions(+), 107 deletions(-) diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java b/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java index e60b9e619a..13ef230b6f 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java @@ -21,9 +21,7 @@ package org.jumpmind.symmetric.model; import java.io.Serializable; -import java.util.ArrayList; import java.util.Date; -import java.util.List; public class OutgoingBatch implements Serializable { @@ -99,10 +97,8 @@ public BatchType getBatchType() { return batchType; } - public List getBatchInfoList() { - List list = new ArrayList(); - list.add(new BatchInfo(this.batchId)); - return list; + public BatchInfo getBatchInfo() { + return new BatchInfo(this.batchId); } public void setBatchType(BatchType batchType) { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/IAcknowledgeService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/IAcknowledgeService.java index 1e5d1410ce..6683b425e8 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/IAcknowledgeService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/IAcknowledgeService.java @@ -20,11 +20,9 @@ package org.jumpmind.symmetric.service; -import java.util.List; - import org.jumpmind.symmetric.model.BatchInfo; public interface IAcknowledgeService { - public void ack(List batches); + public void ack(BatchInfo batch); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java index c8a885968f..8329f203b1 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java @@ -24,7 +24,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.List; import org.jumpmind.symmetric.model.BatchInfo; import org.jumpmind.symmetric.model.OutgoingBatch.Status; @@ -44,41 +43,39 @@ public class AcknowledgeService extends AbstractService implements IAcknowledgeS private IOutgoingBatchHistoryService outgoingBatchHistoryService; @Transactional - public void ack(List batches) { - for (final BatchInfo batch : batches) { - final Integer id = new Integer(batch.getBatchId()); - // update the outgoing_batch record - jdbcTemplate.execute(new ConnectionCallback() { - public Object doInConnection(Connection conn) throws SQLException, DataAccessException { - PreparedStatement batchUpdate = null; - try { - batchUpdate = conn.prepareStatement(updateOutgoingBatchSql); - batchUpdate.setString(1, batch.isOk() ? Status.OK.name() : Status.ER.name()); - batchUpdate.setInt(2, id); - batchUpdate.executeUpdate(); - return null; - } finally { - JdbcUtils.closeStatement(batchUpdate); - } + public void ack(final BatchInfo batch) { + final Integer id = new Integer(batch.getBatchId()); + // update the outgoing_batch record + jdbcTemplate.execute(new ConnectionCallback() { + public Object doInConnection(Connection conn) throws SQLException, DataAccessException { + PreparedStatement batchUpdate = null; + try { + batchUpdate = conn.prepareStatement(updateOutgoingBatchSql); + batchUpdate.setString(1, batch.isOk() ? Status.OK.name() : Status.ER.name()); + batchUpdate.setInt(2, id); + batchUpdate.executeUpdate(); + return null; + } finally { + JdbcUtils.closeStatement(batchUpdate); } - }); - - // add a record to outgoing_batch_hist indicating success - if (batch.isOk()) { - outgoingBatchHistoryService.ok(id); } - // add a record to outgoing_batch_hist indicating an error - else { - if (batch.getErrorLine() != BatchInfo.UNDEFINED_ERROR_LINE_NUMBER) { - CallBackHandler handler = new CallBackHandler(batch.getErrorLine()); + }); + + // add a record to outgoing_batch_hist indicating success + if (batch.isOk()) { + outgoingBatchHistoryService.ok(id); + } + // add a record to outgoing_batch_hist indicating an error + else { + if (batch.getErrorLine() != BatchInfo.UNDEFINED_ERROR_LINE_NUMBER) { + CallBackHandler handler = new CallBackHandler(batch.getErrorLine()); - jdbcTemplate.query(selectDataIdSql, new Object[] { id }, handler); - final long dataId = handler.getDataId(); + jdbcTemplate.query(selectDataIdSql, new Object[] { id }, handler); + final long dataId = handler.getDataId(); - outgoingBatchHistoryService.error(id, dataId); - } else { - outgoingBatchHistoryService.error(id, 0l); - } + outgoingBatchHistoryService.error(id, dataId); + } else { + outgoingBatchHistoryService.error(id, 0l); } } } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java index b771256d0d..3131d3e8f1 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java @@ -127,7 +127,10 @@ private void pushToNode(Node remote) { batchInfo = parser.nextBatch(); } - ackService.ack(batches); + for (BatchInfo batch : batches) { + ackService.ack(batch); + } + } finally { transport.close(); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index 75ded850c0..f61c9225eb 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -142,7 +142,7 @@ protected boolean writeConfiguration(Node node, OutputStream out) trigger, transport); // acknowledge right away, because the acknowledgment is not build into the registration // protocol. - acknowledgeService.ack(batch.getBatchInfoList()); + acknowledgeService.ack(batch.getBatchInfo()); } } dataExtractorService.extractNodeIdentityFor(node, transport); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/AckResourceHandler.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/AckResourceHandler.java index ff5a34e8fa..3021f1d1dc 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/AckResourceHandler.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/AckResourceHandler.java @@ -33,7 +33,9 @@ public class AckResourceHandler extends AbstractTransportResourceHandler { private IAcknowledgeService acknowledgeService; public void ack(List batches) throws IOException { - acknowledgeService.ack(batches); + for (BatchInfo batchInfo : batches) { + acknowledgeService.ack(batchInfo); + } } public void setAcknowledgeService(IAcknowledgeService acknowledgeService) { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java index 3793ccd6eb..9bc50f7b6b 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/internal/InternalTransportManager.java @@ -57,8 +57,7 @@ /** * Coordinates interaction between two symmetric engines in the same JVM. */ -public class InternalTransportManager extends AbstractTransportManager - implements ITransportManager { +public class InternalTransportManager extends AbstractTransportManager implements ITransportManager { static final Log logger = LogFactory.getLog(InternalTransportManager.class); @@ -69,24 +68,21 @@ public InternalTransportManager(IRuntimeConfig config) { this.runtimeConfiguration = config; } - public IIncomingTransport getPullTransport(final Node remote, final Node local) - throws IOException { + public IIncomingTransport getPullTransport(final Node remote, final Node local) throws IOException { final PipedOutputStream respOs = new PipedOutputStream(); final PipedInputStream respIs = new PipedInputStream(respOs); runAtClient(remote.getSyncURL(), null, respOs, new IClientRunnable() { - public void run(BeanFactory factory, InputStream is, OutputStream os) - throws Exception { + public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception { // TODO this is duplicated from the Pull Servlet. It should be consolidated somehow! - INodeService nodeService = (INodeService)factory.getBean(Constants.NODE_SERVICE); + INodeService nodeService = (INodeService) factory.getBean(Constants.NODE_SERVICE); NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId()); if (security.isInitialLoadEnabled()) { - ((IDataService)factory.getBean(Constants.DATA_SERVICE)).insertReloadEvent(local); + ((IDataService) factory.getBean(Constants.DATA_SERVICE)).insertReloadEvent(local); } IDataExtractorService extractor = (IDataExtractorService) factory .getBean(Constants.DATAEXTRACTOR_SERVICE); - IOutgoingTransport transport = new InternalOutgoingTransport( - respOs); + IOutgoingTransport transport = new InternalOutgoingTransport(respOs); extractor.extract(local, transport); transport.close(); } @@ -94,8 +90,7 @@ public void run(BeanFactory factory, InputStream is, OutputStream os) return new InternalIncomingTransport(respIs); } - public IOutgoingWithResponseTransport getPushTransport(final Node remote, final Node local) - throws IOException { + public IOutgoingWithResponseTransport getPushTransport(final Node remote, final Node local) throws IOException { final PipedOutputStream pushOs = new PipedOutputStream(); final PipedInputStream pushIs = new PipedInputStream(pushOs); @@ -104,56 +99,47 @@ public IOutgoingWithResponseTransport getPushTransport(final Node remote, final final PipedInputStream respIs = new PipedInputStream(respOs); runAtClient(remote.getSyncURL(), pushIs, respOs, new IClientRunnable() { - public void run(BeanFactory factory, InputStream is, OutputStream os) - throws Exception { + public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception { // This should be basically what the push servlet does ... - IDataLoaderService service = (IDataLoaderService) factory - .getBean(Constants.DATALOADER_SERVICE); + IDataLoaderService service = (IDataLoaderService) factory.getBean(Constants.DATALOADER_SERVICE); service.loadData(pushIs, respOs); } }); return new InternalOutgoingWithResponseTransport(pushOs, respIs); } - public IIncomingTransport getRegisterTransport(final Node client) - throws IOException { + public IIncomingTransport getRegisterTransport(final Node client) throws IOException { final PipedOutputStream respOs = new PipedOutputStream(); final PipedInputStream respIs = new PipedInputStream(respOs); - runAtClient(runtimeConfiguration.getRegistrationUrl(), null, respOs, - new IClientRunnable() { - public void run(BeanFactory factory, InputStream is, - OutputStream os) throws Exception { - // This should be basically what the registration servlet does ... - IRegistrationService service = (IRegistrationService) factory - .getBean(Constants.REGISTRATION_SERVICE); - service.registerNode(client, os); - } - }); + runAtClient(runtimeConfiguration.getRegistrationUrl(), null, respOs, new IClientRunnable() { + public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception { + // This should be basically what the registration servlet does ... + IRegistrationService service = (IRegistrationService) factory.getBean(Constants.REGISTRATION_SERVICE); + service.registerNode(client, os); + } + }); return new InternalIncomingTransport(respIs); } - public boolean sendAcknowledgement(Node remote, - List list, Node local) throws IOException { + public boolean sendAcknowledgement(Node remote, List list, Node local) throws IOException { try { if (list != null && list.size() > 0) { - SymmetricEngine remoteEngine = getTargetEngine(remote - .getSyncURL()); + SymmetricEngine remoteEngine = getTargetEngine(remote.getSyncURL()); List batches = new ArrayList(); for (IncomingBatchHistory loadStatus : list) { - if (loadStatus.getStatus() == Status.OK - || loadStatus.getStatus() == Status.SK) { + if (loadStatus.getStatus() == Status.OK || loadStatus.getStatus() == Status.SK) { batches.add(new BatchInfo(loadStatus.getBatchId())); } else { - batches.add(new BatchInfo(loadStatus.getBatchId(), - loadStatus.getFailedRowNumber())); + batches.add(new BatchInfo(loadStatus.getBatchId(), loadStatus.getFailedRowNumber())); } } - IAcknowledgeService service = (IAcknowledgeService) remoteEngine - .getApplicationContext().getBean( - Constants.ACKNOWLEDGE_SERVICE); - service.ack(batches); + IAcknowledgeService service = (IAcknowledgeService) remoteEngine.getApplicationContext().getBean( + Constants.ACKNOWLEDGE_SERVICE); + for (BatchInfo batchInfo : batches) { + service.ack(batchInfo); + } } return true; @@ -163,16 +149,15 @@ public boolean sendAcknowledgement(Node remote, } } - public void writeAcknowledgement(OutputStream out, - List list) throws IOException { + public void writeAcknowledgement(OutputStream out, List list) throws IOException { String data = getAcknowledgementData(list); PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, ENCODING), true); pw.println(data); pw.close(); } - private void runAtClient(final String url, final InputStream is, - final OutputStream os, final IClientRunnable runnable) { + private void runAtClient(final String url, final InputStream is, final OutputStream os, + final IClientRunnable runnable) { new Thread() { public void run() { try { @@ -191,17 +176,14 @@ public void run() { private SymmetricEngine getTargetEngine(String url) { SymmetricEngine engine = SymmetricEngine.findEngineByUrl(url); if (engine == null) { - throw new NullPointerException( - "Could not find the engine reference for the following url: " - + url); + throw new NullPointerException("Could not find the engine reference for the following url: " + url); } else { return engine; } } interface IClientRunnable { - public void run(BeanFactory factory, InputStream is, OutputStream os) - throws Exception; + public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception; } } diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/AcknowledgeServiceTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/AcknowledgeServiceTest.java index 7362c12a6c..ef251da0cf 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/AcknowledgeServiceTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/AcknowledgeServiceTest.java @@ -63,9 +63,7 @@ protected void setUp() { @Test(groups = "continuous") public void okTest() { cleanSlate(); - ArrayList list = new ArrayList(); - list.add(new BatchInfo("1")); - ackService.ack(list); + ackService.ack(new BatchInfo("1")); List history = getOutgoingBatchHistory("1"); Assert.assertEquals(history.size(), 1); @@ -120,10 +118,7 @@ public void errorErrorTest() { } protected void errorTestCore(String batchId, int errorLine, long expectedResults) { - ArrayList list = new ArrayList(); - list.add(new BatchInfo(batchId, errorLine)); - ackService.ack(list); - + ackService.ack(new BatchInfo(batchId, errorLine)); List history = getOutgoingBatchHistory(batchId); Assert.assertEquals(history.size(), 1); OutgoingBatchHistory hist = history.get(0); diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceTest.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceTest.java index 11958962b3..7a549bfaa0 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceTest.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceTest.java @@ -25,7 +25,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -226,9 +225,7 @@ public void testErrorChannel() { String thirdBatchId = batches.get(2).getBatchId(); // Ack the first batch as an error, leaving the others as new - ArrayList ackList = new ArrayList(); - ackList.add(new BatchInfo(firstBatchId, 1)); - ackService.ack(ackList); + ackService.ack(new BatchInfo(firstBatchId, 1)); // Get the batches again. The error channel batches should be last batches = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID); diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockAcknowledgeService.java b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockAcknowledgeService.java index f6a38a38d2..3d58242875 100644 --- a/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockAcknowledgeService.java +++ b/symmetric/src/test/java/org/jumpmind/symmetric/service/mock/MockAcknowledgeService.java @@ -22,14 +22,12 @@ */ package org.jumpmind.symmetric.service.mock; -import java.util.List; - import org.jumpmind.symmetric.model.BatchInfo; import org.jumpmind.symmetric.service.IAcknowledgeService; public class MockAcknowledgeService implements IAcknowledgeService { - public void ack(List batches) { + public void ack(BatchInfo batch) { }