Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
CHUKWA-664. Added network compression between agent and collector. (S…
…ourygna Luangsay via Eric Yang)

git-svn-id: https://svn.apache.org/repos/asf/incubator/chukwa/trunk@1411817 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
macroadster committed Nov 20, 2012
1 parent 6b1bd0a commit 941c7213e9e63c6247960c22eb7380d982fb352f
Showing 5 changed files with 121 additions and 14 deletions.
@@ -6,6 +6,8 @@ Trunk (unreleased changes)

CHUKWA-635. Collect swap usage. (Eric Yang)

CHUKWA-664. Added network compression between agent and collector. (Sourygna Luangsay via Eric Yang)

IMPROVEMENTS

CHUKWA-648. Make Chukwa Reduce Type to support hierarchy format. (Jie Huang via asrabkin)
@@ -33,4 +33,18 @@
<description>Location of Chukwa data on HDFS</description>
</property>

<!-- uncomment to enable network compression
<property>
<name>chukwaAgent.output.compress</name>
<value>true</value>
<description>boolean: true if we want to compress data on the wire between the agent and the collector</description>
</property>
<property>
<name>chukwaAgent.output.compression.type</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
<description>compression codec if the network data should be compressed</description>
</property>
-->

</configuration>
@@ -740,6 +740,7 @@ private static Configuration readConfig() {
log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
conf.addResource(new Path(agentConf.getAbsolutePath()));
conf.addResource(new Path( new File(chukwaConf, "chukwa-common.xml").getAbsolutePath()));
if (conf.get("chukwaAgent.checkpoint.dir") == null)
conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var")
.getAbsolutePath());
@@ -21,19 +21,29 @@

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.writer.*;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;

public class ServletCollector extends HttpServlet {
@@ -48,6 +58,10 @@ public class ServletCollector extends HttpServlet {

private static final long serialVersionUID = 6286162898591407111L;
Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class);

static boolean COMPRESS;
static String CODEC_NAME;
static CompressionCodec codec;

public void setWriter(ChukwaWriter w) {
writer = w;
@@ -103,6 +117,20 @@ public void run() {
log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
}

COMPRESS = conf.getBoolean("chukwaAgent.output.compress", false);
if( COMPRESS) {
CODEC_NAME = conf.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
Class<?> codecClass = null;
try {
codecClass = Class.forName( CODEC_NAME);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
log.info("codec " + CODEC_NAME + " loaded for network compression");
} catch (ClassNotFoundException e) {
log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
COMPRESS = false;
}
}

// We default to here if the pipeline construction failed or didn't happen.
try {
if (writer == null) {
@@ -132,7 +160,17 @@ protected void accept(HttpServletRequest req, HttpServletResponse resp)
java.io.InputStream in = req.getInputStream();

ServletOutputStream l_out = resp.getOutputStream();
final DataInputStream di = new DataInputStream(in);

DataInputStream di = null;
boolean compressNetwork = COMPRESS;
if( compressNetwork){
InputStream cin = codec.createInputStream( in);
di = new DataInputStream(cin);
}
else {
di = new DataInputStream(in);
}

final int numEvents = di.readInt();
// log.info("saw " + numEvents+ " in request");

@@ -29,20 +29,25 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpMethod;
import org.apache.commons.httpclient.HttpMethodBase;
import org.apache.commons.httpclient.HttpMethodRetryHandler;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.methods.*;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.sender.metrics.HttpSenderMetrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;

/**
@@ -81,6 +86,10 @@ public class ChukwaHttpSender implements ChukwaSender {
int postID = 0;

protected Iterator<String> collectors;

static boolean COMPRESS;
static String CODEC_NAME;
static CompressionCodec codec;

static {
connectionManager = new MultiThreadedHttpConnectionManager();
@@ -106,12 +115,21 @@ static class BuffersRequestEntity implements RequestEntity {
public BuffersRequestEntity(List<DataOutputBuffer> buf) {
buffers = buf;
}

private long getUncompressedContentLenght(){
long len = 4;// first we send post length, then buffers
for (DataOutputBuffer b : buffers)
len += b.getLength();
return len;
}

public long getContentLength() {
long len = 4;// first we send post length, then buffers
for (DataOutputBuffer b : buffers)
len += b.getLength();
return len;
if( COMPRESS) {
return -1;
}
else {
return getUncompressedContentLenght();
}
}

public String getContentType() {
@@ -122,11 +140,23 @@ public boolean isRepeatable() {
return true;
}

private void doWriteRequest( DataOutputStream out ) throws IOException {
out.writeInt(buffers.size());
for (DataOutputBuffer b : buffers)
out.write(b.getData(), 0, b.getLength());
}

public void writeRequest(OutputStream out) throws IOException {
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(buffers.size());
for (DataOutputBuffer b : buffers)
dos.write(b.getData(), 0, b.getLength());
if( COMPRESS) {
CompressionOutputStream cos = codec.createOutputStream(out);
DataOutputStream dos = new DataOutputStream( cos);
doWriteRequest( dos);
cos.finish();
}
else {
DataOutputStream dos = new DataOutputStream( out);
doWriteRequest( dos);
}
}
}

@@ -140,6 +170,19 @@ public ChukwaHttpSender(Configuration c) {
WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
20 * 1000);
COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
COMPRESS = c.getBoolean("chukwaAgent.output.compress", false);
if( COMPRESS) {
CODEC_NAME = c.get( "chukwaAgent.output.compression.type", "org.apache.hadoop.io.compress.DefaultCodec");
Class<?> codecClass = null;
try {
codecClass = Class.forName( CODEC_NAME);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, c);
log.info("codec " + CODEC_NAME + " loaded for network compression");
} catch (ClassNotFoundException e) {
log.warn("failed to create codec " + CODEC_NAME + ". Network compression won't be enabled.", e);
COMPRESS = false;
}
}
}

/**
@@ -199,7 +242,16 @@ public List<CommitListEntry> send(List<Chunk> toSend)

PostMethod method = new PostMethod();
method.setRequestEntity(postData);
log.info(">>>>>> HTTP post_"+thisPost + " to " + currCollector + " length = " + postData.getContentLength());
StringBuilder sb = new StringBuilder( ">>>>>> HTTP post_");
sb.append( thisPost).append( " to ").append( currCollector).append( " length = ");
if( COMPRESS) {
sb.append( ((BuffersRequestEntity)postData).getUncompressedContentLenght())
.append( " of uncompressed data");
}
else {
sb.append( postData.getContentLength());
}
log.info( sb);

List<CommitListEntry> results = postAndParseResponse(method, commitResults);
log.info("post_" + thisPost + " sent " + toSendSize + " chunks, got back " + results.size() + " acks");

0 comments on commit 941c721

Please sign in to comment.