From 3371a07068dbbd5b3984f386cf61bc297ba608a9 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 6 Apr 2018 16:58:33 +0200 Subject: [PATCH 1/2] NIFIREG-160 - Initial hook provider --- .../registry/extension/ExtensionManager.java | 2 + .../registry/provider/ProviderFactory.java | 8 + .../provider/StandardProviderFactory.java | 53 ++++++ .../flow/StandardFlowSnapshotContext.java | 9 ++ .../provider/hook/ScriptFlowHookProvider.java | 152 ++++++++++++++++++ .../registry/service/RegistryService.java | 89 ++++++++++ ...apache.nifi.registry.hook.FlowHookProvider | 15 ++ .../src/main/xsd/providers.xsd | 1 + .../flow/TestStandardFlowSnapshotContext.java | 4 +- .../hook/TestScriptFlowHookProvider.java | 45 ++++++ .../registry/service/TestRegistryService.java | 5 +- .../provider/hook/bad-script-provider.xml | 30 ++++ .../registry/flow/FlowSnapshotContext.java | 2 +- .../nifi/registry/hook/FlowHookEvent.java | 31 ++++ .../nifi/registry/hook/FlowHookException.java | 31 ++++ .../nifi/registry/hook/FlowHookProvider.java | 81 ++++++++++ .../src/main/resources/conf/providers.xml | 9 ++ 17 files changed, 564 insertions(+), 3 deletions(-) create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java create mode 100644 nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider create mode 100644 nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java create mode 100644 nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java index 4c8b5ac94..7f2b9e14e 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java @@ -22,6 +22,7 @@ import org.apache.nifi.registry.security.authorization.Authorizer; import org.apache.nifi.registry.security.authorization.UserGroupProvider; import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ public class ExtensionManager { classes.add(AccessPolicyProvider.class); classes.add(Authorizer.class); classes.add(IdentityProvider.class); + classes.add(FlowHookProvider.class); EXTENSION_CLASSES = Collections.unmodifiableList(classes); } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java index d5b40a22f..bb62eb29a 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java @@ -16,7 +16,10 @@ */ package org.apache.nifi.registry.provider; +import java.util.List; + import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.hook.FlowHookProvider; /** * A factory for obtaining the configured providers. @@ -35,4 +38,9 @@ public interface ProviderFactory { */ FlowPersistenceProvider getFlowPersistenceProvider(); + /** + * @return the configured FlowHookProviders + */ + List getFlowHookProviders(); + } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java index 5f2da59c7..34f307cad 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java @@ -18,6 +18,7 @@ import org.apache.nifi.registry.extension.ExtensionManager; import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.apache.nifi.registry.provider.generated.Property; import org.apache.nifi.registry.provider.generated.Providers; @@ -39,6 +40,7 @@ import javax.xml.validation.SchemaFactory; import java.io.File; import java.lang.reflect.Constructor; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -72,6 +74,7 @@ private static JAXBContext initializeJaxbContext() { private final AtomicReference providersHolder = new AtomicReference<>(null); private FlowPersistenceProvider flowPersistenceProvider; + private List flowHookProviders; @Autowired public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) { @@ -151,6 +154,56 @@ public synchronized FlowPersistenceProvider getFlowPersistenceProvider() { return flowPersistenceProvider; } + @Bean + @Override + public List getFlowHookProviders() { + if (flowHookProviders == null) { + flowHookProviders = new ArrayList(); + + if (providersHolder.get() == null) { + throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider"); + } + + final Providers providers = providersHolder.get(); + final List jaxbFlowHookProvider = providers.getFlowHookProvider(); + + if(jaxbFlowHookProvider == null || jaxbFlowHookProvider.isEmpty()) { + // no hook provided + return flowHookProviders; + } + + for (org.apache.nifi.registry.provider.generated.Provider flowHookProvider : jaxbFlowHookProvider) { + + final String flowHookProviderClassName = flowHookProvider.getClazz(); + FlowHookProvider hook; + + try { + final ClassLoader classLoader = extensionManager.getExtensionClassLoader(flowHookProviderClassName); + if (classLoader == null) { + throw new IllegalStateException("Extension not found in any of the configured class loaders: " + flowHookProviderClassName); + } + + final Class rawFlowHookProviderClass = Class.forName(flowHookProviderClassName, true, classLoader); + final Class flowHookProviderClass = rawFlowHookProviderClass.asSubclass(FlowHookProvider.class); + + final Constructor constructor = flowHookProviderClass.getConstructor(); + hook = (FlowHookProvider) constructor.newInstance(); + + LOGGER.info("Instantiated FlowHookProvider with class name {}", new Object[] {flowHookProviderClassName}); + } catch (Exception e) { + throw new ProviderFactoryException("Error creating FlowHookProvider with class name: " + flowHookProviderClassName, e); + } + + final ProviderConfigurationContext configurationContext = createConfigurationContext(flowHookProvider.getProperty()); + hook.onConfigured(configurationContext); + flowHookProviders.add(hook); + LOGGER.info("Configured FlowHookProvider with class name {}", new Object[] {flowHookProviderClassName}); + } + } + + return flowHookProviders; + } + private ProviderConfigurationContext createConfigurationContext(final List configProperties) { final Map properties = new HashMap<>(); diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java index 513f37fa2..1f6f84073 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java @@ -33,6 +33,7 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext { private final String flowName; private final int version; private final String comments; + private final String author; private final long snapshotTimestamp; private final String author; @@ -43,6 +44,7 @@ private StandardFlowSnapshotContext(final Builder builder) { this.flowName = builder.flowName; this.version = builder.version; this.comments = builder.comments; + this.author = builder.author; this.snapshotTimestamp = builder.snapshotTimestamp; this.author = builder.author; @@ -105,6 +107,7 @@ public static class Builder { private String flowName; private int version; private String comments; + private String author; private long snapshotTimestamp; private String author; @@ -119,6 +122,7 @@ public Builder(final Bucket bucket, final VersionedFlow versionedFlow, final Ver flowName(versionedFlow.getName()); version(snapshotMetadata.getVersion()); comments(snapshotMetadata.getComments()); + author(snapshotMetadata.getAuthor()); snapshotTimestamp(snapshotMetadata.getTimestamp()); author(snapshotMetadata.getAuthor()); } @@ -153,6 +157,11 @@ public Builder comments(final String comments) { return this; } + public Builder author(final String author) { + this.author = author; + return this; + } + public Builder snapshotTimestamp(final long snapshotTimestamp) { this.snapshotTimestamp = snapshotTimestamp; return this; diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java new file mode 100644 index 000000000..d57df1658 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.hook; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.hook.FlowHookEvent; +import org.apache.nifi.registry.hook.FlowHookException; +import org.apache.nifi.registry.hook.FlowHookProvider; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A FlowHookProvider that is used to execute a script before a flow snapshot version is committed. + */ +public class ScriptFlowHookProvider implements FlowHookProvider { + + static final Logger LOGGER = LoggerFactory.getLogger(ScriptFlowHookProvider.class); + static final String SCRIPT_PATH_PROP = "Script Path"; + static final String SCRIPT_WORKDIR_PROP = "Working Directory"; + private File scriptFile; + private File workDirFile; + + @Override + public void postCreateBucket(String bucketId) throws FlowHookException { + this.executeScript(FlowHookEvent.CREATE_BUCKET, bucketId, null, null, null, null); + } + + @Override + public void postCreateFlow(String bucketId, String flowId) throws FlowHookException { + this.executeScript(FlowHookEvent.CREATE_FLOW, bucketId, flowId, null, null, null); + } + + @Override + public void postDeleteBucket(String bucketId) throws FlowHookException { + this.executeScript(FlowHookEvent.DELETE_BUCKET, bucketId, null, null, null, null); + } + + @Override + public void postDeleteFlow(String bucketId, String flowId) throws FlowHookException { + this.executeScript(FlowHookEvent.DELETE_FLOW, bucketId, flowId, null, null, null); + } + + @Override + public void postDeleteFlowVersion(String bucketId, String flowId, int version) throws FlowHookException { + this.executeScript(FlowHookEvent.DELETE_FLOW, bucketId, flowId, Integer.toString(version), null, null); + } + + @Override + public void postUpdateBucket(String bucketId) throws FlowHookException { + this.executeScript(FlowHookEvent.UPDATE_BUCKET, bucketId, null, null, null, null); + } + + @Override + public void postUpdateFlow(String bucketId, String flowId) throws FlowHookException { + this.executeScript(FlowHookEvent.UPDATE_FLOW, bucketId, flowId, null, null, null); + } + + @Override + public void postCreateFlowVersion(final FlowSnapshotContext flowSnapshotContext) throws FlowHookException { + this.executeScript(FlowHookEvent.CREATE_VERSION, flowSnapshotContext.getBucketId(), flowSnapshotContext.getFlowId(), + Integer.toString(flowSnapshotContext.getVersion()), flowSnapshotContext.getComments(), flowSnapshotContext.getAuthor()); + } + + private void executeScript(final FlowHookEvent eventType, String bucketId, String flowId, String version, String comment, String author) { + List command = new ArrayList(); + command.add(scriptFile.getAbsolutePath()); + command.add(eventType.name()); + command.add(bucketId); + + if(flowId != null) { + command.add(flowId); + } + + if(version != null) { + command.add(version); + } + + if(comment != null) { + command.add(comment); + } + + if(author != null) { + command.add(author); + } + + final String commandString = StringUtils.join(command, " "); + final ProcessBuilder builder = new ProcessBuilder(command); + builder.directory(workDirFile); + LOGGER.debug("Execution of " + commandString); + + try { + builder.start(); + } catch (IOException e) { + LOGGER.error("Execution of {0} failed with: {1}", new Object[] { commandString, e.getLocalizedMessage() }, e); + } + } + + @Override + public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException { + final Map props = configurationContext.getProperties(); + if (!props.containsKey(SCRIPT_PATH_PROP)) { + throw new ProviderCreationException("The property " + SCRIPT_PATH_PROP + " must be provided"); + } + + final String scripPath = props.get(SCRIPT_PATH_PROP); + if (StringUtils.isBlank(scripPath)) { + throw new ProviderCreationException("The property " + SCRIPT_PATH_PROP + " cannot be null or blank"); + } + + if(props.containsKey(SCRIPT_WORKDIR_PROP) && !StringUtils.isBlank(props.get(SCRIPT_WORKDIR_PROP))) { + final String workdir = props.get(SCRIPT_WORKDIR_PROP); + try { + workDirFile = new File(workdir); + FileUtils.ensureDirectoryExistAndCanRead(workDirFile); + } catch (IOException e) { + throw new ProviderCreationException("The working directory " + workdir + " cannot be read."); + } + } + + scriptFile = new File(scripPath); + if(scriptFile.isFile() && scriptFile.canExecute()) { + LOGGER.info("Configured ScriptFlowHookProvider with script {}", new Object[] {scriptFile.getAbsolutePath()}); + } else { + throw new ProviderCreationException("The script file " + scriptFile.getAbsolutePath() + " cannot be executed."); + } + } + +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java index 23f1d14a9..95c62c39f 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java @@ -42,6 +42,7 @@ import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; +import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; import org.apache.nifi.registry.serialization.Serializer; import org.slf4j.Logger; @@ -84,6 +85,7 @@ public class RegistryService { private final MetadataService metadataService; private final FlowPersistenceProvider flowPersistenceProvider; + private final List flowHookProviders; private final Serializer processGroupSerializer; private final Validator validator; @@ -94,10 +96,12 @@ public class RegistryService { @Autowired public RegistryService(final MetadataService metadataService, final FlowPersistenceProvider flowPersistenceProvider, + final List flowHookProviders, final Serializer processGroupSerializer, final Validator validator) { this.metadataService = metadataService; this.flowPersistenceProvider = flowPersistenceProvider; + this.flowHookProviders = flowHookProviders; this.processGroupSerializer = processGroupSerializer; this.validator = validator; Validate.notNull(this.metadataService); @@ -134,6 +138,17 @@ public Bucket createBucket(final Bucket bucket) { } final BucketEntity createdBucket = metadataService.createBucket(DataModelMapper.map(bucket)); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postCreateBucket(createdBucket.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(createdBucket); } finally { writeLock.unlock(); @@ -225,6 +240,17 @@ public Bucket updateBucket(final Bucket bucket) { // perform the actual update final BucketEntity updatedBucket = metadataService.updateBucket(existingBucketById); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postUpdateBucket(existingBucketById.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(updatedBucket); } finally { writeLock.unlock(); @@ -253,6 +279,16 @@ public Bucket deleteBucket(final String bucketIdentifier) { // now delete the bucket from the metadata provider, which deletes all flows referencing it metadataService.deleteBucket(existingBucket); + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postDeleteBucket(existingBucket.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket); } finally { writeLock.unlock(); @@ -356,6 +392,17 @@ public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFl // persist the flow and return the created entity final FlowEntity createdFlow = metadataService.createFlow(flowEntity); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postCreateFlow(existingBucket.getId(), createdFlow.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket, createdFlow); } finally { writeLock.unlock(); @@ -497,6 +544,17 @@ public VersionedFlow updateFlow(final VersionedFlow versionedFlow) { // perform the actual update final FlowEntity updatedFlow = metadataService.updateFlow(existingFlow); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postUpdateFlow(existingFlow.getBucketId(), existingFlow.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket, updatedFlow); } finally { writeLock.unlock(); @@ -537,6 +595,16 @@ public VersionedFlow deleteFlow(final String bucketIdentifier, final String flow // now delete the flow from the metadata provider metadataService.deleteFlow(existingFlow); + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postDeleteFlow(existingFlow.getBucketId(), existingFlow.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket, existingFlow); } finally { writeLock.unlock(); @@ -630,6 +698,16 @@ public VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flow } final VersionedFlow updatedVersionedFlow = DataModelMapper.map(existingBucket, updatedFlow); + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postCreateFlowVersion(context); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + flowSnapshot.setBucket(bucket); flowSnapshot.setFlow(updatedVersionedFlow); return flowSnapshot; @@ -879,6 +957,17 @@ public VersionedFlowSnapshotMetadata deleteFlowSnapshot(final String bucketIdent // delete the snapshot itself metadataService.deleteFlowSnapshot(snapshotEntity); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postDeleteFlowVersion(bucketIdentifier, flowIdentifier, version); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket, snapshotEntity); } finally { writeLock.unlock(); diff --git a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider new file mode 100644 index 000000000..150103aee --- /dev/null +++ b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.registry.provider.hook.ScriptFlowHookProvider \ No newline at end of file diff --git a/nifi-registry-framework/src/main/xsd/providers.xsd b/nifi-registry-framework/src/main/xsd/providers.xsd index 1202f9efd..16086af59 100644 --- a/nifi-registry-framework/src/main/xsd/providers.xsd +++ b/nifi-registry-framework/src/main/xsd/providers.xsd @@ -43,6 +43,7 @@ + diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestStandardFlowSnapshotContext.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestStandardFlowSnapshotContext.java index 3e0a106b4..bff2ef915 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestStandardFlowSnapshotContext.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestStandardFlowSnapshotContext.java @@ -17,7 +17,6 @@ package org.apache.nifi.registry.provider.flow; import org.apache.nifi.registry.flow.FlowSnapshotContext; -import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; import org.junit.Assert; import org.junit.Test; @@ -31,6 +30,7 @@ public void testBuilder() { final String flowName = "Some Flow"; final int version = 2; final String comments = "Some Comments"; + final String author = "anonymous"; final long timestamp = System.currentTimeMillis(); final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder() @@ -40,6 +40,7 @@ public void testBuilder() { .flowName(flowName) .version(version) .comments(comments) + .author(author) .snapshotTimestamp(timestamp) .build(); @@ -49,6 +50,7 @@ public void testBuilder() { Assert.assertEquals(flowName, context.getFlowName()); Assert.assertEquals(version, context.getVersion()); Assert.assertEquals(comments, context.getComments()); + Assert.assertEquals(author, context.getAuthor()); Assert.assertEquals(timestamp, context.getSnapshotTimestamp()); } diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java new file mode 100644 index 000000000..5bc888cb7 --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.hook; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import org.apache.nifi.registry.extension.ExtensionManager; +import org.apache.nifi.registry.properties.NiFiRegistryProperties; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.provider.ProviderFactory; +import org.apache.nifi.registry.provider.StandardProviderFactory; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestScriptFlowHookProvider { + + @Test(expected = ProviderCreationException.class) + public void testBadScriptProvider() { + final NiFiRegistryProperties props = new NiFiRegistryProperties(); + props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/hook/bad-script-provider.xml"); + + final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class); + when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader()); + + final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager); + providerFactory.initialize(); + providerFactory.getFlowHookProviders(); + } + +} diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java index 95e2d1a8c..1f21248e9 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java @@ -30,6 +30,7 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.serialization.Serializer; import org.apache.nifi.registry.serialization.VersionedProcessGroupSerializer; import org.junit.Assert; @@ -72,6 +73,7 @@ public class TestRegistryService { private MetadataService metadataService; private FlowPersistenceProvider flowPersistenceProvider; + private FlowHookProvider flowHookProvider; private Serializer snapshotSerializer; private Validator validator; @@ -81,12 +83,13 @@ public class TestRegistryService { public void setup() { metadataService = mock(MetadataService.class); flowPersistenceProvider = mock(FlowPersistenceProvider.class); + // flowHookProvider = mock(FlowHookProvider.class); snapshotSerializer = mock(VersionedProcessGroupSerializer.class); final ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.getValidator(); - registryService = new RegistryService(metadataService, flowPersistenceProvider, snapshotSerializer, validator); + registryService = new RegistryService(metadataService, flowPersistenceProvider, new ArrayList(), snapshotSerializer, validator); } // ---------------------- Test Bucket methods --------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml b/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml new file mode 100644 index 000000000..ca6cc6c90 --- /dev/null +++ b/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml @@ -0,0 +1,30 @@ + + + + + + org.apache.nifi.registry.provider.MockFlowPersistenceProvider + flow foo + flow bar + + + + org.apache.nifi.registry.provider.hook.ScriptFlowHookProvider + + + + + \ No newline at end of file diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java index c4bdd4650..9c2a8183b 100644 --- a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java @@ -57,7 +57,7 @@ public interface FlowSnapshotContext { long getSnapshotTimestamp(); /** - * @return the name of the user who created the snapshot + * @return the author of the snapshot */ String getAuthor(); diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java new file mode 100644 index 000000000..c0aebc728 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.hook; + +public enum FlowHookEvent { + CREATE_BUCKET, + CREATE_FLOW, + CREATE_VERSION, + + UPDATE_BUCKET, + UPDATE_FLOW, + + DELETE_BUCKET, + DELETE_FLOW, + DELETE_VERSION; +} diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java new file mode 100644 index 000000000..85033ca1a --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.hook; + +/** + * An Exception for errors encountered when a FlowHookProvider executes an action before/after a commit. + */ +public class FlowHookException extends RuntimeException { + + public FlowHookException(String message) { + super(message); + } + + public FlowHookException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java new file mode 100644 index 000000000..599cbed09 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.hook; + +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.provider.Provider; + +/** + * A service that defines post event action hook + * + * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may + * change across releases until the registry matures. + */ +public interface FlowHookProvider extends Provider { + + /** + * @param bucketId + * @throws FlowHookException + */ + default void postCreateBucket(String bucketId) throws FlowHookException { } + + /** + * @param bucketId + * @param flowId + * @throws FlowHookException + */ + default void postCreateFlow(String bucketId, String flowId) throws FlowHookException { } + + /** + * @param flowSnapshotContext + * @throws FlowHookException + */ + default void postCreateFlowVersion(FlowSnapshotContext flowSnapshotContext) throws FlowHookException { } + + /** + * @param bucketId + * @throws FlowHookException + */ + default void postDeleteBucket(String bucketId) throws FlowHookException { } + + /** + * @param bucketId + * @param flowId + * @throws FlowHookException + */ + default void postDeleteFlow(String bucketId, String flowId) throws FlowHookException { } + + /** + * @param flowSnapshotContext + * @throws FlowHookException + */ + default void postDeleteFlowVersion(String bucketIdentifier, String flowIdentifier, int version) throws FlowHookException { } + + /** + * @param bucketId + * @throws FlowHookException + */ + default void postUpdateBucket(String bucketId) throws FlowHookException { } + + /** + * @param bucketId + * @param flowId + * @throws FlowHookException + */ + default void postUpdateFlow(String bucketId, String flowId) throws FlowHookException { } + +} diff --git a/nifi-registry-resources/src/main/resources/conf/providers.xml b/nifi-registry-resources/src/main/resources/conf/providers.xml index 720bee2c0..3d7d39285 100644 --- a/nifi-registry-resources/src/main/resources/conf/providers.xml +++ b/nifi-registry-resources/src/main/resources/conf/providers.xml @@ -29,4 +29,13 @@ --> + + + \ No newline at end of file From f99cf19d49bf5d301ada83a5c128b645da00458f Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Tue, 8 May 2018 13:38:33 -0400 Subject: [PATCH 2/2] NIFIREG-160 - Making event hooks asynchronous --- .../nifi/registry/event/EventFactory.java | 94 ++++++++++ .../nifi/registry/event/EventService.java | 112 ++++++++++++ .../nifi/registry/event/StandardEvent.java | 124 ++++++++++++++ .../registry/event/StandardEventField.java | 49 ++++++ .../registry/extension/ExtensionManager.java | 4 +- .../registry/provider/ProviderFactory.java | 4 +- .../provider/StandardProviderFactory.java | 46 ++--- .../flow/StandardFlowSnapshotContext.java | 9 - .../hook/LoggingEventHookProvider.java | 57 +++++++ ...ider.java => ScriptEventHookProvider.java} | 76 ++------- .../registry/service/RegistryService.java | 89 ---------- ...ache.nifi.registry.hook.EventHookProvider} | 3 +- .../src/main/xsd/providers.xsd | 2 +- .../nifi/registry/event/TestEventFactory.java | 161 ++++++++++++++++++ .../nifi/registry/event/TestEventService.java | 97 +++++++++++ .../registry/event/TestStandardEvent.java | 47 +++++ ....java => TestScriptEventHookProvider.java} | 4 +- .../registry/service/TestRegistryService.java | 6 +- .../provider/hook/bad-script-provider.xml | 6 +- .../org/apache/nifi/registry/hook/Event.java | 50 ++++++ .../apache/nifi/registry/hook/EventField.java | 33 ++++ ...FlowHookEvent.java => EventFieldName.java} | 19 +-- ...Exception.java => EventHookException.java} | 8 +- .../nifi/registry/hook/EventHookProvider.java | 39 +++++ .../apache/nifi/registry/hook/EventType.java | 72 ++++++++ .../nifi/registry/hook/FlowHookProvider.java | 81 --------- .../src/main/resources/conf/logback.xml | 23 +++ .../src/main/resources/conf/providers.xml | 13 +- .../web/api/AccessPolicyResource.java | 6 +- .../nifi/registry/web/api/AccessResource.java | 5 +- .../registry/web/api/ApplicationResource.java | 14 ++ .../api/AuthorizableApplicationResource.java | 5 +- .../registry/web/api/BucketFlowResource.java | 14 +- .../nifi/registry/web/api/BucketResource.java | 15 +- .../nifi/registry/web/api/FlowResource.java | 6 +- .../nifi/registry/web/api/ItemResource.java | 6 +- .../nifi/registry/web/api/TenantResource.java | 5 +- pom.xml | 4 +- 38 files changed, 1093 insertions(+), 315 deletions(-) create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java rename nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/{ScriptFlowHookProvider.java => ScriptEventHookProvider.java} (54%) rename nifi-registry-framework/src/main/resources/META-INF/services/{org.apache.nifi.registry.hook.FlowHookProvider => org.apache.nifi.registry.hook.EventHookProvider} (86%) create mode 100644 nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestEventFactory.java create mode 100644 nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestEventService.java create mode 100644 nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestStandardEvent.java rename nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/{TestScriptFlowHookProvider.java => TestScriptEventHookProvider.java} (95%) create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/Event.java create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventField.java rename nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/{FlowHookEvent.java => EventFieldName.java} (82%) rename nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/{FlowHookException.java => EventHookException.java} (76%) create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventHookProvider.java create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventType.java delete mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java new file mode 100644 index 000000000..ba9e7759f --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventFieldName; +import org.apache.nifi.registry.hook.EventType; +import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils; + +/** + * Factory to create Events from domain objects. + */ +public class EventFactory { + + public static Event bucketCreated(final Bucket bucket) { + return new StandardEvent.Builder() + .eventType(EventType.CREATE_BUCKET) + .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event bucketUpdated(final Bucket bucket) { + return new StandardEvent.Builder() + .eventType(EventType.UPDATE_BUCKET) + .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event bucketDeleted(final Bucket bucket) { + return new StandardEvent.Builder() + .eventType(EventType.DELETE_BUCKET) + .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event flowCreated(final VersionedFlow versionedFlow) { + return new StandardEvent.Builder() + .eventType(EventType.CREATE_FLOW) + .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier()) + .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event flowUpdated(final VersionedFlow versionedFlow) { + return new StandardEvent.Builder() + .eventType(EventType.UPDATE_FLOW) + .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier()) + .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event flowDeleted(final VersionedFlow versionedFlow) { + return new StandardEvent.Builder() + .eventType(EventType.DELETE_FLOW) + .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier()) + .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event flowVersionCreated(final VersionedFlowSnapshot versionedFlowSnapshot) { + return new StandardEvent.Builder() + .eventType(EventType.CREATE_FLOW_VERSION) + .addField(EventFieldName.BUCKET_ID, versionedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier()) + .addField(EventFieldName.FLOW_ID, versionedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier()) + .addField(EventFieldName.VERSION, String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getVersion())) + .addField(EventFieldName.USER, versionedFlowSnapshot.getSnapshotMetadata().getAuthor()) + .addField(EventFieldName.COMMENT, versionedFlowSnapshot.getSnapshotMetadata().getComments()) + .build(); + } + +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java new file mode 100644 index 000000000..ef30f8450 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventHookProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Service used for publishing events and passing events to the hook providers. + */ +@Service +public class EventService implements DisposableBean { + + private static final Logger LOGGER = LoggerFactory.getLogger(EventService.class); + + // Should only be a few events in the queue at a time, but setting a capacity just so it isn't unbounded + static final int EVENT_QUEUE_SIZE = 10_000; + + private final BlockingQueue eventQueue; + private final ExecutorService scheduledExecutorService; + private final List eventHookProviders; + + @Autowired + public EventService(final List eventHookProviders) { + this.eventQueue = new LinkedBlockingQueue<>(EVENT_QUEUE_SIZE); + this.scheduledExecutorService = Executors.newSingleThreadExecutor(); + this.eventHookProviders = new ArrayList<>(eventHookProviders); + } + + @PostConstruct + public void postConstruct() { + LOGGER.info("Starting event consumer..."); + + this.scheduledExecutorService.execute(() -> { + while (!Thread.interrupted()) { + try { + final Event event = eventQueue.poll(1000, TimeUnit.MILLISECONDS); + if (event == null) { + continue; + } + + // event was available so notify each provider, contain errors per-provider + for(final EventHookProvider provider : eventHookProviders) { + try { + provider.handle(event); + } catch (Exception e) { + LOGGER.error("Error handling event hook", e); + } + } + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while polling event queue"); + return; + } + } + }); + + LOGGER.info("Event consumer started!"); + } + + @Override + public void destroy() throws Exception { + LOGGER.info("Shutting down event consumer..."); + this.scheduledExecutorService.shutdownNow(); + LOGGER.info("Event consumer shutdown!"); + } + + public void publish(final Event event) { + if (event == null) { + return; + } + + try { + event.validate(); + + final boolean queued = eventQueue.offer(event); + if (!queued) { + LOGGER.error("Unable to queue event because queue is full"); + } + } catch (IllegalStateException e) { + LOGGER.error("Invalid event due to: " + e.getMessage(), e); + } + } + +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java new file mode 100644 index 000000000..4ad459d9a --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventField; +import org.apache.nifi.registry.hook.EventFieldName; +import org.apache.nifi.registry.hook.EventType; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Standard implementation of Event. + */ +public class StandardEvent implements Event { + + private final EventType eventType; + + private final List eventFields; + + private StandardEvent(final Builder builder) { + this.eventType = builder.eventType; + this.eventFields = Collections.unmodifiableList(builder.eventFields == null + ? Collections.emptyList() : new ArrayList<>(builder.eventFields)); + Validate.notNull(this.eventType); + } + + @Override + public EventType getEventType() { + return eventType; + } + + @Override + public List getFields() { + return eventFields; + } + + @Override + public EventField getField(final EventFieldName fieldName) { + if (fieldName == null) { + return null; + } + + return eventFields.stream().filter(e -> fieldName.equals(e.getName())).findFirst().orElse(null); + } + + @Override + public void validate() throws IllegalStateException { + final int numProvidedFields = eventFields.size(); + final int numRequiredFields = eventType.getFieldNames().size(); + + if (numProvidedFields != numRequiredFields) { + throw new IllegalStateException(numRequiredFields + " fields were required, but only " + numProvidedFields + " were provided"); + } + + for (int i=0; i < numRequiredFields; i++) { + final EventFieldName required = eventType.getFieldNames().get(i); + final EventFieldName provided = eventFields.get(i).getName(); + if (!required.equals(provided)) { + throw new IllegalStateException("Expected " + required.name() + ", but found " + provided.name()); + } + } + } + + /** + * Builder for Events. + */ + public static class Builder { + + private EventType eventType; + private List eventFields = new ArrayList<>(); + + public Builder eventType(final EventType eventType) { + this.eventType = eventType; + return this; + } + + public Builder addField(final EventFieldName name, final String value) { + this.eventFields.add(new StandardEventField(name, value)); + return this; + } + + public Builder addField(final EventField arg) { + if (arg != null) { + this.eventFields.add(arg); + } + return this; + } + + public Builder addFields(final Collection fields) { + if (fields != null) { + this.eventFields.addAll(fields); + } + return this; + } + + public Builder clearFields() { + this.eventFields.clear(); + return this; + } + + public Event build() { + return new StandardEvent(this); + } + } +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java new file mode 100644 index 000000000..7ff94c529 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.registry.hook.EventField; +import org.apache.nifi.registry.hook.EventFieldName; + +/** + * Standard implementation of EventField. + */ +public class StandardEventField implements EventField { + + private final EventFieldName name; + + private final String value; + + public StandardEventField(final EventFieldName name, final String value) { + this.name = name; + this.value = value; + Validate.notNull(this.name); + Validate.notBlank(this.value); + } + + @Override + public EventFieldName getName() { + return name; + } + + @Override + public String getValue() { + return value; + } + +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java index 7f2b9e14e..ca3259d71 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java @@ -17,12 +17,12 @@ package org.apache.nifi.registry.extension; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.registry.hook.EventHookProvider; import org.apache.nifi.registry.security.authentication.IdentityProvider; import org.apache.nifi.registry.security.authorization.AccessPolicyProvider; import org.apache.nifi.registry.security.authorization.Authorizer; import org.apache.nifi.registry.security.authorization.UserGroupProvider; import org.apache.nifi.registry.flow.FlowPersistenceProvider; -import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +57,7 @@ public class ExtensionManager { classes.add(AccessPolicyProvider.class); classes.add(Authorizer.class); classes.add(IdentityProvider.class); - classes.add(FlowHookProvider.class); + classes.add(EventHookProvider.class); EXTENSION_CLASSES = Collections.unmodifiableList(classes); } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java index bb62eb29a..a3f3276f0 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java @@ -19,7 +19,7 @@ import java.util.List; import org.apache.nifi.registry.flow.FlowPersistenceProvider; -import org.apache.nifi.registry.hook.FlowHookProvider; +import org.apache.nifi.registry.hook.EventHookProvider; /** * A factory for obtaining the configured providers. @@ -41,6 +41,6 @@ public interface ProviderFactory { /** * @return the configured FlowHookProviders */ - List getFlowHookProviders(); + List getEventHookProviders(); } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java index 34f307cad..65ba91495 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java @@ -18,7 +18,7 @@ import org.apache.nifi.registry.extension.ExtensionManager; import org.apache.nifi.registry.flow.FlowPersistenceProvider; -import org.apache.nifi.registry.hook.FlowHookProvider; +import org.apache.nifi.registry.hook.EventHookProvider; import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.apache.nifi.registry.provider.generated.Property; import org.apache.nifi.registry.provider.generated.Providers; @@ -74,7 +74,7 @@ private static JAXBContext initializeJaxbContext() { private final AtomicReference providersHolder = new AtomicReference<>(null); private FlowPersistenceProvider flowPersistenceProvider; - private List flowHookProviders; + private List eventHookProviders; @Autowired public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) { @@ -156,52 +156,52 @@ public synchronized FlowPersistenceProvider getFlowPersistenceProvider() { @Bean @Override - public List getFlowHookProviders() { - if (flowHookProviders == null) { - flowHookProviders = new ArrayList(); + public List getEventHookProviders() { + if (eventHookProviders == null) { + eventHookProviders = new ArrayList<>(); if (providersHolder.get() == null) { throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider"); } final Providers providers = providersHolder.get(); - final List jaxbFlowHookProvider = providers.getFlowHookProvider(); + final List jaxbHookProvider = providers.getEventHookProvider(); - if(jaxbFlowHookProvider == null || jaxbFlowHookProvider.isEmpty()) { + if(jaxbHookProvider == null || jaxbHookProvider.isEmpty()) { // no hook provided - return flowHookProviders; + return eventHookProviders; } - for (org.apache.nifi.registry.provider.generated.Provider flowHookProvider : jaxbFlowHookProvider) { + for (org.apache.nifi.registry.provider.generated.Provider hookProvider : jaxbHookProvider) { - final String flowHookProviderClassName = flowHookProvider.getClazz(); - FlowHookProvider hook; + final String hookProviderClassName = hookProvider.getClazz(); + EventHookProvider hook; try { - final ClassLoader classLoader = extensionManager.getExtensionClassLoader(flowHookProviderClassName); + final ClassLoader classLoader = extensionManager.getExtensionClassLoader(hookProviderClassName); if (classLoader == null) { - throw new IllegalStateException("Extension not found in any of the configured class loaders: " + flowHookProviderClassName); + throw new IllegalStateException("Extension not found in any of the configured class loaders: " + hookProviderClassName); } - final Class rawFlowHookProviderClass = Class.forName(flowHookProviderClassName, true, classLoader); - final Class flowHookProviderClass = rawFlowHookProviderClass.asSubclass(FlowHookProvider.class); + final Class rawHookProviderClass = Class.forName(hookProviderClassName, true, classLoader); + final Class hookProviderClass = rawHookProviderClass.asSubclass(EventHookProvider.class); - final Constructor constructor = flowHookProviderClass.getConstructor(); - hook = (FlowHookProvider) constructor.newInstance(); + final Constructor constructor = hookProviderClass.getConstructor(); + hook = (EventHookProvider) constructor.newInstance(); - LOGGER.info("Instantiated FlowHookProvider with class name {}", new Object[] {flowHookProviderClassName}); + LOGGER.info("Instantiated EventHookProvider with class name {}", new Object[] {hookProviderClassName}); } catch (Exception e) { - throw new ProviderFactoryException("Error creating FlowHookProvider with class name: " + flowHookProviderClassName, e); + throw new ProviderFactoryException("Error creating EventHookProvider with class name: " + hookProviderClassName, e); } - final ProviderConfigurationContext configurationContext = createConfigurationContext(flowHookProvider.getProperty()); + final ProviderConfigurationContext configurationContext = createConfigurationContext(hookProvider.getProperty()); hook.onConfigured(configurationContext); - flowHookProviders.add(hook); - LOGGER.info("Configured FlowHookProvider with class name {}", new Object[] {flowHookProviderClassName}); + eventHookProviders.add(hook); + LOGGER.info("Configured EventHookProvider with class name {}", new Object[] {hookProviderClassName}); } } - return flowHookProviders; + return eventHookProviders; } private ProviderConfigurationContext createConfigurationContext(final List configProperties) { diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java index 1f6f84073..1728513eb 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java @@ -35,7 +35,6 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext { private final String comments; private final String author; private final long snapshotTimestamp; - private final String author; private StandardFlowSnapshotContext(final Builder builder) { this.bucketId = builder.bucketId; @@ -46,7 +45,6 @@ private StandardFlowSnapshotContext(final Builder builder) { this.comments = builder.comments; this.author = builder.author; this.snapshotTimestamp = builder.snapshotTimestamp; - this.author = builder.author; Validate.notBlank(bucketId); Validate.notBlank(bucketName); @@ -109,7 +107,6 @@ public static class Builder { private String comments; private String author; private long snapshotTimestamp; - private String author; public Builder() { @@ -124,7 +121,6 @@ public Builder(final Bucket bucket, final VersionedFlow versionedFlow, final Ver comments(snapshotMetadata.getComments()); author(snapshotMetadata.getAuthor()); snapshotTimestamp(snapshotMetadata.getTimestamp()); - author(snapshotMetadata.getAuthor()); } public Builder bucketId(final String bucketId) { @@ -167,11 +163,6 @@ public Builder snapshotTimestamp(final long snapshotTimestamp) { return this; } - public Builder author(final String author) { - this.author = author; - return this; - } - public StandardFlowSnapshotContext build() { return new StandardFlowSnapshotContext(this); } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java new file mode 100644 index 000000000..a942d5a51 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.hook; + +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventField; +import org.apache.nifi.registry.hook.EventHookException; +import org.apache.nifi.registry.hook.EventHookProvider; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoggingEventHookProvider implements EventHookProvider { + + static final Logger LOGGER = LoggerFactory.getLogger(LoggingEventHookProvider.class); + + @Override + public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException { + // nothing to do + } + + @Override + public void handle(final Event event) throws EventHookException { + final StringBuilder builder = new StringBuilder() + .append(event.getEventType()) + .append(" ["); + + int count = 0; + for (final EventField argument : event.getFields()) { + if (count > 0) { + builder.append(", "); + } + builder.append(argument.getName()).append("=").append(argument.getValue()); + count++; + } + + builder.append("] "); + + LOGGER.info(builder.toString()); + } + +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java similarity index 54% rename from nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java rename to nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java index d57df1658..4900a2bfd 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java @@ -23,10 +23,9 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.registry.flow.FlowSnapshotContext; -import org.apache.nifi.registry.hook.FlowHookEvent; -import org.apache.nifi.registry.hook.FlowHookException; -import org.apache.nifi.registry.hook.FlowHookProvider; +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventField; +import org.apache.nifi.registry.hook.EventHookProvider; import org.apache.nifi.registry.provider.ProviderConfigurationContext; import org.apache.nifi.registry.provider.ProviderCreationException; import org.apache.nifi.registry.util.FileUtils; @@ -34,77 +33,24 @@ import org.slf4j.LoggerFactory; /** - * A FlowHookProvider that is used to execute a script before a flow snapshot version is committed. + * A EventHookProvider that is used to execute a script to handle the event. */ -public class ScriptFlowHookProvider implements FlowHookProvider { +public class ScriptEventHookProvider implements EventHookProvider { - static final Logger LOGGER = LoggerFactory.getLogger(ScriptFlowHookProvider.class); + static final Logger LOGGER = LoggerFactory.getLogger(ScriptEventHookProvider.class); static final String SCRIPT_PATH_PROP = "Script Path"; static final String SCRIPT_WORKDIR_PROP = "Working Directory"; private File scriptFile; private File workDirFile; @Override - public void postCreateBucket(String bucketId) throws FlowHookException { - this.executeScript(FlowHookEvent.CREATE_BUCKET, bucketId, null, null, null, null); - } - - @Override - public void postCreateFlow(String bucketId, String flowId) throws FlowHookException { - this.executeScript(FlowHookEvent.CREATE_FLOW, bucketId, flowId, null, null, null); - } - - @Override - public void postDeleteBucket(String bucketId) throws FlowHookException { - this.executeScript(FlowHookEvent.DELETE_BUCKET, bucketId, null, null, null, null); - } - - @Override - public void postDeleteFlow(String bucketId, String flowId) throws FlowHookException { - this.executeScript(FlowHookEvent.DELETE_FLOW, bucketId, flowId, null, null, null); - } - - @Override - public void postDeleteFlowVersion(String bucketId, String flowId, int version) throws FlowHookException { - this.executeScript(FlowHookEvent.DELETE_FLOW, bucketId, flowId, Integer.toString(version), null, null); - } - - @Override - public void postUpdateBucket(String bucketId) throws FlowHookException { - this.executeScript(FlowHookEvent.UPDATE_BUCKET, bucketId, null, null, null, null); - } - - @Override - public void postUpdateFlow(String bucketId, String flowId) throws FlowHookException { - this.executeScript(FlowHookEvent.UPDATE_FLOW, bucketId, flowId, null, null, null); - } - - @Override - public void postCreateFlowVersion(final FlowSnapshotContext flowSnapshotContext) throws FlowHookException { - this.executeScript(FlowHookEvent.CREATE_VERSION, flowSnapshotContext.getBucketId(), flowSnapshotContext.getFlowId(), - Integer.toString(flowSnapshotContext.getVersion()), flowSnapshotContext.getComments(), flowSnapshotContext.getAuthor()); - } - - private void executeScript(final FlowHookEvent eventType, String bucketId, String flowId, String version, String comment, String author) { + public void handle(final Event event) { List command = new ArrayList(); command.add(scriptFile.getAbsolutePath()); - command.add(eventType.name()); - command.add(bucketId); - - if(flowId != null) { - command.add(flowId); - } - - if(version != null) { - command.add(version); - } - - if(comment != null) { - command.add(comment); - } + command.add(event.getEventType().name()); - if(author != null) { - command.add(author); + for (EventField arg : event.getFields()) { + command.add(arg.getValue()); } final String commandString = StringUtils.join(command, " "); @@ -143,7 +89,7 @@ public void onConfigured(ProviderConfigurationContext configurationContext) thro scriptFile = new File(scripPath); if(scriptFile.isFile() && scriptFile.canExecute()) { - LOGGER.info("Configured ScriptFlowHookProvider with script {}", new Object[] {scriptFile.getAbsolutePath()}); + LOGGER.info("Configured ScriptEventHookProvider with script {}", new Object[] {scriptFile.getAbsolutePath()}); } else { throw new ProviderCreationException("The script file " + scriptFile.getAbsolutePath() + " cannot be executed."); } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java index 95c62c39f..23f1d14a9 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java @@ -42,7 +42,6 @@ import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; -import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; import org.apache.nifi.registry.serialization.Serializer; import org.slf4j.Logger; @@ -85,7 +84,6 @@ public class RegistryService { private final MetadataService metadataService; private final FlowPersistenceProvider flowPersistenceProvider; - private final List flowHookProviders; private final Serializer processGroupSerializer; private final Validator validator; @@ -96,12 +94,10 @@ public class RegistryService { @Autowired public RegistryService(final MetadataService metadataService, final FlowPersistenceProvider flowPersistenceProvider, - final List flowHookProviders, final Serializer processGroupSerializer, final Validator validator) { this.metadataService = metadataService; this.flowPersistenceProvider = flowPersistenceProvider; - this.flowHookProviders = flowHookProviders; this.processGroupSerializer = processGroupSerializer; this.validator = validator; Validate.notNull(this.metadataService); @@ -138,17 +134,6 @@ public Bucket createBucket(final Bucket bucket) { } final BucketEntity createdBucket = metadataService.createBucket(DataModelMapper.map(bucket)); - - // call the post-event hook - for(FlowHookProvider flowHookProvider : flowHookProviders) { - try { - flowHookProvider.postCreateBucket(createdBucket.getId()); - } catch (Exception e) { - // we don't want to throw anything here, hook are provided on best effort - LOGGER.error("Error while calling post-event hook", e); - } - } - return DataModelMapper.map(createdBucket); } finally { writeLock.unlock(); @@ -240,17 +225,6 @@ public Bucket updateBucket(final Bucket bucket) { // perform the actual update final BucketEntity updatedBucket = metadataService.updateBucket(existingBucketById); - - // call the post-event hook - for(FlowHookProvider flowHookProvider : flowHookProviders) { - try { - flowHookProvider.postUpdateBucket(existingBucketById.getId()); - } catch (Exception e) { - // we don't want to throw anything here, hook are provided on best effort - LOGGER.error("Error while calling post-event hook", e); - } - } - return DataModelMapper.map(updatedBucket); } finally { writeLock.unlock(); @@ -279,16 +253,6 @@ public Bucket deleteBucket(final String bucketIdentifier) { // now delete the bucket from the metadata provider, which deletes all flows referencing it metadataService.deleteBucket(existingBucket); - // call the post-event hook - for(FlowHookProvider flowHookProvider : flowHookProviders) { - try { - flowHookProvider.postDeleteBucket(existingBucket.getId()); - } catch (Exception e) { - // we don't want to throw anything here, hook are provided on best effort - LOGGER.error("Error while calling post-event hook", e); - } - } - return DataModelMapper.map(existingBucket); } finally { writeLock.unlock(); @@ -392,17 +356,6 @@ public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFl // persist the flow and return the created entity final FlowEntity createdFlow = metadataService.createFlow(flowEntity); - - // call the post-event hook - for(FlowHookProvider flowHookProvider : flowHookProviders) { - try { - flowHookProvider.postCreateFlow(existingBucket.getId(), createdFlow.getId()); - } catch (Exception e) { - // we don't want to throw anything here, hook are provided on best effort - LOGGER.error("Error while calling post-event hook", e); - } - } - return DataModelMapper.map(existingBucket, createdFlow); } finally { writeLock.unlock(); @@ -544,17 +497,6 @@ public VersionedFlow updateFlow(final VersionedFlow versionedFlow) { // perform the actual update final FlowEntity updatedFlow = metadataService.updateFlow(existingFlow); - - // call the post-event hook - for(FlowHookProvider flowHookProvider : flowHookProviders) { - try { - flowHookProvider.postUpdateFlow(existingFlow.getBucketId(), existingFlow.getId()); - } catch (Exception e) { - // we don't want to throw anything here, hook are provided on best effort - LOGGER.error("Error while calling post-event hook", e); - } - } - return DataModelMapper.map(existingBucket, updatedFlow); } finally { writeLock.unlock(); @@ -595,16 +537,6 @@ public VersionedFlow deleteFlow(final String bucketIdentifier, final String flow // now delete the flow from the metadata provider metadataService.deleteFlow(existingFlow); - // call the post-event hook - for(FlowHookProvider flowHookProvider : flowHookProviders) { - try { - flowHookProvider.postDeleteFlow(existingFlow.getBucketId(), existingFlow.getId()); - } catch (Exception e) { - // we don't want to throw anything here, hook are provided on best effort - LOGGER.error("Error while calling post-event hook", e); - } - } - return DataModelMapper.map(existingBucket, existingFlow); } finally { writeLock.unlock(); @@ -698,16 +630,6 @@ public VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flow } final VersionedFlow updatedVersionedFlow = DataModelMapper.map(existingBucket, updatedFlow); - // call the post-event hook - for(FlowHookProvider flowHookProvider : flowHookProviders) { - try { - flowHookProvider.postCreateFlowVersion(context); - } catch (Exception e) { - // we don't want to throw anything here, hook are provided on best effort - LOGGER.error("Error while calling post-event hook", e); - } - } - flowSnapshot.setBucket(bucket); flowSnapshot.setFlow(updatedVersionedFlow); return flowSnapshot; @@ -957,17 +879,6 @@ public VersionedFlowSnapshotMetadata deleteFlowSnapshot(final String bucketIdent // delete the snapshot itself metadataService.deleteFlowSnapshot(snapshotEntity); - - // call the post-event hook - for(FlowHookProvider flowHookProvider : flowHookProviders) { - try { - flowHookProvider.postDeleteFlowVersion(bucketIdentifier, flowIdentifier, version); - } catch (Exception e) { - // we don't want to throw anything here, hook are provided on best effort - LOGGER.error("Error while calling post-event hook", e); - } - } - return DataModelMapper.map(existingBucket, snapshotEntity); } finally { writeLock.unlock(); diff --git a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.EventHookProvider similarity index 86% rename from nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider rename to nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.EventHookProvider index 150103aee..2676a35de 100644 --- a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider +++ b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.EventHookProvider @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.registry.provider.hook.ScriptFlowHookProvider \ No newline at end of file +org.apache.nifi.registry.provider.hook.ScriptEventHookProvider +org.apache.nifi.registry.provider.hook.LoggingEventHookProvider \ No newline at end of file diff --git a/nifi-registry-framework/src/main/xsd/providers.xsd b/nifi-registry-framework/src/main/xsd/providers.xsd index 16086af59..ce82dcc55 100644 --- a/nifi-registry-framework/src/main/xsd/providers.xsd +++ b/nifi-registry-framework/src/main/xsd/providers.xsd @@ -43,7 +43,7 @@ - + diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestEventFactory.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestEventFactory.java new file mode 100644 index 000000000..95dd6abf0 --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestEventFactory.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventFieldName; +import org.apache.nifi.registry.hook.EventType; +import org.junit.Before; +import org.junit.Test; + +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +public class TestEventFactory { + + private Bucket bucket; + private VersionedFlow versionedFlow; + private VersionedFlowSnapshot versionedFlowSnapshot; + + @Before + public void setup() { + bucket = new Bucket(); + bucket.setName("Bucket1"); + bucket.setIdentifier(UUID.randomUUID().toString()); + bucket.setCreatedTimestamp(System.currentTimeMillis()); + + versionedFlow = new VersionedFlow(); + versionedFlow.setIdentifier(UUID.randomUUID().toString()); + versionedFlow.setName("Flow 1"); + versionedFlow.setBucketIdentifier(bucket.getIdentifier()); + versionedFlow.setBucketName(bucket.getName()); + + VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata(); + metadata.setAuthor("user1"); + metadata.setComments("This is flow 1"); + metadata.setVersion(1); + metadata.setBucketIdentifier(bucket.getIdentifier()); + metadata.setFlowIdentifier(versionedFlow.getIdentifier()); + + versionedFlowSnapshot = new VersionedFlowSnapshot(); + versionedFlowSnapshot.setSnapshotMetadata(metadata); + versionedFlowSnapshot.setFlowContents(new VersionedProcessGroup()); + } + + @Test + public void testBucketCreatedEvent() { + final Event event = EventFactory.bucketCreated(bucket); + event.validate(); + + assertEquals(EventType.CREATE_BUCKET, event.getEventType()); + assertEquals(2, event.getFields().size()); + + assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue()); + assertEquals("unknown", event.getField(EventFieldName.USER).getValue()); + } + + @Test + public void testBucketUpdatedEvent() { + final Event event = EventFactory.bucketUpdated(bucket); + event.validate(); + + assertEquals(EventType.UPDATE_BUCKET, event.getEventType()); + assertEquals(2, event.getFields().size()); + + assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue()); + assertEquals("unknown", event.getField(EventFieldName.USER).getValue()); + } + + @Test + public void testBucketDeletedEvent() { + final Event event = EventFactory.bucketDeleted(bucket); + event.validate(); + + assertEquals(EventType.DELETE_BUCKET, event.getEventType()); + assertEquals(2, event.getFields().size()); + + assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue()); + assertEquals("unknown", event.getField(EventFieldName.USER).getValue()); + } + + @Test + public void testFlowCreated() { + final Event event = EventFactory.flowCreated(versionedFlow); + event.validate(); + + assertEquals(EventType.CREATE_FLOW, event.getEventType()); + assertEquals(3, event.getFields().size()); + + assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue()); + assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue()); + assertEquals("unknown", event.getField(EventFieldName.USER).getValue()); + } + + @Test + public void testFlowUpdated() { + final Event event = EventFactory.flowUpdated(versionedFlow); + event.validate(); + + assertEquals(EventType.UPDATE_FLOW, event.getEventType()); + assertEquals(3, event.getFields().size()); + + assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue()); + assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue()); + assertEquals("unknown", event.getField(EventFieldName.USER).getValue()); + } + + @Test + public void testFlowDeleted() { + final Event event = EventFactory.flowDeleted(versionedFlow); + event.validate(); + + assertEquals(EventType.DELETE_FLOW, event.getEventType()); + assertEquals(3, event.getFields().size()); + + assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue()); + assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue()); + assertEquals("unknown", event.getField(EventFieldName.USER).getValue()); + } + + @Test + public void testFlowVersionedCreated() { + final Event event = EventFactory.flowVersionCreated(versionedFlowSnapshot); + event.validate(); + + assertEquals(EventType.CREATE_FLOW_VERSION, event.getEventType()); + assertEquals(5, event.getFields().size()); + + assertEquals(bucket.getIdentifier(), event.getField(EventFieldName.BUCKET_ID).getValue()); + assertEquals(versionedFlow.getIdentifier(), event.getField(EventFieldName.FLOW_ID).getValue()); + + assertEquals(String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getVersion()), + event.getField(EventFieldName.VERSION).getValue()); + + assertEquals(versionedFlowSnapshot.getSnapshotMetadata().getAuthor(), + event.getField(EventFieldName.USER).getValue()); + + assertEquals(versionedFlowSnapshot.getSnapshotMetadata().getComments(), + event.getField(EventFieldName.COMMENT).getValue()); + } + +} diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestEventService.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestEventService.java new file mode 100644 index 000000000..0270dd8ec --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestEventService.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventHookException; +import org.apache.nifi.registry.hook.EventHookProvider; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +public class TestEventService { + + private CapturingEventHook eventHook; + private EventService eventService; + + @Before + public void setup() { + eventHook = new CapturingEventHook(); + eventService = new EventService(Collections.singletonList(eventHook)); + eventService.postConstruct(); + } + + @After + public void teardown() throws Exception { + eventService.destroy(); + } + + @Test + public void testPublishConsume() throws InterruptedException { + final Bucket bucket = new Bucket(); + bucket.setIdentifier(UUID.randomUUID().toString()); + + final Event bucketCreatedEvent = EventFactory.bucketCreated(bucket); + eventService.publish(bucketCreatedEvent); + + final Event bucketDeletedEvent = EventFactory.bucketDeleted(bucket); + eventService.publish(bucketDeletedEvent); + + Thread.sleep(1000); + + final List events = eventHook.getEvents(); + Assert.assertEquals(2, events.size()); + + final Event firstEvent = events.get(0); + Assert.assertEquals(bucketCreatedEvent.getEventType(), firstEvent.getEventType()); + + final Event secondEvent = events.get(1); + Assert.assertEquals(bucketDeletedEvent.getEventType(), secondEvent.getEventType()); + } + + /** + * Simple implementation of EventHookProvider that captures event for later verification. + */ + private class CapturingEventHook implements EventHookProvider { + + private List events = new ArrayList<>(); + + @Override + public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException { + + } + + @Override + public void handle(Event event) throws EventHookException { + events.add(event); + } + + public List getEvents() { + return events; + } + } + +} diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestStandardEvent.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestStandardEvent.java new file mode 100644 index 000000000..8e9f73f7c --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/event/TestStandardEvent.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventField; +import org.apache.nifi.registry.hook.EventFieldName; +import org.apache.nifi.registry.hook.EventType; +import org.junit.Assert; +import org.junit.Test; + +public class TestStandardEvent { + + @Test(expected = IllegalStateException.class) + public void testInvalidEvent() { + final Event event = new StandardEvent.Builder() + .eventType(EventType.CREATE_BUCKET) + .build(); + + event.validate(); + } + + @Test + public void testGetFieldWhenDoesNotExist() { + final Event event = new StandardEvent.Builder() + .eventType(EventType.CREATE_BUCKET) + .build(); + + final EventField field = event.getField(EventFieldName.BUCKET_ID); + Assert.assertNull(field); + } + +} diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java similarity index 95% rename from nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java rename to nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java index 5bc888cb7..ab2499809 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java @@ -27,7 +27,7 @@ import org.junit.Test; import org.mockito.Mockito; -public class TestScriptFlowHookProvider { +public class TestScriptEventHookProvider { @Test(expected = ProviderCreationException.class) public void testBadScriptProvider() { @@ -39,7 +39,7 @@ public void testBadScriptProvider() { final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager); providerFactory.initialize(); - providerFactory.getFlowHookProviders(); + providerFactory.getEventHookProviders(); } } diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java index 1f21248e9..1008e198a 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java @@ -30,7 +30,6 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; -import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.serialization.Serializer; import org.apache.nifi.registry.serialization.VersionedProcessGroupSerializer; import org.junit.Assert; @@ -73,7 +72,6 @@ public class TestRegistryService { private MetadataService metadataService; private FlowPersistenceProvider flowPersistenceProvider; - private FlowHookProvider flowHookProvider; private Serializer snapshotSerializer; private Validator validator; @@ -83,13 +81,13 @@ public class TestRegistryService { public void setup() { metadataService = mock(MetadataService.class); flowPersistenceProvider = mock(FlowPersistenceProvider.class); - // flowHookProvider = mock(FlowHookProvider.class); + // eventHookProvider = mock(EventHookProvider.class); snapshotSerializer = mock(VersionedProcessGroupSerializer.class); final ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.getValidator(); - registryService = new RegistryService(metadataService, flowPersistenceProvider, new ArrayList(), snapshotSerializer, validator); + registryService = new RegistryService(metadataService, flowPersistenceProvider, snapshotSerializer, validator); } // ---------------------- Test Bucket methods --------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml b/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml index ca6cc6c90..568e756de 100644 --- a/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml +++ b/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml @@ -21,10 +21,10 @@ flow bar - - org.apache.nifi.registry.provider.hook.ScriptFlowHookProvider + + org.apache.nifi.registry.provider.hook.ScriptEventHookProvider - + \ No newline at end of file diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/Event.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/Event.java new file mode 100644 index 000000000..294896208 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/Event.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.hook; + +import java.util.List; + +/** + * An event that will be passed to EventHookProviders. + */ +public interface Event { + + /** + * @return the type of the event + */ + EventType getEventType(); + + /** + * @return the fields of the event in the order they were added to the event + */ + List getFields(); + + /** + * @param fieldName the name of the field to return + * @return the EventField with the given name, or null if it does not exist + */ + EventField getField(EventFieldName fieldName); + + /** + * Will be called before publishing the event to ensure the event contains the required + * fields for the given event type in the order specified by the type. + * + * @throws IllegalStateException if the event does not contain the required fields + */ + void validate() throws IllegalStateException; + +} diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventField.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventField.java new file mode 100644 index 000000000..485926654 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventField.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.hook; + +/** + * A field for an event. + */ +public interface EventField { + + /** + * @return the name of the field + */ + EventFieldName getName(); + + /** + * @return the value of the field + */ + String getValue(); +} diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventFieldName.java similarity index 82% rename from nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java rename to nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventFieldName.java index c0aebc728..35b0cfe00 100644 --- a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventFieldName.java @@ -14,18 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.registry.hook; -public enum FlowHookEvent { - CREATE_BUCKET, - CREATE_FLOW, - CREATE_VERSION, +/** + * Enumeration of possible field names for an EventField. + */ +public enum EventFieldName { - UPDATE_BUCKET, - UPDATE_FLOW, + BUCKET_ID, + FLOW_ID, + VERSION, + USER, + COMMENT; - DELETE_BUCKET, - DELETE_FLOW, - DELETE_VERSION; } diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventHookException.java similarity index 76% rename from nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java rename to nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventHookException.java index 85033ca1a..2d91735aa 100644 --- a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventHookException.java @@ -17,15 +17,15 @@ package org.apache.nifi.registry.hook; /** - * An Exception for errors encountered when a FlowHookProvider executes an action before/after a commit. + * An Exception for errors encountered when a EventHookProvider executes an action before/after a commit. */ -public class FlowHookException extends RuntimeException { +public class EventHookException extends RuntimeException { - public FlowHookException(String message) { + public EventHookException(String message) { super(message); } - public FlowHookException(String message, Throwable cause) { + public EventHookException(String message, Throwable cause) { super(message, cause); } } diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventHookProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventHookProvider.java new file mode 100644 index 000000000..e134e8fa9 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventHookProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.hook; + +import org.apache.nifi.registry.provider.Provider; + +/** + * An extension point that will be passed events produced by actions take in the registry. + * + * The list of event types can be found in {@link org.apache.nifi.registry.hook.EventType}. + * + * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may + * change across releases until the registry matures. + */ +public interface EventHookProvider extends Provider { + + /** + * Handles the given event. + * + * @param event the event to handle + * @throws EventHookException if an error occurs handling the event + */ + void handle(Event event) throws EventHookException; + +} diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventType.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventType.java new file mode 100644 index 000000000..ce6930a31 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventType.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.hook; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Enumeration of possible EventTypes with the expected fields for each event. + * + * Producers of events must produce events with the fields in the same order specified here. + */ +public enum EventType { + + CREATE_BUCKET( + EventFieldName.BUCKET_ID, + EventFieldName.USER), + CREATE_FLOW( + EventFieldName.BUCKET_ID, + EventFieldName.FLOW_ID, + EventFieldName.USER), + CREATE_FLOW_VERSION( + EventFieldName.BUCKET_ID, + EventFieldName.FLOW_ID, + EventFieldName.VERSION, + EventFieldName.USER, + EventFieldName.COMMENT), + + UPDATE_BUCKET( + EventFieldName.BUCKET_ID, + EventFieldName.USER), + UPDATE_FLOW( + EventFieldName.BUCKET_ID, + EventFieldName.FLOW_ID, + EventFieldName.USER), + + DELETE_BUCKET( + EventFieldName.BUCKET_ID, + EventFieldName.USER), + DELETE_FLOW( + EventFieldName.BUCKET_ID, + EventFieldName.FLOW_ID, + EventFieldName.USER); + + + private List fieldNames; + + EventType(EventFieldName... fieldNames) { + this.fieldNames = Collections.unmodifiableList(Arrays.asList(fieldNames)); + } + + public List getFieldNames() { + return this.fieldNames; + } + +} diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java deleted file mode 100644 index 599cbed09..000000000 --- a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.registry.hook; - -import org.apache.nifi.registry.flow.FlowSnapshotContext; -import org.apache.nifi.registry.provider.Provider; - -/** - * A service that defines post event action hook - * - * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may - * change across releases until the registry matures. - */ -public interface FlowHookProvider extends Provider { - - /** - * @param bucketId - * @throws FlowHookException - */ - default void postCreateBucket(String bucketId) throws FlowHookException { } - - /** - * @param bucketId - * @param flowId - * @throws FlowHookException - */ - default void postCreateFlow(String bucketId, String flowId) throws FlowHookException { } - - /** - * @param flowSnapshotContext - * @throws FlowHookException - */ - default void postCreateFlowVersion(FlowSnapshotContext flowSnapshotContext) throws FlowHookException { } - - /** - * @param bucketId - * @throws FlowHookException - */ - default void postDeleteBucket(String bucketId) throws FlowHookException { } - - /** - * @param bucketId - * @param flowId - * @throws FlowHookException - */ - default void postDeleteFlow(String bucketId, String flowId) throws FlowHookException { } - - /** - * @param flowSnapshotContext - * @throws FlowHookException - */ - default void postDeleteFlowVersion(String bucketIdentifier, String flowIdentifier, int version) throws FlowHookException { } - - /** - * @param bucketId - * @throws FlowHookException - */ - default void postUpdateBucket(String bucketId) throws FlowHookException { } - - /** - * @param bucketId - * @param flowId - * @throws FlowHookException - */ - default void postUpdateFlow(String bucketId, String flowId) throws FlowHookException { } - -} diff --git a/nifi-registry-resources/src/main/resources/conf/logback.xml b/nifi-registry-resources/src/main/resources/conf/logback.xml index 72e863cbb..7d65bda2a 100644 --- a/nifi-registry-resources/src/main/resources/conf/logback.xml +++ b/nifi-registry-resources/src/main/resources/conf/logback.xml @@ -58,6 +58,24 @@ + + ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-event.log + + + ${org.apache.nifi.registry.bootstrap.config.log.dir}/nifi-registry-event_%d.log + + 5 + + + %date ## %msg%n + + + %date %level [%thread] %logger{40} %msg%n @@ -94,6 +112,11 @@ + + + + + diff --git a/nifi-registry-resources/src/main/resources/conf/providers.xml b/nifi-registry-resources/src/main/resources/conf/providers.xml index 3d7d39285..e59e277ae 100644 --- a/nifi-registry-resources/src/main/resources/conf/providers.xml +++ b/nifi-registry-resources/src/main/resources/conf/providers.xml @@ -31,11 +31,18 @@ --> + + + \ No newline at end of file diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AccessPolicyResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AccessPolicyResource.java index ac84b2f72..713b36907 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AccessPolicyResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AccessPolicyResource.java @@ -27,6 +27,7 @@ import org.apache.nifi.registry.authorization.AccessPolicy; import org.apache.nifi.registry.authorization.AccessPolicySummary; import org.apache.nifi.registry.authorization.Resource; +import org.apache.nifi.registry.event.EventService; import org.apache.nifi.registry.exception.ResourceNotFoundException; import org.apache.nifi.registry.security.authorization.Authorizer; import org.apache.nifi.registry.security.authorization.AuthorizerCapabilityDetection; @@ -73,8 +74,9 @@ public class AccessPolicyResource extends AuthorizableApplicationResource { @Autowired public AccessPolicyResource( Authorizer authorizer, - AuthorizationService authorizationService) { - super(authorizationService); + AuthorizationService authorizationService, + EventService eventService) { + super(authorizationService, eventService); this.authorizer = authorizer; } diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AccessResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AccessResource.java index 080429585..d310d0c3c 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AccessResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AccessResource.java @@ -24,6 +24,7 @@ import io.swagger.annotations.Authorization; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.registry.authorization.CurrentUser; +import org.apache.nifi.registry.event.EventService; import org.apache.nifi.registry.exception.AdministrationException; import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.apache.nifi.registry.security.authentication.AuthenticationRequest; @@ -86,7 +87,9 @@ public AccessResource( JwtService jwtService, X509IdentityProvider x509IdentityProvider, @Nullable KerberosSpnegoIdentityProvider kerberosSpnegoIdentityProvider, - @Nullable IdentityProvider identityProvider) { + @Nullable IdentityProvider identityProvider, + EventService eventService) { + super(eventService); this.properties = properties; this.jwtService = jwtService; this.x509IdentityProvider = x509IdentityProvider; diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/ApplicationResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/ApplicationResource.java index ed5c28123..34369d0b3 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/ApplicationResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/ApplicationResource.java @@ -17,6 +17,9 @@ package org.apache.nifi.registry.web.api; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.registry.event.EventService; +import org.apache.nifi.registry.hook.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +55,17 @@ public class ApplicationResource { @Context private UriInfo uriInfo; + private final EventService eventService; + + public ApplicationResource(final EventService eventService) { + this.eventService = eventService; + Validate.notNull(this.eventService); + } + + protected void publish(final Event event) { + eventService.publish(event); + } + protected String generateResourceUri(final String... path) { final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); uriBuilder.segment(path); diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AuthorizableApplicationResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AuthorizableApplicationResource.java index 8f983cf84..83240c7ac 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AuthorizableApplicationResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/AuthorizableApplicationResource.java @@ -18,6 +18,7 @@ import org.apache.nifi.registry.authorization.Resource; import org.apache.nifi.registry.bucket.BucketItem; +import org.apache.nifi.registry.event.EventService; import org.apache.nifi.registry.security.authorization.AuthorizableLookup; import org.apache.nifi.registry.security.authorization.RequestAction; import org.apache.nifi.registry.security.authorization.resource.Authorizable; @@ -38,7 +39,9 @@ public class AuthorizableApplicationResource extends ApplicationResource { protected final AuthorizableLookup authorizableLookup; protected AuthorizableApplicationResource( - AuthorizationService authorizationService) { + AuthorizationService authorizationService, + EventService eventService) { + super(eventService); this.authorizationService = authorizationService; this.authorizableLookup = authorizationService.getAuthorizableLookup(); } diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java index 64f183062..942a3d4d4 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java @@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.registry.bucket.BucketItem; import org.apache.nifi.registry.diff.VersionedFlowDifference; +import org.apache.nifi.registry.event.EventFactory; +import org.apache.nifi.registry.event.EventService; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; @@ -76,8 +78,9 @@ public BucketFlowResource( final RegistryService registryService, final LinkService linkService, final PermissionsService permissionsService, - final AuthorizationService authorizationService) { - super(authorizationService); + final AuthorizationService authorizationService, + final EventService eventService) { + super(authorizationService, eventService); this.registryService = registryService; this.linkService = linkService; this.permissionsService =permissionsService; @@ -111,7 +114,10 @@ public Response createFlow( authorizeBucketAccess(RequestAction.WRITE, bucketId); verifyPathParamsMatchBody(bucketId, flow); + final VersionedFlow createdFlow = registryService.createFlow(bucketId, flow); + publish(EventFactory.flowCreated(createdFlow)); + permissionsService.populateItemPermissions(createdFlow); linkService.populateFlowLinks(createdFlow); return Response.status(Response.Status.OK).entity(createdFlow).build(); @@ -222,6 +228,7 @@ public Response updateFlow( setBucketItemMetadataIfMissing(bucketId, flowId, flow); final VersionedFlow updatedFlow = registryService.updateFlow(flow); + publish(EventFactory.flowUpdated(updatedFlow)); permissionsService.populateItemPermissions(updatedFlow); linkService.populateFlowLinks(updatedFlow); @@ -256,6 +263,7 @@ public Response deleteFlow( authorizeBucketAccess(RequestAction.DELETE, bucketId); final VersionedFlow deletedFlow = registryService.deleteFlow(bucketId, flowId); + publish(EventFactory.flowDeleted(deletedFlow)); return Response.status(Response.Status.OK).entity(deletedFlow).build(); } @@ -300,6 +308,8 @@ public Response createFlowVersion( snapshot.getSnapshotMetadata().setAuthor(userIdentity); final VersionedFlowSnapshot createdSnapshot = registryService.createFlowSnapshot(snapshot); + publish(EventFactory.flowVersionCreated(createdSnapshot)); + if (createdSnapshot.getSnapshotMetadata() != null) { linkService.populateSnapshotLinks(createdSnapshot.getSnapshotMetadata()); } diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java index 5f75e8030..e905973c9 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java @@ -27,6 +27,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.registry.bucket.Bucket; import org.apache.nifi.registry.bucket.BucketItem; +import org.apache.nifi.registry.event.EventFactory; +import org.apache.nifi.registry.event.EventService; import org.apache.nifi.registry.field.Fields; import org.apache.nifi.registry.security.authorization.RequestAction; import org.apache.nifi.registry.security.authorization.exception.AccessDeniedException; @@ -83,8 +85,9 @@ public BucketResource( final RegistryService registryService, final LinkService linkService, final PermissionsService permissionsService, - final AuthorizationService authorizationService) { - super(authorizationService); + final AuthorizationService authorizationService, + final EventService eventService) { + super(authorizationService, eventService); this.registryService = registryService; this.linkService = linkService; this.permissionsService = permissionsService; @@ -110,7 +113,10 @@ public Response createBucket( @ApiParam(value = "The bucket to create", required = true) final Bucket bucket) { authorizeAccess(RequestAction.WRITE); + final Bucket createdBucket = registryService.createBucket(bucket); + publish(EventFactory.bucketCreated(createdBucket)); + permissionsService.populateBucketPermissions(createdBucket); linkService.populateBucketLinks(createdBucket); return Response.status(Response.Status.OK).entity(createdBucket).build(); @@ -224,6 +230,8 @@ public Response updateBucket( authorizeBucketAccess(RequestAction.WRITE, bucketId); final Bucket updatedBucket = registryService.updateBucket(bucket); + publish(EventFactory.bucketUpdated(updatedBucket)); + permissionsService.populateBucketPermissions(updatedBucket); linkService.populateBucketLinks(updatedBucket); return Response.status(Response.Status.OK).entity(updatedBucket).build(); @@ -256,7 +264,10 @@ public Response deleteBucket( throw new BadRequestException("Bucket id cannot be blank"); } authorizeBucketAccess(RequestAction.DELETE, bucketId); + final Bucket deletedBucket = registryService.deleteBucket(bucketId); + publish(EventFactory.bucketDeleted(deletedBucket)); + return Response.status(Response.Status.OK).entity(deletedBucket).build(); } diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/FlowResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/FlowResource.java index 2c04d600d..02283341b 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/FlowResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/FlowResource.java @@ -25,6 +25,7 @@ import io.swagger.annotations.Extension; import io.swagger.annotations.ExtensionProperty; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.registry.event.EventService; import org.apache.nifi.registry.field.Fields; import org.apache.nifi.registry.flow.VersionedFlow; import org.apache.nifi.registry.flow.VersionedFlowSnapshot; @@ -65,8 +66,9 @@ public class FlowResource extends AuthorizableApplicationResource { public FlowResource(final RegistryService registryService, final LinkService linkService, final PermissionsService permissionsService, - final AuthorizationService authorizationService) { - super(authorizationService); + final AuthorizationService authorizationService, + final EventService eventService) { + super(authorizationService, eventService); this.registryService = registryService; this.linkService = linkService; this.permissionsService = permissionsService; diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/ItemResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/ItemResource.java index e7ae6beaf..02b63d2e3 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/ItemResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/ItemResource.java @@ -25,6 +25,7 @@ import io.swagger.annotations.Extension; import io.swagger.annotations.ExtensionProperty; import org.apache.nifi.registry.bucket.BucketItem; +import org.apache.nifi.registry.event.EventService; import org.apache.nifi.registry.field.Fields; import org.apache.nifi.registry.security.authorization.RequestAction; import org.apache.nifi.registry.service.AuthorizationService; @@ -73,8 +74,9 @@ public ItemResource( final RegistryService registryService, final LinkService linkService, final PermissionsService permissionsService, - final AuthorizationService authorizationService) { - super(authorizationService); + final AuthorizationService authorizationService, + final EventService eventService) { + super(authorizationService, eventService); this.registryService = registryService; this.linkService = linkService; this.permissionsService = permissionsService; diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/TenantResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/TenantResource.java index d00038dd4..7215ccbec 100644 --- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/TenantResource.java +++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/TenantResource.java @@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.registry.authorization.User; import org.apache.nifi.registry.authorization.UserGroup; +import org.apache.nifi.registry.event.EventService; import org.apache.nifi.registry.exception.ResourceNotFoundException; import org.apache.nifi.registry.security.authorization.Authorizer; import org.apache.nifi.registry.security.authorization.AuthorizerCapabilityDetection; @@ -70,8 +71,8 @@ public class TenantResource extends AuthorizableApplicationResource { private Authorizer authorizer; @Autowired - public TenantResource(AuthorizationService authorizationService) { - super(authorizationService); + public TenantResource(AuthorizationService authorizationService, EventService eventService) { + super(authorizationService, eventService); authorizer = authorizationService.getAuthorizer(); } diff --git a/pom.xml b/pom.xml index 94bb5e0e2..86deaa4c6 100644 --- a/pom.xml +++ b/pom.xml @@ -110,8 +110,8 @@ 2.1 2.26 2.9.2 - 2.0.0.M7 - 5.0.0.RELEASE + 2.0.2.RELEASE + 5.0.5.RELEASE 4.2.0