Skip to content

Commit

Permalink
Refactor BeatsCodec to emit single messages
Browse files Browse the repository at this point in the history
Closes #5
  • Loading branch information
Jochen Schalanda committed May 18, 2016
1 parent e8e6d26 commit 6b35d65
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 321 deletions.
27 changes: 5 additions & 22 deletions src/main/java/org/graylog/plugins/beats/BeatsCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.AbstractCodec;
import org.graylog2.plugin.inputs.codecs.MultiMessageCodec;
import org.graylog2.plugin.journal.RawMessage;
import org.joda.time.DateTime;
import org.slf4j.Logger;
Expand All @@ -36,17 +35,14 @@
import javax.annotation.Nullable;
import javax.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Objects.requireNonNull;
import static org.graylog.plugins.beats.MapUtils.flatten;

@Codec(name = "beats", displayName = "Beats")
public class BeatsCodec extends AbstractCodec implements MultiMessageCodec {
public class BeatsCodec extends AbstractCodec {
private static final Logger LOG = LoggerFactory.getLogger(BeatsCodec.class);
private static final String MAP_KEY_SEPARATOR = "_";

Expand All @@ -61,30 +57,17 @@ public BeatsCodec(@Assisted Configuration configuration, ObjectMapper objectMapp
@Nullable
@Override
public Message decode(@Nonnull RawMessage rawMessage) {
throw new UnsupportedOperationException("MultiMessageCodec " + getClass() + " does not support decode()");
}

@Nullable
@Override
public Collection<Message> decodeMessages(@Nonnull RawMessage rawMessage) {
final byte[] payload = rawMessage.getPayload();
final List<Map<String, Object>> events;
final Map<String, Object> event;
try {
events = objectMapper.readValue(payload, new TypeReference<List<Map<String, Object>>>() {
event = objectMapper.readValue(payload, new TypeReference<Map<String, Object>>() {
});
} catch (IOException e) {
LOG.error("Couldn't decode raw message {}", rawMessage);
return null;
}

final List<Message> messages = new ArrayList<>(events.size());
for (Map<String, Object> event : events) {
final Message message = parseEvent(event);
if (message != null) {
messages.add(message);
}
}

return messages;
return parseEvent(event);
}

@Nullable
Expand Down
71 changes: 34 additions & 37 deletions src/main/java/org/graylog/plugins/beats/BeatsFrameDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.graylog.plugins.beats;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import org.jboss.netty.buffer.ChannelBuffer;
Expand All @@ -30,18 +30,15 @@

import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.InflaterInputStream;

import static java.util.Objects.requireNonNull;

/**
* FrameDecoder for the Beats/Lumberjack protocol.
*
Expand All @@ -57,27 +54,25 @@ public class BeatsFrameDecoder extends FrameDecoder {
private static final byte FRAME_JSON = 'J';
private static final byte FRAME_WINDOW_SIZE = 'W';

private final ObjectMapper objectMapper;
private long windowSize;
private long sequenceNum;

public BeatsFrameDecoder(ObjectMapper objectMapper) {
this.objectMapper = requireNonNull(objectMapper);
public BeatsFrameDecoder() {
super(true);
}

@Override
protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer) throws Exception {
final List<Map<String, Object>> events = processBuffer(channel, channelBuffer);

final ChannelBuffer[] events = processBuffer(channel, channelBuffer);
if (events == null) {
return null;
} else {
return ChannelBuffers.copiedBuffer(objectMapper.writeValueAsBytes(events));
return events;
}
}

@Nullable
private List<Map<String, Object>> processBuffer(Channel channel, ChannelBuffer channelBuffer) throws IOException {
private ChannelBuffer[] processBuffer(Channel channel, ChannelBuffer channelBuffer) throws IOException {
channelBuffer.markReaderIndex();
@SuppressWarnings("unused")
byte version = channelBuffer.readByte();
Expand All @@ -86,20 +81,20 @@ private List<Map<String, Object>> processBuffer(Channel channel, ChannelBuffer c
}
byte frameType = channelBuffer.readByte();

List<Map<String, Object>> events = null;
ChannelBuffer[] events = null;
switch (frameType) {
case FRAME_WINDOW_SIZE:
processWindowSizeFrame(channelBuffer);
break;
case FRAME_DATA:
events = Collections.singletonList(parseDataFrame(channelBuffer));
events = new ChannelBuffer[]{parseDataFrame(channelBuffer)};
sendACK(channel);
break;
case FRAME_COMPRESSED:
events = processCompressedFrame(channel, channelBuffer);
break;
case FRAME_JSON:
events = Collections.singletonList(parseJsonFrame(channelBuffer));
events = new ChannelBuffer[]{parseJsonFrame(channelBuffer)};
sendACK(channel);
break;
default:
Expand All @@ -108,6 +103,7 @@ private List<Map<String, Object>> processBuffer(Channel channel, ChannelBuffer c
}
break;
}

return events;
}

Expand All @@ -129,21 +125,18 @@ private void sendACK(Channel channel) throws IOException {
/**
* <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type">'json' frame type</a>
*/
private Map<String, Object> parseJsonFrame(ChannelBuffer channelBuffer) throws IOException {
private ChannelBuffer parseJsonFrame(ChannelBuffer channelBuffer) throws IOException {
sequenceNum = channelBuffer.readUnsignedInt();
LOG.trace("Received sequence number {}", sequenceNum);

final int jsonLength = Ints.saturatedCast(channelBuffer.readUnsignedInt());
final byte[] data = new byte[jsonLength];
channelBuffer.readBytes(data);
return objectMapper.readValue(data, new TypeReference<Map<String, Object>>() {
});
return channelBuffer.readSlice(jsonLength);
}

/**
* @see <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type">'compressed' frame type</a>
*/
private List<Map<String, Object>> processCompressedFrame(Channel channel, ChannelBuffer channelBuffer) throws IOException {
private ChannelBuffer[] processCompressedFrame(Channel channel, ChannelBuffer channelBuffer) throws IOException {
if (channelBuffer.readableBytes() >= 4) {
final long payloadLength = channelBuffer.readUnsignedInt();
if (channelBuffer.readableBytes() < payloadLength) {
Expand All @@ -163,15 +156,15 @@ private List<Map<String, Object>> processCompressedFrame(Channel channel, Channe
return null;
}

private List<Map<String, Object>> processCompressedDataFrames(Channel channel, ChannelBuffer channelBuffer) throws IOException {
final List<Map<String, Object>> events = new ArrayList<>();
private ChannelBuffer[] processCompressedDataFrames(Channel channel, ChannelBuffer channelBuffer) throws IOException {
final List<ChannelBuffer> events = new ArrayList<>();
while (channelBuffer.readable()) {
final List<Map<String, Object>> buffer = processBuffer(channel, channelBuffer);
if (buffer != null) {
events.addAll(buffer);
final ChannelBuffer[] buffers = processBuffer(channel, channelBuffer);
if (buffers != null) {
Collections.addAll(events, buffers);
}
}
return events;
return events.toArray(new ChannelBuffer[events.size()]);
}

/**
Expand All @@ -189,19 +182,23 @@ private void processWindowSizeFrame(ChannelBuffer channelBuffer) {
/**
* @see <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type">'data' frame type</a>
*/
private Map<String, Object> parseDataFrame(ChannelBuffer channelBuffer) {
private ChannelBuffer parseDataFrame(ChannelBuffer channelBuffer) throws IOException {
sequenceNum = channelBuffer.readUnsignedInt();
LOG.trace("Received sequence number {}", sequenceNum);

int pairs = Ints.saturatedCast(channelBuffer.readUnsignedInt());
final Map<String, Object> data = new HashMap<>(pairs);
for (int i = 0; i < pairs; i++) {
final String key = parseDataItem(channelBuffer);
final String value = parseDataItem(channelBuffer);
data.put(key, value);
final int pairs = Ints.saturatedCast(channelBuffer.readUnsignedInt());
final JsonFactory jsonFactory = new JsonFactory();
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (final JsonGenerator jg = jsonFactory.createGenerator(outputStream)) {
jg.writeStartObject();
for (int i = 0; i < pairs; i++) {
final String key = parseDataItem(channelBuffer);
final String value = parseDataItem(channelBuffer);
jg.writeStringField(key, value);
}
jg.writeEndObject();
}

return data;
return ChannelBuffers.wrappedBuffer(outputStream.toByteArray());
}

private String parseDataItem(ChannelBuffer channelBuffer) {
Expand Down
23 changes: 8 additions & 15 deletions src/main/java/org/graylog/plugins/beats/BeatsTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import org.graylog2.plugin.LocalMetricRegistry;
Expand All @@ -43,30 +42,24 @@
import java.util.concurrent.ThreadFactory;

import static com.codahale.metrics.MetricRegistry.name;
import static java.util.Objects.requireNonNull;

public class BeatsTransport extends AbstractTcpTransport {
private final ObjectMapper objectMapper;

@Inject
public BeatsTransport(@Assisted Configuration configuration,
ThroughputCounter throughputCounter,
LocalMetricRegistry localRegistry,
@Named("bossPool") Executor bossPool,
ConnectionCounter connectionCounter,
ObjectMapper objectMapper) {
this(configuration, throughputCounter, localRegistry, bossPool, executorService("beats-worker", "beats-transport-worker-%d", localRegistry), connectionCounter, objectMapper);
ConnectionCounter connectionCounter) {
this(configuration, throughputCounter, localRegistry, bossPool, executorService("beats-worker", "beats-transport-worker-%d", localRegistry), connectionCounter);
}

private BeatsTransport(Configuration configuration,
ThroughputCounter throughputCounter,
LocalMetricRegistry localRegistry,
Executor bossPool,
Executor workerPool,
ConnectionCounter connectionCounter,
ObjectMapper objectMapper) {
ThroughputCounter throughputCounter,
LocalMetricRegistry localRegistry,
Executor bossPool,
Executor workerPool,
ConnectionCounter connectionCounter) {
super(configuration, throughputCounter, localRegistry, bossPool, workerPool, connectionCounter);
this.objectMapper = requireNonNull(objectMapper);
}

private static Executor executorService(final String executorName, final String threadNameFormat, final MetricRegistry metricRegistry) {
Expand All @@ -81,7 +74,7 @@ private static Executor executorService(final String executorName, final String
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getFinalChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> finalChannelHandlers = super.getFinalChannelHandlers(input);
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlers = new LinkedHashMap<>();
handlers.put("beats", () -> new BeatsFrameDecoder(objectMapper));
handlers.put("beats", BeatsFrameDecoder::new);
handlers.putAll(finalChannelHandlers);

return handlers;
Expand Down
51 changes: 18 additions & 33 deletions src/test/java/org/graylog/plugins/beats/BeatsCodecTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

import java.util.Collection;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -52,20 +51,17 @@ public void setUp() throws Exception {
codec = new BeatsCodec(configuration, objectMapper);
}

@Test(expected = UnsupportedOperationException.class)
public void decodeThrowsUnsupportedOperationException() throws Exception {
codec.decode(new RawMessage(new byte[0]));
@Test
public void decodeReturnsNullIfPayloadCouldNotBeDecoded() throws Exception {
assertThat(codec.decode(new RawMessage(new byte[0]))).isNull();
}

@Test
public void decodeMessagesHandlesFilebeatMessages() throws Exception {
final byte[] json = Resources.toByteArray(Resources.getResource("BeatsCodecTest/filebeat.json"));
final byte[] json = Resources.toByteArray(Resources.getResource("filebeat.json"));
final RawMessage rawMessage = new RawMessage(json);
final Collection<Message> messages = codec.decodeMessages(rawMessage);
assertThat(messages)
.isNotNull()
.hasSize(1);
final Message message = messages.iterator().next();
final Message message = codec.decode(rawMessage);
assertThat(message).isNotNull();
assertThat(message.getMessage()).isEqualTo("TEST");
assertThat(message.getSource()).isEqualTo("example.local");
assertThat(message.getTimestamp()).isEqualTo(new DateTime(2016, 4, 1, 0, 0, DateTimeZone.UTC));
Expand All @@ -79,13 +75,10 @@ public void decodeMessagesHandlesFilebeatMessages() throws Exception {

@Test
public void decodeMessagesHandlesPacketbeatMessages() throws Exception {
final byte[] json = Resources.toByteArray(Resources.getResource("BeatsCodecTest/packetbeat-dns.json"));
final byte[] json = Resources.toByteArray(Resources.getResource("packetbeat-dns.json"));
final RawMessage rawMessage = new RawMessage(json);
final Collection<Message> messages = codec.decodeMessages(rawMessage);
assertThat(messages)
.isNotNull()
.hasSize(1);
final Message message = messages.iterator().next();
final Message message = codec.decode(rawMessage);
assertThat(message).isNotNull();
assertThat(message.getSource()).isEqualTo("example.local");
assertThat(message.getTimestamp()).isEqualTo(new DateTime(2016, 4, 1, 0, 0, DateTimeZone.UTC));
assertThat(message.getField("facility")).isEqualTo("packetbeat");
Expand All @@ -94,13 +87,10 @@ public void decodeMessagesHandlesPacketbeatMessages() throws Exception {

@Test
public void decodeMessagesHandlesTopbeatMessages() throws Exception {
final byte[] json = Resources.toByteArray(Resources.getResource("BeatsCodecTest/topbeat-system.json"));
final byte[] json = Resources.toByteArray(Resources.getResource("topbeat-system.json"));
final RawMessage rawMessage = new RawMessage(json);
final Collection<Message> messages = codec.decodeMessages(rawMessage);
assertThat(messages)
.isNotNull()
.hasSize(1);
final Message message = messages.iterator().next();
final Message message = codec.decode(rawMessage);
assertThat(message).isNotNull();
assertThat(message.getSource()).isEqualTo("example.local");
assertThat(message.getTimestamp()).isEqualTo(new DateTime(2016, 4, 1, 0, 0, DateTimeZone.UTC));
assertThat(message.getField("facility")).isEqualTo("topbeat");
Expand All @@ -110,23 +100,18 @@ public void decodeMessagesHandlesTopbeatMessages() throws Exception {
@Test
@Ignore("Ignored until JSON payload is added to test assets")
public void decodeMessagesHandlesWinlogbeatMessages() throws Exception {
final byte[] json = Resources.toByteArray(Resources.getResource("BeatsCodecTest/winlogbeat.json"));
final byte[] json = Resources.toByteArray(Resources.getResource("winlogbeat.json"));
final RawMessage rawMessage = new RawMessage(json);
final Collection<Message> messages = codec.decodeMessages(rawMessage);
assertThat(messages)
.isNotNull()
.hasSize(1);
final Message message = codec.decode(rawMessage);
assertThat(message).isNotNull();
}

@Test
public void decodeMessagesHandleGenericBeatMessages() throws Exception {
final byte[] json = Resources.toByteArray(Resources.getResource("BeatsCodecTest/generic.json"));
final byte[] json = Resources.toByteArray(Resources.getResource("generic.json"));
final RawMessage rawMessage = new RawMessage(json);
final Collection<Message> messages = codec.decodeMessages(rawMessage);
assertThat(messages)
.isNotNull()
.hasSize(1);
final Message message = messages.iterator().next();
final Message message = codec.decode(rawMessage);
assertThat(message).isNotNull();
assertThat(message.getSource()).isEqualTo("unknown");
assertThat(message.getTimestamp()).isEqualTo(new DateTime(2016, 4, 1, 0, 0, DateTimeZone.UTC));
assertThat(message.getField("facility")).isEqualTo("genericbeat");
Expand Down
Loading

0 comments on commit 6b35d65

Please sign in to comment.