From 145582a9d3d7b6a00d14039bbb9e1f8d1a9ba5a1 Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Tue, 19 Jun 2018 18:22:26 -0400 Subject: [PATCH 1/4] NIFI-5325 A Syslog Parser that fully supports RFC 5424 Structured Data --- .../src/main/resources/META-INF/NOTICE | 8 + .../nifi-standard-processors/pom.xml | 4 + .../processors/standard/ParseSyslog5424.java | 174 +++++++++++++++ .../syslog/StrictSyslog5424Parser.java | 210 ++++++++++++++++++ .../standard/syslog/Syslog5424Attributes.java | 43 ++++ .../standard/syslog/Syslog5424Event.java | 103 +++++++++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestParseSyslog5424.java | 62 ++++++ .../util/BaseStrictSyslog5424ParserTest.java | 205 +++++++++++++++++ .../StrictSyslog5424ParserDashPolicyTest.java | 25 +++ .../StrictSyslog5424ParserNullPolicyTest.java | 25 +++ .../StrictSyslog5424ParserOmitPolicyTest.java | 25 +++ nifi-nar-bundles/nifi-standard-bundle/pom.xml | 5 + 13 files changed, 890 insertions(+) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/StrictSyslog5424Parser.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/Syslog5424Attributes.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/Syslog5424Event.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog5424.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/BaseStrictSyslog5424ParserTest.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserDashPolicyTest.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserNullPolicyTest.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserOmitPolicyTest.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE index d5eb63493eda..32ae600d888d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE @@ -218,6 +218,14 @@ The following binary components are provided under the Apache Software License v Copyright 2006 - 2013 Pentaho Corporation. All rights reserved. Copyright 2000-2005, 2014-2016 Julian Hyde + (ASLv2) simple-syslog-5424 + The following NOTICE information applies: + + simple-syslog-5424 + https://github.com/palindromicity/simple-syslog-5424 + + Copyright 2018 simple-syslog-5424 authors. + ************************ Common Development and Distribution License 1.1 ************************ diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 67c511246f75..9bbdec1b1081 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -327,6 +327,10 @@ 1.7.0-SNAPSHOT test + + com.github.palindromicity + simple-syslog-5424 + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java new file mode 100644 index 000000000000..de800b3d94cf --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.github.palindromicity.syslog.NilPolicy; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.syslog.StrictSyslog5424Parser; +import org.apache.nifi.processors.standard.syslog.Syslog5424Event; +import org.apache.nifi.stream.io.StreamUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"logs", "syslog", "syslog5424", "attributes", "system", "event", "message"}) +@CapabilityDescription("Attempts to parse the contents of a well formed Syslog message in accordance to RFC5424 " + + "format and adds attributes to the FlowFile for each of the parts of the Syslog message, including Structured Data." + + "Structured Data will be written to attributes as on attribute per item id + parameter "+ + "see https://tools.ietf.org/html/rfc5424." + + "Note: ParseSyslog5424 follows the specification more closely than ParseSyslog. If your Syslog producer " + + "does not follow the spec closely, with regards to using '-' for missing header entries for example, those logs " + + "will fail with this parser, where they would not fail with ParseSyslog.") +@WritesAttributes({@WritesAttribute(attribute = "syslog.priority", description = "The priority of the Syslog message."), + @WritesAttribute(attribute = "syslog.severity", description = "The severity of the Syslog message derived from the priority."), + @WritesAttribute(attribute = "syslog.facility", description = "The facility of the Syslog message derived from the priority."), + @WritesAttribute(attribute = "syslog.version", description = "The optional version from the Syslog message."), + @WritesAttribute(attribute = "syslog.timestamp", description = "The timestamp of the Syslog message."), + @WritesAttribute(attribute = "syslog.hostname", description = "The hostname or IP address of the Syslog message."), + @WritesAttribute(attribute = "syslog.appname", description = "The appname of the Syslog message."), + @WritesAttribute(attribute = "syslog.procid", description = "The procid of the Syslog message."), + @WritesAttribute(attribute = "syslog.messageid", description = "The messageid the Syslog message."), + @WritesAttribute(attribute = "syslog.structuredData", description = "Multiple entries per structuredData of the Syslog message."), + @WritesAttribute(attribute = "syslog.sender", description = "The hostname of the Syslog server that sent the message."), + @WritesAttribute(attribute = "syslog.body", description = "The body of the Syslog message, everything after the hostname.")}) +@SeeAlso({ListenSyslog.class, ParseSyslog.class, PutSyslog.class}) +public class ParseSyslog5424 extends AbstractProcessor { + + public static final AllowableValue OMIT = new AllowableValue(NilPolicy.OMIT.name(),NilPolicy.OMIT.name(),"The missing field will not have an attribute added."); + public static final AllowableValue NULL = new AllowableValue(NilPolicy.NULL.name(),NilPolicy.NULL.name(),"The missing field will have an empty attribute added."); + public static final AllowableValue DASH = new AllowableValue(NilPolicy.DASH.name(),NilPolicy.DASH.name(),"The missing field will have an attribute added with the value of '-'."); + + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies which character set of the Syslog messages") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + public static final PropertyDescriptor NIL_POLICY = new PropertyDescriptor.Builder() + .name("nil_policy") + .displayName("NIL Policy") + .description("Defines how NIL values are handled for header fields.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .allowableValues(OMIT,NULL,DASH) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .defaultValue(NULL.getValue()) + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that could not be parsed as a Syslog message will be transferred to this Relationship without any attributes being added") + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully parsed as a Syslog message will be to this Relationship.") + .build(); + + private StrictSyslog5424Parser parser; + + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(2); + properties.add(CHARSET); + properties.add(NIL_POLICY); + return properties; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_FAILURE); + relationships.add(REL_SUCCESS); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String charsetName = context.getProperty(CHARSET).getValue(); + final String nilPolicyString = context.getProperty(NIL_POLICY).getValue(); + + // If the parser already exists and uses the same charset, it does not need to be re-initialized + if (parser == null || !parser.getCharsetName().equals(charsetName)) { + parser = new StrictSyslog5424Parser(Charset.forName(charsetName),NilPolicy.valueOf(nilPolicyString)); + } + + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + final Syslog5424Event event; + try { + event = parser.parseEvent(buffer, null); + } catch (final ProcessException pe) { + getLogger().error("Failed to parse {} as a Syslog message due to {}; routing to failure", new Object[] {flowFile, pe}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + if (event == null || !event.isValid()) { + getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", new Object[] {flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + flowFile = session.putAllAttributes(flowFile, event.getFieldMap()); + session.transfer(flowFile, REL_SUCCESS); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/StrictSyslog5424Parser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/StrictSyslog5424Parser.java new file mode 100644 index 000000000000..0b8e13f1470a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/StrictSyslog5424Parser.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.syslog; + +import com.github.palindromicity.syslog.DefaultKeyProvider; +import com.github.palindromicity.syslog.KeyProvider; +import com.github.palindromicity.syslog.NilPolicy; +import com.github.palindromicity.syslog.SyslogParserBuilder; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * Parses a Syslog message from a ByteBuffer into a SyslogEvent instance. + * + * The Syslog regular expressions below were adapted from the Apache Flume project for RFC3164 logs. + * For 5424 we use simple-syslog-5424 since it parsers out structured data. + */ +public class StrictSyslog5424Parser { + private Charset charset; + private com.github.palindromicity.syslog.SyslogParser parser; + KeyProvider keyProvider = new DefaultKeyProvider(); + + public StrictSyslog5424Parser() { + this(StandardCharsets.UTF_8, NilPolicy.NULL); + } + + public StrictSyslog5424Parser(final Charset charset, final NilPolicy nilPolicy) { + this.charset = charset; + parser = new SyslogParserBuilder() + .withNilPolicy(nilPolicy) + .withKeyProvider(new NifiKeyProvider()) + .build(); + } + + /** + * Parses a Syslog5424Event from a byte buffer. + * + * @param buffer a byte buffer containing a syslog message + * @return a Syslog5424Event parsed from the byte array + */ + public Syslog5424Event parseEvent(final ByteBuffer buffer) { + return parseEvent(buffer, null); + } + + /** + * Parses a Syslog5424Event from a byte buffer. + * + * @param buffer a byte buffer containing a syslog message + * @param sender the hostname of the syslog server that sent the message + * @return a Syslog5424Event parsed from the byte array + */ + public Syslog5424Event parseEvent(final ByteBuffer buffer, final String sender) { + if (buffer == null) { + return null; + } + if (buffer.position() != 0) { + buffer.flip(); + } + byte bytes[] = new byte[buffer.limit()]; + buffer.get(bytes, 0, buffer.limit()); + return parseEvent(bytes, sender); + } + + /** + * Parses a Syslog5424Event from a byte array. + * + * @param bytes a byte array containing a syslog message + * @param sender the hostname of the syslog server that sent the message + * @return a Syslog5424Event parsed from the byte array + */ + public Syslog5424Event parseEvent(final byte[] bytes, final String sender) { + if (bytes == null || bytes.length == 0) { + return null; + } + + // remove trailing new line before parsing + int length = bytes.length; + if (bytes[length - 1] == '\n') { + length = length - 1; + } + + final String message = new String(bytes, 0, length, charset); + + final Syslog5424Event.Builder builder = new Syslog5424Event.Builder() + .valid(false).fullMessage(message).rawMessage(bytes).sender(sender); + + try { + parser.parseLine(message,(map)-> { + builder.fieldMap(convertMap(map)); + }); + builder.valid(true); + return builder.build(); + } catch (Exception e) { + // this is not a valid 5424 message + builder.valid(false); + } + + // either invalid w/original msg, or fully parsed event + return builder.build(); + } + + public String getCharsetName() { + return charset == null ? StandardCharsets.UTF_8.name() : charset.name(); + } + + private static Map convertMap(Map map) { + Map returnMap = new HashMap<>(); + map.forEach((key,value) -> returnMap.put(key,(String)value)); + return returnMap; + } + + public static class NifiKeyProvider implements KeyProvider { + private Pattern pattern; + + public NifiKeyProvider(){} + + @Override + public String getMessage() { + return SyslogAttributes.BODY.key(); + } + + @Override + public String getHeaderAppName() { + return Syslog5424Attributes.APP_NAME.key(); + } + + @Override + public String getHeaderHostName() { + return SyslogAttributes.HOSTNAME.key(); + } + + @Override + public String getHeaderPriority() { + return SyslogAttributes.PRIORITY.key(); + } + + @Override + public String getHeaderFacility() { + return SyslogAttributes.FACILITY.key(); + } + + @Override + public String getHeaderSeverity() { + return SyslogAttributes.SEVERITY.key(); + } + + + @Override + public String getHeaderProcessId() { + return Syslog5424Attributes.PROCID.key(); + } + + @Override + public String getHeaderTimeStamp() { + return SyslogAttributes.TIMESTAMP.key(); + } + + @Override + public String getHeaderMessageId() { + return Syslog5424Attributes.MESSAGEID.key(); + } + + @Override + public String getHeaderVersion() { + return SyslogAttributes.VERSION.key(); + } + + @Override + public String getStructuredBase() { + return Syslog5424Attributes.STRUCTURED_BASE.key(); + } + + @Override + public String getStructuredElementIdFormat() { + return Syslog5424Attributes.STRUCTURED_ELEMENT_ID_FMT.key(); + } + + @Override + public String getStructuredElementIdParamNameFormat() { + return Syslog5424Attributes.STRUCTURED_ELEMENT_ID_PNAME_FMT.key(); + } + + @Override + public Pattern getStructuredElementIdParamNamePattern() { + if (pattern == null) { + pattern = Pattern.compile(Syslog5424Attributes.STRUCTURED_ELEMENT_ID_PNAME_PATTERN.key()); + } + return pattern; + } + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/Syslog5424Attributes.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/Syslog5424Attributes.java new file mode 100644 index 000000000000..284cc840c523 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/Syslog5424Attributes.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.syslog; + +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; + +/** + * FlowFile Attributes for each Syslog message. + */ +public enum Syslog5424Attributes implements FlowFileAttributeKey { + + APP_NAME("syslog.appName"), + PROCID("syslog.procid"), + MESSAGEID("syslog.messageid"), + STRUCTURED_BASE("syslog.structuredData"), + STRUCTURED_ELEMENT_ID_FMT("syslog.structuredData.%s"), + STRUCTURED_ELEMENT_ID_PNAME_FMT("syslog.structuredData.%s.%s"), + STRUCTURED_ELEMENT_ID_PNAME_PATTERN("syslog.structuredData\\.(.*)\\.(.*)$"); + private String key; + + Syslog5424Attributes(String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/Syslog5424Event.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/Syslog5424Event.java new file mode 100644 index 000000000000..89910a9e5922 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/Syslog5424Event.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.syslog; + +import java.util.Map; + +/** + * Encapsulates the parsed information for a single Syslog 5424 event. + */ +public class Syslog5424Event { + private final Map fieldMap; + private final String fullMessage; + private final byte[] rawMessage; + private final String sender; + private final boolean valid; + + private Syslog5424Event(final Builder builder) { + this.fieldMap = builder.fieldMap; + this.fullMessage = builder.fullMessage; + this.rawMessage = builder.rawMessage; + this.sender = builder.sender; + this.valid = builder.valid; + } + + public Map getFieldMap() { + return fieldMap; + } + + public String getFullMessage() { + return fullMessage; + } + + public byte[] getRawMessage() { + return rawMessage; + } + + public String getSender() { + return sender; + } + + public boolean isValid() { + return valid; + } + + public static final class Builder { + private String fullMessage; + private String sender; + private Map fieldMap; + private byte[] rawMessage; + private boolean valid; + + public void reset() { + this.fieldMap = null; + this.sender = null; + this.fullMessage = null; + this.valid = false; + } + + public Builder sender(String sender) { + this.sender = sender; + return this; + } + + public Builder fieldMap(Map fieldMap) { + this.fieldMap = fieldMap; + return this; + } + + public Builder fullMessage(String fullMessage) { + this.fullMessage = fullMessage; + return this; + } + + public Builder rawMessage(byte[] rawMessage) { + this.rawMessage = rawMessage; + return this; + } + + public Builder valid(boolean valid) { + this.valid = valid; + return this; + } + + public Syslog5424Event build() { + return new Syslog5424Event(this); + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 72f0768b15f9..8fc361d62a92 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -75,6 +75,7 @@ org.apache.nifi.processors.standard.MonitorActivity org.apache.nifi.processors.standard.Notify org.apache.nifi.processors.standard.ParseCEF org.apache.nifi.processors.standard.ParseSyslog +org.apache.nifi.processors.standard.ParseSyslog5424 org.apache.nifi.processors.standard.PartitionRecord org.apache.nifi.processors.standard.PostHTTP org.apache.nifi.processors.standard.PutDatabaseRecord diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog5424.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog5424.java new file mode 100644 index 000000000000..a45768c58671 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog5424.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import com.github.palindromicity.syslog.NilPolicy; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestParseSyslog5424 { + private static final String SYSLOG_LINE_ALL = "<14>1 2014-06-20T09:14:07+00:00 loggregator" + + " d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01" + + " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]" + + " [exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance"; + private static final String SYSLOG_LINE_NILS= "<14>1 2014-06-20T09:14:07+00:00 -" + + " d0602076-b14a-4c55-852a-981e7afeed38 - -" + + " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]" + + " [exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance"; + + @Test + public void testValidMessage() { + final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog5424()); + runner.setProperty(ParseSyslog5424.NIL_POLICY,NilPolicy.DASH.name()); + runner.enqueue(SYSLOG_LINE_ALL.getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_SUCCESS,1); + } + + @Test + public void testValidMessageWithNils() { + final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog5424()); + runner.setProperty(ParseSyslog5424.NIL_POLICY,NilPolicy.DASH.name()); + runner.enqueue(SYSLOG_LINE_NILS.getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_SUCCESS,1); + } + + @Test + public void testInvalidMessage() { + final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog5424()); + runner.setProperty(ParseSyslog5424.NIL_POLICY, NilPolicy.OMIT.name()); + runner.enqueue(" yesterday localhost\n".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_FAILURE, 1); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/BaseStrictSyslog5424ParserTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/BaseStrictSyslog5424ParserTest.java new file mode 100644 index 000000000000..f6217cbb2c05 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/BaseStrictSyslog5424ParserTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import com.github.palindromicity.syslog.NilPolicy; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processors.standard.syslog.StrictSyslog5424Parser; +import org.apache.nifi.processors.standard.syslog.Syslog5424Attributes; +import org.apache.nifi.processors.standard.syslog.Syslog5424Event; +import org.apache.nifi.processors.standard.syslog.SyslogAttributes; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +public abstract class BaseStrictSyslog5424ParserTest { + + private static final Charset CHARSET = Charset.forName("UTF-8"); + private static final String NIL_VALUE = "-"; + private StrictSyslog5424Parser parser; + + protected abstract NilPolicy getPolicy(); + + protected void validateForPolicy(String expected, String actual) { + switch (getPolicy()) { + case DASH: + Assert.assertEquals(actual, NIL_VALUE); + break; + case OMIT: + case NULL: + Assert.assertNull(actual); + + } + } + + @Before + public void setup() { + parser = new StrictSyslog5424Parser(CHARSET, getPolicy()); + } + + @Test + public void testRFC5424WithVersion() { + final String pri = "34"; + final String version = "1"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String appName = "su"; + final String procId = "-"; + final String msgId = "ID17"; + final String structuredData = "-"; + final String body = "BOM'su root' failed for lonvick on /dev/pts/8"; + + final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + + appName + " " + procId + " " + msgId + " " + "-" + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final Syslog5424Event event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertTrue(event.isValid()); + Assert.assertFalse(event.getFieldMap().isEmpty()); + Map fieldMap = event.getFieldMap(); + Assert.assertEquals(pri, fieldMap.get(SyslogAttributes.PRIORITY.key())); + Assert.assertEquals("2", fieldMap.get(SyslogAttributes.SEVERITY.key())); + Assert.assertEquals("4", fieldMap.get(SyslogAttributes.FACILITY.key())); + Assert.assertEquals(version, fieldMap.get(SyslogAttributes.VERSION.key())); + Assert.assertEquals(stamp, fieldMap.get(SyslogAttributes.TIMESTAMP.key())); + Assert.assertEquals(host, fieldMap.get(SyslogAttributes.HOSTNAME.key())); + Assert.assertEquals(appName, fieldMap.get(Syslog5424Attributes.APP_NAME.key())); + validateForPolicy(procId, fieldMap.get(Syslog5424Attributes.PROCID.key())); + Assert.assertEquals(msgId, fieldMap.get(Syslog5424Attributes.MESSAGEID.key())); + + Pattern structuredPattern = new StrictSyslog5424Parser.NifiKeyProvider().getStructuredElementIdParamNamePattern(); + fieldMap.forEach((key,value) -> { + if (!StringUtils.isBlank(value)) { + Assert.assertFalse(structuredPattern.matcher(value).matches()); + } + }); + + Assert.assertEquals(body, fieldMap.get(SyslogAttributes.BODY.key())); + Assert.assertEquals(message, event.getFullMessage()); + Assert.assertNull(event.getSender()); + } + + @Test + public void testRFC5424WithoutVersion() { + final String pri = "34"; + final String version = "-"; + final String stamp = "2003-10-11T22:14:15.003Z"; + final String host = "mymachine.example.com"; + final String appName = "su"; + final String procId = "-"; + final String msgId = "ID17"; + final String structuredData = "-"; + final String body = "BOM'su root' failed for lonvick on /dev/pts/8"; + + final String message = "<" + pri + ">" + version + " " + stamp + " " + host + " " + + appName + " " + procId + " " + msgId + " " + "-" + " " + body; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final Syslog5424Event event = parser.parseEvent(buffer); + Assert.assertFalse(event.isValid()); + } + + @Test + public void testTrailingNewLine() { + final String message = "<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " + + "ID47 - BOM'su root' failed for lonvick on /dev/pts/8\n"; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final Syslog5424Event event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertTrue(event.isValid()); + } + + @Test + public void testVariety() { + final List messages = new ArrayList<>(); + + // supported examples from RFC 5424 + messages.add("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - " + + "ID47 - BOM'su root' failed for lonvick on /dev/pts/8"); + messages.add("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc " + + "8710 - - %% It's time to make the do-nuts."); + messages.add("<14>1 2014-06-20T09:14:07+00:00 loggregator" + + " d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01" + + " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]" + + " [exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance"); + + for (final String message : messages) { + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final Syslog5424Event event = parser.parseEvent(buffer); + Assert.assertTrue(event.isValid()); + } + } + + @Test + public void testInvalidPriority() { + final String message = "10 Oct 13 14:14:43 localhost some body of the message"; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final Syslog5424Event event = parser.parseEvent(buffer); + Assert.assertNotNull(event); + Assert.assertFalse(event.isValid()); + Assert.assertEquals(message, event.getFullMessage()); + } + + @Test + public void testParseWithSender() { + final String sender = "127.0.0.1"; + final String message = "<14>1 2014-06-20T09:14:07+00:00 loggregator" + + " d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01" + + " [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]" + + " [exampleSDID@32480 iut=\"4\" eventSource=\"Other Application\" eventID=\"2022\"] Removing instance"; + + final byte[] bytes = message.getBytes(CHARSET); + final ByteBuffer buffer = ByteBuffer.allocate(bytes.length); + buffer.clear(); + buffer.put(bytes); + + final Syslog5424Event event = parser.parseEvent(buffer, sender); + Assert.assertNotNull(event); + Assert.assertTrue(event.isValid()); + Assert.assertEquals(sender, event.getSender()); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserDashPolicyTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserDashPolicyTest.java new file mode 100644 index 000000000000..9b78d4d51974 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserDashPolicyTest.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import com.github.palindromicity.syslog.NilPolicy; + +public class StrictSyslog5424ParserDashPolicyTest extends BaseStrictSyslog5424ParserTest { + protected NilPolicy getPolicy() { + return NilPolicy.DASH; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserNullPolicyTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserNullPolicyTest.java new file mode 100644 index 000000000000..4a7a3d671855 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserNullPolicyTest.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import com.github.palindromicity.syslog.NilPolicy; + +public class StrictSyslog5424ParserNullPolicyTest extends BaseStrictSyslog5424ParserTest { + protected NilPolicy getPolicy() { + return NilPolicy.NULL; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserOmitPolicyTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserOmitPolicyTest.java new file mode 100644 index 000000000000..0e58f2a6e060 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/StrictSyslog5424ParserOmitPolicyTest.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import com.github.palindromicity.syslog.NilPolicy; + +public class StrictSyslog5424ParserOmitPolicyTest extends BaseStrictSyslog5424ParserTest { + protected NilPolicy getPolicy() { + return NilPolicy.OMIT; + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/pom.xml index b80e4c914b78..64e36cc8239b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml @@ -380,6 +380,11 @@ avro 1.8.1 + + com.github.palindromicity + simple-syslog-5424 + 0.0.5 + From b2c45cc804d4b002578dcab6ec82fdd19945dc4d Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Wed, 20 Jun 2018 16:30:53 -0400 Subject: [PATCH 2/4] address review comments --- nifi-assembly/NOTICE | 7 +++++ .../src/main/resources/META-INF/LICENSE | 29 ++++++++++++++++++- .../processors/standard/ParseSyslog5424.java | 4 +-- .../syslog/StrictSyslog5424Parser.java | 5 +--- 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 9f413d6fb505..6bd2bb7c24fb 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -1713,6 +1713,13 @@ The following binary components are provided under the Apache Software License v server-metrics Copyright 2011-2015 Metamarkets Group Inc. + (ASLv2) simple-syslog-5424 + The following NOTICE information applies: + + simple-syslog-5424 + https://github.com/palindromicity/simple-syslog-5424 + + Copyright 2018 simple-syslog-5424 authors. ************************ Common Development and Distribution License 1.1 diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE index a8db80fbebc4..a09260fe7e4c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE @@ -206,7 +206,34 @@ APACHE NIFI SUBCOMPONENTS: The Apache NiFi project contains subcomponents with separate copyright notices and license terms. Your use of the source code for the these subcomponents is subject to the terms and conditions of the following -licenses. +licenses. + +The binary distribution of this product bundles 'ANTLR 4' which is available + under a "3-clause BSD" license. For details see http://www.antlr.org/license.html + + Copyright (c) 2012 Terence Parr and Sam Harwell + All rights reserved. + Redistribution and use in source and binary forms, with or without modification, are permitted + provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this list of + conditions and the following disclaimer. + Redistributions in binary form must reproduce the above copyright notice, this list of + conditions and the following disclaimer in the documentation and/or other materials + provided with the distribution. + + Neither the name of the author nor the names of its contributors may be used to endorse + or promote products derived from this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY + EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. The binary distribution of this product bundles 'Bouncy Castle JDK 1.5' under an MIT style license. diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java index de800b3d94cf..7540f0bce4d8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java @@ -59,7 +59,7 @@ @Tags({"logs", "syslog", "syslog5424", "attributes", "system", "event", "message"}) @CapabilityDescription("Attempts to parse the contents of a well formed Syslog message in accordance to RFC5424 " + "format and adds attributes to the FlowFile for each of the parts of the Syslog message, including Structured Data." + - "Structured Data will be written to attributes as on attribute per item id + parameter "+ + "Structured Data will be written to attributes as one attribute per item id + parameter "+ "see https://tools.ietf.org/html/rfc5424." + "Note: ParseSyslog5424 follows the specification more closely than ParseSyslog. If your Syslog producer " + "does not follow the spec closely, with regards to using '-' for missing header entries for example, those logs " + @@ -111,7 +111,7 @@ public class ParseSyslog5424 extends AbstractProcessor { .description("Any FlowFile that is successfully parsed as a Syslog message will be to this Relationship.") .build(); - private StrictSyslog5424Parser parser; + private volatile StrictSyslog5424Parser parser; @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/StrictSyslog5424Parser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/StrictSyslog5424Parser.java index 0b8e13f1470a..880c4600ec41 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/StrictSyslog5424Parser.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/StrictSyslog5424Parser.java @@ -29,15 +29,12 @@ import java.util.regex.Pattern; /** - * Parses a Syslog message from a ByteBuffer into a SyslogEvent instance. - * - * The Syslog regular expressions below were adapted from the Apache Flume project for RFC3164 logs. + * Parses a Syslog message from a ByteBuffer into a Syslog5424Event instance. * For 5424 we use simple-syslog-5424 since it parsers out structured data. */ public class StrictSyslog5424Parser { private Charset charset; private com.github.palindromicity.syslog.SyslogParser parser; - KeyProvider keyProvider = new DefaultKeyProvider(); public StrictSyslog5424Parser() { this(StandardCharsets.UTF_8, NilPolicy.NULL); From 5fce51f12070f7b36772122644363cbb0d8d7866 Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Thu, 21 Jun 2018 10:20:15 -0400 Subject: [PATCH 3/4] per review, add property for including body in attributes --- .../processors/standard/ParseSyslog5424.java | 34 +++++++++++++--- .../standard/TestParseSyslog5424.java | 40 +++++++++++++++++++ 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java index 7540f0bce4d8..2a8d2bd1bb80 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java @@ -41,6 +41,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.syslog.StrictSyslog5424Parser; import org.apache.nifi.processors.standard.syslog.Syslog5424Event; +import org.apache.nifi.processors.standard.syslog.SyslogAttributes; import org.apache.nifi.stream.io.StreamUtils; import java.io.IOException; @@ -49,6 +50,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; @@ -102,6 +104,17 @@ public class ParseSyslog5424 extends AbstractProcessor { .defaultValue(NULL.getValue()) .build(); + public static final PropertyDescriptor INCLUDE_BODY_IN_ATTRIBUTES = new PropertyDescriptor.Builder() + .name("include_policy") + .displayName("Include Message Body in Attributes") + .description("If true, then the Syslog Message body will be included in the attributes.") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true","false") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(false) + .defaultValue("true") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("Any FlowFile that could not be parsed as a Syslog message will be transferred to this Relationship without any attributes being added") @@ -119,6 +132,7 @@ protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(2); properties.add(CHARSET); properties.add(NIL_POLICY); + properties.add(INCLUDE_BODY_IN_ATTRIBUTES); return properties; } @@ -139,6 +153,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String charsetName = context.getProperty(CHARSET).getValue(); final String nilPolicyString = context.getProperty(NIL_POLICY).getValue(); + boolean includeBody = true; + + if (context.getProperty(INCLUDE_BODY_IN_ATTRIBUTES).isSet()) { + includeBody = context.getProperty(INCLUDE_BODY_IN_ATTRIBUTES).asBoolean(); + } // If the parser already exists and uses the same charset, it does not need to be re-initialized if (parser == null || !parser.getCharsetName().equals(charsetName)) { @@ -153,22 +172,25 @@ public void process(final InputStream in) throws IOException { } }); - final Syslog5424Event event; + final Syslog5424Event syslogEvent; try { - event = parser.parseEvent(buffer, null); + syslogEvent = parser.parseEvent(buffer, null); } catch (final ProcessException pe) { - getLogger().error("Failed to parse {} as a Syslog message due to {}; routing to failure", new Object[] {flowFile, pe}); + getLogger().error("Failed to parse {} as a Syslog 5424 message due to {}; routing to failure", new Object[] {flowFile, pe}); session.transfer(flowFile, REL_FAILURE); return; } - if (event == null || !event.isValid()) { + if (syslogEvent == null || !syslogEvent.isValid()) { getLogger().error("Failed to parse {} as a Syslog message: it does not conform to any of the RFC formats supported; routing to failure", new Object[] {flowFile}); session.transfer(flowFile, REL_FAILURE); return; } - - flowFile = session.putAllAttributes(flowFile, event.getFieldMap()); + Map attributeMap = syslogEvent.getFieldMap(); + if (!includeBody) { + attributeMap.remove(SyslogAttributes.BODY.key()); + } + flowFile = session.putAllAttributes(flowFile, attributeMap); session.transfer(flowFile, REL_SUCCESS); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog5424.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog5424.java index a45768c58671..eb88d6c752b1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog5424.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog5424.java @@ -18,10 +18,15 @@ package org.apache.nifi.processors.standard; import com.github.palindromicity.syslog.NilPolicy; +import org.apache.nifi.processors.standard.syslog.SyslogAttributes; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Test; +import java.util.List; + public class TestParseSyslog5424 { private static final String SYSLOG_LINE_ALL = "<14>1 2014-06-20T09:14:07+00:00 loggregator" + " d0602076-b14a-4c55-852a-981e7afeed38 DEA MSG-01" @@ -59,4 +64,39 @@ public void testInvalidMessage() { runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_FAILURE, 1); } + + @Test + public void testDefaultHasBodyAttribute() { + final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog5424()); + runner.setProperty(ParseSyslog5424.NIL_POLICY,NilPolicy.DASH.name()); + runner.enqueue(SYSLOG_LINE_NILS.getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_SUCCESS,1); + List results = runner.getFlowFilesForRelationship(ParseSyslog5424.REL_SUCCESS); + Assert.assertNotNull(results.get(0).getAttribute(SyslogAttributes.BODY.key())); + } + + @Test + public void testIncludeBodyAttributeTrue() { + final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog5424()); + runner.setProperty(ParseSyslog5424.NIL_POLICY,NilPolicy.DASH.name()); + runner.setProperty(ParseSyslog5424.INCLUDE_BODY_IN_ATTRIBUTES,"true"); + runner.enqueue(SYSLOG_LINE_NILS.getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_SUCCESS,1); + List results = runner.getFlowFilesForRelationship(ParseSyslog5424.REL_SUCCESS); + Assert.assertNotNull(results.get(0).getAttribute(SyslogAttributes.BODY.key())); + } + + @Test + public void testIncludeBodyAttributeFalse() { + final TestRunner runner = TestRunners.newTestRunner(new ParseSyslog5424()); + runner.setProperty(ParseSyslog5424.NIL_POLICY,NilPolicy.DASH.name()); + runner.setProperty(ParseSyslog5424.INCLUDE_BODY_IN_ATTRIBUTES,"false"); + runner.enqueue(SYSLOG_LINE_NILS.getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(ParseSyslog5424.REL_SUCCESS,1); + List results = runner.getFlowFilesForRelationship(ParseSyslog5424.REL_SUCCESS); + Assert.assertNull(results.get(0).getAttribute(SyslogAttributes.BODY.key())); + } } From 116b2b15fd1f1b1bab5900772ba423616bddf74d Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Thu, 21 Jun 2018 10:27:41 -0400 Subject: [PATCH 4/4] per review, create processor in OnScheduled --- .../nifi/processors/standard/ParseSyslog5424.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java index 2a8d2bd1bb80..1436921cfde8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ParseSyslog5424.java @@ -28,6 +28,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; @@ -144,6 +145,12 @@ public Set getRelationships() { return relationships; } + @OnScheduled + public void onScheduled(final ProcessContext context) { + final String charsetName = context.getProperty(CHARSET).getValue(); + final String nilPolicyString = context.getProperty(NIL_POLICY).getValue(); + parser = new StrictSyslog5424Parser(Charset.forName(charsetName),NilPolicy.valueOf(nilPolicyString)); + } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -151,19 +158,12 @@ public void onTrigger(final ProcessContext context, final ProcessSession session return; } - final String charsetName = context.getProperty(CHARSET).getValue(); - final String nilPolicyString = context.getProperty(NIL_POLICY).getValue(); boolean includeBody = true; if (context.getProperty(INCLUDE_BODY_IN_ATTRIBUTES).isSet()) { includeBody = context.getProperty(INCLUDE_BODY_IN_ATTRIBUTES).asBoolean(); } - // If the parser already exists and uses the same charset, it does not need to be re-initialized - if (parser == null || !parser.getCharsetName().equals(charsetName)) { - parser = new StrictSyslog5424Parser(Charset.forName(charsetName),NilPolicy.valueOf(nilPolicyString)); - } - final byte[] buffer = new byte[(int) flowFile.getSize()]; session.read(flowFile, new InputStreamCallback() { @Override