From 66b072a3d7348cb8dc1d2c6ab52634116b1bf6d7 Mon Sep 17 00:00:00 2001 From: charlesporter Date: Tue, 16 Jan 2018 20:14:45 -0800 Subject: [PATCH 1/3] ExtractGrok Supports multiple expressions -separated by comma or specified delimiter -option to return on first match or to run all expressions in list other enhancements -multiple pattern files -selectable result attribute prefix --- .gitignore | 2 + .../nifi/processors/standard/ExtractGrok.java | 264 ++++++++++++------ .../processors/standard/TestExtractGrok.java | 102 ++++++- .../resources/TestExtractGrok/apache_patterns | 6 + .../test/resources/TestExtractGrok/patterns | 11 +- 5 files changed, 282 insertions(+), 103 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/apache_patterns diff --git a/.gitignore b/.gitignore index a2e04fa15aba..66b35dd55df7 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,6 @@ nb-configuration.xml *.iml *.iws *~ +*.ipr +.shelf diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java index 8e20b9e0e84f..d7e7d97f0958 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java @@ -31,7 +31,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -43,15 +42,11 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.charset.Charset; import java.util.List; import java.util.Map; @@ -70,33 +65,51 @@ @Tags({"grok", "log", "text", "parse", "delimit", "extract"}) @CapabilityDescription("Evaluates one or more Grok Expressions against the content of a FlowFile, " + "adding the results as attributes or replacing the content of the FlowFile with a JSON " + - "notation of the matched content") + "notation of the matched content\n" + + "uses https://github.com/thekrakken/java-grok.") @WritesAttributes({ - @WritesAttribute(attribute = "grok.XXX", description = "When operating in flowfile-attribute mode, each of the Grok identifier that is matched in the flowfile " + - "will be added as an attribute, prefixed with \"grok.\" For example," + - "if the grok identifier \"timestamp\" is matched, then the value will be added to an attribute named \"grok.timestamp\"")}) + @WritesAttribute(attribute = "{result prefix}XXX", description = "When operating in flowfile-attribute mode, each of the Grok identifier that is matched in the flowfile " + + "will be added as an attribute, prefixed with \"{result prefix}\" For example," + + "if the grok identifier \"timestamp\" is matched, then the value will be added to an attribute named \"{result prefix}timestamp\""), + + @WritesAttribute(attribute = "ExtractGrok.exception", description = "if an error occurs, an exception will be written to this attribute, " + + "and the flow routed to 'unmatched' ") +}) public class ExtractGrok extends AbstractProcessor { public static final String FLOWFILE_ATTRIBUTE = "flowfile-attribute"; public static final String FLOWFILE_CONTENT = "flowfile-content"; - private static final String APPLICATION_JSON = "application/json"; - + public static final String APPLICATION_JSON = "application/json"; + public static final String GROK_EXPRESSION_KEY = "Grok Expression"; + public static final String GROK_PATTERN_FILE_KEY = "Grok Pattern file"; + public static final String DESTINATION_KEY = "Destination"; + public static final String CHARACTER_SET_KEY = "Character Set"; + public static final String MAXIMUM_BUFFER_SIZE_KEY = "Maximum Buffer Size"; + public static final String NAMED_CAPTURES_ONLY_KEY = "Named captures only"; + public static final String SINGLE_MATCH_KEY = "Single Match"; + public static final String RESULT_PREFIX_KEY = "result prefix"; + public static final String MATCHED_EXP_ATTR_KEY = "matched expression attribute"; + public static final String EXP_SEPARATOR_KEY = "expression-separator"; + public static final String PATTERN_FILE_LIST_SEPARATOR = ","; + + //properties public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() - .name("Grok Expression") - .description("Grok expression") + .name(GROK_EXPRESSION_KEY) + .description("Grok expressions, one or more grok expressions separated by ',' or other character as set in attribute" + EXP_SEPARATOR_KEY) .required(true) .addValidator(validateGrokExpression()) .build(); + public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor.Builder() - .name("Grok Pattern file") - .description("Grok Pattern file definition") + .name(GROK_PATTERN_FILE_KEY) + .description("Grok Pattern file definition. May include multiple files, separated by " + PATTERN_FILE_LIST_SEPARATOR) .required(true) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .addValidator(validateMultipleFilesExist()) .build(); public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() - .name("Destination") + .name(DESTINATION_KEY) .description("Control if Grok output value is written as a new flowfile attributes, in this case " + "each of the Grok identifier that is matched in the flowfile will be added as an attribute, " + "prefixed with \"grok.\" or written in the flowfile content. Writing to flowfile content " + @@ -107,7 +120,7 @@ public class ExtractGrok extends AbstractProcessor { .build(); public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() - .name("Character Set") + .name(CHARACTER_SET_KEY) .description("The Character Set in which the file is encoded") .required(true) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) @@ -115,7 +128,7 @@ public class ExtractGrok extends AbstractProcessor { .build(); public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Maximum Buffer Size") + .name(MAXIMUM_BUFFER_SIZE_KEY) .description("Specifies the maximum amount of data to buffer (per file) in order to apply the Grok expressions. Files larger than the specified maximum will not be fully evaluated.") .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) @@ -123,6 +136,54 @@ public class ExtractGrok extends AbstractProcessor { .defaultValue("1 MB") .build(); + public static final PropertyDescriptor NAMED_CAPTURES_ONLY = new PropertyDescriptor.Builder() + .name(NAMED_CAPTURES_ONLY_KEY) + .description("Only store named captures from grokList") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") + .build(); + + public static final PropertyDescriptor BREAK_ON_FIRST_MATCH = new PropertyDescriptor.Builder() + .name(SINGLE_MATCH_KEY) + .description("Stop on first matched expression.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + public static final PropertyDescriptor RESULT_PREFIX = new PropertyDescriptor.Builder() + .name(RESULT_PREFIX_KEY) + .description("Value to prefix attribute results with (avoid collisions with existing properties)" + + "\n\t (Does not apply when results returned as content)" + + "\n\t (May be empty, the dot (.) separator is not implied)") + .required(true) + .defaultValue("grok.") + .addValidator(Validator.VALID) + .build(); + + + public static final PropertyDescriptor EXPRESSION_SEPARATOR = new PropertyDescriptor.Builder() + .name(EXP_SEPARATOR_KEY) + .description("character to use to separate multiple grok expressions ") + .required(true) + .defaultValue(",") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + + public static final PropertyDescriptor MATCHED_EXP_ATTR = new PropertyDescriptor.Builder() + .name(MATCHED_EXP_ATTR_KEY) + .description("Name of attribute to receive the name(s) of the matched expression(s).") + .required(true) + .defaultValue("matched_expression") + .addValidator(Validator.VALID) + .build(); + + // relationships + public static final Relationship REL_MATCH = new Relationship.Builder() .name("matched") .description("FlowFiles are routed to this relationship when the Grok Expression is successfully evaluated and the FlowFile is modified as a result") @@ -133,10 +194,11 @@ public class ExtractGrok extends AbstractProcessor { .description("FlowFiles are routed to this relationship when no provided Grok Expression matches the content of the FlowFile") .build(); + private final static List descriptors; private final static Set relationships; - private volatile Grok grok = new Grok(); + private volatile List grokList = new ArrayList<>(); private final BlockingQueue bufferQueue = new LinkedBlockingQueue<>(); static { @@ -147,13 +209,23 @@ public class ExtractGrok extends AbstractProcessor { final List _descriptors = new ArrayList<>(); _descriptors.add(GROK_EXPRESSION); + _descriptors.add(EXPRESSION_SEPARATOR); _descriptors.add(GROK_PATTERN_FILE); _descriptors.add(DESTINATION); _descriptors.add(CHARACTER_SET); _descriptors.add(MAX_BUFFER_SIZE); + _descriptors.add(NAMED_CAPTURES_ONLY); + _descriptors.add(RESULT_PREFIX); + _descriptors.add(BREAK_ON_FIRST_MATCH); + _descriptors.add(MATCHED_EXP_ATTR); descriptors = Collections.unmodifiableList(_descriptors); } + private String resultPrefix = ""; + private boolean breakOnFirstMatch; + private String matchedExpressionAttribute; + private String expressionSeparator; + @Override public Set getRelationships() { return relationships; @@ -171,15 +243,28 @@ public void onStopped() { @OnScheduled public void onScheduled(final ProcessContext context) throws GrokException { + grokList.clear(); for (int i = 0; i < context.getMaxConcurrentTasks(); i++) { final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final byte[] buffer = new byte[maxBufferSize]; bufferQueue.add(buffer); } - grok = new Grok(); - grok.addPatternFromFile(context.getProperty(GROK_PATTERN_FILE).getValue()); - grok.compile(context.getProperty(GROK_EXPRESSION).getValue()); + resultPrefix = context.getProperty(RESULT_PREFIX).getValue(); + breakOnFirstMatch = context.getProperty(BREAK_ON_FIRST_MATCH).asBoolean() ; + matchedExpressionAttribute = context.getProperty(MATCHED_EXP_ATTR).getValue(); + expressionSeparator = context.getProperty(EXPRESSION_SEPARATOR).getValue(); + + String patterns = context.getProperty(GROK_EXPRESSION).getValue(); + for (String patternName : patterns.split(expressionSeparator)) { + Grok grok = new Grok(); + final String patternFileListString = context.getProperty(GROK_PATTERN_FILE).getValue(); + for (String patternFile : patternFileListString.split(PATTERN_FILE_LIST_SEPARATOR)) { + grok.addPatternFromFile(patternFile); + } + grok.compile(patternName, context.getProperty(NAMED_CAPTURES_ONLY).asBoolean()); + grokList.add(grok); + } } @Override @@ -188,6 +273,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (flowFile == null) { return; } + Map results = new HashMap<>(); + List matchedExpressionList = new ArrayList<>(); + final StopWatch stopWatch = new StopWatch(true); final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); final String contentString; @@ -199,78 +287,94 @@ public void onTrigger(final ProcessContext context, final ProcessSession session try { final byte[] byteBuffer = buffer; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - StreamUtils.fillBuffer(in, byteBuffer, false); - } - }); + session.read(flowFile, in -> StreamUtils.fillBuffer(in, byteBuffer, false)); final long len = Math.min(byteBuffer.length, flowFile.getSize()); contentString = new String(byteBuffer, 0, (int) len, charset); } finally { bufferQueue.offer(buffer); } - final Match gm = grok.match(contentString); - gm.captures(); - - if (gm.toMap().isEmpty()) { - session.transfer(flowFile, REL_NO_MATCH); - getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile}); - return; - } - - final ObjectMapper objectMapper = new ObjectMapper(); - switch (context.getProperty(DESTINATION).getValue()) { - case FLOWFILE_ATTRIBUTE: - Map grokResults = new HashMap<>(); - for (Map.Entry entry : gm.toMap().entrySet()) { - if (null != entry.getValue()) { - grokResults.put("grok." + entry.getKey(), entry.getValue().toString()); + try{ + for (Grok grok : grokList) { + final Match gm = grok.match(contentString); + gm.captures(); + final Map localResults = gm.toMap(); + if (!localResults.isEmpty()) { + matchedExpressionList.add(grok.getOriginalGrokPattern()); + results.putAll(localResults); + if (breakOnFirstMatch) { + break; } } + } - flowFile = session.putAllAttributes(flowFile, grokResults); - session.getProvenanceReporter().modifyAttributes(flowFile); - session.transfer(flowFile, REL_MATCH); - getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile}); - - break; - case FLOWFILE_CONTENT: - FlowFile conFlowfile = session.write(flowFile, new StreamCallback() { - @Override - public void process(InputStream in, OutputStream out) throws IOException { - out.write(objectMapper.writeValueAsBytes(gm.toMap())); - } - }); - conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); - session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content with parsed Grok fields and values", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(conFlowfile, REL_MATCH); + if (results.isEmpty()) { + session.transfer(flowFile, REL_NO_MATCH); + getLogger().debug("Did not match any Grok Expression for FlowFile {}", new Object[]{flowFile}); + return; + } + + String matchedExpressions = StringUtils.join(matchedExpressionList, expressionSeparator); + flowFile = session.putAttribute(flowFile, matchedExpressionAttribute, matchedExpressions); - break; + switch (context.getProperty(DESTINATION).getValue()) { + case FLOWFILE_ATTRIBUTE: + Map grokResults = new HashMap<>(); + for (Map.Entry entry : results.entrySet()) { + if (null != entry.getValue()) { + grokResults.put(resultPrefix + entry.getKey(), entry.getValue().toString()); + } + } + flowFile = session.putAllAttributes(flowFile, grokResults); + session.getProvenanceReporter().modifyAttributes(flowFile); + session.transfer(flowFile, REL_MATCH); + getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile}); + break; + case FLOWFILE_CONTENT: + final ObjectMapper objectMapper = new ObjectMapper(); + FlowFile conFlowfile = session.write(flowFile, (in, out) -> { + out.write(objectMapper.writeValueAsBytes(results)); + }); + + conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); + session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content with parsed Grok fields and values", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(conFlowfile, REL_MATCH); + break; + } + }catch(ProcessException t){ + flowFile = session.putAttribute(flowFile, getClass().getSimpleName() + ".exception", t.getMessage()); + session.transfer(flowFile, REL_NO_MATCH); + getLogger().error("Did not match any Grok Expression for FlowFile {}", new Object[]{flowFile}); + return; } } + public static Validator validateMultipleFilesExist() { + return (subject, input, context) -> { + for (String s : input.split(PATTERN_FILE_LIST_SEPARATOR)) { + return StandardValidators.FILE_EXISTS_VALIDATOR.validate(subject, s, context); + } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + }; + } - public static final Validator validateGrokExpression() { - return new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - - Grok grok = new Grok(); - try { - grok.compile(input); - } catch (GrokException | java.util.regex.PatternSyntaxException e) { - return new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(false) - .explanation("Not a valid Grok Expression - " + e.getMessage()) - .build(); + public static Validator validateGrokExpression() { + return (subject, input, context) -> { + Grok grok = new Grok(); + String separator = context.getProperty(EXPRESSION_SEPARATOR).getValue(); + try { + for (String s : input.split(separator)) { + grok.compile(s); } - - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } catch (GrokException | java.util.regex.PatternSyntaxException e) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Not a valid Grok Expression - " + e.getMessage()) + .build(); } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); }; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java index b503c40639da..3ed297ff84d0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java @@ -31,6 +31,7 @@ public class TestExtractGrok { private TestRunner testRunner; + private static final String PATTERNS = "src/test/resources/TestExtractGrok/patterns,src/test/resources/TestExtractGrok/apache_patterns"; private final static Path GROK_LOG_INPUT = Paths.get("src/test/resources/TestExtractGrok/apache.log"); private final static Path GROK_TEXT_INPUT = Paths.get("src/test/resources/TestExtractGrok/simple_text.log"); @@ -42,26 +43,20 @@ public void init() { @Test public void testExtractGrokWithMatchedContent() throws IOException { testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}"); - testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); testRunner.enqueue(GROK_LOG_INPUT); testRunner.run(); testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); - - matched.assertAttributeEquals("grok.verb","GET"); - matched.assertAttributeEquals("grok.response","401"); - matched.assertAttributeEquals("grok.bytes","12846"); - matched.assertAttributeEquals("grok.clientip","64.242.88.10"); - matched.assertAttributeEquals("grok.auth","-"); - matched.assertAttributeEquals("grok.timestamp","07/Mar/2004:16:05:49 -0800"); - matched.assertAttributeEquals("grok.request","/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables"); - matched.assertAttributeEquals("grok.httpversion","1.1"); + assertCommonApacheLog(matched); + matched.assertAttributeEquals("matched_expression", "%{COMMONAPACHELOG}"); + assertUnNamedApacheCaptures(matched); } @Test public void testExtractGrokWithUnMatchedContent() throws IOException { testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{ADDRESS}"); - testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); testRunner.enqueue(GROK_TEXT_INPUT); testRunner.run(); testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_NO_MATCH); @@ -77,13 +72,94 @@ public void testExtractGrokWithNotFoundPatternFile() throws IOException { testRunner.assertNotValid(); } - @Test public void testExtractGrokWithBadGrokExpression() throws IOException { testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{TOTO"); - testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); testRunner.enqueue(GROK_LOG_INPUT); testRunner.assertNotValid(); } + @Test + public void testExtractGrokWithNamedCapturesOnly() throws IOException { + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); + testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "true"); + testRunner.enqueue(GROK_LOG_INPUT); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); + final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); + assertCommonApacheLog(matched); + matched.assertAttributeEquals("matched_expression", "%{COMMONAPACHELOG}"); + assertNoUnNamedApacheCaptures(matched); + } + + + @Test + public void testExtractGrokMatchSecondPattern() throws IOException { + final String exp = "%{COMBINEDAPACHELOG},%{COMMONAPACHELOG}"; + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, exp); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); + testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "true"); + testRunner.enqueue(GROK_LOG_INPUT); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); + final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); + assertCommonApacheLog(matched); + matched.assertAttributeEquals("matched_expression", "%{COMMONAPACHELOG}"); + matched.assertAttributeNotExists("grok.program"); + } + + @Test + public void testExtractGrokMatchBothOfTwoPatterns() throws IOException { + final String exp = "%{SYSLOGPROG},%{COMMONAPACHELOG}"; + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, exp); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); + testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "false"); + testRunner.setProperty(ExtractGrok.BREAK_ON_FIRST_MATCH, "false"); + testRunner.enqueue(GROK_LOG_INPUT); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); + final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); + assertCommonApacheLog(matched); + matched.assertAttributeEquals("matched_expression", exp); + matched.assertAttributeEquals("grok.program", "64.242.88.10"); + } + + @Test + public void testSetEmptyResultPrefix() throws IOException { + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{SYSLOGPROG}"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); + testRunner.setProperty(ExtractGrok.RESULT_PREFIX_KEY,""); + testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "true"); + + testRunner.enqueue(GROK_LOG_INPUT); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); + final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); + matched.assertAttributeEquals("program", "64.242.88.10"); + } + + private void assertUnNamedApacheCaptures(MockFlowFile matched) { + matched.assertAttributeExists("grok.COMMONAPACHELOG"); + matched.assertAttributeExists("grok.INT"); + matched.assertAttributeExists("grok.HOUR"); + } + + private void assertNoUnNamedApacheCaptures(MockFlowFile matched) { + matched.assertAttributeNotExists("grok.COMMONAPACHELOG"); + matched.assertAttributeNotExists("grok.INT"); + matched.assertAttributeNotExists("grok.BASE10NUM"); + } + + private void assertCommonApacheLog(MockFlowFile matched) { + matched.assertAttributeEquals("grok.verb", "GET"); + matched.assertAttributeEquals("grok.response", "401"); + matched.assertAttributeEquals("grok.bytes", "12846"); + matched.assertAttributeEquals("grok.clientip", "64.242.88.10"); + matched.assertAttributeEquals("grok.auth", "-"); + matched.assertAttributeEquals("grok.timestamp", "07/Mar/2004:16:05:49 -0800"); + matched.assertAttributeEquals("grok.request", "/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables"); + matched.assertAttributeEquals("grok.httpversion", "1.1"); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/apache_patterns b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/apache_patterns new file mode 100644 index 000000000000..63e38b69a4d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/apache_patterns @@ -0,0 +1,6 @@ +COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) +COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} +COMMONAPACHELOG_DATATYPED %{IPORHOST:clientip} %{USER:ident;boolean} %{USER:auth} \[%{HTTPDATE:timestamp;date;dd/MMM/yyyy:HH:mm:ss Z}\] "(?:%{WORD:verb;string} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion;float})?|%{DATA:rawrequest})" %{NUMBER:response;int} (?:%{NUMBER:bytes;long}|-) + +# Log Levels +LOGLEVEL ([A|a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns index 94eaaa8b588c..fbd5a974022d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns @@ -1,4 +1,3 @@ - USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME:UNWANTED} INT (?:[+-]?(?:[0-9]+)) @@ -96,12 +95,4 @@ QS %{QUOTEDSTRING:UNWANTED} # Log formats SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: -MESSAGESLOG %{SYSLOGBASE} %{DATA} - -COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) -COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} -COMMONAPACHELOG_DATATYPED %{IPORHOST:clientip} %{USER:ident;boolean} %{USER:auth} \[%{HTTPDATE:timestamp;date;dd/MMM/yyyy:HH:mm:ss Z}\] "(?:%{WORD:verb;string} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion;float})?|%{DATA:rawrequest})" %{NUMBER:response;int} (?:%{NUMBER:bytes;long}|-) - - -# Log Levels -LOGLEVEL ([A|a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?) \ No newline at end of file +MESSAGESLOG %{SYSLOGBASE} %{DATA} \ No newline at end of file From dd8bc224c5193c8b6b47fb8babf6cb6c8323eef3 Mon Sep 17 00:00:00 2001 From: charlesporter Date: Tue, 16 Jan 2018 20:14:45 -0800 Subject: [PATCH 2/3] ExtractGrok Supports multiple expressions -separated by comma or specified delimiter -option to return on first match or to run all expressions in list other enhancements -multiple pattern files -selectable result attribute prefix --- .gitignore | 2 + .../nifi/processors/standard/ExtractGrok.java | 260 ++++++++++++------ .../processors/standard/TestExtractGrok.java | 101 +++++-- .../resources/TestExtractGrok/apache_patterns | 6 + .../test/resources/TestExtractGrok/patterns | 11 +- 5 files changed, 265 insertions(+), 115 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/apache_patterns diff --git a/.gitignore b/.gitignore index a2e04fa15aba..66b35dd55df7 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,6 @@ nb-configuration.xml *.iml *.iws *~ +*.ipr +.shelf diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java index a7e421bd9f80..b14dacaf4e82 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java @@ -31,7 +31,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -43,15 +42,11 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.charset.Charset; import java.util.List; import java.util.Map; @@ -70,33 +65,51 @@ @Tags({"grok", "log", "text", "parse", "delimit", "extract"}) @CapabilityDescription("Evaluates one or more Grok Expressions against the content of a FlowFile, " + "adding the results as attributes or replacing the content of the FlowFile with a JSON " + - "notation of the matched content") + "notation of the matched content\n" + + "uses https://github.com/thekrakken/java-grok.") @WritesAttributes({ - @WritesAttribute(attribute = "grok.XXX", description = "When operating in flowfile-attribute mode, each of the Grok identifier that is matched in the flowfile " + - "will be added as an attribute, prefixed with \"grok.\" For example," + - "if the grok identifier \"timestamp\" is matched, then the value will be added to an attribute named \"grok.timestamp\"")}) + @WritesAttribute(attribute = "{result prefix}XXX", description = "When operating in flowfile-attribute mode, each of the Grok identifier that is matched in the flowfile " + + "will be added as an attribute, prefixed with \"{result prefix}\" For example," + + "if the grok identifier \"timestamp\" is matched, then the value will be added to an attribute named \"{result prefix}timestamp\""), + + @WritesAttribute(attribute = "ExtractGrok.exception", description = "if an error occurs, an exception will be written to this attribute, " + + "and the flow routed to 'unmatched' ") +}) public class ExtractGrok extends AbstractProcessor { public static final String FLOWFILE_ATTRIBUTE = "flowfile-attribute"; public static final String FLOWFILE_CONTENT = "flowfile-content"; - private static final String APPLICATION_JSON = "application/json"; - + public static final String APPLICATION_JSON = "application/json"; + public static final String GROK_EXPRESSION_KEY = "Grok Expression"; + public static final String GROK_PATTERN_FILE_KEY = "Grok Pattern file"; + public static final String DESTINATION_KEY = "Destination"; + public static final String CHARACTER_SET_KEY = "Character Set"; + public static final String MAXIMUM_BUFFER_SIZE_KEY = "Maximum Buffer Size"; + public static final String NAMED_CAPTURES_ONLY_KEY = "Named captures only"; + public static final String SINGLE_MATCH_KEY = "Single Match"; + public static final String RESULT_PREFIX_KEY = "result prefix"; + public static final String MATCHED_EXP_ATTR_KEY = "matched expression attribute"; + public static final String EXP_SEPARATOR_KEY = "expression-separator"; + public static final String PATTERN_FILE_LIST_SEPARATOR = ","; + + //properties public static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() - .name("Grok Expression") - .description("Grok expression") + .name(GROK_EXPRESSION_KEY) + .description("Grok expressions, one or more grok expressions separated by ',' or other character as set in attribute" + EXP_SEPARATOR_KEY) .required(true) .addValidator(validateGrokExpression()) .build(); + public static final PropertyDescriptor GROK_PATTERN_FILE = new PropertyDescriptor.Builder() - .name("Grok Pattern file") - .description("Grok Pattern file definition") + .name(GROK_PATTERN_FILE_KEY) + .description("Grok Pattern file definition. May include multiple files, separated by " + PATTERN_FILE_LIST_SEPARATOR) .required(true) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .addValidator(validateMultipleFilesExist()) .build(); public static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() - .name("Destination") + .name(DESTINATION_KEY) .description("Control if Grok output value is written as a new flowfile attributes, in this case " + "each of the Grok identifier that is matched in the flowfile will be added as an attribute, " + "prefixed with \"grok.\" or written in the flowfile content. Writing to flowfile content " + @@ -107,7 +120,7 @@ public class ExtractGrok extends AbstractProcessor { .build(); public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() - .name("Character Set") + .name(CHARACTER_SET_KEY) .description("The Character Set in which the file is encoded") .required(true) .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) @@ -115,7 +128,7 @@ public class ExtractGrok extends AbstractProcessor { .build(); public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Maximum Buffer Size") + .name(MAXIMUM_BUFFER_SIZE_KEY) .description("Specifies the maximum amount of data to buffer (per file) in order to apply the Grok expressions. Files larger than the specified maximum will not be fully evaluated.") .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) @@ -123,14 +136,53 @@ public class ExtractGrok extends AbstractProcessor { .defaultValue("1 MB") .build(); - public static final PropertyDescriptor NAMED_CAPTURES_ONLY = new PropertyDescriptor.Builder() - .name("Named captures only") - .description("Only store named captures from grok") + + public static final PropertyDescriptor NAMED_CAPTURES_ONLY = new PropertyDescriptor.Builder() + .name(NAMED_CAPTURES_ONLY_KEY) + .description("Only store named captures from grokList") .required(true) .allowableValues("true", "false") .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .defaultValue("false") .build(); + public static final PropertyDescriptor BREAK_ON_FIRST_MATCH = new PropertyDescriptor.Builder() + .name(SINGLE_MATCH_KEY) + .description("Stop on first matched expression.") + .required(true) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("true") + .build(); + + public static final PropertyDescriptor RESULT_PREFIX = new PropertyDescriptor.Builder() + .name(RESULT_PREFIX_KEY) + .description("Value to prefix attribute results with (avoid collisions with existing properties)" + + "\n\t (Does not apply when results returned as content)" + + "\n\t (May be empty, the dot (.) separator is not implied)") + .required(true) + .defaultValue("grok.") + .addValidator(Validator.VALID) + .build(); + + + public static final PropertyDescriptor EXPRESSION_SEPARATOR = new PropertyDescriptor.Builder() + .name(EXP_SEPARATOR_KEY) + .description("character to use to separate multiple grok expressions ") + .required(true) + .defaultValue(",") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + + public static final PropertyDescriptor MATCHED_EXP_ATTR = new PropertyDescriptor.Builder() + .name(MATCHED_EXP_ATTR_KEY) + .description("Name of attribute to receive the name(s) of the matched expression(s).") + .required(true) + .defaultValue("matched_expression") + .addValidator(Validator.VALID) + .build(); + + // relationships public static final Relationship REL_MATCH = new Relationship.Builder() .name("matched") @@ -142,10 +194,11 @@ public class ExtractGrok extends AbstractProcessor { .description("FlowFiles are routed to this relationship when no provided Grok Expression matches the content of the FlowFile") .build(); + private final static List descriptors; private final static Set relationships; - private volatile Grok grok = new Grok(); + private volatile List grokList = new ArrayList<>(); private final BlockingQueue bufferQueue = new LinkedBlockingQueue<>(); static { @@ -156,14 +209,23 @@ public class ExtractGrok extends AbstractProcessor { final List _descriptors = new ArrayList<>(); _descriptors.add(GROK_EXPRESSION); + _descriptors.add(EXPRESSION_SEPARATOR); _descriptors.add(GROK_PATTERN_FILE); _descriptors.add(DESTINATION); _descriptors.add(CHARACTER_SET); _descriptors.add(MAX_BUFFER_SIZE); _descriptors.add(NAMED_CAPTURES_ONLY); + _descriptors.add(RESULT_PREFIX); + _descriptors.add(BREAK_ON_FIRST_MATCH); + _descriptors.add(MATCHED_EXP_ATTR); descriptors = Collections.unmodifiableList(_descriptors); } + private String resultPrefix = ""; + private boolean breakOnFirstMatch; + private String matchedExpressionAttribute; + private String expressionSeparator; + @Override public Set getRelationships() { return relationships; @@ -181,15 +243,28 @@ public void onStopped() { @OnScheduled public void onScheduled(final ProcessContext context) throws GrokException { + grokList.clear(); for (int i = 0; i < context.getMaxConcurrentTasks(); i++) { final int maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final byte[] buffer = new byte[maxBufferSize]; bufferQueue.add(buffer); } - grok = new Grok(); - grok.addPatternFromFile(context.getProperty(GROK_PATTERN_FILE).getValue()); - grok.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean()); + resultPrefix = context.getProperty(RESULT_PREFIX).getValue(); + breakOnFirstMatch = context.getProperty(BREAK_ON_FIRST_MATCH).asBoolean() ; + matchedExpressionAttribute = context.getProperty(MATCHED_EXP_ATTR).getValue(); + expressionSeparator = context.getProperty(EXPRESSION_SEPARATOR).getValue(); + + String patterns = context.getProperty(GROK_EXPRESSION).getValue(); + for (String patternName : patterns.split(expressionSeparator)) { + Grok grok = new Grok(); + final String patternFileListString = context.getProperty(GROK_PATTERN_FILE).getValue(); + for (String patternFile : patternFileListString.split(PATTERN_FILE_LIST_SEPARATOR)) { + grok.addPatternFromFile(patternFile); + } + grok.compile(patternName, context.getProperty(NAMED_CAPTURES_ONLY).asBoolean()); + grokList.add(grok); + } } @Override @@ -198,6 +273,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (flowFile == null) { return; } + Map results = new HashMap<>(); + List matchedExpressionList = new ArrayList<>(); + final StopWatch stopWatch = new StopWatch(true); final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); final String contentString; @@ -209,78 +287,94 @@ public void onTrigger(final ProcessContext context, final ProcessSession session try { final byte[] byteBuffer = buffer; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - StreamUtils.fillBuffer(in, byteBuffer, false); - } - }); + session.read(flowFile, in -> StreamUtils.fillBuffer(in, byteBuffer, false)); final long len = Math.min(byteBuffer.length, flowFile.getSize()); contentString = new String(byteBuffer, 0, (int) len, charset); } finally { bufferQueue.offer(buffer); } - final Match gm = grok.match(contentString); - gm.captures(); - - if (gm.toMap().isEmpty()) { - session.transfer(flowFile, REL_NO_MATCH); - getLogger().info("Did not match any Grok Expressions for FlowFile {}", new Object[]{flowFile}); - return; - } - - final ObjectMapper objectMapper = new ObjectMapper(); - switch (context.getProperty(DESTINATION).getValue()) { - case FLOWFILE_ATTRIBUTE: - Map grokResults = new HashMap<>(); - for (Map.Entry entry : gm.toMap().entrySet()) { - if (null != entry.getValue()) { - grokResults.put("grok." + entry.getKey(), entry.getValue().toString()); + try{ + for (Grok grok : grokList) { + final Match gm = grok.match(contentString); + gm.captures(); + final Map localResults = gm.toMap(); + if (!localResults.isEmpty()) { + matchedExpressionList.add(grok.getOriginalGrokPattern()); + results.putAll(localResults); + if (breakOnFirstMatch) { + break; } } + } - flowFile = session.putAllAttributes(flowFile, grokResults); - session.getProvenanceReporter().modifyAttributes(flowFile); - session.transfer(flowFile, REL_MATCH); - getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile}); - - break; - case FLOWFILE_CONTENT: - FlowFile conFlowfile = session.write(flowFile, new StreamCallback() { - @Override - public void process(InputStream in, OutputStream out) throws IOException { - out.write(objectMapper.writeValueAsBytes(gm.toMap())); - } - }); - conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); - session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content with parsed Grok fields and values", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(conFlowfile, REL_MATCH); + if (results.isEmpty()) { + session.transfer(flowFile, REL_NO_MATCH); + getLogger().debug("Did not match any Grok Expression for FlowFile {}", new Object[]{flowFile}); + return; + } + + String matchedExpressions = StringUtils.join(matchedExpressionList, expressionSeparator); + flowFile = session.putAttribute(flowFile, matchedExpressionAttribute, matchedExpressions); - break; + switch (context.getProperty(DESTINATION).getValue()) { + case FLOWFILE_ATTRIBUTE: + Map grokResults = new HashMap<>(); + for (Map.Entry entry : results.entrySet()) { + if (null != entry.getValue()) { + grokResults.put(resultPrefix + entry.getKey(), entry.getValue().toString()); + } + } + flowFile = session.putAllAttributes(flowFile, grokResults); + session.getProvenanceReporter().modifyAttributes(flowFile); + session.transfer(flowFile, REL_MATCH); + getLogger().info("Matched {} Grok Expressions and added attributes to FlowFile {}", new Object[]{grokResults.size(), flowFile}); + break; + case FLOWFILE_CONTENT: + final ObjectMapper objectMapper = new ObjectMapper(); + FlowFile conFlowfile = session.write(flowFile, (in, out) -> { + out.write(objectMapper.writeValueAsBytes(results)); + }); + + conFlowfile = session.putAttribute(conFlowfile, CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); + session.getProvenanceReporter().modifyContent(conFlowfile, "Replaced content with parsed Grok fields and values", stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(conFlowfile, REL_MATCH); + break; + } + }catch(ProcessException t){ + flowFile = session.putAttribute(flowFile, getClass().getSimpleName() + ".exception", t.getMessage()); + session.transfer(flowFile, REL_NO_MATCH); + getLogger().error("Did not match any Grok Expression for FlowFile {}", new Object[]{flowFile}); + return; } } + public static Validator validateMultipleFilesExist() { + return (subject, input, context) -> { + for (String s : input.split(PATTERN_FILE_LIST_SEPARATOR)) { + return StandardValidators.FILE_EXISTS_VALIDATOR.validate(subject, s, context); + } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + }; + } - public static final Validator validateGrokExpression() { - return new Validator() { - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - - Grok grok = new Grok(); - try { - grok.compile(input); - } catch (GrokException | java.util.regex.PatternSyntaxException e) { - return new ValidationResult.Builder() - .subject(subject) - .input(input) - .valid(false) - .explanation("Not a valid Grok Expression - " + e.getMessage()) - .build(); + public static Validator validateGrokExpression() { + return (subject, input, context) -> { + Grok grok = new Grok(); + String separator = context.getProperty(EXPRESSION_SEPARATOR).getValue(); + try { + for (String s : input.split(separator)) { + grok.compile(s); } - - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } catch (GrokException | java.util.regex.PatternSyntaxException e) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Not a valid Grok Expression - " + e.getMessage()) + .build(); } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); }; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java index b5891ad7dd11..ef43b803f993 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java @@ -31,6 +31,7 @@ public class TestExtractGrok { private TestRunner testRunner; + private static final String PATTERNS = "src/test/resources/TestExtractGrok/patterns,src/test/resources/TestExtractGrok/apache_patterns"; private final static Path GROK_LOG_INPUT = Paths.get("src/test/resources/TestExtractGrok/apache.log"); private final static Path GROK_TEXT_INPUT = Paths.get("src/test/resources/TestExtractGrok/simple_text.log"); @@ -42,26 +43,20 @@ public void init() { @Test public void testExtractGrokWithMatchedContent() throws IOException { testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}"); - testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); testRunner.enqueue(GROK_LOG_INPUT); testRunner.run(); testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); - - matched.assertAttributeEquals("grok.verb","GET"); - matched.assertAttributeEquals("grok.response","401"); - matched.assertAttributeEquals("grok.bytes","12846"); - matched.assertAttributeEquals("grok.clientip","64.242.88.10"); - matched.assertAttributeEquals("grok.auth","-"); - matched.assertAttributeEquals("grok.timestamp","07/Mar/2004:16:05:49 -0800"); - matched.assertAttributeEquals("grok.request","/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables"); - matched.assertAttributeEquals("grok.httpversion","1.1"); + assertCommonApacheLog(matched); + matched.assertAttributeEquals("matched_expression", "%{COMMONAPACHELOG}"); + assertUnNamedApacheCaptures(matched); } @Test public void testExtractGrokWithUnMatchedContent() throws IOException { testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{ADDRESS}"); - testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); testRunner.enqueue(GROK_TEXT_INPUT); testRunner.run(); testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_NO_MATCH); @@ -80,7 +75,7 @@ public void testExtractGrokWithNotFoundPatternFile() throws IOException { @Test public void testExtractGrokWithBadGrokExpression() throws IOException { testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{TOTO"); - testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); testRunner.enqueue(GROK_LOG_INPUT); testRunner.assertNotValid(); } @@ -88,24 +83,86 @@ public void testExtractGrokWithBadGrokExpression() throws IOException { @Test public void testExtractGrokWithNamedCapturesOnly() throws IOException { testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{COMMONAPACHELOG}"); - testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, "src/test/resources/TestExtractGrok/patterns"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); + testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "true"); testRunner.enqueue(GROK_LOG_INPUT); testRunner.run(); testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); - matched.assertAttributeEquals("grok.verb","GET"); - matched.assertAttributeEquals("grok.response","401"); - matched.assertAttributeEquals("grok.bytes","12846"); - matched.assertAttributeEquals("grok.clientip","64.242.88.10"); - matched.assertAttributeEquals("grok.auth","-"); - matched.assertAttributeEquals("grok.timestamp","07/Mar/2004:16:05:49 -0800"); - matched.assertAttributeEquals("grok.request","/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables"); - matched.assertAttributeEquals("grok.httpversion","1.1"); + assertCommonApacheLog(matched); + matched.assertAttributeEquals("matched_expression", "%{COMMONAPACHELOG}"); + assertNoUnNamedApacheCaptures(matched); + } + + + @Test + public void testExtractGrokMatchSecondPattern() throws IOException { + final String exp = "%{COMBINEDAPACHELOG},%{COMMONAPACHELOG}"; + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, exp); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); + testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "true"); + testRunner.enqueue(GROK_LOG_INPUT); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); + final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); + assertCommonApacheLog(matched); + matched.assertAttributeEquals("matched_expression", "%{COMMONAPACHELOG}"); + matched.assertAttributeNotExists("grok.program"); + } + @Test + public void testExtractGrokMatchBothOfTwoPatterns() throws IOException { + final String exp = "%{SYSLOGPROG},%{COMMONAPACHELOG}"; + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, exp); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); + testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "false"); + testRunner.setProperty(ExtractGrok.BREAK_ON_FIRST_MATCH, "false"); + testRunner.enqueue(GROK_LOG_INPUT); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); + final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); + assertCommonApacheLog(matched); + matched.assertAttributeEquals("matched_expression", exp); + matched.assertAttributeEquals("grok.program", "64.242.88.10"); + } + + @Test + public void testSetEmptyResultPrefix() throws IOException { + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{SYSLOGPROG}"); + testRunner.setProperty(ExtractGrok.GROK_PATTERN_FILE, PATTERNS); + testRunner.setProperty(ExtractGrok.RESULT_PREFIX_KEY,""); + testRunner.setProperty(ExtractGrok.NAMED_CAPTURES_ONLY, "true"); + + testRunner.enqueue(GROK_LOG_INPUT); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); + final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); + matched.assertAttributeEquals("program", "64.242.88.10"); + } + + private void assertUnNamedApacheCaptures(MockFlowFile matched) { + matched.assertAttributeExists("grok.COMMONAPACHELOG"); + matched.assertAttributeExists("grok.INT"); + matched.assertAttributeExists("grok.HOUR"); + } + + private void assertNoUnNamedApacheCaptures(MockFlowFile matched) { + matched.assertAttributeNotExists("grok.COMMONAPACHELOG"); matched.assertAttributeNotExists("grok.INT"); matched.assertAttributeNotExists("grok.BASE10NUM"); - matched.assertAttributeNotExists("grok.COMMONAPACHELOG"); + } + + private void assertCommonApacheLog(MockFlowFile matched) { + matched.assertAttributeEquals("grok.verb", "GET"); + matched.assertAttributeEquals("grok.response", "401"); + matched.assertAttributeEquals("grok.bytes", "12846"); + matched.assertAttributeEquals("grok.clientip", "64.242.88.10"); + matched.assertAttributeEquals("grok.auth", "-"); + matched.assertAttributeEquals("grok.timestamp", "07/Mar/2004:16:05:49 -0800"); + matched.assertAttributeEquals("grok.request", "/twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables"); + matched.assertAttributeEquals("grok.httpversion", "1.1"); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/apache_patterns b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/apache_patterns new file mode 100644 index 000000000000..63e38b69a4d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/apache_patterns @@ -0,0 +1,6 @@ +COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) +COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} +COMMONAPACHELOG_DATATYPED %{IPORHOST:clientip} %{USER:ident;boolean} %{USER:auth} \[%{HTTPDATE:timestamp;date;dd/MMM/yyyy:HH:mm:ss Z}\] "(?:%{WORD:verb;string} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion;float})?|%{DATA:rawrequest})" %{NUMBER:response;int} (?:%{NUMBER:bytes;long}|-) + +# Log Levels +LOGLEVEL ([A|a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns index 94eaaa8b588c..fbd5a974022d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractGrok/patterns @@ -1,4 +1,3 @@ - USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME:UNWANTED} INT (?:[+-]?(?:[0-9]+)) @@ -96,12 +95,4 @@ QS %{QUOTEDSTRING:UNWANTED} # Log formats SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: -MESSAGESLOG %{SYSLOGBASE} %{DATA} - -COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) -COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} -COMMONAPACHELOG_DATATYPED %{IPORHOST:clientip} %{USER:ident;boolean} %{USER:auth} \[%{HTTPDATE:timestamp;date;dd/MMM/yyyy:HH:mm:ss Z}\] "(?:%{WORD:verb;string} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion;float})?|%{DATA:rawrequest})" %{NUMBER:response;int} (?:%{NUMBER:bytes;long}|-) - - -# Log Levels -LOGLEVEL ([A|a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?) \ No newline at end of file +MESSAGESLOG %{SYSLOGBASE} %{DATA} \ No newline at end of file From c6cd348e416045d69cbb78f07f0996e43e58b760 Mon Sep 17 00:00:00 2001 From: charlesporter Date: Fri, 19 Jan 2018 11:01:54 -0800 Subject: [PATCH 3/3] add apache_patterns to rat excludes --- .../nifi-standard-bundle/nifi-standard-processors/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 3cdd7879ea8a..46a7aed25a30 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -491,6 +491,7 @@ src/test/resources/TestExtractGrok/apache.log src/test/resources/TestExtractGrok/simple_text.log src/test/resources/TestExtractGrok/patterns + src/test/resources/TestExtractGrok/apache_patterns src/test/resources/TestUpdateRecord/input/person.json src/test/resources/TestUpdateRecord/input/person-address.json src/test/resources/TestUpdateRecord/input/person-with-null-array.json