Skip to content

Commit

Permalink
0002920: Offline mode extract is broken
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Nov 29, 2016
1 parent c8bf633 commit d1d1cbb
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 36 deletions.
Expand Up @@ -140,6 +140,7 @@
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.service.impl.TransformService.TransformTableNodeGroupLink;
import org.jumpmind.symmetric.statistic.IStatisticManager;
import org.jumpmind.symmetric.transport.BatchBufferedWriter;
import org.jumpmind.symmetric.transport.IOutgoingTransport;
import org.jumpmind.symmetric.transport.TransportUtils;
import org.jumpmind.symmetric.util.SymmetricUtils;
Expand Down Expand Up @@ -981,8 +982,9 @@ protected boolean isPreviouslyExtracted(OutgoingBatch currentBatch) {
}

protected boolean isRetry(OutgoingBatch currentBatch, Node remoteNode) {
boolean offline = parameterService.is(ParameterConstants.NODE_OFFLINE, false);
IStagedResource previouslyExtracted = getStagedResource(currentBatch);
return previouslyExtracted != null && previouslyExtracted.exists() && previouslyExtracted.getState() != State.CREATE
return !offline && previouslyExtracted != null && previouslyExtracted.exists() && previouslyExtracted.getState() != State.CREATE
&& currentBatch.getStatus() != OutgoingBatch.Status.RS && currentBatch.getSentCount() > 0 && remoteNode.isVersionGreaterThanOrEqualTo(3, 8, 0);
}

Expand All @@ -992,11 +994,11 @@ protected OutgoingBatch sendOutgoingBatch(ProcessInfo processInfo, Node targetNo
currentBatch.setSentCount(currentBatch.getSentCount() + 1);

long ts = System.currentTimeMillis();

IStagedResource extractedBatch = getStagedResource(currentBatch);
if (extractedBatch != null) {
if (mode == ExtractMode.FOR_SYM_CLIENT && writer != null) {
if (!isRetry && parameterService.is(ParameterConstants.OUTGOING_BATCH_COPY_TO_INCOMING_STAGING)) {
if (!isRetry && parameterService.is(ParameterConstants.OUTGOING_BATCH_COPY_TO_INCOMING_STAGING) &&
!parameterService.is(ParameterConstants.NODE_OFFLINE, false)) {
ISymmetricEngine targetEngine = AbstractSymmetricEngine.findEngineByUrl(targetNode.getSyncUrl());
if (targetEngine != null) {
try {
Expand Down Expand Up @@ -1135,6 +1137,11 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
totalThrottleTime, maxKBytesPerSec);
}
}

if (writer instanceof BatchBufferedWriter) {
((BatchBufferedWriter)writer).getBatchIds().add(batch.getBatchId());

}
} catch (Throwable t) {
throw new RuntimeException(t);
} finally {
Expand Down
Expand Up @@ -129,12 +129,13 @@ private void pushToNode(Node remote, RemoteNodeStatus status) {
ProcessInfo processInfo = statisticManager.newProcessInfo(new ProcessInfoKey(
identity.getNodeId(), status.getChannelId(), remote.getNodeId(), ProcessType.OFFLINE_PUSH));

List<OutgoingBatch> extractedBatches = null;
try {
transport = (FileOutgoingTransport) transportManager.getPushTransport(remote, identity, null, null);

List<OutgoingBatch> extractedBatches = dataExtractorService.extract(processInfo, remote, status.getChannelId(), transport);
extractedBatches = dataExtractorService.extract(processInfo, remote, status.getChannelId(), transport);
if (extractedBatches.size() > 0) {
log.info("Offline push data written for {}", remote);
log.info("Offline push data written for {} at {}", remote, transport.getOutgoingDir());
List<BatchAck> batchAcks = readAcks(extractedBatches, transport, transportManager, acknowledgeService);
status.updateOutgoingStatus(extractedBatches, batchAcks);
}
Expand All @@ -144,6 +145,7 @@ private void pushToNode(Node remote, RemoteNodeStatus status) {
}
} catch (Exception ex) {
processInfo.setStatus(Status.ERROR);
log.error("Failed to write offline file", ex);
} finally {
transport.close();
transport.complete(processInfo.getStatus() == Status.OK);
Expand Down
@@ -0,0 +1,19 @@
package org.jumpmind.symmetric.transport;

import java.io.BufferedWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;

public class BatchBufferedWriter extends BufferedWriter {

List<Long> batchIds = new ArrayList<Long>();

public BatchBufferedWriter(Writer out) {
super(out);
}

public List<Long> getBatchIds() {
return batchIds;
}
}
Expand Up @@ -28,15 +28,12 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringReader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.jumpmind.symmetric.io.data.CsvConstants;
import org.jumpmind.symmetric.model.ChannelMap;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.transport.BatchBufferedWriter;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.web.WebConstants;

Expand All @@ -52,11 +49,18 @@ public class FileOutgoingTransport implements IOutgoingWithResponseTransport {

Node remoteNode;

String outgoingDir;

public FileOutgoingTransport(Node remoteNode, Node localNode, String outgoingDir) throws IOException {
this.outgoingDir = outgoingDir;
this.fileName = outgoingDir + File.separator + localNode.getNodeGroupId() + "-" + localNode.getNodeId() + "_to_" +
remoteNode.getNodeGroupId() + "-" + remoteNode.getNodeId() + "_" + System.currentTimeMillis();
this.remoteNode = remoteNode;
}

public String getOutgoingDir() {
return outgoingDir;
}

@Override
public BufferedWriter openWriter() {
Expand All @@ -81,7 +85,7 @@ public OutputStream openStream() {
@Override
public BufferedReader readResponse() throws IOException {
StringBuilder resp = new StringBuilder();
for (String batchId : writer.getBatchIds()) {
for (Long batchId : writer.getBatchIds()) {
resp.append(WebConstants.ACK_BATCH_NAME).append(batchId).append("=").append(WebConstants.ACK_BATCH_OK).append("&");
resp.append(WebConstants.ACK_NODE_ID).append(batchId).append("=").append(remoteNode.getNodeId()).append("&");
}
Expand All @@ -107,36 +111,11 @@ public ChannelMap getSuspendIgnoreChannelLists(IConfigurationService configurati

public void complete(boolean success) {
if (!success) {
new File(fileName).delete();
new File(fileName + ".tmp").delete();
} else if (writer != null) {
new File(fileName + ".tmp").renameTo(new File(fileName + ".csv"));
} else {
new File(fileName + ".tmp").renameTo(new File(fileName + ".zip"));
}
}

class BatchBufferedWriter extends BufferedWriter {
List<String> batchIds = new ArrayList<String>();
boolean isCommit = false;

public BatchBufferedWriter(Writer out) {
super(out);
}

public void write(String str) throws IOException {
super.write(str);
if (str.equals(CsvConstants.COMMIT)) {
isCommit = true;
} else if (!str.equals(",")) {
if (isCommit) {
batchIds.add(str);
}
isCommit = false;
}
}

public List<String> getBatchIds() {
return batchIds;
}
}
}

0 comments on commit d1d1cbb

Please sign in to comment.