-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a Syslog octet counting frame decoder. (RFC6587)
See: http://tools.ietf.org/html/rfc6587#section-3.4.1 Autodetect framing type to transparently support both methods on the same input.
- Loading branch information
Showing
7 changed files
with
418 additions
and
3 deletions.
There are no files selected for viewing
81 changes: 81 additions & 0 deletions
81
...og2-inputs/src/main/java/org/graylog2/inputs/syslog/tcp/SyslogOctetCountFrameDecoder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
/** | ||
* This file is part of Graylog2. | ||
* | ||
* Graylog2 is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* Graylog2 is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
package org.graylog2.inputs.syslog.tcp; | ||
|
||
import com.google.common.base.Charsets; | ||
import org.jboss.netty.buffer.ChannelBuffer; | ||
import org.jboss.netty.channel.Channel; | ||
import org.jboss.netty.channel.ChannelHandlerContext; | ||
import org.jboss.netty.handler.codec.frame.FrameDecoder; | ||
|
||
/** | ||
* Implements a Netty {@link FrameDecoder} for the Syslog octet counting framing. (RFC6587) | ||
* | ||
* @see <a href="http://tools.ietf.org/html/rfc6587#section-3.4.1">RFC6587 Octet Counting</a> | ||
*/ | ||
public class SyslogOctetCountFrameDecoder extends FrameDecoder { | ||
@Override | ||
protected Object decode(final ChannelHandlerContext ctx, | ||
final Channel channel, | ||
final ChannelBuffer buffer) throws Exception { | ||
final int frameSizeValueLength = findFrameSizeValueLength(buffer); | ||
|
||
// We have not found the frame length value byte size yet. | ||
if (frameSizeValueLength <= 0) { | ||
return null; | ||
} | ||
|
||
// Convert the frame length value bytes into an integer without mutating the buffer reader index. | ||
final String lengthString = buffer.slice(buffer.readerIndex(), frameSizeValueLength).toString(Charsets.UTF_8); | ||
final int length = Integer.parseInt(lengthString); | ||
|
||
if (buffer.readableBytes() < length) { | ||
// We cannot read the complete frame yet. | ||
return null; | ||
} else { | ||
// Skip the frame length value bytes and the whitespace that follows it. | ||
buffer.skipBytes(frameSizeValueLength + 1); | ||
} | ||
|
||
final ChannelBuffer frame = extractFrame(buffer, buffer.readerIndex(), length); | ||
|
||
// Advance the reader index because extractFrame() does not do that. | ||
buffer.skipBytes(length); | ||
|
||
return frame; | ||
} | ||
|
||
/** | ||
* Find the byte length of the frame length value. | ||
* | ||
* @param buffer The channel buffer | ||
* @return The length of the frame length value | ||
*/ | ||
private int findFrameSizeValueLength(final ChannelBuffer buffer) { | ||
final int n = buffer.writerIndex(); | ||
|
||
for (int i = buffer.readerIndex(); i < n; i ++) { | ||
final byte b = buffer.getByte(i); | ||
|
||
if (b == ' ') { | ||
return i - buffer.readerIndex(); | ||
} | ||
} | ||
|
||
return -1; // Not found. | ||
} | ||
} |
61 changes: 61 additions & 0 deletions
61
...g2-inputs/src/main/java/org/graylog2/inputs/syslog/tcp/SyslogTCPFramingRouterHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/** | ||
* This file is part of Graylog2. | ||
* | ||
* Graylog2 is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* Graylog2 is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
package org.graylog2.inputs.syslog.tcp; | ||
|
||
import org.jboss.netty.buffer.ChannelBuffer; | ||
import org.jboss.netty.channel.ChannelHandlerContext; | ||
import org.jboss.netty.channel.MessageEvent; | ||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler; | ||
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; | ||
|
||
public class SyslogTCPFramingRouterHandler extends SimpleChannelUpstreamHandler { | ||
private final int maxFrameLength; | ||
private final ChannelBuffer[] delimiter; | ||
private boolean routed = false; | ||
|
||
public SyslogTCPFramingRouterHandler(int maxFrameLength, ChannelBuffer[] delimiter) { | ||
this.maxFrameLength = maxFrameLength; | ||
this.delimiter = delimiter; | ||
} | ||
|
||
@Override | ||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { | ||
final ChannelBuffer message = (ChannelBuffer) e.getMessage(); | ||
|
||
if (! message.readable()) { | ||
return; | ||
} | ||
|
||
if (! routed) { | ||
if (usesOctetCountFraming(message)) { | ||
ctx.getPipeline().addAfter(ctx.getName(), "framer-octet", new SyslogOctetCountFrameDecoder()); | ||
} else { | ||
ctx.getPipeline().addAfter(ctx.getName(), "framer-delimiter", new DelimiterBasedFrameDecoder(maxFrameLength, delimiter)); | ||
} | ||
|
||
routed = true; | ||
} | ||
|
||
ctx.sendUpstream(e); | ||
} | ||
|
||
private boolean usesOctetCountFraming(ChannelBuffer message) { | ||
// Octet counting framing needs to start with a non-zero digit. | ||
// See: http://tools.ietf.org/html/rfc6587#section-3.4.1 | ||
return '0' < message.getByte(0) && message.getByte(0) <= '9'; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
76 changes: 76 additions & 0 deletions
76
graylog2-inputs/src/main/java/org/graylog2/inputs/transports/SyslogTcpTransport.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/** | ||
* This file is part of Graylog2. | ||
* | ||
* Graylog2 is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* Graylog2 is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
package org.graylog2.inputs.transports; | ||
|
||
import com.google.common.collect.Maps; | ||
import com.google.inject.assistedinject.Assisted; | ||
import com.google.inject.assistedinject.AssistedInject; | ||
import org.graylog2.inputs.syslog.tcp.SyslogTCPFramingRouterHandler; | ||
import org.graylog2.plugin.ConfigClass; | ||
import org.graylog2.plugin.FactoryClass; | ||
import org.graylog2.plugin.LocalMetricRegistry; | ||
import org.graylog2.plugin.configuration.Configuration; | ||
import org.graylog2.plugin.inputs.MessageInput; | ||
import org.graylog2.plugin.inputs.transports.Transport; | ||
import org.graylog2.plugin.inputs.util.ConnectionCounter; | ||
import org.graylog2.plugin.inputs.util.ThroughputCounter; | ||
import org.jboss.netty.channel.ChannelHandler; | ||
|
||
import javax.inject.Named; | ||
import javax.inject.Provider; | ||
import java.util.LinkedHashMap; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.Executor; | ||
|
||
public class SyslogTcpTransport extends TcpTransport { | ||
@AssistedInject | ||
public SyslogTcpTransport(@Assisted Configuration configuration, | ||
@Named("bossPool") Executor bossPool, | ||
@Named("cached") Provider<Executor> workerPoolProvider, | ||
ThroughputCounter throughputCounter, | ||
ConnectionCounter connectionCounter, | ||
LocalMetricRegistry localRegistry) { | ||
super(configuration, bossPool, workerPoolProvider, throughputCounter, connectionCounter, localRegistry); | ||
} | ||
|
||
@Override | ||
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getFinalChannelHandlers(MessageInput input) { | ||
final LinkedHashMap<String, Callable<? extends ChannelHandler>> finalChannelHandlers = Maps.newLinkedHashMap(); | ||
|
||
finalChannelHandlers.putAll(super.getFinalChannelHandlers(input)); | ||
|
||
// Replace the "framer" channel handler inserted by the parent. | ||
finalChannelHandlers.put("framer", new Callable<ChannelHandler>() { | ||
@Override | ||
public ChannelHandler call() throws Exception { | ||
return new SyslogTCPFramingRouterHandler(maxFrameLength, delimiter); | ||
} | ||
}); | ||
|
||
return finalChannelHandlers; | ||
} | ||
|
||
@FactoryClass | ||
public interface Factory extends Transport.Factory<SyslogTcpTransport> { | ||
SyslogTcpTransport create(Configuration configuration); | ||
} | ||
|
||
@ConfigClass | ||
public static class Config extends TcpTransport.Config { | ||
|
||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
90 changes: 90 additions & 0 deletions
90
...inputs/src/test/java/org/graylog2/inputs/syslog/tcp/SyslogOctetCountFrameDecoderTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
/** | ||
* This file is part of Graylog2. | ||
* | ||
* Graylog2 is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* Graylog2 is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
package org.graylog2.inputs.syslog.tcp; | ||
|
||
import com.google.common.base.Charsets; | ||
import org.jboss.netty.buffer.ChannelBuffer; | ||
import org.jboss.netty.buffer.ChannelBuffers; | ||
import org.jboss.netty.handler.codec.embedder.CodecEmbedderException; | ||
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder; | ||
import org.testng.annotations.BeforeMethod; | ||
import org.testng.annotations.Test; | ||
|
||
import static org.testng.Assert.assertEquals; | ||
import static org.testng.Assert.assertFalse; | ||
import static org.testng.Assert.assertNull; | ||
import static org.testng.Assert.assertTrue; | ||
|
||
public class SyslogOctetCountFrameDecoderTest { | ||
private DecoderEmbedder<ChannelBuffer> embedder; | ||
|
||
@BeforeMethod | ||
public void setUp() throws Exception { | ||
embedder = new DecoderEmbedder<ChannelBuffer>(new SyslogOctetCountFrameDecoder()); | ||
} | ||
|
||
@Test | ||
public void testDecode() throws Exception { | ||
final ChannelBuffer buf1 = ChannelBuffers.copiedBuffer("123 <45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"1\"] syslog-ng starting up; version='3.5.3'\n", Charsets.UTF_8); | ||
final ChannelBuffer buf2 = ChannelBuffers.copiedBuffer("186 <45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"2\"] Syslog connection established; fd='9', server='AF_INET(172.17.42.1:6666)', local='AF_INET(0.0.0.0:0)'\n", Charsets.UTF_8); | ||
final ChannelBuffer buf3 = ChannelBuffers.wrappedBuffer(buf1, buf2, buf1); | ||
|
||
assertTrue(embedder.offer(buf1)); | ||
assertTrue(embedder.offer(buf2)); | ||
assertTrue(embedder.offer(buf3)); | ||
|
||
assertEquals(embedder.poll().toString(Charsets.UTF_8), "<45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"1\"] syslog-ng starting up; version='3.5.3'\n"); | ||
assertEquals(embedder.poll().toString(Charsets.UTF_8), "<45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"2\"] Syslog connection established; fd='9', server='AF_INET(172.17.42.1:6666)', local='AF_INET(0.0.0.0:0)'\n"); | ||
|
||
assertEquals(embedder.poll().toString(Charsets.UTF_8), "<45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"1\"] syslog-ng starting up; version='3.5.3'\n"); | ||
assertEquals(embedder.poll().toString(Charsets.UTF_8), "<45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"2\"] Syslog connection established; fd='9', server='AF_INET(172.17.42.1:6666)', local='AF_INET(0.0.0.0:0)'\n"); | ||
assertEquals(embedder.poll().toString(Charsets.UTF_8), "<45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"1\"] syslog-ng starting up; version='3.5.3'\n"); | ||
|
||
assertNull(embedder.poll()); | ||
} | ||
|
||
@Test | ||
public void testIncompleteFrameLengthValue() throws Exception { | ||
final ChannelBuffer buf1 = ChannelBuffers.copiedBuffer("12", Charsets.UTF_8); | ||
final ChannelBuffer buf2 = ChannelBuffers.copiedBuffer("3 <45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"1\"] syslog-ng starting up; version='3.5.3'\n", Charsets.UTF_8); | ||
|
||
assertFalse(embedder.offer(buf1)); | ||
assertNull(embedder.poll()); | ||
|
||
assertTrue(embedder.offer(buf2)); | ||
assertEquals(embedder.poll().toString(Charsets.UTF_8), "<45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"1\"] syslog-ng starting up; version='3.5.3'\n"); | ||
} | ||
|
||
@Test | ||
public void testIncompleteFrames() throws Exception { | ||
final ChannelBuffer buf1 = ChannelBuffers.copiedBuffer("123 <45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - ", Charsets.UTF_8); | ||
final ChannelBuffer buf2 = ChannelBuffers.copiedBuffer("[meta sequenceId=\"1\"] syslog-ng starting up; version='3.5.3'\n", Charsets.UTF_8); | ||
|
||
assertFalse(embedder.offer(buf1)); | ||
assertNull(embedder.poll()); | ||
|
||
assertTrue(embedder.offer(buf2)); | ||
assertEquals(embedder.poll().toString(Charsets.UTF_8), "<45>1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - [meta sequenceId=\"1\"] syslog-ng starting up; version='3.5.3'\n"); | ||
} | ||
|
||
@Test(expectedExceptions = CodecEmbedderException.class) | ||
public void testBrokenFrames() throws Exception { | ||
final ChannelBuffer buf1 = ChannelBuffers.copiedBuffer("1 2014-10-21T10:21:09+00:00 c4dc57ba1ebb syslog-ng 7120 - ", Charsets.UTF_8); | ||
|
||
embedder.offer(buf1); | ||
} | ||
} |
Oops, something went wrong.
6a77e02
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to mention. This was squashed from #743.