From cb0958e2bb60f683b3179c016707edc87f5fa041 Mon Sep 17 00:00:00 2001 From: erilong Date: Fri, 7 Mar 2008 22:18:24 +0000 Subject: [PATCH] [1861766] Add optional compression on push --- .../transport/TransportManagerFactoryBean.java | 8 +++++++- .../transport/http/HttpOutgoingTransport.java | 12 +++++++++++- .../transport/http/HttpTransportManager.java | 12 ++++++------ .../jumpmind/symmetric/web/AbstractServlet.java | 17 +++++++++++++++-- .../main/resources/symmetric-default.properties | 5 +++++ .../src/main/resources/symmetric-services.xml | 1 + 6 files changed, 45 insertions(+), 10 deletions(-) diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/TransportManagerFactoryBean.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/TransportManagerFactoryBean.java index 7e8a53321e..d903076d0b 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/TransportManagerFactoryBean.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/TransportManagerFactoryBean.java @@ -40,9 +40,11 @@ public class TransportManagerFactoryBean implements FactoryBean { private int httpTimeout; + private boolean useCompression; + public Object getObject() throws Exception { if (TRANSPORT_HTTP.equalsIgnoreCase(transport)) { - return new HttpTransportManager(runtimeConfiguration, nodeService, httpTimeout); + return new HttpTransportManager(runtimeConfiguration, nodeService, httpTimeout, useCompression); } else if (TRANSPORT_INTERNAL.equalsIgnoreCase(transport)) { return new InternalTransportManager(runtimeConfiguration); } else { @@ -76,4 +78,8 @@ public void setHttpTimeout(int httpTimeout) { this.httpTimeout = httpTimeout; } + public void setUseCompression(boolean useCompression) { + this.useCompression = useCompression; + } + } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java index c67149d7b3..4108f4b909 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java @@ -27,6 +27,7 @@ import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.URL; +import java.util.zip.GZIPOutputStream; import javax.servlet.http.HttpServletResponse; @@ -47,10 +48,13 @@ public class HttpOutgoingTransport implements IOutgoingWithResponseTransport { HttpURLConnection connection; int httpTimeout; + + boolean useCompression; - public HttpOutgoingTransport(URL url, int httpTimeout) { + public HttpOutgoingTransport(URL url, int httpTimeout, boolean useCompression) { this.url = url; this.httpTimeout = httpTimeout; + this.useCompression = useCompression; } public void close() throws IOException { @@ -86,7 +90,13 @@ public BufferedWriter open() throws IOException { connection.setReadTimeout(httpTimeout); connection.setRequestMethod("PUT"); connection.setRequestProperty("accept-encoding", "gzip"); + if (useCompression) { + connection.addRequestProperty("Content-Type", "gzip"); + } OutputStream out = connection.getOutputStream(); + if (useCompression) { + out = new GZIPOutputStream(out); + } OutputStreamWriter wout = new OutputStreamWriter(out, "UTF-8"); writer = new BufferedWriter(wout); return writer; diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java index 29f40172cd..5d1ce8dd96 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java @@ -57,11 +57,15 @@ public class HttpTransportManager extends AbstractTransportManager implements IT private INodeService nodeService; private int httpTimeout; + + private boolean useCompression; - public HttpTransportManager(IRuntimeConfig config, INodeService nodeService, int httpTimeout) { + public HttpTransportManager(IRuntimeConfig config, INodeService nodeService, int httpTimeout, + boolean useCompression) { this.runtimeConfiguration = config; this.nodeService = nodeService; this.httpTimeout = httpTimeout; + this.useCompression = useCompression; } public boolean sendAcknowledgement(Node remote, List list, Node local) throws IOException { @@ -105,7 +109,7 @@ public IIncomingTransport getPullTransport(Node remote, Node local) throws IOExc public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local) throws IOException { URL url = new URL(buildURL("push", remote, local)); - return new HttpOutgoingTransport(url, httpTimeout); + return new HttpOutgoingTransport(url, httpTimeout, useCompression); } public IIncomingTransport getRegisterTransport(Node node) throws IOException { @@ -183,8 +187,4 @@ private String addNodeId(String base, String nodeId, String connector) { return sb.toString(); } - public void setHttpTimeout(int httpTimeout) { - this.httpTimeout = httpTimeout; - } - } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/AbstractServlet.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/AbstractServlet.java index 325f116f54..93455f9e42 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/AbstractServlet.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/AbstractServlet.java @@ -25,7 +25,9 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.util.zip.GZIPInputStream; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; @@ -57,10 +59,18 @@ protected OutputStream createOutputStream(HttpServletResponse resp) throws IOExc protected InputStream createInputStream(HttpServletRequest req) throws IOException { InputStream is = null; - + String contentType = req.getHeader("Content-Type"); + boolean useCompression = contentType != null && contentType.equalsIgnoreCase("gzip"); + if (getLogger().isDebugEnabled()) { StringBuilder b = new StringBuilder(); - BufferedReader reader = req.getReader(); + BufferedReader reader = null; + if (useCompression) { + getLogger().debug("Received compressed stream"); + reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(req.getInputStream()))); + } else { + reader = req.getReader(); + } String line = null; do { line = reader.readLine(); @@ -74,6 +84,9 @@ protected InputStream createInputStream(HttpServletRequest req) throws IOExcepti is = new ByteArrayInputStream(b.toString().getBytes()); } else { is = req.getInputStream(); + if (useCompression) { + is = new GZIPInputStream(is); + } } return is; diff --git a/symmetric/src/main/resources/symmetric-default.properties b/symmetric/src/main/resources/symmetric-default.properties index 8c7b04b62e..1e484dc501 100644 --- a/symmetric/src/main/resources/symmetric-default.properties +++ b/symmetric/src/main/resources/symmetric-default.properties @@ -110,6 +110,11 @@ symmetric.runtime.initial.load.create.first=false # Sets both the connection and read timeout on the internal HttpUrlConnection symmetric.runtime.http.timeout.ms=600000 +# Whether or not to use compression over HTTP connections. +# Currently, this setting only affects the push connection of the source node. +# Compression on a pull is enabled using a filter in the web.xml for the PullServlet. +symmetric.runtime.http.compression=true + # When starting jobs, symmetric attempts to randomize the start time to spread out load. This is the # maximum wait period before starting a job. symmetric.runtime.job.random.max.start.time.ms=10000 diff --git a/symmetric/src/main/resources/symmetric-services.xml b/symmetric/src/main/resources/symmetric-services.xml index 5c9874960f..9537f1ee58 100644 --- a/symmetric/src/main/resources/symmetric-services.xml +++ b/symmetric/src/main/resources/symmetric-services.xml @@ -10,6 +10,7 @@ +