From ca9c19c4d462eeeb8ece4eb5a67dc358349a7343 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Wed, 29 Aug 2018 14:58:15 +0200 Subject: [PATCH 1/4] NIFI-5561 - Add component name filtering to S2S Provenance Reporting Task --- .../provenance/ProvenanceEventConsumer.java | 32 ++++++++++++++++++- .../SiteToSiteProvenanceReportingTask.java | 23 +++++++++++++ ...TestSiteToSiteProvenanceReportingTask.java | 32 +++++++++++++++++++ 3 files changed, 86 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java index 34734754fe7b..515aa4c2aa1c 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java @@ -68,6 +68,8 @@ public class ProvenanceEventConsumer { private String startPositionValue = PROVENANCE_START_POSITION.getDefaultValue(); private Pattern componentTypeRegex; private Pattern componentTypeRegexExclude; + private Pattern componentNameRegex; + private Pattern componentNameRegexExclude; private List eventTypes = new ArrayList<>(); private List eventTypesExclude = new ArrayList<>(); private List componentIds = new ArrayList<>(); @@ -99,6 +101,18 @@ public void setComponentTypeRegexExclude(final String componentTypeRegex) { } } + public void setComponentNameRegex(final String componentNameRegex) { + if (!StringUtils.isBlank(componentNameRegex)) { + this.componentNameRegex = Pattern.compile(componentNameRegex); + } + } + + public void setComponentNameRegexExclude(final String componentNameRegexExclude) { + if (!StringUtils.isBlank(componentNameRegexExclude)) { + this.componentNameRegexExclude = Pattern.compile(componentNameRegexExclude); + } + } + public void addTargetEventType(final ProvenanceEventType... types) { Collections.addAll(eventTypes, types); } @@ -241,7 +255,8 @@ private long updateLastEventId(final List events, final S private boolean isFilteringEnabled() { return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty() - || componentTypeRegexExclude != null || !eventTypesExclude.isEmpty() || !componentIdsExclude.isEmpty(); + || componentTypeRegexExclude != null || !eventTypesExclude.isEmpty() || !componentIdsExclude.isEmpty() + || componentNameRegex != null || componentNameRegexExclude != null; } private List filterEvents(ComponentMapHolder componentMapHolder, List provenanceEvents) { @@ -249,12 +264,15 @@ private List filterEvents(ComponentMapHolder componentMap List filteredEvents = new ArrayList<>(); for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) { + if (!eventTypesExclude.isEmpty() && eventTypesExclude.contains(provenanceEventRecord.getEventType())) { continue; } + if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) { continue; } + final String componentId = provenanceEventRecord.getComponentId(); if (!componentIdsExclude.isEmpty()) { if (componentIdsExclude.contains(componentId)) { @@ -281,6 +299,7 @@ private List filterEvents(ComponentMapHolder componentMap } } } + if (!componentIds.isEmpty() && !componentIds.contains(componentId)) { // If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs // that is being filtered on @@ -305,9 +324,20 @@ private List filterEvents(ComponentMapHolder componentMap if (componentTypeRegexExclude != null && componentTypeRegexExclude.matcher(provenanceEventRecord.getComponentType()).matches()) { continue; } + if (componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) { continue; } + + final String componentName = componentMapHolder.getComponentName(provenanceEventRecord.getComponentId()); + if (componentNameRegexExclude != null && componentName != null && componentNameRegexExclude.matcher(componentName).matches()) { + continue; + } + + if (componentNameRegex != null && componentName != null && !componentNameRegex.matcher(componentName).matches()) { + continue; + } + filteredEvents.add(provenanceEventRecord); } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index ec45c5966583..cd1723af2853 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -151,6 +151,25 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor FILTER_COMPONENT_NAME = new PropertyDescriptor.Builder() + .name("s2s-prov-task-name-filter") + .displayName("Component Name to Include") + .description("Regular expression to filter the provenance events based on the component name. Only the events matching the regular " + + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + + static final PropertyDescriptor FILTER_COMPONENT_NAME_EXCLUDE = new PropertyDescriptor.Builder() + .name("s2s-prov-task-name-filter-exclude") + .displayName("Component Name to Exclude") + .description("Regular expression to exclude the provenance events based on the component name. The events matching the regular " + + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. " + + "If a component name is included in Component Name to Include and excluded here, then the exclusion takes precedence and the event will not be sent.") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder() .name("start-position") .displayName("Start Position") @@ -177,6 +196,8 @@ public void onScheduled(final ConfigurationContext context) throws IOException { // initialize component type filtering consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).getValue()); consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE).getValue()); + consumer.setComponentNameRegex(context.getProperty(FILTER_COMPONENT_NAME).getValue()); + consumer.setComponentNameRegexExclude(context.getProperty(FILTER_COMPONENT_NAME_EXCLUDE).getValue()); final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(), ',')); if(targetEventTypes != null) { @@ -231,6 +252,8 @@ protected List getSupportedPropertyDescriptors() { properties.add(FILTER_COMPONENT_TYPE_EXCLUDE); properties.add(FILTER_COMPONENT_ID); properties.add(FILTER_COMPONENT_ID_EXCLUDE); + properties.add(FILTER_COMPONENT_NAME); + properties.add(FILTER_COMPONENT_NAME_EXCLUDE); properties.add(START_POSITION); return properties; } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java index d39df59c9f8e..9d74807fa62b 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java @@ -272,6 +272,38 @@ public void testFilterComponentTypeSuccess() throws IOException, InitializationE assertEquals(3, task.dataSent.size()); } + @Test + public void testFilterComponentName() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000"); + properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_NAME, "Processor in .*"); + properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_NAME_EXCLUDE, ".*PGB"); + + // A001 has name "Processor in PGA" and should be picked + ProvenanceEventRecord event = createProvenanceEventRecord("A001", "dummy"); + MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(1, task.dataSent.size()); + JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0); + assertEquals("A001", reportedEvent.get("componentId").asText()); + assertEquals("Processor in PGA", reportedEvent.get("componentName").asText()); + + // B001 has name "Processor in PGB" and should not be picked + event = createProvenanceEventRecord("B001", "dummy"); + task = setup(event, properties, 1); + task.initialize(initContext); + task.onScheduled(confContext); + task.onTrigger(context); + + assertEquals(0, task.dataSent.size()); + } + @Test public void testFilterComponentTypeExcludeSuccess() throws IOException, InitializationException { final Map properties = new HashMap<>(); From 9d43fb6d0691122e64dfff65fcc89dd37b7748f2 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Wed, 29 Aug 2018 21:52:37 -0700 Subject: [PATCH 2/4] NIFI-5561 Added regression test for ProvenanceEventConsumer#isFilteringEnabled(). --- .../ProvenanceEventConsumerTest.groovy | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/test/groovy/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumerTest.groovy diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/test/groovy/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumerTest.groovy b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/test/groovy/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumerTest.groovy new file mode 100644 index 000000000000..67811db330d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/test/groovy/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumerTest.groovy @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.provenance + + +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +@RunWith(JUnit4.class) +class ProvenanceEventConsumerTest extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(ProvenanceEventConsumerTest.class) + + @BeforeClass + static void setUpOnce() { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + void setUp() { + super.setUp() + + } + + @After + void tearDown() { + + } + + @Test + void testIsFilteringEnabled() { + // Arrange + ProvenanceEventConsumer noFilterPopulated = new ProvenanceEventConsumer() + + Map patternFilteredConsumers = [:] + def regexProperties = ["componentTypeRegex", "componentTypeRegexExclude", "componentNameRegex", "componentNameRegexExclude"] + regexProperties.each { String prop -> + ProvenanceEventConsumer consumer = new ProvenanceEventConsumer() + consumer."${prop}" = prop + logger.info("Created regex-filtered PEC with ${prop} set") + patternFilteredConsumers[prop] = consumer + } + + Map listFilteredConsumers = [:] + def listProperties = ["eventTypes", "eventTypesExclude", "componentIds", "componentIdsExclude"] + listProperties.each { String prop -> + ProvenanceEventConsumer consumer = new ProvenanceEventConsumer() + consumer."${prop}" = [prop] + logger.info("Created list-filtered PEC with ${prop} set") + listFilteredConsumers[prop] = consumer + } + + def allFilteredConsumers = patternFilteredConsumers + listFilteredConsumers + logger.info("Created ${allFilteredConsumers.size()} filtered consumers") + + // Act + boolean unfilteredConsumerHasFilteringEnabled = noFilterPopulated.isFilteringEnabled() + logger.info("Unfiltered PEC has filtering enabled: ${unfilteredConsumerHasFilteringEnabled}") + + def filteredResults = allFilteredConsumers.collectEntries { prop, consumer -> + [prop, consumer.isFilteringEnabled()] + } + logger.info("Filtered PEC results: ${filteredResults}") + + // Assert + assert !unfilteredConsumerHasFilteringEnabled + assert filteredResults.every() + } +} From a3e14eb3ee857d0ab6a4900cb918e8b6da1f4506 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Wed, 29 Aug 2018 22:03:35 -0700 Subject: [PATCH 3/4] NIFI-5561 Changed isFilteringEnabled implementation to be expandable as other attributes are added using Streams. --- .../provenance/ProvenanceEventConsumer.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java index 515aa4c2aa1c..02b37c07285e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java @@ -36,8 +36,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiConsumer; import java.util.regex.Pattern; +import java.util.stream.Stream; public class ProvenanceEventConsumer { @@ -254,9 +256,19 @@ private long updateLastEventId(final List events, final S private boolean isFilteringEnabled() { - return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty() - || componentTypeRegexExclude != null || !eventTypesExclude.isEmpty() || !componentIdsExclude.isEmpty() - || componentNameRegex != null || componentNameRegexExclude != null; + // Collect all non-blank patterns + boolean anyPatternPresent = Stream.of(componentTypeRegex, componentTypeRegexExclude, componentNameRegex, componentNameRegexExclude) + .filter(Objects::nonNull) + .map(Pattern::toString) + .anyMatch(StringUtils::isNotBlank); + + // Collect all non-empty lists + boolean anyListPresent = Stream.of(eventTypes, eventTypesExclude, componentIds, componentIdsExclude) + .filter(Objects::nonNull) + .anyMatch(list -> !list.isEmpty()); + + // If either is present, filtering is enabled + return anyPatternPresent || anyListPresent; } private List filterEvents(ComponentMapHolder componentMapHolder, List provenanceEvents) { From 2d82162db8d68ec259d7cbca639c30538e7b4d49 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 30 Aug 2018 13:25:10 +0200 Subject: [PATCH 4/4] EL + indentation --- .../SiteToSiteProvenanceReportingTask.java | 116 ++++++++++-------- 1 file changed, 62 insertions(+), 54 deletions(-) diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index cd1723af2853..7fcc3b1b15a7 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -78,29 +78,30 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask { static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream", - "Start reading provenance Events from the beginning of the stream (the oldest event first)"); + "Start reading provenance Events from the beginning of the stream (the oldest event first)"); static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream", - "Start reading provenance Events from the end of the stream, ignoring old events"); + "Start reading provenance Events from the end of the stream, ignoring old events"); static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() - .name("Platform") - .displayName("Platform") - .description("The value to use for the platform field in each provenance event.") - .required(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .defaultValue("nifi") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Platform") + .displayName("Platform") + .description("The value to use for the platform field in each provenance event.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder() - .name("s2s-prov-task-event-filter") - .displayName("Event Type to Include") - .description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. " - + "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If " - + "multiple filters are set, the filters are cumulative.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("s2s-prov-task-event-filter") + .displayName("Event Type to Include") + .description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. " + + "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If " + + "multiple filters are set, the filters are cumulative.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder() .name("s2s-prov-task-event-filter-exclude") @@ -110,17 +111,19 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti + "multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the " + "exclusion takes precedence and the event will not be sent.") .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder() - .name("s2s-prov-task-type-filter") - .displayName("Component Type to Include") - .description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular " - + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") - .required(false) - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .build(); + .name("s2s-prov-task-type-filter") + .displayName("Component Type to Include") + .description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular " + + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder() .name("s2s-prov-task-type-filter-exclude") @@ -129,17 +132,19 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. " + "If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.") .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder() - .name("s2s-prov-task-id-filter") - .displayName("Component ID to Include") - .description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no " - + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("s2s-prov-task-id-filter") + .displayName("Component ID to Include") + .description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no " + + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder() .name("s2s-prov-task-id-filter-exclude") @@ -148,17 +153,19 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in " + "Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.") .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); static final PropertyDescriptor FILTER_COMPONENT_NAME = new PropertyDescriptor.Builder() - .name("s2s-prov-task-name-filter") - .displayName("Component Name to Include") - .description("Regular expression to filter the provenance events based on the component name. Only the events matching the regular " - + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") - .required(false) - .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .build(); + .name("s2s-prov-task-name-filter") + .displayName("Component Name to Include") + .description("Regular expression to filter the provenance events based on the component name. Only the events matching the regular " + + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); static final PropertyDescriptor FILTER_COMPONENT_NAME_EXCLUDE = new PropertyDescriptor.Builder() .name("s2s-prov-task-name-filter-exclude") @@ -167,17 +174,18 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. " + "If a component name is included in Component Name to Include and excluded here, then the exclusion takes precedence and the event will not be sent.") .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) .build(); static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder() - .name("start-position") - .displayName("Start Position") - .description("If the Reporting Task has never been run, or if its state has been reset by a user, specifies where in the stream of Provenance Events the Reporting Task should start") - .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM) - .defaultValue(BEGINNING_OF_STREAM.getValue()) - .required(true) - .build(); + .name("start-position") + .displayName("Start Position") + .description("If the Reporting Task has never been run, or if its state has been reset by a user, specifies where in the stream of Provenance Events the Reporting Task should start") + .allowableValues(BEGINNING_OF_STREAM, END_OF_STREAM) + .defaultValue(BEGINNING_OF_STREAM.getValue()) + .required(true) + .build(); private volatile ProvenanceEventConsumer consumer; @@ -194,12 +202,12 @@ public void onScheduled(final ConfigurationContext context) throws IOException { consumer.setLogger(getLogger()); // initialize component type filtering - consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).getValue()); - consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE).getValue()); - consumer.setComponentNameRegex(context.getProperty(FILTER_COMPONENT_NAME).getValue()); - consumer.setComponentNameRegexExclude(context.getProperty(FILTER_COMPONENT_NAME_EXCLUDE).getValue()); + consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).evaluateAttributeExpressions().getValue()); + consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE).evaluateAttributeExpressions().getValue()); + consumer.setComponentNameRegex(context.getProperty(FILTER_COMPONENT_NAME).evaluateAttributeExpressions().getValue()); + consumer.setComponentNameRegexExclude(context.getProperty(FILTER_COMPONENT_NAME_EXCLUDE).evaluateAttributeExpressions().getValue()); - final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(), ',')); + final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).evaluateAttributeExpressions().getValue(), ',')); if(targetEventTypes != null) { for(String type : targetEventTypes) { try { @@ -210,7 +218,7 @@ public void onScheduled(final ConfigurationContext context) throws IOException { } } - final String[] targetEventTypesExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE).getValue(), ',')); + final String[] targetEventTypesExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE).evaluateAttributeExpressions().getValue(), ',')); if(targetEventTypesExclude != null) { for(String type : targetEventTypesExclude) { try { @@ -222,12 +230,12 @@ public void onScheduled(final ConfigurationContext context) throws IOException { } // initialize component ID filtering - final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(), ',')); + final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).evaluateAttributeExpressions().getValue(), ',')); if(targetComponentIds != null) { consumer.addTargetComponentId(targetComponentIds); } - final String[] targetComponentIdsExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE).getValue(), ',')); + final String[] targetComponentIdsExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE).evaluateAttributeExpressions().getValue(), ',')); if(targetComponentIdsExclude != null) { consumer.addTargetComponentIdExclude(targetComponentIdsExclude); }