Skip to content

Commit

Permalink
Support messages >1024 bytes in BeatsFrameDecoder (#12)
Browse files Browse the repository at this point in the history
Fixes #10
(cherry picked from commit 68271be)
  • Loading branch information
joschi authored and Jochen Schalanda committed Oct 6, 2016
1 parent 270f3b9 commit b66d097
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 154 deletions.
180 changes: 127 additions & 53 deletions src/main/java/org/graylog/plugins/beats/BeatsFrameDecoder.java
Expand Up @@ -18,14 +18,15 @@

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,7 +37,7 @@
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.zip.InflaterInputStream;

Expand All @@ -45,7 +46,7 @@
*
* @see <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md">Lumberjack protocol</a>
*/
public class BeatsFrameDecoder extends FrameDecoder {
public class BeatsFrameDecoder extends ReplayingDecoder<BeatsFrameDecoder.DecodingState> {
private static final Logger LOG = LoggerFactory.getLogger(BeatsFrameDecoder.class);

private static final byte PROTOCOL_VERSION = '2';
Expand All @@ -55,59 +56,106 @@ public class BeatsFrameDecoder extends FrameDecoder {
private static final byte FRAME_JSON = 'J';
private static final byte FRAME_WINDOW_SIZE = 'W';

enum DecodingState {
PROTOCOL_VERSION,
FRAME_TYPE,
FRAME_COMPRESSED,
FRAME_DATA,
FRAME_JSON,
FRAME_WINDOW_SIZE
}

private long windowSize;
private long sequenceNum;


public BeatsFrameDecoder() {
super(true);
super(DecodingState.PROTOCOL_VERSION, true);
}

@Override
protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer) throws Exception {
final Iterable<ChannelBuffer> events = processBuffer(channel, channelBuffer);
if (events == null) {
return null;
} else {
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, DecodingState state) throws Exception {
ChannelBuffer[] events = null;
switch (state) {
case PROTOCOL_VERSION:
checkVersion(buffer);
checkpoint(DecodingState.FRAME_TYPE);
case FRAME_TYPE:
final byte frameType = buffer.readByte();
switch (frameType) {
case FRAME_WINDOW_SIZE:
checkpoint(DecodingState.FRAME_WINDOW_SIZE);
break;
case FRAME_DATA:
checkpoint(DecodingState.FRAME_DATA);
break;
case FRAME_COMPRESSED:
checkpoint(DecodingState.FRAME_COMPRESSED);
break;
case FRAME_JSON:
checkpoint(DecodingState.FRAME_JSON);
break;
default:
throw new Exception("Unknown frame type: " + frameType);
}
return null;
case FRAME_WINDOW_SIZE:
processWindowSizeFrame(buffer);
break;
case FRAME_DATA:
events = parseDataFrame(channel, buffer);
break;
case FRAME_COMPRESSED:
events = processCompressedFrame(channel, buffer);
break;
case FRAME_JSON:
events = parseJsonFrame(channel, buffer);
break;
default:
throw new Exception("Unknown decoding state: " + state);
}

try {
return events;
} finally {
checkpoint(DecodingState.PROTOCOL_VERSION);
}
}

@Nullable
private Iterable<ChannelBuffer> processBuffer(Channel channel, ChannelBuffer channelBuffer) throws IOException {
channelBuffer.markReaderIndex();
@SuppressWarnings("unused")
byte version = channelBuffer.readByte();
if (LOG.isTraceEnabled() && version != PROTOCOL_VERSION) {
LOG.trace("Unknown beats protocol version: {}", version);
}
byte frameType = channelBuffer.readByte();
private ChannelBuffer[] processUncompressedBuffer(Channel channel, ChannelBuffer buffer) throws Exception {
buffer.markReaderIndex();
checkVersion(buffer);
byte frameType = buffer.readByte();

Iterable<ChannelBuffer> events = null;
ChannelBuffer[] events = null;
switch (frameType) {
case FRAME_WINDOW_SIZE:
processWindowSizeFrame(channelBuffer);
processWindowSizeFrame(buffer);
break;
case FRAME_DATA:
events = Collections.singleton(parseDataFrame(channelBuffer));
sendACK(channel);
events = parseDataFrame(channel, buffer);
break;
case FRAME_COMPRESSED:
events = processCompressedFrame(channel, channelBuffer);
events = processCompressedFrame(channel, buffer);
break;
case FRAME_JSON:
events = Collections.singleton(parseJsonFrame(channelBuffer));
sendACK(channel);
events = parseJsonFrame(channel, buffer);
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot understand frame type: {}", Character.getName(frameType));
}
break;
throw new Exception("Unknown frame type: " + frameType);
}

return events;
}

private void checkVersion(ChannelBuffer channelBuffer) throws Exception {
byte version = channelBuffer.readByte();
if (version != PROTOCOL_VERSION) {
throw new Exception("Unknown beats protocol version: {}");
}
}

/**
* @see <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type">'ack' frame type</a>
*/
Expand All @@ -126,18 +174,26 @@ private void sendACK(Channel channel) throws IOException {
/**
* <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type">'json' frame type</a>
*/
private ChannelBuffer parseJsonFrame(ChannelBuffer channelBuffer) throws IOException {
sequenceNum = channelBuffer.readUnsignedInt();
LOG.trace("Received sequence number {}", sequenceNum);
private ChannelBuffer[] parseJsonFrame(Channel channel, ChannelBuffer channelBuffer) throws IOException {
if (channelBuffer.readableBytes() >= 4) {
sequenceNum = channelBuffer.readUnsignedInt();
LOG.trace("Received sequence number {}", sequenceNum);

final int jsonLength = Ints.saturatedCast(channelBuffer.readUnsignedInt());

final int jsonLength = Ints.saturatedCast(channelBuffer.readUnsignedInt());
return channelBuffer.readSlice(jsonLength);
final ChannelBuffer buffer = channelBuffer.readSlice(jsonLength);
sendACK(channel);

return new ChannelBuffer[]{buffer};
}

return null;
}

/**
* @see <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type">'compressed' frame type</a>
*/
private Iterable<ChannelBuffer> processCompressedFrame(Channel channel, ChannelBuffer channelBuffer) throws IOException {
private ChannelBuffer[] processCompressedFrame(Channel channel, ChannelBuffer channelBuffer) throws Exception {
if (channelBuffer.readableBytes() >= 4) {
final long payloadLength = channelBuffer.readUnsignedInt();
if (channelBuffer.readableBytes() < payloadLength) {
Expand All @@ -157,15 +213,15 @@ private Iterable<ChannelBuffer> processCompressedFrame(Channel channel, ChannelB
return null;
}

private Iterable<ChannelBuffer> processCompressedDataFrames(Channel channel, ChannelBuffer channelBuffer) throws IOException {
private ChannelBuffer[] processCompressedDataFrames(Channel channel, ChannelBuffer channelBuffer) throws Exception {
final List<ChannelBuffer> events = new ArrayList<>();
while (channelBuffer.readable()) {
final Iterable<ChannelBuffer> buffers = processBuffer(channel, channelBuffer);
final ChannelBuffer[] buffers = processUncompressedBuffer(channel, channelBuffer);
if (buffers != null) {
Iterables.addAll(events, buffers);
Iterables.addAll(events, Arrays.asList(buffers));
}
}
return events;
return events.toArray(new ChannelBuffer[0]);
}

/**
Expand All @@ -183,23 +239,31 @@ private void processWindowSizeFrame(ChannelBuffer channelBuffer) {
/**
* @see <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type">'data' frame type</a>
*/
private ChannelBuffer parseDataFrame(ChannelBuffer channelBuffer) throws IOException {
sequenceNum = channelBuffer.readUnsignedInt();
LOG.trace("Received sequence number {}", sequenceNum);

final int pairs = Ints.saturatedCast(channelBuffer.readUnsignedInt());
final JsonFactory jsonFactory = new JsonFactory();
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (final JsonGenerator jg = jsonFactory.createGenerator(outputStream)) {
jg.writeStartObject();
for (int i = 0; i < pairs; i++) {
final String key = parseDataItem(channelBuffer);
final String value = parseDataItem(channelBuffer);
jg.writeStringField(key, value);
private ChannelBuffer[] parseDataFrame(Channel channel, ChannelBuffer channelBuffer) throws IOException {
if (channelBuffer.readableBytes() >= 8) {
sequenceNum = channelBuffer.readUnsignedInt();
LOG.trace("Received sequence number {}", sequenceNum);

final int pairs = Ints.saturatedCast(channelBuffer.readUnsignedInt());
final JsonFactory jsonFactory = new JsonFactory();
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (final JsonGenerator jg = jsonFactory.createGenerator(outputStream)) {
jg.writeStartObject();
for (int i = 0; i < pairs; i++) {
final String key = parseDataItem(channelBuffer);
final String value = parseDataItem(channelBuffer);
jg.writeStringField(key, value);
}
jg.writeEndObject();
}
jg.writeEndObject();

final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(outputStream.toByteArray());
sendACK(channel);

return new ChannelBuffer[]{buffer};
}
return ChannelBuffers.wrappedBuffer(outputStream.toByteArray());

return null;
}

private String parseDataItem(ChannelBuffer channelBuffer) {
Expand All @@ -208,4 +272,14 @@ private String parseDataItem(ChannelBuffer channelBuffer) {
channelBuffer.readBytes(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}

@VisibleForTesting
long getWindowSize() {
return windowSize;
}

@VisibleForTesting
long getSequenceNum() {
return sequenceNum;
}
}

0 comments on commit b66d097

Please sign in to comment.