Permalink
Browse files

FLUME-418: Add batching and compression arguments to agent and collec…

…tors

- compression and batching before retry logic and buffering in sinks
- add test to make sure compressed version works
  • Loading branch information...
jmhsieh committed Jan 2, 2011
1 parent d02bf98 commit 0e7b248bd7648798f61dcbdcb49d5ad07015398b
@@ -62,17 +62,42 @@
BEST_EFFORT, // this is equivalent to syslog's best effort mechanism.
};
+ public static final String BATCH_COUNT = "batchCount";
+ public static final String BATCH_MILLIS = "batchMillis";
+
final EventSink sink;
public AgentSink(Context ctx, String dsthost, int port, ReliabilityMode mode)
throws FlumeSpecException {
Preconditions.checkNotNull(dsthost);
+ // batching is overhead so if no settings specified, don't do it.
+ String batchGzDeco = "";
+ String batchN = ctx.getValue(BATCH_COUNT);
+ int n = 1;
+ if (batchN != null) {
+ n = Integer.parseInt(batchN);
+ }
+
+ String batchLatency = ctx.getValue(BATCH_MILLIS);
+ int ms = 0; // never time out due to time.
+ if (batchLatency != null) {
+ ms = Integer.parseInt(batchLatency);
+ }
+
+ if (n > 1) {
+ batchGzDeco += " batch(" + n + "," + ms + ") ";
+ }
+
+ if (ctx.getValue("compression") != null) {
+ // currently ignore all values, just use gzip
+ batchGzDeco += " gzip ";
+ }
+
switch (mode) {
case ENDTOEND: {
- String snk = String.format(
- "{ ackedWriteAhead => { stubbornAppend => { insistentOpen => "
- + "rpcSink(\"%s\", %d)} } }", dsthost, port);
+ String snk = String.format("ackedWriteAhead " + batchGzDeco
+ + " stubbornAppend insistentOpen rpcSink(\"%s\", %d)", dsthost, port);
sink = FlumeBuilder.buildSink(ctx, snk);
break;
}
@@ -86,17 +111,19 @@ public AgentSink(Context ctx, String dsthost, int port, ReliabilityMode mode)
long maxCumulativeBo = conf.getFailoverMaxCumulativeBackoff();
String rpc = String.format("rpcSink(\"%s\", %d)", dsthost, port);
- String snk = String.format("< %s ? { diskFailover => { insistentAppend "
- + "=> { stubbornAppend => { insistentOpen(%d,%d,%d) => %s} } } } >",
- rpc, maxSingleBo, initialBo, maxCumulativeBo, rpc);
+ String snk = String.format(batchGzDeco
+ + " < %s ? diskFailover insistentAppend "
+ + " stubbornAppend insistentOpen(%d,%d,%d) %s >", rpc, maxSingleBo,
+ initialBo, maxCumulativeBo, rpc);
sink = FlumeBuilder.buildSink(ctx, snk);
break;
}
case BEST_EFFORT: {
- String snk = String.format("< { insistentOpen => { stubbornAppend => "
- + "rpcSink(\"%s\", %d) } } ? null>", dsthost, port);
+ String snk = String.format("< " + batchGzDeco
+ + " insistentOpen stubbornAppend rpcSink(\"%s\", %d) ? null>",
+ dsthost, port);
sink = FlumeBuilder.buildSink(ctx, snk);
break;
}
@@ -137,7 +164,9 @@ public static SinkBuilder e2eBuilder() {
return new SinkBuilder() {
@Override
public EventSink build(Context context, String... argv) {
- Preconditions.checkArgument(argv.length <= 2);
+ Preconditions.checkArgument(argv.length <= 2,
+ "usage: agentE2ESink(collectorhost[, port]{, " + BATCH_COUNT
+ + "=1}{, " + BATCH_MILLIS + "=0}{,compression=false})");
FlumeConfiguration conf = FlumeConfiguration.get();
String collector = conf.getCollectorHost();
int port = conf.getCollectorPort();
@@ -177,7 +206,9 @@ public static SinkBuilder dfoBuilder() {
return new SinkBuilder() {
@Override
public EventSink build(Context context, String... argv) {
- Preconditions.checkArgument(argv.length <= 2);
+ Preconditions.checkArgument(argv.length <= 2,
+ "usage: agentDFOSink(collectorhost[, port]{, " + BATCH_COUNT
+ + "=1}{, " + BATCH_MILLIS + "=0}{,compression=false})");
FlumeConfiguration conf = FlumeConfiguration.get();
String collector = conf.getCollectorHost();
int port = conf.getCollectorPort();
@@ -213,7 +244,9 @@ public static SinkBuilder beBuilder() {
return new SinkBuilder() {
@Override
public EventSink build(Context context, String... argv) {
- Preconditions.checkArgument(argv.length <= 2);
+ Preconditions.checkArgument(argv.length <= 2,
+ "usage: agentBESink(collectorhost[, port]{, " + BATCH_COUNT
+ + "=1}{, " + BATCH_MILLIS + "=0}{,compression=false})");
FlumeConfiguration conf = FlumeConfiguration.get();
String collector = conf.getCollectorHost();
int port = conf.getCollectorPort();
@@ -39,6 +39,8 @@
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.flume.core.MaskDecorator;
+import com.cloudera.flume.handlers.batch.GunzipDecorator;
+import com.cloudera.flume.handlers.batch.UnbatchingDecorator;
import com.cloudera.flume.handlers.debug.InsistentAppendDecorator;
import com.cloudera.flume.handlers.debug.InsistentOpenDecorator;
import com.cloudera.flume.handlers.debug.StubbornAppendSink;
@@ -115,14 +117,16 @@ public EventSink newSink(Context ctx) throws IOException {
// needs an extra mask before rolling, writing to disk and forwarding acks
// (roll detect).
- // { ackChecksumChecker => insistentAppend => stubbornAppend =>
- // insistentOpen => mask("rolltag") => roll(xx) { rollDetect =>
- // subsink } }
+ // gunzip unbatch ackChecksumChecker insistentAppend stubbornAppend
+ // insistentOpen mask("rolltag") roll(xx) { rollDetect subsink }
+
EventSink tmp = new MaskDecorator<EventSink>(roller, "rolltag");
tmp = new InsistentOpenDecorator<EventSink>(tmp, backoff1);
tmp = new StubbornAppendSink<EventSink>(tmp);
tmp = new InsistentAppendDecorator<EventSink>(tmp, backoff2);
- snk = new AckChecksumChecker<EventSink>(tmp, accum);
+ tmp = new AckChecksumChecker<EventSink>(tmp, accum);
+ tmp = new UnbatchingDecorator<EventSink>(tmp);
+ snk = new GunzipDecorator<EventSink>(tmp);
}
/**
@@ -17,8 +17,6 @@
*/
package com.cloudera.flume.agent;
-import static org.junit.Assert.assertEquals;
-
import java.io.File;
import java.io.IOException;
@@ -29,20 +27,13 @@
import org.slf4j.LoggerFactory;
import com.cloudera.flume.conf.Context;
+import com.cloudera.flume.conf.FlumeArgException;
import com.cloudera.flume.conf.FlumeBuilder;
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.conf.FlumeSpecException;
import com.cloudera.flume.conf.LogicalNodeContext;
-import com.cloudera.flume.conf.SinkFactoryImpl;
-import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
-import com.cloudera.flume.core.Event;
-import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSource;
-import com.cloudera.flume.core.EventUtil;
-import com.cloudera.flume.handlers.avro.AvroJsonOutputFormat;
-import com.cloudera.flume.handlers.debug.ConsoleEventSink;
-import com.cloudera.flume.handlers.debug.MemorySinkSource;
import com.cloudera.util.FileUtil;
/**
@@ -92,9 +83,39 @@ public void testBuilder() throws FlumeSpecException {
Assert.fail("unexpected fall through");
}
+ @Test
+ public void testBatchCompressBuilder() throws FlumeSpecException {
+ String snk1 = "agentSink(\"localhost\", 12345, compression=true)";
+ FlumeBuilder.buildSink(LogicalNodeContext.testingContext(), snk1);
+
+ String snk2 = "agentSink(\"localhost\", 12345, batchCount=100 )";
+ FlumeBuilder.buildSink(LogicalNodeContext.testingContext(), snk2);
+
+ String snk3 = "agentSink(\"localhost\", 12345, batchMillis=1000 )";
+ FlumeBuilder.buildSink(LogicalNodeContext.testingContext(), snk3);
+
+ String snk4 = "agentSink(\"localhost\", 12345, batchCount=100, batchMillis=1000)";
+ FlumeBuilder.buildSink(LogicalNodeContext.testingContext(), snk4);
+
+ String snk5 = "agentSink(\"localhost\", 12345, batchCount=100, batchMillis=1000, compression=true)";
+ FlumeBuilder.buildSink(LogicalNodeContext.testingContext(), snk5);
+ }
+
+ @Test(expected = FlumeArgException.class)
+ public void testBadBatchCompressBuilder() throws FlumeSpecException {
+ String snk1 = "agentSink(\"localhost\", 12345, batchCount=true)";
+ FlumeBuilder.buildSink(LogicalNodeContext.testingContext(), snk1);
+ }
+
+ @Test(expected = FlumeArgException.class)
+ public void testBadBatchCompressBuilder2() throws FlumeSpecException {
+ String snk1 = "agentSink(\"localhost\", 12345, batchMillis=1000, batchCount=true)";
+ FlumeBuilder.buildSink(LogicalNodeContext.testingContext(), snk1);
+ }
+
@Test
public void testDiskFailoverBuilder() throws FlumeSpecException {
- String snk = " agentFailoverSink";
+ String snk = "agentFailoverSink";
FlumeBuilder.buildSink(LogicalNodeContext.testingContext(), snk);
String snk2 = "agentFailoverSink(\"localhost\")";
@@ -229,7 +229,7 @@ MemorySinkSource setupAckRoll() throws IOException, InterruptedException {
+ "\",\"\")";
CollectorSink coll = (CollectorSink) FlumeBuilder.buildSink(new Context(),
snkspec);
- RollSink roll = coll.roller;
+ RollSink roll = coll.roller; // shortcut to roller exposed for testing
// normally inside wal
NaiveFileWALDeco.AckChecksumRegisterer<EventSink> snk = new NaiveFileWALDeco.AckChecksumRegisterer<EventSink>(
@@ -199,27 +199,55 @@ data outputted by collectors is specified by the
.Flume's Agent Tier Event Sinks
-[horizontal]
-+agentSink[("_machine_"[, _port_])]+ :: Defaults to +agentE2ESink+
-
-+agentE2ESink[("_machine_"[, _port_])]+ :: Agent sink with write ahead log and
-end-to-end ack. Optional arguments specify a +_machine_+, and the TCP
-+_port_+ pointing to a +collectorSource+. If none is specified, the values
-specified by the +flume.collector.event.host+ and the +flume.collector.port+
-properties will be used.
-
-+agentDFOSink[("_machine_"[, _port_])]+ :: DiskFailover Agent sink that stores
-to local disk on detected failure. This sink periodically checks with the
-+_machine:port_+ and resends events if becomes alive again. Optional arguments
-specify a +_machine_+, and the TCP +_port_+ pointing to a +collectorSource+.
-If none is specified, the values specified by the +flume.collector.event.host+
-and the +flume.collector.port+ properties will be used.
-
-+agentBESink[("_machine_"[, _port_])]+ :: BestEffort Agent sink. This drops
-messages on failures and continues sending. Optional arguments specify a
-+_collector_+, and the TCP +_PORT_+ pointing to a +collectorSource+. If none
-is specified, the values specified by the +flume.collector.event.host+ and the
-+flume.collector.port+ properties will be used.
+[horizontal] +agentSink[("_machine_"[, _port_]{, batchN=1}{,
+batchLatency=0}{,compression=false})]+ :: Defaults to +agentE2ESink+
+
++agentE2ESink[("_machine_"[, _port_]{,batchN=1}{,batchLatency=0}{,compression=false})]+ ::
+Agent sink with write ahead log and end-to-end ack. Optional
+arguments specify a +_machine_+, and the TCP +_port_+ pointing to a
++collectorSource+. If none is specified, the values specified by the
++flume.collector.event.host+ and the +flume.collector.port+ properties
+will be used. Batching and compression options can also be specfied
+at the agent. By default neither batching nor compression is used.
+By specifying a +batchTimeout+ integer keyword argument, the agent
+will attempt to send at least every +batchMillis+ ms. By adding the
++batchCount+ integer keyword argument, one can send events in groups
+of +batchCount+ events. By specifying the +compression+ boolean
+argument, gzip compression (of batched events if batching enabled)
+will be enabled.
+
++agentDFOSink[("_machine_"[, _port_]{, batchN=1}{,batchLatency=0}{,compression=false})]+ ::
+Disk Failover Agent sink that stores to local disk on detected
+failure. This sink periodically checks with the +_machine:port_+ and
+resends events if becomes alive again. Optional arguments specify a
++_machine_+, and the TCP +_port_+ pointing to a +collectorSource+. If
+none is specified, the values specified by the
++flume.collector.event.host+ and the +flume.collector.port+ properties
+will be used. Batching and compression options can also be specfied
+at the agent. By default neither batching nor compression is used.
+By specifying a +batchTimeout+ integer keyword argument, the agent
+will attempt to send at least every +batchMillis+ ms. By adding the
++batchCount+ integer keyword argument, one can send events in groups
+of +batchCount+ events. By specifying the +compression+ boolean
+argument, gzip compression (of batched events if batching enabled)
+will be enabled.
+
++agentBESink[("_machine_"[, _port_]{, batchN=1}{,batchLatency=0}{,compression=false})]+ ::
+Best Effort Agent sink. This drops messages on failures and continues
+sending. Optional arguments specify a +_collector_+, and the TCP
++_PORT_+ pointing to a +collectorSource+. If none is specified, the
+values specified by the +flume.collector.event.host+ and the
++flume.collector.port+ properties will be used. Batching and
+compression options can also be specfied at the agent. By default
+neither batching nor compression is used. By specifying a
++batchTimeout+ integer keyword argument, the agent will attempt to
+send at least every +batchMillis+ ms. By adding the +batchCount+
+integer keyword argument, one can send events in groups of
++batchCount+ events. By specifying the +compression+ boolean
+argument, gzip compression (of batched events if batching enabled)
+will be enabled.
+
+
+agentE2EChain("_m1_[:_p1_]"[, "_m2_[:_p2_]"[,...]])+ :: Agent sink
with write-ahead log and end-to-end ack and collector failover
@@ -1,4 +1,4 @@
-#!/usr/bin/env /bin/bash
+#!/usr/bin/env /bin/sh
# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -21,14 +21,16 @@
# specific number of events were delivered. It expects that a node
# and master are running on the localhost
-DIR=`dirname $0`
-FLUME="$DIR/../bin/flume"
+DIR=`dirname $0`/..
+FLUME_HOME=${FLUME_HOME:-$DIR/flume-distribution/target/flume-*/flume*}
+FLUME=${FLUME_HOME}/bin/flume
+
MASTER=localhost
NODE1=`hostname`
NODE2=`hostname`
-TMPDIR=/tmp/test-be
+FLUMELOGDIR=/tmp/test-be-`date +%Y%m%d-%H%M%S.%N`
-rm -rf $TMPDIR || { echo "Unable to delete $TMPDIR" ; exit 1 ; }
+rm -rf $FLUMELOGDIR || { echo "Unable to delete $FLUMELOGDIR" ; exit 1 ; }
$FLUME shell <<EOF
connect $MASTER
@@ -44,7 +46,7 @@ exec spawn $NODE2 node
exec noop 10000
## Setup an agent collector topology using autoBEChains.
-exec config collector autoCollectorSource 'collectorSink("file://$TMPDIR","data")'
+exec config collector autoCollectorSource 'collectorSink("file://$FLUMELOGDIR","data",5000)'
exec config node 'asciisynth(500)' '{delay(200)=>autoBEChain}'
## Wait for the node to become active, and then done.
@@ -56,7 +58,9 @@ exec decommission node
exec decommission collector
# waitForNodesDecommissioned
-wait 5000
+# waitForNodesDone 20000 collector ## this doesn't work anymore.
+exec noop 5000
+exec noop 5000
quit
EOF
@@ -68,12 +72,12 @@ fi
sleep 5s
-COUNT=`wc -l $TMPDIR/* | tail -1 | awk '{print $1}'`
+COUNT=`wc -l $FLUMELOGDIR/* | tail -1 | awk '{print $1}'`
echo count messages $COUNT
if [ "$COUNT" != 500 ] ; then
echo "FAIL"
exit 1
fi
echo "SUCCESS"
-rm -rf $TMPDIR || { echo "test succeeded but failed to rm $TMPDIR" ; exit 1; }
+rm -rf $FLUMELOGDIR || { echo "test succeeded but failed to rm $FLUMELOGDIR" ; exit 1; }
Oops, something went wrong.

0 comments on commit 0e7b248

Please sign in to comment.