Skip to content

Commit

Permalink
0001370: When initial loads are configured to extract in the backgrou…
Browse files Browse the repository at this point in the history
…nd multiple load requests can queue up on top of each other
  • Loading branch information
chenson42 committed Aug 7, 2013
1 parent d92aff2 commit db4701d
Showing 1 changed file with 43 additions and 12 deletions.
Expand Up @@ -992,28 +992,60 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
request.getEndBatchId() });
List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(
request.getStartBatchId(), request.getEndBatchId()).getBatches();

ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(identity
.getNodeId(), nodeCommunication.getNodeId(),
ProcessInfoKey.ProcessType.INITIAL_LOAD_EXTRACT_JOB));
try {
Channel channel = configurationService.getChannel(batches.get(0).getChannelId());
/*
* "Trick" the extractor to extract one reload batch, but we
* will split it across the N batches when writing it
*/
extractOutgoingBatch(processInfo, targetNode,
new MultiBatchStagingWriter(identity.getNodeId(), stagingManager, batches,
channel.getMaxBatchSize()), batches.get(0), false, false, ExtractMode.FOR_SYM_CLIENT);
boolean areBatchesOk = true;

// check to see if batches have been OK'd by another reload request
for (OutgoingBatch outgoingBatch : batches) {
if (outgoingBatch.getStatus() != Status.OK) {
areBatchesOk = false;
}
}

if (!areBatchesOk) {

Channel channel = configurationService
.getChannel(batches.get(0).getChannelId());
/*
* "Trick" the extractor to extract one reload batch, but we
* will split it across the N batches when writing it
*/
extractOutgoingBatch(processInfo, targetNode,
new MultiBatchStagingWriter(identity.getNodeId(), stagingManager,
batches, channel.getMaxBatchSize()), batches.get(0), false,
false, ExtractMode.FOR_SYM_CLIENT);

}

// re-query the batches to see if they have been OK'd while extracting
batches = outgoingBatchService.getOutgoingBatchRange(
request.getStartBatchId(), request.getEndBatchId()).getBatches();

areBatchesOk = true;

// check to see if batches have been OK'd by another reload request
// while extracting
for (OutgoingBatch outgoingBatch : batches) {
if (outgoingBatch.getStatus() != Status.OK) {
areBatchesOk = false;
}
}

ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
updateExtractRequestStatus(transaction, request.getRequestId(),
ExtractStatus.OK);

for (OutgoingBatch outgoingBatch : batches) {
outgoingBatch.setStatus(Status.NE);
outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch);
if (!areBatchesOk) {
for (OutgoingBatch outgoingBatch : batches) {
outgoingBatch.setStatus(Status.NE);
outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch);
}
}
transaction.commit();

Expand All @@ -1039,7 +1071,6 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
processInfo.setStatus(org.jumpmind.symmetric.model.ProcessInfo.Status.ERROR);
throw ex;
}

} else {
log.warn("An extract was requested, but no extract records where found for node {}",
nodeCommunication.getNodeId());
Expand Down

0 comments on commit db4701d

Please sign in to comment.