Skip to content

Commit

Permalink
SYMMETRICDS-91 - Added support to tweak compression settings for gzip…
Browse files Browse the repository at this point in the history
… outputstreams
  • Loading branch information
chenson42 committed May 19, 2009
1 parent 2e8756a commit 83014af
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 10 deletions.
Expand Up @@ -80,6 +80,8 @@ public class ParameterConstants {
public final static String TRANSPORT_HTTP_TIMEOUT = "http.timeout.ms";
public final static String TRANSPORT_HTTP_USE_COMPRESSION_CLIENT = "http.compression";
public final static String TRANSPORT_HTTP_COMPRESSION_DISABLED_SERVLET = "web.compression.disabled";
public final static String TRANSPORT_HTTP_COMPRESSION_LEVEL = "compression.level";
public final static String TRANSPORT_HTTP_COMPRESSION_STRATEGY = "compression.strategy";
public final static String TRANSPORT_TYPE = "transport.type";
public final static String TRANSPORT_HTTPS_VERIFIED_SERVERS = "https.verified.server.names";

Expand Down
Expand Up @@ -33,6 +33,8 @@

import org.apache.commons.io.IOUtils;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
Expand All @@ -50,11 +52,14 @@ public class HttpOutgoingTransport implements IOutgoingWithResponseTransport {
private int httpTimeout;

private boolean useCompression;

private IParameterService parameterService;

public HttpOutgoingTransport(URL url, int httpTimeout, boolean useCompression) {
public HttpOutgoingTransport(URL url, int httpTimeout, boolean useCompression, IParameterService parameterService) {
this.url = url;
this.httpTimeout = httpTimeout;
this.useCompression = useCompression;
this.parameterService = parameterService;
}

public void close() throws IOException {
Expand Down Expand Up @@ -114,7 +119,12 @@ public BufferedWriter open() throws IOException {
}
OutputStream out = connection.getOutputStream();
if (useCompression) {
out = new GZIPOutputStream(out);
out = new GZIPOutputStream(out) {
{
this.def.setLevel(parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_COMPRESSION_LEVEL));
this.def.setStrategy(parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_COMPRESSION_STRATEGY));
}
};
}
OutputStreamWriter wout = new OutputStreamWriter(out, Constants.ENCODING);
writer = new BufferedWriter(wout);
Expand Down
Expand Up @@ -107,7 +107,7 @@ public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local)
URL url = new URL(buildURL("push", remote, local));
int httpTimeout = parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT);
boolean useCompression = parameterService.is(ParameterConstants.TRANSPORT_HTTP_USE_COMPRESSION_CLIENT);
return new HttpOutgoingTransport(url, httpTimeout, useCompression);
return new HttpOutgoingTransport(url, httpTimeout, useCompression, parameterService);
}

public IIncomingTransport getRegisterTransport(Node node) throws IOException {
Expand Down
Expand Up @@ -86,7 +86,8 @@ public void destroy() {
public void init(final FilterConfig filterConfig) throws ServletException {
super.init(filterConfig);
delegate = new org.jumpmind.symmetric.web.compression.CompressionFilter();

delegate.setCompressionLevel(parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_COMPRESSION_LEVEL));
delegate.setCompressionStrategy(parameterService.getInt(ParameterConstants.TRANSPORT_HTTP_COMPRESSION_STRATEGY));
delegate.init(filterConfig);
}

Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Enumeration;
import java.util.zip.Deflater;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
Expand Down Expand Up @@ -49,6 +50,10 @@
public class CompressionFilter implements Filter {

static final Log logger = LogFactory.getLog(CompressionFilter.class);

int compressionLevel = Deflater.DEFAULT_COMPRESSION;

int compressionStrategy = Deflater.DEFAULT_STRATEGY;

/**
* The filter configuration object we are associated with. If this value is
Expand Down Expand Up @@ -144,7 +149,7 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
} else {
if (response instanceof HttpServletResponse) {
CompressionServletResponseWrapper wrappedResponse = new CompressionServletResponseWrapper(
(HttpServletResponse) response);
(HttpServletResponse) response, compressionLevel, compressionStrategy);
if (logger.isDebugEnabled()) {
logger.debug("doFilter gets called with compression");
}
Expand All @@ -157,6 +162,22 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
}
}
}

public int getCompressionLevel() {
return compressionLevel;
}

public void setCompressionLevel(int compressionLevel) {
this.compressionLevel = compressionLevel;
}

public int getCompressionStrategy() {
return compressionStrategy;
}

public void setCompressionStrategy(int compressionStrategy) {
this.compressionStrategy = compressionStrategy;
}

/**
* Set filter config This function is equivalent to init. Required by
Expand Down
Expand Up @@ -64,11 +64,16 @@ public class CompressionResponseStream extends ServletOutputStream {
* @param response
* The associated response
*/
public CompressionResponseStream(HttpServletResponse response) throws IOException {
public CompressionResponseStream(HttpServletResponse response, final int compressionLevel, final int compressionStrategy) throws IOException {
this.closed = false;
this.response = response;
response.addHeader("Content-Encoding", "gzip");
gzipstream = new GZIPOutputStream(response.getOutputStream());
gzipstream = new GZIPOutputStream(response.getOutputStream()) {
{
this.def.setLevel(compressionLevel);
this.def.setStrategy(compressionStrategy);
}
};
}

/**
Expand Down
Expand Up @@ -19,6 +19,8 @@
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.zip.Deflater;

import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpServletResponseWrapper;
Expand All @@ -44,13 +46,18 @@ public class CompressionServletResponseWrapper extends HttpServletResponseWrappe

static final Log logger = LogFactory.getLog(CompressionServletResponseWrapper.class);

int compressionLevel = Deflater.DEFAULT_COMPRESSION;

int compressionStrategy = Deflater.DEFAULT_STRATEGY;

/**
* Calls the parent constructor which creates a ServletResponse adaptor
* wrapping the given response object.
*/

public CompressionServletResponseWrapper(HttpServletResponse response) {
public CompressionServletResponseWrapper(HttpServletResponse response, int compressionLevel, int compressionStrategy) {
super(response);
this.compressionLevel = compressionLevel;
this.compressionStrategy = compressionStrategy;
origResponse = response;
if (logger.isDebugEnabled()) {
logger.debug("CompressionServletResponseWrapper constructor gets called");
Expand Down Expand Up @@ -113,7 +120,7 @@ public ServletOutputStream createOutputStream() throws IOException {
logger.debug("createOutputStream gets called");
}

CompressionResponseStream stream = new CompressionResponseStream(origResponse);
CompressionResponseStream stream = new CompressionResponseStream(origResponse, compressionLevel, compressionStrategy);
return stream;

}
Expand Down
20 changes: 20 additions & 0 deletions symmetric/src/main/resources/symmetric-default.properties
Expand Up @@ -270,6 +270,7 @@ http.timeout.ms=600000
# Whether or not to use compression over HTTP connections.
# Currently, this setting only affects the push connection of the source node.
# Compression on a pull is enabled using a filter in the web.xml for the PullServlet.
# @see web.compression.disabled to enable/disable the filter
#
# This property can be overridden in the database
http.compression=true
Expand All @@ -280,6 +281,25 @@ http.compression=true
# This property can be overridden in the database
web.compression.disabled=false

# Set the compression level this node will use when compressing synchronization payloads
# @see java.util.zip.Deflater
# NO_COMPRESSION = 0
# BEST_SPEED = 1
# BEST_COMPRESSION = 9
# DEFAULT_COMPRESSION = -1
#
# This property can be overridden in the database
compression.level=-1

# Set the compression strategy this node will use when compressing synchronization payloads
# @see java.util.zip.Deflater
# FILTERED = 1
# HUFFMAN_ONLY = 2
# DEFAULT_STRATEGY = 0
#
# This property can be overridden in the database
compression.strategy=0

# The base servlet path for when embedding SymmetricDS with another web application
web.base.servlet.path=

Expand Down
@@ -0,0 +1,91 @@
package org.jumpmind.symmetric.test;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Deflater;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.jumpmind.symmetric.io.GzipConfigurableOutputStream;
import org.junit.Assert;
import org.junit.Test;

/**
* Quick little test ot see what type of compression gets the best results
*/
public class CompressionExperiment {

Log logger = org.apache.commons.logging.LogFactory.getLog(getClass());

@Test
public void testCompression() throws IOException {
File csv = new File("/tmp/test.csv");
if (csv.exists()) {
Assert.assertTrue(csv.exists());
printFileSize(csv);
logger.info("Normal gzip");
printFileSize(compress(csv, new IWrapCompression() {
public OutputStream wrap(OutputStream os) throws IOException {
return new GZIPOutputStream(os);
}
}));
logger.info("Buffered gzip");
printFileSize(compress(csv, new IWrapCompression() {
public OutputStream wrap(OutputStream os) throws IOException {
return new GZIPOutputStream(os, 18192);
}
}));
logger.info("Max compress gzip orig");
File max = compress(csv, new IWrapCompression() {
public OutputStream wrap(OutputStream os) throws IOException {
return new GZIPOutputStream(os, 18192) {
{
def.setLevel(Deflater.BEST_COMPRESSION);
def.setStrategy(Deflater.FILTERED);
}
};
}
});
printFileSize(max);
uncompress(max);
logger.info("Max compress gzip");
max = compress(csv, new IWrapCompression() {
public OutputStream wrap(OutputStream os) throws IOException {
return new GzipConfigurableOutputStream(os, 18192, Deflater.BEST_COMPRESSION);
}
});
printFileSize(max);
uncompress(max);
}

}

protected void printFileSize(File file) {
logger.info(file.getName() + " size is " + file.length() / 10000 + "kb");
}

protected void uncompress(File file) throws IOException {
GZIPInputStream is = new GZIPInputStream(new FileInputStream(file));
IOUtils.readLines(is);
}

protected File compress(File orig, IWrapCompression wrapper) throws IOException {
File compressed = File.createTempFile("test.", ".gz");
FileOutputStream fos = new FileOutputStream(compressed);
OutputStream wos = wrapper.wrap(fos);
FileInputStream fis = new FileInputStream(orig);
IOUtils.copy(fis, wos);
IOUtils.closeQuietly(fis);
IOUtils.closeQuietly(wos);
return compressed;
}

interface IWrapCompression {
public OutputStream wrap(OutputStream os) throws IOException;
}
}
3 changes: 3 additions & 0 deletions symmetric/src/test/resources/symmetric-test.properties
Expand Up @@ -71,6 +71,9 @@ start.heartbeat.job=false
start.stat.flush.job=false
purge.retention.minutes=0
auto.update.node.values.from.properties=false
stream.to.file.threshold.bytes=512
compression.level=1
compression.strategy=1
#ip.filters=10.5-1.254-1.*,11.100.200-100.*, 100.50.*.*, 50.25.2.0/24

#auto.sync.configuration=false

0 comments on commit 83014af

Please sign in to comment.