Skip to content

Commit

Permalink
0001352: The batch link on the outgoing batch screen is broken
Browse files Browse the repository at this point in the history
0001353: The batch URL feature is broken
  • Loading branch information
chenson42 committed Jul 27, 2013
1 parent 49592a8 commit df81d50
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 33 deletions.
Expand Up @@ -25,7 +25,6 @@
import java.util.Date;
import java.util.List;

import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.StructureDataWriter.PayloadType;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.OutgoingBatch;
Expand Down Expand Up @@ -54,13 +53,9 @@ public interface IDataExtractorService {
public boolean extractBatchRange(Writer writer, String nodeId, long startBatchId, long endBatchId);

public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTime,
Date endBatchTime, String... channelIds);

public OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNode,
IDataWriter dataWriter, OutgoingBatch currentBatch,
boolean streamToFileEnabled, boolean updateBatchStatistics);
Date endBatchTime, String... channelIds);

public boolean extractOutgoingBatch(String nodeId, long batchId, Writer writer);
public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writer);

public RemoteNodeStatuses queueWork(boolean force);

Expand Down
Expand Up @@ -125,7 +125,7 @@ public class DataExtractorService extends AbstractService implements IDataExtrac

final static long MS_PASSED_BEFORE_BATCH_REQUERIED = 5000;

private enum ExtractMode { TO_STREAM, TO_PAYLOAD };
protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };

private IOutgoingBatchService outgoingBatchService;

Expand Down Expand Up @@ -360,7 +360,7 @@ public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo,
useDelimiterIdentifiers, symmetricDialect.getBinaryEncoding(),
useJdbcTimestampFormat, useUpsertStatements);
List<OutgoingBatch> extractedBatches = extract(processInfo, targetNode,
activeBatches, writer, ExtractMode.TO_PAYLOAD);
activeBatches, writer, ExtractMode.FOR_PAYLOAD_CLIENT);

List<OutgoingBatchWithPayload> batchesWithPayload = new ArrayList<OutgoingBatchWithPayload>();
for (OutgoingBatch batch : extractedBatches) {
Expand Down Expand Up @@ -405,7 +405,7 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
transport.openWriter(), targetNode.requires13Compatiblity());

return extract(processInfo, targetNode, activeBatches, dataWriter,
ExtractMode.TO_STREAM);
ExtractMode.FOR_SYM_CLIENT);
}

}
Expand All @@ -414,9 +414,18 @@ public List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,

}

public boolean extractOutgoingBatch(String nodeId, long batchId, Writer writer) {
/**
* This method will extract an outgoing batch, but will not update the outgoing batch status
*/
public boolean extractOnlyOutgoingBatch(String nodeId, long batchId, Writer writer) {
boolean extracted = false;
Node targetNode = nodeService.findNode(nodeId);

Node targetNode = null;
if (Constants.UNROUTED_NODE_ID.equals(nodeId)) {
targetNode = new Node(nodeId, parameterService.getNodeGroupId());
} else {
targetNode = nodeService.findNode(nodeId);
}
if (targetNode != null) {
OutgoingBatch batch = outgoingBatchService.findOutgoingBatch(batchId, nodeId);
if (batch != null) {
Expand All @@ -425,7 +434,7 @@ public boolean extractOutgoingBatch(String nodeId, long batchId, Writer writer)
List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>(1);
batches.add(batch);
batches = extract(new ProcessInfo(), targetNode, batches, dataWriter,
ExtractMode.TO_STREAM);
ExtractMode.EXTRACT_ONLY);
extracted = batches.size() > 0;
}
}
Expand Down Expand Up @@ -468,9 +477,9 @@ protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
log.info(
"Batch {} is marked as ready but it has been deleted. Rescheduling it for extraction",
currentBatch.getNodeBatchId());
currentBatch.setStatus(Status.RQ);
outgoingBatchService.updateOutgoingBatch(currentBatch);
resetExtractRequest(currentBatch);
if (changeBatchStatus(Status.RQ, currentBatch, mode)) {
resetExtractRequest(currentBatch);
}
break;
} else if (currentBatch.getStatus() == Status.RQ) {
log.info(
Expand All @@ -485,20 +494,20 @@ protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
} else {
processInfo.setStatus(ProcessInfo.Status.EXTRACTING);
currentBatch = extractOutgoingBatch(processInfo, targetNode, dataWriter,
currentBatch, streamToFileEnabled, true);
currentBatch, streamToFileEnabled, true, mode);
}

if (streamToFileEnabled || mode == ExtractMode.TO_PAYLOAD) {
if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch,
dataWriter);
dataWriter, mode);
}

processedBatches.add(currentBatch);

if (currentBatch.getStatus() != Status.OK) {
currentBatch.setLoadCount(currentBatch.getLoadCount() + 1);
changeBatchStatus(Status.LD, currentBatch);
changeBatchStatus(Status.LD, currentBatch, mode);

bytesSentCount += currentBatch.getByteCount();
batchesSentCount++;
Expand Down Expand Up @@ -569,11 +578,17 @@ protected List<OutgoingBatch> extract(ProcessInfo processInfo, Node targetNode,
}
}

final protected void changeBatchStatus(Status status, OutgoingBatch currentBatch) {
final protected boolean changeBatchStatus(Status status, OutgoingBatch currentBatch, ExtractMode mode) {
if (currentBatch.getStatus() != Status.IG) {
currentBatch.setStatus(status);
}
outgoingBatchService.updateOutgoingBatch(currentBatch);
if (mode != ExtractMode.EXTRACT_ONLY) {
outgoingBatchService.updateOutgoingBatch(currentBatch);
return true;
} else {
return false;
}

}

/**
Expand All @@ -588,9 +603,10 @@ final protected OutgoingBatch requeryIfEnoughTimeHasPassed(long ts, OutgoingBatc
return currentBatch;
}

public OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNode,
IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter, boolean updateBatchStatistics) {
if (currentBatch.getStatus() != Status.OK) {
protected OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNode,
IDataWriter dataWriter, OutgoingBatch currentBatch, boolean useStagingDataWriter,
boolean updateBatchStatistics, ExtractMode mode) {
if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) {
Node sourceNode = nodeService.findIdentity();

TransformWriter transformExtractWriter = null;
Expand Down Expand Up @@ -654,7 +670,7 @@ public OutgoingBatch extractOutgoingBatch(ProcessInfo processInfo, Node targetNo
synchronized (lock) {
if (!isPreviouslyExtracted(currentBatch)) {
currentBatch.setExtractCount(currentBatch.getExtractCount() + 1);
changeBatchStatus(Status.QY, currentBatch);
changeBatchStatus(Status.QY, currentBatch, mode);
IDataReader dataReader = new ExtractDataReader(
symmetricDialect.getPlatform(), new SelectFromSymDataSource(
currentBatch, sourceNode, targetNode));
Expand Down Expand Up @@ -740,10 +756,10 @@ protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) {
}

protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNode,
OutgoingBatch currentBatch, IDataWriter dataWriter) {
if (currentBatch.getStatus() != Status.OK) {
OutgoingBatch currentBatch, IDataWriter dataWriter, ExtractMode mode) {
if (currentBatch.getStatus() != Status.OK || ExtractMode.EXTRACT_ONLY == mode) {
currentBatch.setSentCount(currentBatch.getSentCount() + 1);
changeBatchStatus(Status.SE, currentBatch);
changeBatchStatus(Status.SE, currentBatch, mode);

long ts = System.currentTimeMillis();

Expand Down Expand Up @@ -976,7 +992,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
*/
extractOutgoingBatch(processInfo, targetNode,
new MultiBatchStagingWriter(identity.getNodeId(), stagingManager, batches,
channel.getMaxBatchSize()), batches.get(0), false, false);
channel.getMaxBatchSize()), batches.get(0), false, false, ExtractMode.FOR_SYM_CLIENT);

ISqlTransaction transaction = null;
try {
Expand Down
Expand Up @@ -361,8 +361,9 @@ public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node targetNode,
processInfo.incrementBatchCount();
processInfo.setCurrentBatchId(currentBatch.getBatchId());

engine.getDataExtractorService().extractOutgoingBatch(processInfo, targetNode,
dataWriter, currentBatch, false, true);
((DataExtractorService) engine.getDataExtractorService()).extractOutgoingBatch(
processInfo, targetNode, dataWriter, currentBatch, false, true,
DataExtractorService.ExtractMode.FOR_SYM_CLIENT);

/*
* check to see if max bytes to sync has been reached and
Expand Down
Expand Up @@ -52,7 +52,7 @@ public void handleWithCompression(HttpServletRequest req, HttpServletResponse re
if (dashIndex > 0) {
String nodeId = nodeIdBatchId.substring(0, dashIndex);
String batchId = nodeIdBatchId.substring(dashIndex + 1, nodeIdBatchId.length());
if (!dataExtractorService.extractOutgoingBatch(nodeId, Long.parseLong(batchId),
if (!dataExtractorService.extractOnlyOutgoingBatch(nodeId, Long.parseLong(batchId),
res.getWriter())) {
ServletUtils.sendError(res, HttpServletResponse.SC_NOT_FOUND);
} else {
Expand Down

0 comments on commit df81d50

Please sign in to comment.