From 83014afb4b7705cd6b44e0ef14fde89b9b66ae6b Mon Sep 17 00:00:00 2001 From: chenson42 Date: Tue, 19 May 2009 19:31:54 +0000 Subject: [PATCH] SYMMETRICDS-91 - Added support to tweak compression settings for gzip outputstreams --- .../symmetric/common/ParameterConstants.java | 2 + .../transport/http/HttpOutgoingTransport.java | 14 ++- .../transport/http/HttpTransportManager.java | 2 +- .../symmetric/web/CompressionFilter.java | 3 +- .../web/compression/CompressionFilter.java | 23 ++++- .../CompressionResponseStream.java | 9 +- .../CompressionServletResponseWrapper.java | 13 ++- .../resources/symmetric-default.properties | 20 ++++ .../symmetric/test/CompressionExperiment.java | 91 +++++++++++++++++++ .../test/resources/symmetric-test.properties | 3 + 10 files changed, 170 insertions(+), 10 deletions(-) create mode 100644 symmetric/src/test/java/org/jumpmind/symmetric/test/CompressionExperiment.java diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 2272589eb9..289d2487bb 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -80,6 +80,8 @@ public class ParameterConstants { public final static String TRANSPORT_HTTP_TIMEOUT = "http.timeout.ms"; public final static String TRANSPORT_HTTP_USE_COMPRESSION_CLIENT = "http.compression"; public final static String TRANSPORT_HTTP_COMPRESSION_DISABLED_SERVLET = "web.compression.disabled"; + public final static String TRANSPORT_HTTP_COMPRESSION_LEVEL = "compression.level"; + public final static String TRANSPORT_HTTP_COMPRESSION_STRATEGY = "compression.strategy"; public final static String TRANSPORT_TYPE = "transport.type"; public final static String TRANSPORT_HTTPS_VERIFIED_SERVERS = "https.verified.server.names"; diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java index 0a76fba0f6..570f71dcd8 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpOutgoingTransport.java @@ -33,6 +33,8 @@ import org.apache.commons.io.IOUtils; import org.jumpmind.symmetric.common.Constants; +import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.transport.AuthenticationException; import org.jumpmind.symmetric.transport.ConnectionRejectedException; import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport; @@ -50,11 +52,14 @@ public class HttpOutgoingTransport implements IOutgoingWithResponseTransport { private int httpTimeout; private boolean useCompression; + + private IParameterService parameterService; - public HttpOutgoingTransport(URL url, int httpTimeout, boolean useCompression) { + public HttpOutgoingTransport(URL url, int httpTimeout, boolean useCompression, IParameterService parameterService) { this.url = url; this.httpTimeout = httpTimeout; this.useCompression = useCompression; + this.parameterService = parameterService; } public void close() throws IOException { @@ -114,7 +119,12 @@ public BufferedWriter open() throws IOException { } OutputStream out = connection.getOutputStream(); if (useCompression) { - out = new GZIPOutputStream(out); + out = new GZIPOutputStream(out) { + { + this.def.setLevel(parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_COMPRESSION_LEVEL)); + this.def.setStrategy(parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_COMPRESSION_STRATEGY)); + } + }; } OutputStreamWriter wout = new OutputStreamWriter(out, Constants.ENCODING); writer = new BufferedWriter(wout); diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java b/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java index ab8d71beb6..0ba4c5679d 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/transport/http/HttpTransportManager.java @@ -107,7 +107,7 @@ public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local) URL url = new URL(buildURL("push", remote, local)); int httpTimeout = parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT); boolean useCompression = parameterService.is(ParameterConstants.TRANSPORT_HTTP_USE_COMPRESSION_CLIENT); - return new HttpOutgoingTransport(url, httpTimeout, useCompression); + return new HttpOutgoingTransport(url, httpTimeout, useCompression, parameterService); } public IIncomingTransport getRegisterTransport(Node node) throws IOException { diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/CompressionFilter.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/CompressionFilter.java index a934df3f46..cf49fab5de 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/CompressionFilter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/CompressionFilter.java @@ -86,7 +86,8 @@ public void destroy() { public void init(final FilterConfig filterConfig) throws ServletException { super.init(filterConfig); delegate = new org.jumpmind.symmetric.web.compression.CompressionFilter(); - + delegate.setCompressionLevel(parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_COMPRESSION_LEVEL)); + delegate.setCompressionStrategy(parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_COMPRESSION_STRATEGY)); delegate.init(filterConfig); } diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionFilter.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionFilter.java index 5f727158e9..e49147d9c8 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionFilter.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionFilter.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.Enumeration; +import java.util.zip.Deflater; import javax.servlet.Filter; import javax.servlet.FilterChain; @@ -49,6 +50,10 @@ public class CompressionFilter implements Filter { static final Log logger = LogFactory.getLog(CompressionFilter.class); + + int compressionLevel = Deflater.DEFAULT_COMPRESSION; + + int compressionStrategy = Deflater.DEFAULT_STRATEGY; /** * The filter configuration object we are associated with. If this value is @@ -144,7 +149,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha } else { if (response instanceof HttpServletResponse) { CompressionServletResponseWrapper wrappedResponse = new CompressionServletResponseWrapper( - (HttpServletResponse) response); + (HttpServletResponse) response, compressionLevel, compressionStrategy); if (logger.isDebugEnabled()) { logger.debug("doFilter gets called with compression"); } @@ -157,6 +162,22 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha } } } + + public int getCompressionLevel() { + return compressionLevel; + } + + public void setCompressionLevel(int compressionLevel) { + this.compressionLevel = compressionLevel; + } + + public int getCompressionStrategy() { + return compressionStrategy; + } + + public void setCompressionStrategy(int compressionStrategy) { + this.compressionStrategy = compressionStrategy; + } /** * Set filter config This function is equivalent to init. Required by diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionResponseStream.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionResponseStream.java index e13320e56d..077fd35fec 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionResponseStream.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionResponseStream.java @@ -64,11 +64,16 @@ public class CompressionResponseStream extends ServletOutputStream { * @param response * The associated response */ - public CompressionResponseStream(HttpServletResponse response) throws IOException { + public CompressionResponseStream(HttpServletResponse response, final int compressionLevel, final int compressionStrategy) throws IOException { this.closed = false; this.response = response; response.addHeader("Content-Encoding", "gzip"); - gzipstream = new GZIPOutputStream(response.getOutputStream()); + gzipstream = new GZIPOutputStream(response.getOutputStream()) { + { + this.def.setLevel(compressionLevel); + this.def.setStrategy(compressionStrategy); + } + }; } /** diff --git a/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionServletResponseWrapper.java b/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionServletResponseWrapper.java index ef2a84e44e..6495042926 100644 --- a/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionServletResponseWrapper.java +++ b/symmetric/src/main/java/org/jumpmind/symmetric/web/compression/CompressionServletResponseWrapper.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; +import java.util.zip.Deflater; + import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponseWrapper; @@ -44,13 +46,18 @@ public class CompressionServletResponseWrapper extends HttpServletResponseWrappe static final Log logger = LogFactory.getLog(CompressionServletResponseWrapper.class); + int compressionLevel = Deflater.DEFAULT_COMPRESSION; + + int compressionStrategy = Deflater.DEFAULT_STRATEGY; + /** * Calls the parent constructor which creates a ServletResponse adaptor * wrapping the given response object. */ - - public CompressionServletResponseWrapper(HttpServletResponse response) { + public CompressionServletResponseWrapper(HttpServletResponse response, int compressionLevel, int compressionStrategy) { super(response); + this.compressionLevel = compressionLevel; + this.compressionStrategy = compressionStrategy; origResponse = response; if (logger.isDebugEnabled()) { logger.debug("CompressionServletResponseWrapper constructor gets called"); @@ -113,7 +120,7 @@ public ServletOutputStream createOutputStream() throws IOException { logger.debug("createOutputStream gets called"); } - CompressionResponseStream stream = new CompressionResponseStream(origResponse); + CompressionResponseStream stream = new CompressionResponseStream(origResponse, compressionLevel, compressionStrategy); return stream; } diff --git a/symmetric/src/main/resources/symmetric-default.properties b/symmetric/src/main/resources/symmetric-default.properties index 095e1a9601..b26e21ca03 100644 --- a/symmetric/src/main/resources/symmetric-default.properties +++ b/symmetric/src/main/resources/symmetric-default.properties @@ -270,6 +270,7 @@ http.timeout.ms=600000 # Whether or not to use compression over HTTP connections. # Currently, this setting only affects the push connection of the source node. # Compression on a pull is enabled using a filter in the web.xml for the PullServlet. +# @see web.compression.disabled to enable/disable the filter # # This property can be overridden in the database http.compression=true @@ -280,6 +281,25 @@ http.compression=true # This property can be overridden in the database web.compression.disabled=false +# Set the compression level this node will use when compressing synchronization payloads +# @see java.util.zip.Deflater +# NO_COMPRESSION = 0 +# BEST_SPEED = 1 +# BEST_COMPRESSION = 9 +# DEFAULT_COMPRESSION = -1 +# +# This property can be overridden in the database +compression.level=-1 + +# Set the compression strategy this node will use when compressing synchronization payloads +# @see java.util.zip.Deflater +# FILTERED = 1 +# HUFFMAN_ONLY = 2 +# DEFAULT_STRATEGY = 0 +# +# This property can be overridden in the database +compression.strategy=0 + # The base servlet path for when embedding SymmetricDS with another web application web.base.servlet.path= diff --git a/symmetric/src/test/java/org/jumpmind/symmetric/test/CompressionExperiment.java b/symmetric/src/test/java/org/jumpmind/symmetric/test/CompressionExperiment.java new file mode 100644 index 0000000000..0069fc88e5 --- /dev/null +++ b/symmetric/src/test/java/org/jumpmind/symmetric/test/CompressionExperiment.java @@ -0,0 +1,91 @@ +package org.jumpmind.symmetric.test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.Deflater; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.jumpmind.symmetric.io.GzipConfigurableOutputStream; +import org.junit.Assert; +import org.junit.Test; + +/** + * Quick little test ot see what type of compression gets the best results + */ +public class CompressionExperiment { + + Log logger = org.apache.commons.logging.LogFactory.getLog(getClass()); + + @Test + public void testCompression() throws IOException { + File csv = new File("/tmp/test.csv"); + if (csv.exists()) { + Assert.assertTrue(csv.exists()); + printFileSize(csv); + logger.info("Normal gzip"); + printFileSize(compress(csv, new IWrapCompression() { + public OutputStream wrap(OutputStream os) throws IOException { + return new GZIPOutputStream(os); + } + })); + logger.info("Buffered gzip"); + printFileSize(compress(csv, new IWrapCompression() { + public OutputStream wrap(OutputStream os) throws IOException { + return new GZIPOutputStream(os, 18192); + } + })); + logger.info("Max compress gzip orig"); + File max = compress(csv, new IWrapCompression() { + public OutputStream wrap(OutputStream os) throws IOException { + return new GZIPOutputStream(os, 18192) { + { + def.setLevel(Deflater.BEST_COMPRESSION); + def.setStrategy(Deflater.FILTERED); + } + }; + } + }); + printFileSize(max); + uncompress(max); + logger.info("Max compress gzip"); + max = compress(csv, new IWrapCompression() { + public OutputStream wrap(OutputStream os) throws IOException { + return new GzipConfigurableOutputStream(os, 18192, Deflater.BEST_COMPRESSION); + } + }); + printFileSize(max); + uncompress(max); + } + + } + + protected void printFileSize(File file) { + logger.info(file.getName() + " size is " + file.length() / 10000 + "kb"); + } + + protected void uncompress(File file) throws IOException { + GZIPInputStream is = new GZIPInputStream(new FileInputStream(file)); + IOUtils.readLines(is); + } + + protected File compress(File orig, IWrapCompression wrapper) throws IOException { + File compressed = File.createTempFile("test.", ".gz"); + FileOutputStream fos = new FileOutputStream(compressed); + OutputStream wos = wrapper.wrap(fos); + FileInputStream fis = new FileInputStream(orig); + IOUtils.copy(fis, wos); + IOUtils.closeQuietly(fis); + IOUtils.closeQuietly(wos); + return compressed; + } + + interface IWrapCompression { + public OutputStream wrap(OutputStream os) throws IOException; + } +} diff --git a/symmetric/src/test/resources/symmetric-test.properties b/symmetric/src/test/resources/symmetric-test.properties index 49e4d999ab..3bc90d19e3 100644 --- a/symmetric/src/test/resources/symmetric-test.properties +++ b/symmetric/src/test/resources/symmetric-test.properties @@ -71,6 +71,9 @@ start.heartbeat.job=false start.stat.flush.job=false purge.retention.minutes=0 auto.update.node.values.from.properties=false +stream.to.file.threshold.bytes=512 +compression.level=1 +compression.strategy=1 #ip.filters=10.5-1.254-1.*,11.100.200-100.*, 100.50.*.*, 50.25.2.0/24 #auto.sync.configuration=false \ No newline at end of file