Skip to content

Commit

Permalink
0003259: Retry batch should be flushed when written to the output stream
Browse files Browse the repository at this point in the history
so we don't hit timeouts
  • Loading branch information
chenson42 committed Sep 26, 2017
1 parent 693fc92 commit 455a19f
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 166 deletions.

This file was deleted.

Expand Up @@ -1205,6 +1205,8 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
writer.newLine();
}
}

writer.flush();
} else {
long totalCharsRead = 0, totalBytesRead = 0;
int numCharsRead = 0, numBytesRead = 0;
Expand Down
Expand Up @@ -241,7 +241,7 @@ public OutputStream openStream() {
os = connection.getOutputStream();

if (!fileUpload && useCompression) {
os = new GZIPOutputStream(os) {
os = new GZIPOutputStream(os, 128, true) {
{
this.def.setLevel(compressionLevel);
this.def.setStrategy(compressionStrategy);
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.exception.IoException;
import org.jumpmind.symmetric.AbstractSymmetricEngine;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand Down Expand Up @@ -177,8 +178,10 @@ public int getHttpTimeOutInMs() {
return engine.getParameterService().getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT);
}

public boolean isUseCompression() {
return engine.getParameterService().is(ParameterConstants.TRANSPORT_HTTP_USE_COMPRESSION_CLIENT);
public boolean isUseCompression(Node targetNode) {
// if the node is local, no need to use compression
ISymmetricEngine targetEngine = AbstractSymmetricEngine.findEngineByUrl(targetNode.getSyncUrl());
return engine.getParameterService().is(ParameterConstants.TRANSPORT_HTTP_USE_COMPRESSION_CLIENT) && targetEngine == null;
}

public int getCompressionLevel() {
Expand Down Expand Up @@ -231,23 +234,23 @@ public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local,
String securityToken, Map<String, String> requestProperties,
String registrationUrl) throws IOException {
URL url = new URL(buildURL("push", remote, local, securityToken, registrationUrl));
return new HttpOutgoingTransport(url, getHttpTimeOutInMs(), isUseCompression(),
return new HttpOutgoingTransport(url, getHttpTimeOutInMs(), isUseCompression(remote),
getCompressionStrategy(), getCompressionLevel(), getBasicAuthUsername(),
getBasicAuthPassword(), isOutputStreamEnabled(), getOutputStreamSize(), false, requestProperties);
}

public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local,
String securityToken, String registrationUrl) throws IOException {
URL url = new URL(buildURL("push", remote, local, securityToken, registrationUrl));
return new HttpOutgoingTransport(url, getHttpTimeOutInMs(), isUseCompression(),
return new HttpOutgoingTransport(url, getHttpTimeOutInMs(), isUseCompression(remote),
getCompressionStrategy(), getCompressionLevel(), getBasicAuthUsername(),
getBasicAuthPassword(), isOutputStreamEnabled(), getOutputStreamSize(), false);
}

public IOutgoingWithResponseTransport getFilePushTransport(Node remote, Node local,
String securityToken, String registrationUrl) throws IOException {
URL url = new URL(buildURL("filesync/push", remote, local, securityToken, registrationUrl));
return new HttpOutgoingTransport(url, getHttpTimeOutInMs(), isUseCompression(),
return new HttpOutgoingTransport(url, getHttpTimeOutInMs(), isUseCompression(remote),
getCompressionStrategy(), getCompressionLevel(), getBasicAuthUsername(),
getBasicAuthPassword(), isOutputStreamEnabled(), getOutputStreamSize(), true);
}
Expand Down

0 comments on commit 455a19f

Please sign in to comment.