Skip to content

Commit

Permalink
1929400 - Don't ACK multiple batches in the same transaction for fear…
Browse files Browse the repository at this point in the history
… of database deadlocks under stress conditions.
  • Loading branch information
chenson42 committed Mar 30, 2008
1 parent b3812eb commit 2082477
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 107 deletions.
Expand Up @@ -21,9 +21,7 @@
package org.jumpmind.symmetric.model;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class OutgoingBatch implements Serializable {

Expand Down Expand Up @@ -99,10 +97,8 @@ public BatchType getBatchType() {
return batchType;
}

public List<BatchInfo> getBatchInfoList() {
List<BatchInfo> list = new ArrayList<BatchInfo>();
list.add(new BatchInfo(this.batchId));
return list;
public BatchInfo getBatchInfo() {
return new BatchInfo(this.batchId);
}

public void setBatchType(BatchType batchType) {
Expand Down
Expand Up @@ -20,11 +20,9 @@

package org.jumpmind.symmetric.service;

import java.util.List;

import org.jumpmind.symmetric.model.BatchInfo;

public interface IAcknowledgeService
{
public void ack(List<BatchInfo> batches);
public void ack(BatchInfo batch);
}
Expand Up @@ -24,7 +24,6 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import org.jumpmind.symmetric.model.BatchInfo;
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
Expand All @@ -44,41 +43,39 @@ public class AcknowledgeService extends AbstractService implements IAcknowledgeS
private IOutgoingBatchHistoryService outgoingBatchHistoryService;

@Transactional
public void ack(List<BatchInfo> batches) {
for (final BatchInfo batch : batches) {
final Integer id = new Integer(batch.getBatchId());
// update the outgoing_batch record
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
PreparedStatement batchUpdate = null;
try {
batchUpdate = conn.prepareStatement(updateOutgoingBatchSql);
batchUpdate.setString(1, batch.isOk() ? Status.OK.name() : Status.ER.name());
batchUpdate.setInt(2, id);
batchUpdate.executeUpdate();
return null;
} finally {
JdbcUtils.closeStatement(batchUpdate);
}
public void ack(final BatchInfo batch) {
final Integer id = new Integer(batch.getBatchId());
// update the outgoing_batch record
jdbcTemplate.execute(new ConnectionCallback() {
public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
PreparedStatement batchUpdate = null;
try {
batchUpdate = conn.prepareStatement(updateOutgoingBatchSql);
batchUpdate.setString(1, batch.isOk() ? Status.OK.name() : Status.ER.name());
batchUpdate.setInt(2, id);
batchUpdate.executeUpdate();
return null;
} finally {
JdbcUtils.closeStatement(batchUpdate);
}
});

// add a record to outgoing_batch_hist indicating success
if (batch.isOk()) {
outgoingBatchHistoryService.ok(id);
}
// add a record to outgoing_batch_hist indicating an error
else {
if (batch.getErrorLine() != BatchInfo.UNDEFINED_ERROR_LINE_NUMBER) {
CallBackHandler handler = new CallBackHandler(batch.getErrorLine());
});

// add a record to outgoing_batch_hist indicating success
if (batch.isOk()) {
outgoingBatchHistoryService.ok(id);
}
// add a record to outgoing_batch_hist indicating an error
else {
if (batch.getErrorLine() != BatchInfo.UNDEFINED_ERROR_LINE_NUMBER) {
CallBackHandler handler = new CallBackHandler(batch.getErrorLine());

jdbcTemplate.query(selectDataIdSql, new Object[] { id }, handler);
final long dataId = handler.getDataId();
jdbcTemplate.query(selectDataIdSql, new Object[] { id }, handler);
final long dataId = handler.getDataId();

outgoingBatchHistoryService.error(id, dataId);
} else {
outgoingBatchHistoryService.error(id, 0l);
}
outgoingBatchHistoryService.error(id, dataId);
} else {
outgoingBatchHistoryService.error(id, 0l);
}
}
}
Expand Down
Expand Up @@ -127,7 +127,10 @@ private void pushToNode(Node remote) {
batchInfo = parser.nextBatch();
}

ackService.ack(batches);
for (BatchInfo batch : batches) {
ackService.ack(batch);
}

} finally {
transport.close();
}
Expand Down
Expand Up @@ -142,7 +142,7 @@ protected boolean writeConfiguration(Node node, OutputStream out)
trigger, transport);
// acknowledge right away, because the acknowledgment is not build into the registration
// protocol.
acknowledgeService.ack(batch.getBatchInfoList());
acknowledgeService.ack(batch.getBatchInfo());
}
}
dataExtractorService.extractNodeIdentityFor(node, transport);
Expand Down
Expand Up @@ -33,7 +33,9 @@ public class AckResourceHandler extends AbstractTransportResourceHandler {
private IAcknowledgeService acknowledgeService;

public void ack(List<BatchInfo> batches) throws IOException {
acknowledgeService.ack(batches);
for (BatchInfo batchInfo : batches) {
acknowledgeService.ack(batchInfo);
}
}

public void setAcknowledgeService(IAcknowledgeService acknowledgeService) {
Expand Down
Expand Up @@ -57,8 +57,7 @@
/**
* Coordinates interaction between two symmetric engines in the same JVM.
*/
public class InternalTransportManager extends AbstractTransportManager
implements ITransportManager {
public class InternalTransportManager extends AbstractTransportManager implements ITransportManager {

static final Log logger = LogFactory.getLog(InternalTransportManager.class);

Expand All @@ -69,33 +68,29 @@ public InternalTransportManager(IRuntimeConfig config) {
this.runtimeConfiguration = config;
}

public IIncomingTransport getPullTransport(final Node remote, final Node local)
throws IOException {
public IIncomingTransport getPullTransport(final Node remote, final Node local) throws IOException {
final PipedOutputStream respOs = new PipedOutputStream();
final PipedInputStream respIs = new PipedInputStream(respOs);

runAtClient(remote.getSyncURL(), null, respOs, new IClientRunnable() {
public void run(BeanFactory factory, InputStream is, OutputStream os)
throws Exception {
public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception {
// TODO this is duplicated from the Pull Servlet. It should be consolidated somehow!
INodeService nodeService = (INodeService)factory.getBean(Constants.NODE_SERVICE);
INodeService nodeService = (INodeService) factory.getBean(Constants.NODE_SERVICE);
NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
if (security.isInitialLoadEnabled()) {
((IDataService)factory.getBean(Constants.DATA_SERVICE)).insertReloadEvent(local);
((IDataService) factory.getBean(Constants.DATA_SERVICE)).insertReloadEvent(local);
}
IDataExtractorService extractor = (IDataExtractorService) factory
.getBean(Constants.DATAEXTRACTOR_SERVICE);
IOutgoingTransport transport = new InternalOutgoingTransport(
respOs);
IOutgoingTransport transport = new InternalOutgoingTransport(respOs);
extractor.extract(local, transport);
transport.close();
}
});
return new InternalIncomingTransport(respIs);
}

public IOutgoingWithResponseTransport getPushTransport(final Node remote, final Node local)
throws IOException {
public IOutgoingWithResponseTransport getPushTransport(final Node remote, final Node local) throws IOException {

final PipedOutputStream pushOs = new PipedOutputStream();
final PipedInputStream pushIs = new PipedInputStream(pushOs);
Expand All @@ -104,56 +99,47 @@ public IOutgoingWithResponseTransport getPushTransport(final Node remote, final
final PipedInputStream respIs = new PipedInputStream(respOs);

runAtClient(remote.getSyncURL(), pushIs, respOs, new IClientRunnable() {
public void run(BeanFactory factory, InputStream is, OutputStream os)
throws Exception {
public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception {
// This should be basically what the push servlet does ...
IDataLoaderService service = (IDataLoaderService) factory
.getBean(Constants.DATALOADER_SERVICE);
IDataLoaderService service = (IDataLoaderService) factory.getBean(Constants.DATALOADER_SERVICE);
service.loadData(pushIs, respOs);
}
});
return new InternalOutgoingWithResponseTransport(pushOs, respIs);
}

public IIncomingTransport getRegisterTransport(final Node client)
throws IOException {
public IIncomingTransport getRegisterTransport(final Node client) throws IOException {

final PipedOutputStream respOs = new PipedOutputStream();
final PipedInputStream respIs = new PipedInputStream(respOs);

runAtClient(runtimeConfiguration.getRegistrationUrl(), null, respOs,
new IClientRunnable() {
public void run(BeanFactory factory, InputStream is,
OutputStream os) throws Exception {
// This should be basically what the registration servlet does ...
IRegistrationService service = (IRegistrationService) factory
.getBean(Constants.REGISTRATION_SERVICE);
service.registerNode(client, os);
}
});
runAtClient(runtimeConfiguration.getRegistrationUrl(), null, respOs, new IClientRunnable() {
public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception {
// This should be basically what the registration servlet does ...
IRegistrationService service = (IRegistrationService) factory.getBean(Constants.REGISTRATION_SERVICE);
service.registerNode(client, os);
}
});
return new InternalIncomingTransport(respIs);
}

public boolean sendAcknowledgement(Node remote,
List<IncomingBatchHistory> list, Node local) throws IOException {
public boolean sendAcknowledgement(Node remote, List<IncomingBatchHistory> list, Node local) throws IOException {
try {
if (list != null && list.size() > 0) {
SymmetricEngine remoteEngine = getTargetEngine(remote
.getSyncURL());
SymmetricEngine remoteEngine = getTargetEngine(remote.getSyncURL());
List<BatchInfo> batches = new ArrayList<BatchInfo>();
for (IncomingBatchHistory loadStatus : list) {
if (loadStatus.getStatus() == Status.OK
|| loadStatus.getStatus() == Status.SK) {
if (loadStatus.getStatus() == Status.OK || loadStatus.getStatus() == Status.SK) {
batches.add(new BatchInfo(loadStatus.getBatchId()));
} else {
batches.add(new BatchInfo(loadStatus.getBatchId(),
loadStatus.getFailedRowNumber()));
batches.add(new BatchInfo(loadStatus.getBatchId(), loadStatus.getFailedRowNumber()));
}
}
IAcknowledgeService service = (IAcknowledgeService) remoteEngine
.getApplicationContext().getBean(
Constants.ACKNOWLEDGE_SERVICE);
service.ack(batches);
IAcknowledgeService service = (IAcknowledgeService) remoteEngine.getApplicationContext().getBean(
Constants.ACKNOWLEDGE_SERVICE);
for (BatchInfo batchInfo : batches) {
service.ack(batchInfo);
}

}
return true;
Expand All @@ -163,16 +149,15 @@ public boolean sendAcknowledgement(Node remote,
}
}

public void writeAcknowledgement(OutputStream out,
List<IncomingBatchHistory> list) throws IOException {
public void writeAcknowledgement(OutputStream out, List<IncomingBatchHistory> list) throws IOException {
String data = getAcknowledgementData(list);
PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, ENCODING), true);
pw.println(data);
pw.close();
}

private void runAtClient(final String url, final InputStream is,
final OutputStream os, final IClientRunnable runnable) {
private void runAtClient(final String url, final InputStream is, final OutputStream os,
final IClientRunnable runnable) {
new Thread() {
public void run() {
try {
Expand All @@ -191,17 +176,14 @@ public void run() {
private SymmetricEngine getTargetEngine(String url) {
SymmetricEngine engine = SymmetricEngine.findEngineByUrl(url);
if (engine == null) {
throw new NullPointerException(
"Could not find the engine reference for the following url: "
+ url);
throw new NullPointerException("Could not find the engine reference for the following url: " + url);
} else {
return engine;
}
}

interface IClientRunnable {
public void run(BeanFactory factory, InputStream is, OutputStream os)
throws Exception;
public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception;
}

}
Expand Up @@ -63,9 +63,7 @@ protected void setUp() {
@Test(groups = "continuous")
public void okTest() {
cleanSlate();
ArrayList<BatchInfo> list = new ArrayList<BatchInfo>();
list.add(new BatchInfo("1"));
ackService.ack(list);
ackService.ack(new BatchInfo("1"));

List<OutgoingBatchHistory> history = getOutgoingBatchHistory("1");
Assert.assertEquals(history.size(), 1);
Expand Down Expand Up @@ -120,10 +118,7 @@ public void errorErrorTest() {
}

protected void errorTestCore(String batchId, int errorLine, long expectedResults) {
ArrayList<BatchInfo> list = new ArrayList<BatchInfo>();
list.add(new BatchInfo(batchId, errorLine));
ackService.ack(list);

ackService.ack(new BatchInfo(batchId, errorLine));
List<OutgoingBatchHistory> history = getOutgoingBatchHistory(batchId);
Assert.assertEquals(history.size(), 1);
OutgoingBatchHistory hist = history.get(0);
Expand Down
Expand Up @@ -25,7 +25,6 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -226,9 +225,7 @@ public void testErrorChannel() {
String thirdBatchId = batches.get(2).getBatchId();

// Ack the first batch as an error, leaving the others as new
ArrayList<BatchInfo> ackList = new ArrayList<BatchInfo>();
ackList.add(new BatchInfo(firstBatchId, 1));
ackService.ack(ackList);
ackService.ack(new BatchInfo(firstBatchId, 1));

// Get the batches again. The error channel batches should be last
batches = batchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
Expand Down
Expand Up @@ -22,14 +22,12 @@
*/
package org.jumpmind.symmetric.service.mock;

import java.util.List;

import org.jumpmind.symmetric.model.BatchInfo;
import org.jumpmind.symmetric.service.IAcknowledgeService;

public class MockAcknowledgeService implements IAcknowledgeService {

public void ack(List<BatchInfo> batches) {
public void ack(BatchInfo batch) {

}

Expand Down

0 comments on commit 2082477

Please sign in to comment.