Skip to content

Commit

Permalink
[1861766] Add optional compression on push
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Mar 7, 2008
1 parent 19e2443 commit cb0958e
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 10 deletions.
Expand Up @@ -40,9 +40,11 @@ public class TransportManagerFactoryBean implements FactoryBean {

private int httpTimeout;

private boolean useCompression;

public Object getObject() throws Exception {
if (TRANSPORT_HTTP.equalsIgnoreCase(transport)) {
return new HttpTransportManager(runtimeConfiguration, nodeService, httpTimeout);
return new HttpTransportManager(runtimeConfiguration, nodeService, httpTimeout, useCompression);
} else if (TRANSPORT_INTERNAL.equalsIgnoreCase(transport)) {
return new InternalTransportManager(runtimeConfiguration);
} else {
Expand Down Expand Up @@ -76,4 +78,8 @@ public void setHttpTimeout(int httpTimeout) {
this.httpTimeout = httpTimeout;
}

public void setUseCompression(boolean useCompression) {
this.useCompression = useCompression;
}

}
Expand Up @@ -27,6 +27,7 @@
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.zip.GZIPOutputStream;

import javax.servlet.http.HttpServletResponse;

Expand All @@ -47,10 +48,13 @@ public class HttpOutgoingTransport implements IOutgoingWithResponseTransport {
HttpURLConnection connection;

int httpTimeout;

boolean useCompression;

public HttpOutgoingTransport(URL url, int httpTimeout) {
public HttpOutgoingTransport(URL url, int httpTimeout, boolean useCompression) {
this.url = url;
this.httpTimeout = httpTimeout;
this.useCompression = useCompression;
}

public void close() throws IOException {
Expand Down Expand Up @@ -86,7 +90,13 @@ public BufferedWriter open() throws IOException {
connection.setReadTimeout(httpTimeout);
connection.setRequestMethod("PUT");
connection.setRequestProperty("accept-encoding", "gzip");
if (useCompression) {
connection.addRequestProperty("Content-Type", "gzip");
}
OutputStream out = connection.getOutputStream();
if (useCompression) {
out = new GZIPOutputStream(out);
}
OutputStreamWriter wout = new OutputStreamWriter(out, "UTF-8");
writer = new BufferedWriter(wout);
return writer;
Expand Down
Expand Up @@ -57,11 +57,15 @@ public class HttpTransportManager extends AbstractTransportManager implements IT
private INodeService nodeService;

private int httpTimeout;

private boolean useCompression;

public HttpTransportManager(IRuntimeConfig config, INodeService nodeService, int httpTimeout) {
public HttpTransportManager(IRuntimeConfig config, INodeService nodeService, int httpTimeout,
boolean useCompression) {
this.runtimeConfiguration = config;
this.nodeService = nodeService;
this.httpTimeout = httpTimeout;
this.useCompression = useCompression;
}

public boolean sendAcknowledgement(Node remote, List<IncomingBatchHistory> list, Node local) throws IOException {
Expand Down Expand Up @@ -105,7 +109,7 @@ public IIncomingTransport getPullTransport(Node remote, Node local) throws IOExc

public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local) throws IOException {
URL url = new URL(buildURL("push", remote, local));
return new HttpOutgoingTransport(url, httpTimeout);
return new HttpOutgoingTransport(url, httpTimeout, useCompression);
}

public IIncomingTransport getRegisterTransport(Node node) throws IOException {
Expand Down Expand Up @@ -183,8 +187,4 @@ private String addNodeId(String base, String nodeId, String connector) {
return sb.toString();
}

public void setHttpTimeout(int httpTimeout) {
this.httpTimeout = httpTimeout;
}

}
Expand Up @@ -25,7 +25,9 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.zip.GZIPInputStream;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -57,10 +59,18 @@ protected OutputStream createOutputStream(HttpServletResponse resp) throws IOExc

protected InputStream createInputStream(HttpServletRequest req) throws IOException {
InputStream is = null;

String contentType = req.getHeader("Content-Type");
boolean useCompression = contentType != null && contentType.equalsIgnoreCase("gzip");

if (getLogger().isDebugEnabled()) {
StringBuilder b = new StringBuilder();
BufferedReader reader = req.getReader();
BufferedReader reader = null;
if (useCompression) {
getLogger().debug("Received compressed stream");
reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(req.getInputStream())));
} else {
reader = req.getReader();
}
String line = null;
do {
line = reader.readLine();
Expand All @@ -74,6 +84,9 @@ protected InputStream createInputStream(HttpServletRequest req) throws IOExcepti
is = new ByteArrayInputStream(b.toString().getBytes());
} else {
is = req.getInputStream();
if (useCompression) {
is = new GZIPInputStream(is);
}
}

return is;
Expand Down
5 changes: 5 additions & 0 deletions symmetric/src/main/resources/symmetric-default.properties
Expand Up @@ -110,6 +110,11 @@ symmetric.runtime.initial.load.create.first=false
# Sets both the connection and read timeout on the internal HttpUrlConnection
symmetric.runtime.http.timeout.ms=600000

# Whether or not to use compression over HTTP connections.
# Currently, this setting only affects the push connection of the source node.
# Compression on a pull is enabled using a filter in the web.xml for the PullServlet.
symmetric.runtime.http.compression=true

# When starting jobs, symmetric attempts to randomize the start time to spread out load. This is the
# maximum wait period before starting a job.
symmetric.runtime.job.random.max.start.time.ms=10000
Expand Down
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -10,6 +10,7 @@
<property name="runtimeConfiguration" ref="runtimeConfiguration" />
<property name="nodeService" ref="nodeService" />
<property name="httpTimeout" value="${symmetric.runtime.http.timeout.ms}" />
<property name="useCompression" value="${symmetric.runtime.http.compression}" />
</bean>

<bean id="bootstrapService" class="org.jumpmind.symmetric.service.impl.BootstrapService"
Expand Down

0 comments on commit cb0958e

Please sign in to comment.