Permalink
Browse files

FLUME-620: Collector fails due to an infinite loop during file-rollin…

…g if the body size is greater than 30kb

- Added option for Thrift event body truncation
- Added option for Avro event body truncation
  • Loading branch information...
jmhsieh committed Jun 15, 2011
1 parent 8963b90 commit 545596911f3702778dca8591e65839e833414f60
@@ -18,63 +18,88 @@
package com.cloudera.flume.handlers.avro;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.commons.lang.StringEscapeUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.google.common.base.Preconditions;
/**
- * This wraps a Avro generated AvroFlumeEvent with a Flume Event interface.
+ * This utility class contains methods that convert Avro-generated
+ * AvroFlumeEvents into an FlumeEvents and vice versa.
*/
-public class AvroEventAdaptor extends Event {
- AvroFlumeEvent evt = null;
+public class AvroEventConvertUtil {
- AvroEventAdaptor(AvroFlumeEvent evt) {
- super();
- this.evt = evt;
+ private AvroEventConvertUtil() {
}
- @Override
- public byte[] getBody() {
- return evt.body.array();
+ public static final Logger LOG = LoggerFactory
+ .getLogger(AvroEventConvertUtil.class);
+
+ public static Event toFlumeEvent(AvroFlumeEvent evt) {
+ return toFlumeEvent(evt, false);
}
- @Override
- public Priority getPriority() {
- return convert(evt.priority);
+ public static Event toFlumeEvent(AvroFlumeEvent evt, boolean truncates) {
+ Preconditions.checkArgument(evt != null, "AvorFlumeEvent is null!");
+
+ byte[] body = convertBody(evt.body, truncates);
+ com.cloudera.flume.handlers.avro.Priority p = evt.priority;
+ p = (p == null) ? com.cloudera.flume.handlers.avro.Priority.INFO : p;
+ String host = (evt.host == null) ? "" : evt.host.toString();
+ Map<String, byte[]> attrs = getAttrs(evt.fields);
+ return new EventImpl(body, evt.timestamp, toFlumePriority(p), evt.nanos,
+ host, attrs);
}
- @Override
- public long getTimestamp() {
- return evt.timestamp;
+ private static byte[] convertBody(ByteBuffer buf, boolean truncates) {
+ if (buf == null) {
+ LOG.warn("Avro Event had null body! returning empty body");
+ return new byte[0];
+ }
+ byte[] bytes = buf.array();
+ int maxSz = (int) FlumeConfiguration.get().getEventMaxSizeBytes();
+ if (bytes.length > maxSz) {
+ Preconditions.checkArgument(truncates,
+ "Unexpected too long Avro Event body: max is " + maxSz
+ + " but body was buf.length");
+ byte[] trunc = Arrays.copyOf(bytes, maxSz);
+ return trunc;
+ }
+ // normal case
+ return bytes;
}
- public static Priority convert(com.cloudera.flume.handlers.avro.Priority p) {
+ private static com.cloudera.flume.core.Event.Priority toFlumePriority(
+ com.cloudera.flume.handlers.avro.Priority p) {
Preconditions.checkNotNull(p, "Priority argument must be valid.");
switch (p) {
case FATAL:
- return Priority.FATAL;
+ return com.cloudera.flume.core.Event.Priority.FATAL;
case ERROR:
- return Priority.ERROR;
+ return com.cloudera.flume.core.Event.Priority.ERROR;
case WARN:
- return Priority.WARN;
+ return com.cloudera.flume.core.Event.Priority.WARN;
case INFO:
- return Priority.INFO;
+ return com.cloudera.flume.core.Event.Priority.INFO;
case DEBUG:
- return Priority.DEBUG;
+ return com.cloudera.flume.core.Event.Priority.DEBUG;
case TRACE:
- return Priority.TRACE;
+ return com.cloudera.flume.core.Event.Priority.TRACE;
default:
throw new IllegalStateException("Unknown value " + p);
}
}
- public static com.cloudera.flume.handlers.avro.Priority convert(Priority p) {
+ private static com.cloudera.flume.handlers.avro.Priority toAvroPriority(
+ com.cloudera.flume.core.Event.Priority p) {
Preconditions.checkNotNull(p, "Argument must not be null.");
switch (p) {
case FATAL:
@@ -94,28 +119,11 @@ public static Priority convert(com.cloudera.flume.handlers.avro.Priority p) {
}
}
- @Override
- public String toString() {
- String mbody = StringEscapeUtils.escapeJava(new String(getBody()));
- return "[" + getPriority().toString() + " " + new Date(getTimestamp())
- + "] " + mbody;
- }
-
- @Override
- public long getNanos() {
- return evt.nanos;
- }
-
- @Override
- public String getHost() {
- return evt.host.toString();
- }
-
- public static AvroFlumeEvent convert(Event e) {
+ public static AvroFlumeEvent toAvroEvent(Event e) {
AvroFlumeEvent tempAvroEvt = new AvroFlumeEvent();
tempAvroEvt.timestamp = e.getTimestamp();
- tempAvroEvt.priority = convert(e.getPriority());
+ tempAvroEvt.priority = toAvroPriority(e.getPriority());
ByteBuffer bbuf = ByteBuffer.wrap(e.getBody());
tempAvroEvt.body = bbuf;
@@ -132,50 +140,16 @@ public static AvroFlumeEvent convert(Event e) {
return tempAvroEvt;
}
- /**
- * This returns the FlumeEvent corresponding to the AvroEvent passed in the
- * constructor of this object.
- */
- public Event toFlumeEvent() {
- Preconditions.checkNotNull(evt, "AvroFlumeEvent is not initialized");
- return new EventImpl(this.getBody(), this.getTimestamp(), this
- .getPriority(), this.getNanos(), this.getHost(), this.getAttrs());
- }
-
- @Override
- public byte[] get(String attr) {
- return evt.fields.get(attr).array();
- }
-
- @Override
- public Map<String, byte[]> getAttrs() {
- if (evt.fields == null) {
+ private static Map<String, byte[]> getAttrs(
+ Map<CharSequence, ByteBuffer> fields) {
+ if (fields == null) {
return Collections.<String, byte[]> emptyMap();
}
HashMap<String, byte[]> tempMap = new HashMap<String, byte[]>();
- for (CharSequence u : evt.fields.keySet()) {
- tempMap.put(u.toString(), evt.fields.get(u).array());
+ for (CharSequence u : fields.keySet()) {
+ tempMap.put(u.toString(), fields.get(u).array());
}
return tempMap;
}
- @Override
- public void set(String attr, byte[] value) {
- if (evt.fields.get(attr) != null) {
- throw new IllegalArgumentException(
- "Event already had an event with attribute " + attr);
- }
- ByteBuffer bbuf = ByteBuffer.wrap(value);
- evt.fields.put(attr, bbuf);
- }
-
- @Override
- public void hierarchicalMerge(String prefix, Event e) {
- throw new NotImplementedException();
- }
-
- @Override
- public void merge(Event e) {
- throw new NotImplementedException();
- }
}
@@ -69,7 +69,7 @@ public AvroEventSink(String host, int port) {
@Override
public void append(Event e) throws IOException, InterruptedException {
// convert the flumeEvent to AvroEevent
- AvroFlumeEvent afe = AvroEventAdaptor.convert(e);
+ AvroFlumeEvent afe = AvroEventConvertUtil.toAvroEvent(e);
// Make sure client side is initialized.
this.ensureInitialized();
try {
@@ -54,6 +54,7 @@
static final Logger LOG = LoggerFactory.getLogger(AvroEventSource.class);
+ public static final String C_TRUNCATE = "truncate";
public static final String A_SERVERPORT = "serverPort";
public static final String A_QUEUE_CAPACITY = "queueCapacity";
public static final String A_QUEUE_FREE = "queueFree";
@@ -68,16 +69,17 @@
final AtomicLong enqueued = new AtomicLong();
final AtomicLong dequeued = new AtomicLong();
final AtomicLong bytesIn = new AtomicLong();
-
+ final boolean shouldTruncate;
boolean closed = true;
/**
* Create a Avro event source listening on port with a qsize buffer.
*/
- public AvroEventSource(int port, int qsize) {
+ public AvroEventSource(int port, int qsize, boolean truncates) {
this.port = port;
this.svr = new FlumeEventAvroServerImpl(port);
this.q = new LinkedBlockingQueue<Event>(qsize);
+ this.shouldTruncate = truncates;
}
/**
@@ -98,14 +100,15 @@ synchronized public ReportEvent getMetrics() {
/**
* This constructor allows the for an arbitrary blocking queue implementation.
*/
- public AvroEventSource(int port, BlockingQueue<Event> q) {
+ public AvroEventSource(int port, BlockingQueue<Event> q, boolean truncates) {
Preconditions.checkNotNull(q);
this.port = port;
this.q = q;
+ this.shouldTruncate = truncates;
}
public AvroEventSource(int port) {
- this(port, DEFAULT_QUEUE_SIZE);
+ this(port, DEFAULT_QUEUE_SIZE, false);
}
/**
@@ -132,9 +135,9 @@ synchronized public void open() throws IOException {
@Override
public void append(AvroFlumeEvent evt) {
// convert AvroEvent evt -> e
- AvroEventAdaptor adapt = new AvroEventAdaptor(evt);
+ Event e = AvroEventConvertUtil.toFlumeEvent(evt, shouldTruncate);
try {
- enqueue(adapt.toFlumeEvent());
+ enqueue(e);
} catch (IOException e1) {
e1.printStackTrace();
}
@@ -166,9 +169,8 @@ synchronized public void close() throws IOException {
if (Clock.unixTime() - start > maxSleep) {
if (sz == q.size()) {
// no progress made, timeout and close it.
- LOG
- .warn("Close timed out due to no progress. Closing despite having "
- + q.size() + " values still enqueued");
+ LOG.warn("Close timed out due to no progress. Closing despite having "
+ + q.size() + " values still enqueued");
return;
}
// there was some progress, go another cycle.
@@ -221,10 +223,14 @@ public static SourceBuilder builder() {
return new SourceBuilder() {
@Override
public EventSource build(Context ctx, String... argv) {
- Preconditions
- .checkArgument(argv.length == 1, "usage: avroSource(port)");
+ Preconditions.checkArgument(argv.length == 1,
+ "usage: avroSource(port{," + C_TRUNCATE + "=false})");
int port = Integer.parseInt(argv[0]);
- return new AvroEventSource(port);
+
+ String val = ctx.getObj(C_TRUNCATE, String.class);
+ boolean truncates = (val == null) ? false : Boolean.parseBoolean(val);
+
+ return new AvroEventSource(port, DEFAULT_QUEUE_SIZE, truncates);
}
};
}
@@ -123,7 +123,7 @@ public void append(Event e) throws IOException,
q.add(e);
super.append(e);
}
- }));
+ }, false));
Factory protFactory = new TBinaryProtocol.Factory(true, true);
TSaneServerSocket serverTransport = new TSaneServerSocket(port);
Oops, something went wrong.

0 comments on commit 5455969

Please sign in to comment.