Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,18 @@
import com.github.palindromicity.syslog.StructuredDataPolicy;
import com.github.palindromicity.syslog.SyslogParserBuilder;
import org.apache.nifi.syslog.events.Syslog5424Event;
import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider;
import org.apache.nifi.syslog.utils.NifiStructuredDataPolicy;
import org.apache.nifi.syslog.utils.NilHandlingPolicy;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/**
* Parses a Syslog message from a ByteBuffer into a Syslog5424Event instance.
* For 5424 we use simple-syslog-5424 since it parsers out structured data.
*/
public class StrictSyslog5424Parser {
private Charset charset;
private com.github.palindromicity.syslog.SyslogParser parser;

public StrictSyslog5424Parser() {
this(StandardCharsets.UTF_8, NilHandlingPolicy.NULL, NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
}
private final com.github.palindromicity.syslog.SyslogParser parser;

public StrictSyslog5424Parser(final Charset charset, final NilHandlingPolicy nilPolicy,
NifiStructuredDataPolicy structuredDataPolicy, KeyProvider keyProvider) {
this.charset = charset;
public StrictSyslog5424Parser(final NilHandlingPolicy nilPolicy,
final NifiStructuredDataPolicy structuredDataPolicy, final KeyProvider keyProvider) {
parser = new SyslogParserBuilder()
.withNilPolicy(NilPolicy.valueOf(nilPolicy.name()))
.withStructuredDataPolicy(StructuredDataPolicy.valueOf(structuredDataPolicy.name()))
Expand All @@ -52,54 +41,17 @@ public StrictSyslog5424Parser(final Charset charset, final NilHandlingPolicy nil
}

/**
* Parses a Syslog5424Event from a {@code ByteBuffer}.
*
* @param buffer a {@code ByteBuffer} containing a syslog message
* @return a Syslog5424Event parsed from the {@code {@code byte array}}
*/
public Syslog5424Event parseEvent(final ByteBuffer buffer) {
return parseEvent(buffer, null);
}

/**
* Parses a Syslog5424Event from a {@code ByteBuffer}.
*
* @param buffer a {@code ByteBuffer} containing a syslog message
* @param sender the hostname of the syslog server that sent the message
* @return a Syslog5424Event parsed from the {@code byte array}
*/
public Syslog5424Event parseEvent(final ByteBuffer buffer, final String sender) {
if (buffer == null) {
return null;
}
return parseEvent(bufferToBytes(buffer), sender);
}

/**
* Parses a Syslog5424Event from a {@code byte array}.
* Parses a Syslog5424Event from a String
*
* @param bytes a {@code byte array} containing a syslog message
* @param sender the hostname of the syslog server that sent the message
* @return a Syslog5424Event parsed from the {@code byte array}
* @param line a {@code String} containing a syslog message
* @return a Syslog5424Event parsed from the input line
*/
public Syslog5424Event parseEvent(final byte[] bytes, final String sender) {
if (bytes == null || bytes.length == 0) {
return null;
}

// remove trailing new line before parsing
int length = bytes.length;
if (bytes[length - 1] == '\n') {
length = length - 1;
}

final String message = new String(bytes, 0, length, charset);

public Syslog5424Event parseEvent(final String line) {
final Syslog5424Event.Builder builder = new Syslog5424Event.Builder()
.valid(false).fullMessage(message).rawMessage(bytes).sender(sender);
.valid(false).fullMessage(line);

try {
parser.parseLine(message, builder::fieldMap);
parser.parseLine(line, builder::fieldMap);
builder.valid(true);
} catch (Exception e) {
// this is not a valid 5424 message
Expand All @@ -110,21 +62,4 @@ public Syslog5424Event parseEvent(final byte[] bytes, final String sender) {
// either invalid w/original msg, or fully parsed event
return builder.build();
}

public String getCharsetName() {
return charset == null ? StandardCharsets.UTF_8.name() : charset.name();
}


private byte[] bufferToBytes(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
if (buffer.position() != 0) {
buffer.flip();
}
byte bytes[] = new byte[buffer.limit()];
buffer.get(bytes, 0, buffer.limit());
return bytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -42,27 +39,13 @@

public abstract class BaseStrictSyslog5424ParserTest {

private static final Charset CHARSET = Charset.forName("UTF-8");
private static final String NIL_VALUE = "-";
private StrictSyslog5424Parser parser;

protected abstract NilHandlingPolicy getPolicy();

protected void validateForPolicy(String expected, Object actual) {
switch (getPolicy()) {
case DASH:
assertEquals(actual, NIL_VALUE);
break;
case OMIT:
case NULL:
assertNull(actual);

}
}

@BeforeEach
public void setup() {
parser = new StrictSyslog5424Parser(CHARSET, getPolicy(), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
parser = new StrictSyslog5424Parser(getPolicy(), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
}

@Test
Expand All @@ -74,18 +57,12 @@ public void testRFC5424WithVersion() {
final String appName = "su";
final String procId = "-";
final String msgId = "ID17";
final String structuredData = "-";
final String body = "'su root' failed for lonvick on /dev/pts/8";

final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " "
+ appName + " " + procId + " " + msgId + " " + "-" + " " + body;

final byte[] bytes = message.getBytes(CHARSET);
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.clear();
buffer.put(bytes);

final Syslog5424Event event = parser.parseEvent(buffer);
final Syslog5424Event event = parser.parseEvent(message);
assertNotNull(event);
assertTrue(event.isValid());
assertFalse(event.getFieldMap().isEmpty());
Expand All @@ -97,7 +74,6 @@ public void testRFC5424WithVersion() {
assertEquals(stamp, fieldMap.get(SyslogAttributes.SYSLOG_TIMESTAMP.key()));
assertEquals(host, fieldMap.get(SyslogAttributes.SYSLOG_HOSTNAME.key()));
assertEquals(appName, fieldMap.get(Syslog5424Attributes.SYSLOG_APP_NAME.key()));
validateForPolicy(procId, fieldMap.get(Syslog5424Attributes.SYSLOG_PROCID.key()));
assertEquals(msgId, fieldMap.get(Syslog5424Attributes.SYSLOG_MESSAGEID.key()));

Pattern structuredPattern = new SyslogPrefixedKeyProvider().getStructuredElementIdParamNamePattern();
Expand All @@ -121,18 +97,12 @@ public void testRFC5424WithoutVersion() {
final String appName = "su";
final String procId = "-";
final String msgId = "ID17";
final String structuredData = "-";
final String body = "'su root' failed for lonvick on /dev/pts/8";

final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " "
+ appName + " " + procId + " " + msgId + " " + "-" + " " + body;

final byte[] bytes = message.getBytes(CHARSET);
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.clear();
buffer.put(bytes);

final Syslog5424Event event = parser.parseEvent(buffer);
final Syslog5424Event event = parser.parseEvent(message);
assertFalse(event.isValid());
}

Expand All @@ -141,12 +111,7 @@ public void testTrailingNewLine() {
final String message = "<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " +
"ID47 - 'su root' failed for lonvick on /dev/pts/8\n";

final byte[] bytes = message.getBytes(CHARSET);
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.clear();
buffer.put(bytes);

final Syslog5424Event event = parser.parseEvent(buffer);
final Syslog5424Event event = parser.parseEvent(message);
assertNotNull(event);
assertTrue(event.isValid());
}
Expand All @@ -166,12 +131,7 @@ public void testVariety() {
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance");

for (final String message : messages) {
final byte[] bytes = message.getBytes(CHARSET);
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.clear();
buffer.put(bytes);

final Syslog5424Event event = parser.parseEvent(buffer);
final Syslog5424Event event = parser.parseEvent(message);
assertTrue(event.isValid());
}
}
Expand All @@ -190,67 +150,30 @@ public void testMessagePartNotRequired() {
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"]");

for (final String message : messages) {
final byte[] bytes = message.getBytes(CHARSET);
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.clear();
buffer.put(bytes);

final Syslog5424Event event = parser.parseEvent(buffer);
final Syslog5424Event event = parser.parseEvent(message);
assertTrue(event.isValid());
assertNull(event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key()));
}


}

@Test
public void testInvalidPriority() {
final String message = "10 Oct 13 14:14:43 localhost some body of the message";

final byte[] bytes = message.getBytes(CHARSET);
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.clear();
buffer.put(bytes);

final Syslog5424Event event = parser.parseEvent(buffer);
final Syslog5424Event event = parser.parseEvent(message);
assertNotNull(event);
assertFalse(event.isValid());
assertEquals(message, event.getFullMessage());
}

@Test
public void testParseWithSender() {
final String sender = "127.0.0.1";
final String message = "<14>1 2014-06-20T09:14:07+00:00 loggregator"
+ " d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01"
+ " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]"
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance";

final byte[] bytes = message.getBytes(CHARSET);
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.clear();
buffer.put(bytes);

final Syslog5424Event event = parser.parseEvent(buffer, sender);
assertNotNull(event);
assertTrue(event.isValid());
assertEquals(sender, event.getSender());
assertEquals("Removing instance", event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key()));
}

@Test
public void testParseWithBOM() {
final String message = "<14>1 2014-06-20T09:14:07+00:00 loggregator"
+ " d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01"
+ " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]"
+ "[exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] \uFEFFMessage with some Umlauts äöü";

final byte[] bytes = message.getBytes(CHARSET);
final ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
((Buffer)buffer).clear();
buffer.put(bytes);

final Syslog5424Event event = parser.parseEvent(buffer);
final Syslog5424Event event = parser.parseEvent(message);
assertNotNull(event);
assertTrue(event.isValid());
assertEquals("Message with some Umlauts äöü", event.getFieldMap().get(SyslogAttributes.SYSLOG_BODY.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.syslog.keyproviders.SyslogPrefixedKeyProvider;
Expand All @@ -46,8 +45,6 @@
import org.apache.nifi.syslog.events.Syslog5424Event;
import org.apache.nifi.syslog.attributes.SyslogAttributes;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -56,7 +53,6 @@
import java.util.Map;
import java.util.Set;


@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
Expand Down Expand Up @@ -128,6 +124,7 @@ public class ParseSyslog5424 extends AbstractProcessor {

private volatile StrictSyslog5424Parser parser;

private volatile Charset charset;

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
Expand All @@ -148,11 +145,9 @@ public Set<Relationship> getRelationships() {

@OnScheduled
public void onScheduled(final ProcessContext context) {
final String charsetName = context.getProperty(CHARSET).getValue();
charset = Charset.forName(context.getProperty(CHARSET).getValue());
final String nilPolicyString = context.getProperty(NIL_POLICY).getValue();
parser = new StrictSyslog5424Parser(Charset.forName(charsetName),
NilHandlingPolicy.valueOf(nilPolicyString),
NifiStructuredDataPolicy.FLATTEN,new SyslogPrefixedKeyProvider());
parser = new StrictSyslog5424Parser(NilHandlingPolicy.valueOf(nilPolicyString), NifiStructuredDataPolicy.FLATTEN, new SyslogPrefixedKeyProvider());
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
Expand All @@ -168,24 +163,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, buffer);
}
});
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
final String line = new String(buffer, charset).trim();

final Syslog5424Event syslogEvent;
try {
syslogEvent = parser.parseEvent(buffer, null);
syslogEvent = parser.parseEvent(line);
} catch (final ProcessException pe) {
getLogger().error("Failed to parse {} as a Syslog 5424 message due to {}; routing to failure", new Object[] {flowFile, pe});
getLogger().error("Failed to parse {} as a Syslog 5424 message; routing to failure", flowFile, pe);
session.transfer(flowFile, REL_FAILURE);
return;
}

if (syslogEvent == null || !syslogEvent.isValid()) {
getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", new Object[] {flowFile});
getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
Expand All @@ -197,7 +188,6 @@ public void process(final InputStream in) throws IOException {
session.transfer(flowFile, REL_SUCCESS);
}


private static Map<String,String> convertMap(Map<String, Object> map) {
Map<String,String> returnMap = new HashMap<>();
map.forEach((key,value) -> returnMap.put(key,(String)value));
Expand Down
Loading