From ac40edf16b32dbb686a51e2c2d0b9363252f5356 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Sat, 17 Jun 2017 12:41:19 +0200 Subject: [PATCH 1/2] NIFI-3964 KeyValue lookup service and Grok patterns controller --- .../nifi/processors/standard/ExtractGrok.java | 29 +++++++++++++++++ .../processors/standard/TestExtractGrok.java | 29 ++++++++++++++++- .../nifi/lookup/KeyValueLookupService.java | 31 +++++++++++++++++++ .../lookup/SimpleKeyValueLookupService.java | 7 ++++- .../pom.xml | 4 +++ .../java/org/apache/nifi/grok/GrokReader.java | 18 +++++++++++ 6 files changed, 116 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/KeyValueLookupService.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java index 5896b939f0d8..a4666fa504d4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java @@ -22,6 +22,7 @@ import io.krakens.grok.api.GrokCompiler; import io.krakens.grok.api.Match; import io.krakens.grok.api.exception.GrokException; + import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -36,6 +37,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.lookup.KeyValueLookupService; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; @@ -103,6 +105,15 @@ public class ExtractGrok extends AbstractProcessor { .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) .build(); + public static final PropertyDescriptor PATTERN_CONTROLLER = new PropertyDescriptor.Builder() + .name("grok-patterns-controller") + .displayName("Grok Patterns Controller") + .description("Controller service with dynamic properties to add custom patterns. " + + "It can be used in combination with the pattern file property.") + .identifiesControllerService(KeyValueLookupService.class) + .required(false) + .build(); + public static final PropertyDescriptor KEEP_EMPTY_CAPTURES = new PropertyDescriptor.Builder() .name("Keep Empty Captures") .description("If true, then empty capture values will be included in the returned capture map.") @@ -177,6 +188,7 @@ public class ExtractGrok extends AbstractProcessor { final List _descriptors = new ArrayList<>(); _descriptors.add(GROK_EXPRESSION); _descriptors.add(GROK_PATTERN_FILE); + _descriptors.add(PATTERN_CONTROLLER); _descriptors.add(DESTINATION); _descriptors.add(CHARACTER_SET); _descriptors.add(MAX_BUFFER_SIZE); @@ -230,6 +242,14 @@ protected Collection customValidate(final ValidationContext va grokCompiler.register(reader); } } + + if(validationContext.getProperty(PATTERN_CONTROLLER).isSet()) { + Map patterns = validationContext.getProperty(PATTERN_CONTROLLER).asControllerService(KeyValueLookupService.class).getAll(); + for(String name : patterns.keySet()) { + grokCompiler.register(name, patterns.get(name)); + } + } + grok = grokCompiler.compile(input, namedCaptures); } catch (final Exception e) { problems.add(new ValidationResult.Builder() @@ -242,6 +262,7 @@ protected Collection customValidate(final ValidationContext va } problems.add(new ValidationResult.Builder().subject(subject).input(input).valid(true).build()); + return problems; } @@ -269,6 +290,14 @@ public void onScheduled(final ProcessContext context) throws GrokException, IOEx grokCompiler.register(reader); } } + + if(context.getProperty(PATTERN_CONTROLLER).isSet()) { + Map patterns = context.getProperty(PATTERN_CONTROLLER).asControllerService(KeyValueLookupService.class).getAll(); + for(String name : patterns.keySet()) { + grokCompiler.register(name, patterns.get(name)); + } + } + grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean()); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java index bc62cb4c8e59..3fd4de6e5428 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java @@ -17,6 +17,8 @@ package org.apache.nifi.processors.standard; +import org.apache.nifi.lookup.SimpleKeyValueLookupService; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -27,7 +29,6 @@ import java.nio.file.Path; import java.nio.file.Paths; - public class TestExtractGrok { private TestRunner testRunner; @@ -139,4 +140,30 @@ public void testExtractGrokWithNamedCapturesOnly() throws IOException { matched.assertAttributeNotExists("grok.BASE10NUM"); matched.assertAttributeNotExists("grok.COMMONAPACHELOG"); } + + public void testExtractGrokWithPatternController() throws IOException, InitializationException { + final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService(); + testRunner.addControllerService("grok-patterns-controller", service); + testRunner.setProperty(service, "GREEDYDATA", ".*"); + testRunner.enableControllerService(service); + testRunner.assertValid(service); + + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{GREEDYDATA:text}"); + testRunner.setProperty(ExtractGrok.PATTERN_CONTROLLER, "grok-patterns-controller"); + + testRunner.enqueue(GROK_TEXT_INPUT); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(ExtractGrok.REL_MATCH); + final MockFlowFile matched = testRunner.getFlowFilesForRelationship(ExtractGrok.REL_MATCH).get(0); + matched.assertAttributeEquals("grok.text","simple text not an apache log"); + } + + @Test + public void testExtractGrokUnsetProperties() throws IOException { + testRunner.setProperty(ExtractGrok.GROK_EXPRESSION, "%{TEST}"); + testRunner.enqueue(GROK_LOG_INPUT); + testRunner.assertNotValid(); + } + } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/KeyValueLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/KeyValueLookupService.java new file mode 100644 index 000000000000..d0ca15d0e4a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/KeyValueLookupService.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import java.util.Map; + +public interface KeyValueLookupService extends StringLookupService { + + /** + * Method to return all the Key/Value properties defined in the controller service. + * + * @return All the defined key/value dynamic properties + */ + Map getAll(); + +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java index 2f99c36024d0..ad9fcc459b33 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java @@ -36,7 +36,7 @@ @Tags({"lookup", "enrich", "key", "value"}) @CapabilityDescription("Allows users to add key/value pairs as User-defined Properties. Each property that is added can be looked up by Property Name. " + "The coordinates that are passed to the lookup must contain the key 'key'.") -public class SimpleKeyValueLookupService extends AbstractControllerService implements StringLookupService { +public class SimpleKeyValueLookupService extends AbstractControllerService implements KeyValueLookupService { private static final String KEY = "key"; private static final Set REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet()); private volatile Map lookupValues = new HashMap<>(); @@ -72,6 +72,11 @@ public Optional lookup(final Map coordinates) { return Optional.ofNullable(lookupValues.get(key)); } + @Override + public Map getAll() { + return lookupValues; + } + @Override public Set getRequiredKeys() { return REQUIRED_KEYS; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 30694b50d9c9..81bfc7ce23c7 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -29,6 +29,10 @@ nifi-utils 1.8.0-SNAPSHOT + + org.apache.nifi + nifi-lookup-service-api + org.apache.nifi nifi-record-serialization-service-api diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 9e7293bc5ad1..55af547194d0 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -21,6 +21,7 @@ import io.krakens.grok.api.GrokCompiler; import io.krakens.grok.api.GrokUtils; import io.krakens.grok.api.exception.GrokException; + import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -31,6 +32,7 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.lookup.KeyValueLookupService; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaField; @@ -95,6 +97,14 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac .required(false) .build(); + static final PropertyDescriptor PATTERN_CONTROLLER = new PropertyDescriptor.Builder() + .name("grok-patterns-controller") + .displayName("Grok Patterns Controller") + .description("Controller service with dynamic properties to add custom patterns. It can be used in combination with the pattern file property.") + .identifiesControllerService(KeyValueLookupService.class) + .required(false) + .build(); + static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() .name("Grok Expression") .description("Specifies the format of a log line in Grok format. This allows the Record Reader to understand how to parse each log line. " @@ -118,6 +128,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); properties.add(PATTERN_FILE); + properties.add(PATTERN_CONTROLLER); properties.add(GROK_EXPRESSION); properties.add(NO_MATCH_BEHAVIOR); return properties; @@ -139,6 +150,13 @@ public void preCompile(final ConfigurationContext context) throws GrokException, } } + if(context.getProperty(PATTERN_CONTROLLER).isSet()) { + Map patterns = context.getProperty(PATTERN_CONTROLLER).asControllerService(KeyValueLookupService.class).getAll(); + for(String name : patterns.keySet()) { + grokCompiler.register(name, patterns.get(name)); + } + } + grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue()); appendUnmatchedLine = context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue()); From fbf21f4414af320041ac46c1321f9f5a2b77be9a Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Wed, 18 Jul 2018 18:52:12 +0200 Subject: [PATCH 2/2] .register(Map<>) --- .../apache/nifi/processors/standard/ExtractGrok.java | 10 ++-------- .../nifi/processors/standard/TestExtractGrok.java | 5 ++++- .../src/main/java/org/apache/nifi/grok/GrokReader.java | 5 +---- 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java index a4666fa504d4..1d0100029bcf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractGrok.java @@ -244,10 +244,7 @@ protected Collection customValidate(final ValidationContext va } if(validationContext.getProperty(PATTERN_CONTROLLER).isSet()) { - Map patterns = validationContext.getProperty(PATTERN_CONTROLLER).asControllerService(KeyValueLookupService.class).getAll(); - for(String name : patterns.keySet()) { - grokCompiler.register(name, patterns.get(name)); - } + grokCompiler.register(validationContext.getProperty(PATTERN_CONTROLLER).asControllerService(KeyValueLookupService.class).getAll()); } grok = grokCompiler.compile(input, namedCaptures); @@ -292,10 +289,7 @@ public void onScheduled(final ProcessContext context) throws GrokException, IOEx } if(context.getProperty(PATTERN_CONTROLLER).isSet()) { - Map patterns = context.getProperty(PATTERN_CONTROLLER).asControllerService(KeyValueLookupService.class).getAll(); - for(String name : patterns.keySet()) { - grokCompiler.register(name, patterns.get(name)); - } + grokCompiler.register(context.getProperty(PATTERN_CONTROLLER).asControllerService(KeyValueLookupService.class).getAll()); } grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue(), context.getProperty(NAMED_CAPTURES_ONLY).asBoolean()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java index 3fd4de6e5428..b0c18f81117b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractGrok.java @@ -141,10 +141,13 @@ public void testExtractGrokWithNamedCapturesOnly() throws IOException { matched.assertAttributeNotExists("grok.COMMONAPACHELOG"); } + @Test public void testExtractGrokWithPatternController() throws IOException, InitializationException { final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService(); testRunner.addControllerService("grok-patterns-controller", service); - testRunner.setProperty(service, "GREEDYDATA", ".*"); + testRunner.setProperty(service, "GREEDYDATA", "%{SIMPLE} not an %{APACHE}"); + testRunner.setProperty(service, "SIMPLE", "simple text"); + testRunner.setProperty(service, "APACHE", "apache log"); testRunner.enableControllerService(service); testRunner.assertValid(service); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 55af547194d0..2205e97e2520 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -151,10 +151,7 @@ public void preCompile(final ConfigurationContext context) throws GrokException, } if(context.getProperty(PATTERN_CONTROLLER).isSet()) { - Map patterns = context.getProperty(PATTERN_CONTROLLER).asControllerService(KeyValueLookupService.class).getAll(); - for(String name : patterns.keySet()) { - grokCompiler.register(name, patterns.get(name)); - } + grokCompiler.register(context.getProperty(PATTERN_CONTROLLER).asControllerService(KeyValueLookupService.class).getAll()); } grok = grokCompiler.compile(context.getProperty(GROK_EXPRESSION).getValue());