From 4d7d8d21f211d646def86271a9b69c2633f9adf9 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 6 Apr 2018 16:58:33 +0200 Subject: [PATCH] NIFIREG-160 - WIP - Hook provider --- .../registry/extension/ExtensionManager.java | 2 + .../registry/provider/ProviderFactory.java | 8 + .../provider/StandardProviderFactory.java | 53 ++++++ .../flow/StandardFlowSnapshotContext.java | 14 ++ .../provider/hook/ScriptFlowHookProvider.java | 152 ++++++++++++++++++ .../registry/service/RegistryService.java | 89 ++++++++++ ...apache.nifi.registry.hook.FlowHookProvider | 15 ++ .../src/main/xsd/providers.xsd | 1 + .../flow/TestStandardFlowSnapshotContext.java | 4 +- .../hook/TestScriptFlowHookProvider.java | 45 ++++++ .../registry/service/TestRegistryService.java | 5 +- .../provider/hook/bad-script-provider.xml | 30 ++++ .../registry/flow/FlowSnapshotContext.java | 5 + .../nifi/registry/hook/FlowHookEvent.java | 31 ++++ .../nifi/registry/hook/FlowHookException.java | 31 ++++ .../nifi/registry/hook/FlowHookProvider.java | 81 ++++++++++ .../src/main/resources/conf/providers.xml | 8 + 17 files changed, 572 insertions(+), 2 deletions(-) create mode 100644 nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java create mode 100644 nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider create mode 100644 nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java create mode 100644 nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java create mode 100644 nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java index 4c8b5ac94..7f2b9e14e 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java @@ -22,6 +22,7 @@ import org.apache.nifi.registry.security.authorization.Authorizer; import org.apache.nifi.registry.security.authorization.UserGroupProvider; import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ public class ExtensionManager { classes.add(AccessPolicyProvider.class); classes.add(Authorizer.class); classes.add(IdentityProvider.class); + classes.add(FlowHookProvider.class); EXTENSION_CLASSES = Collections.unmodifiableList(classes); } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java index d5b40a22f..bb62eb29a 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java @@ -16,7 +16,10 @@ */ package org.apache.nifi.registry.provider; +import java.util.List; + import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.hook.FlowHookProvider; /** * A factory for obtaining the configured providers. @@ -35,4 +38,9 @@ public interface ProviderFactory { */ FlowPersistenceProvider getFlowPersistenceProvider(); + /** + * @return the configured FlowHookProviders + */ + List getFlowHookProviders(); + } diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java index 5f2da59c7..34f307cad 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java @@ -18,6 +18,7 @@ import org.apache.nifi.registry.extension.ExtensionManager; import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.apache.nifi.registry.provider.generated.Property; import org.apache.nifi.registry.provider.generated.Providers; @@ -39,6 +40,7 @@ import javax.xml.validation.SchemaFactory; import java.io.File; import java.lang.reflect.Constructor; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -72,6 +74,7 @@ private static JAXBContext initializeJaxbContext() { private final AtomicReference providersHolder = new AtomicReference<>(null); private FlowPersistenceProvider flowPersistenceProvider; + private List flowHookProviders; @Autowired public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) { @@ -151,6 +154,56 @@ public synchronized FlowPersistenceProvider getFlowPersistenceProvider() { return flowPersistenceProvider; } + @Bean + @Override + public List getFlowHookProviders() { + if (flowHookProviders == null) { + flowHookProviders = new ArrayList(); + + if (providersHolder.get() == null) { + throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider"); + } + + final Providers providers = providersHolder.get(); + final List jaxbFlowHookProvider = providers.getFlowHookProvider(); + + if(jaxbFlowHookProvider == null || jaxbFlowHookProvider.isEmpty()) { + // no hook provided + return flowHookProviders; + } + + for (org.apache.nifi.registry.provider.generated.Provider flowHookProvider : jaxbFlowHookProvider) { + + final String flowHookProviderClassName = flowHookProvider.getClazz(); + FlowHookProvider hook; + + try { + final ClassLoader classLoader = extensionManager.getExtensionClassLoader(flowHookProviderClassName); + if (classLoader == null) { + throw new IllegalStateException("Extension not found in any of the configured class loaders: " + flowHookProviderClassName); + } + + final Class rawFlowHookProviderClass = Class.forName(flowHookProviderClassName, true, classLoader); + final Class flowHookProviderClass = rawFlowHookProviderClass.asSubclass(FlowHookProvider.class); + + final Constructor constructor = flowHookProviderClass.getConstructor(); + hook = (FlowHookProvider) constructor.newInstance(); + + LOGGER.info("Instantiated FlowHookProvider with class name {}", new Object[] {flowHookProviderClassName}); + } catch (Exception e) { + throw new ProviderFactoryException("Error creating FlowHookProvider with class name: " + flowHookProviderClassName, e); + } + + final ProviderConfigurationContext configurationContext = createConfigurationContext(flowHookProvider.getProperty()); + hook.onConfigured(configurationContext); + flowHookProviders.add(hook); + LOGGER.info("Configured FlowHookProvider with class name {}", new Object[] {flowHookProviderClassName}); + } + } + + return flowHookProviders; + } + private ProviderConfigurationContext createConfigurationContext(final List configProperties) { final Map properties = new HashMap<>(); diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java index db60783bb..1728513eb 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java @@ -33,6 +33,7 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext { private final String flowName; private final int version; private final String comments; + private final String author; private final long snapshotTimestamp; private StandardFlowSnapshotContext(final Builder builder) { @@ -42,6 +43,7 @@ private StandardFlowSnapshotContext(final Builder builder) { this.flowName = builder.flowName; this.version = builder.version; this.comments = builder.comments; + this.author = builder.author; this.snapshotTimestamp = builder.snapshotTimestamp; Validate.notBlank(bucketId); @@ -87,6 +89,11 @@ public long getSnapshotTimestamp() { return snapshotTimestamp; } + @Override + public String getAuthor() { + return author; + } + /** * Builder for creating instances of StandardFlowSnapshotContext. */ @@ -98,6 +105,7 @@ public static class Builder { private String flowName; private int version; private String comments; + private String author; private long snapshotTimestamp; public Builder() { @@ -111,6 +119,7 @@ public Builder(final Bucket bucket, final VersionedFlow versionedFlow, final Ver flowName(versionedFlow.getName()); version(snapshotMetadata.getVersion()); comments(snapshotMetadata.getComments()); + author(snapshotMetadata.getAuthor()); snapshotTimestamp(snapshotMetadata.getTimestamp()); } @@ -144,6 +153,11 @@ public Builder comments(final String comments) { return this; } + public Builder author(final String author) { + this.author = author; + return this; + } + public Builder snapshotTimestamp(final long snapshotTimestamp) { this.snapshotTimestamp = snapshotTimestamp; return this; diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java new file mode 100644 index 000000000..d57df1658 --- /dev/null +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptFlowHookProvider.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.hook; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.hook.FlowHookEvent; +import org.apache.nifi.registry.hook.FlowHookException; +import org.apache.nifi.registry.hook.FlowHookProvider; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A FlowHookProvider that is used to execute a script before a flow snapshot version is committed. + */ +public class ScriptFlowHookProvider implements FlowHookProvider { + + static final Logger LOGGER = LoggerFactory.getLogger(ScriptFlowHookProvider.class); + static final String SCRIPT_PATH_PROP = "Script Path"; + static final String SCRIPT_WORKDIR_PROP = "Working Directory"; + private File scriptFile; + private File workDirFile; + + @Override + public void postCreateBucket(String bucketId) throws FlowHookException { + this.executeScript(FlowHookEvent.CREATE_BUCKET, bucketId, null, null, null, null); + } + + @Override + public void postCreateFlow(String bucketId, String flowId) throws FlowHookException { + this.executeScript(FlowHookEvent.CREATE_FLOW, bucketId, flowId, null, null, null); + } + + @Override + public void postDeleteBucket(String bucketId) throws FlowHookException { + this.executeScript(FlowHookEvent.DELETE_BUCKET, bucketId, null, null, null, null); + } + + @Override + public void postDeleteFlow(String bucketId, String flowId) throws FlowHookException { + this.executeScript(FlowHookEvent.DELETE_FLOW, bucketId, flowId, null, null, null); + } + + @Override + public void postDeleteFlowVersion(String bucketId, String flowId, int version) throws FlowHookException { + this.executeScript(FlowHookEvent.DELETE_FLOW, bucketId, flowId, Integer.toString(version), null, null); + } + + @Override + public void postUpdateBucket(String bucketId) throws FlowHookException { + this.executeScript(FlowHookEvent.UPDATE_BUCKET, bucketId, null, null, null, null); + } + + @Override + public void postUpdateFlow(String bucketId, String flowId) throws FlowHookException { + this.executeScript(FlowHookEvent.UPDATE_FLOW, bucketId, flowId, null, null, null); + } + + @Override + public void postCreateFlowVersion(final FlowSnapshotContext flowSnapshotContext) throws FlowHookException { + this.executeScript(FlowHookEvent.CREATE_VERSION, flowSnapshotContext.getBucketId(), flowSnapshotContext.getFlowId(), + Integer.toString(flowSnapshotContext.getVersion()), flowSnapshotContext.getComments(), flowSnapshotContext.getAuthor()); + } + + private void executeScript(final FlowHookEvent eventType, String bucketId, String flowId, String version, String comment, String author) { + List command = new ArrayList(); + command.add(scriptFile.getAbsolutePath()); + command.add(eventType.name()); + command.add(bucketId); + + if(flowId != null) { + command.add(flowId); + } + + if(version != null) { + command.add(version); + } + + if(comment != null) { + command.add(comment); + } + + if(author != null) { + command.add(author); + } + + final String commandString = StringUtils.join(command, " "); + final ProcessBuilder builder = new ProcessBuilder(command); + builder.directory(workDirFile); + LOGGER.debug("Execution of " + commandString); + + try { + builder.start(); + } catch (IOException e) { + LOGGER.error("Execution of {0} failed with: {1}", new Object[] { commandString, e.getLocalizedMessage() }, e); + } + } + + @Override + public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException { + final Map props = configurationContext.getProperties(); + if (!props.containsKey(SCRIPT_PATH_PROP)) { + throw new ProviderCreationException("The property " + SCRIPT_PATH_PROP + " must be provided"); + } + + final String scripPath = props.get(SCRIPT_PATH_PROP); + if (StringUtils.isBlank(scripPath)) { + throw new ProviderCreationException("The property " + SCRIPT_PATH_PROP + " cannot be null or blank"); + } + + if(props.containsKey(SCRIPT_WORKDIR_PROP) && !StringUtils.isBlank(props.get(SCRIPT_WORKDIR_PROP))) { + final String workdir = props.get(SCRIPT_WORKDIR_PROP); + try { + workDirFile = new File(workdir); + FileUtils.ensureDirectoryExistAndCanRead(workDirFile); + } catch (IOException e) { + throw new ProviderCreationException("The working directory " + workdir + " cannot be read."); + } + } + + scriptFile = new File(scripPath); + if(scriptFile.isFile() && scriptFile.canExecute()) { + LOGGER.info("Configured ScriptFlowHookProvider with script {}", new Object[] {scriptFile.getAbsolutePath()}); + } else { + throw new ProviderCreationException("The script file " + scriptFile.getAbsolutePath() + " cannot be executed."); + } + } + +} diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java index b03116251..e3650e393 100644 --- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java +++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java @@ -42,6 +42,7 @@ import org.apache.nifi.registry.flow.diff.FlowDifference; import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow; import org.apache.nifi.registry.flow.diff.StandardFlowComparator; +import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; import org.apache.nifi.registry.serialization.Serializer; import org.slf4j.Logger; @@ -84,6 +85,7 @@ public class RegistryService { private final MetadataService metadataService; private final FlowPersistenceProvider flowPersistenceProvider; + private final List flowHookProviders; private final Serializer processGroupSerializer; private final Validator validator; @@ -94,10 +96,12 @@ public class RegistryService { @Autowired public RegistryService(final MetadataService metadataService, final FlowPersistenceProvider flowPersistenceProvider, + final List flowHookProviders, final Serializer processGroupSerializer, final Validator validator) { this.metadataService = metadataService; this.flowPersistenceProvider = flowPersistenceProvider; + this.flowHookProviders = flowHookProviders; this.processGroupSerializer = processGroupSerializer; this.validator = validator; Validate.notNull(this.metadataService); @@ -134,6 +138,17 @@ public Bucket createBucket(final Bucket bucket) { } final BucketEntity createdBucket = metadataService.createBucket(DataModelMapper.map(bucket)); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postCreateBucket(createdBucket.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(createdBucket); } finally { writeLock.unlock(); @@ -225,6 +240,17 @@ public Bucket updateBucket(final Bucket bucket) { // perform the actual update final BucketEntity updatedBucket = metadataService.updateBucket(existingBucketById); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postUpdateBucket(existingBucketById.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(updatedBucket); } finally { writeLock.unlock(); @@ -253,6 +279,16 @@ public Bucket deleteBucket(final String bucketIdentifier) { // now delete the bucket from the metadata provider, which deletes all flows referencing it metadataService.deleteBucket(existingBucket); + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postDeleteBucket(existingBucket.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket); } finally { writeLock.unlock(); @@ -356,6 +392,17 @@ public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFl // persist the flow and return the created entity final FlowEntity createdFlow = metadataService.createFlow(flowEntity); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postCreateFlow(existingBucket.getId(), createdFlow.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket, createdFlow); } finally { writeLock.unlock(); @@ -477,6 +524,17 @@ public VersionedFlow updateFlow(final VersionedFlow versionedFlow) { // perform the actual update final FlowEntity updatedFlow = metadataService.updateFlow(existingFlow); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postUpdateFlow(existingFlow.getBucketId(), existingFlow.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket, updatedFlow); } finally { writeLock.unlock(); @@ -517,6 +575,16 @@ public VersionedFlow deleteFlow(final String bucketIdentifier, final String flow // now delete the flow from the metadata provider metadataService.deleteFlow(existingFlow); + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postDeleteFlow(existingFlow.getBucketId(), existingFlow.getId()); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket, existingFlow); } finally { writeLock.unlock(); @@ -610,6 +678,16 @@ public VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flow } final VersionedFlow updatedVersionedFlow = DataModelMapper.map(existingBucket, updatedFlow); + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postCreateFlowVersion(context); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + flowSnapshot.setBucket(bucket); flowSnapshot.setFlow(updatedVersionedFlow); return flowSnapshot; @@ -822,6 +900,17 @@ public VersionedFlowSnapshotMetadata deleteFlowSnapshot(final String bucketIdent // delete the snapshot itself metadataService.deleteFlowSnapshot(snapshotEntity); + + // call the post-event hook + for(FlowHookProvider flowHookProvider : flowHookProviders) { + try { + flowHookProvider.postDeleteFlowVersion(bucketIdentifier, flowIdentifier, version); + } catch (Exception e) { + // we don't want to throw anything here, hook are provided on best effort + LOGGER.error("Error while calling post-event hook", e); + } + } + return DataModelMapper.map(existingBucket, snapshotEntity); } finally { writeLock.unlock(); diff --git a/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider new file mode 100644 index 000000000..150103aee --- /dev/null +++ b/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.FlowHookProvider @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.registry.provider.hook.ScriptFlowHookProvider \ No newline at end of file diff --git a/nifi-registry-framework/src/main/xsd/providers.xsd b/nifi-registry-framework/src/main/xsd/providers.xsd index 1202f9efd..16086af59 100644 --- a/nifi-registry-framework/src/main/xsd/providers.xsd +++ b/nifi-registry-framework/src/main/xsd/providers.xsd @@ -43,6 +43,7 @@ + diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestStandardFlowSnapshotContext.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestStandardFlowSnapshotContext.java index 3e0a106b4..bff2ef915 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestStandardFlowSnapshotContext.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestStandardFlowSnapshotContext.java @@ -17,7 +17,6 @@ package org.apache.nifi.registry.provider.flow; import org.apache.nifi.registry.flow.FlowSnapshotContext; -import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext; import org.junit.Assert; import org.junit.Test; @@ -31,6 +30,7 @@ public void testBuilder() { final String flowName = "Some Flow"; final int version = 2; final String comments = "Some Comments"; + final String author = "anonymous"; final long timestamp = System.currentTimeMillis(); final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder() @@ -40,6 +40,7 @@ public void testBuilder() { .flowName(flowName) .version(version) .comments(comments) + .author(author) .snapshotTimestamp(timestamp) .build(); @@ -49,6 +50,7 @@ public void testBuilder() { Assert.assertEquals(flowName, context.getFlowName()); Assert.assertEquals(version, context.getVersion()); Assert.assertEquals(comments, context.getComments()); + Assert.assertEquals(author, context.getAuthor()); Assert.assertEquals(timestamp, context.getSnapshotTimestamp()); } diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java new file mode 100644 index 000000000..5bc888cb7 --- /dev/null +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptFlowHookProvider.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.hook; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import org.apache.nifi.registry.extension.ExtensionManager; +import org.apache.nifi.registry.properties.NiFiRegistryProperties; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.provider.ProviderFactory; +import org.apache.nifi.registry.provider.StandardProviderFactory; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestScriptFlowHookProvider { + + @Test(expected = ProviderCreationException.class) + public void testBadScriptProvider() { + final NiFiRegistryProperties props = new NiFiRegistryProperties(); + props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/hook/bad-script-provider.xml"); + + final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class); + when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader()); + + final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager); + providerFactory.initialize(); + providerFactory.getFlowHookProviders(); + } + +} diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java index c35cda79a..cd5ea2281 100644 --- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java +++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java @@ -30,6 +30,7 @@ import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; import org.apache.nifi.registry.flow.VersionedProcessGroup; import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.registry.hook.FlowHookProvider; import org.apache.nifi.registry.serialization.Serializer; import org.apache.nifi.registry.serialization.VersionedProcessGroupSerializer; import org.junit.Assert; @@ -72,6 +73,7 @@ public class TestRegistryService { private MetadataService metadataService; private FlowPersistenceProvider flowPersistenceProvider; + private FlowHookProvider flowHookProvider; private Serializer snapshotSerializer; private Validator validator; @@ -81,12 +83,13 @@ public class TestRegistryService { public void setup() { metadataService = mock(MetadataService.class); flowPersistenceProvider = mock(FlowPersistenceProvider.class); + // flowHookProvider = mock(FlowHookProvider.class); snapshotSerializer = mock(VersionedProcessGroupSerializer.class); final ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.getValidator(); - registryService = new RegistryService(metadataService, flowPersistenceProvider, snapshotSerializer, validator); + registryService = new RegistryService(metadataService, flowPersistenceProvider, new ArrayList(), snapshotSerializer, validator); } // ---------------------- Test Bucket methods --------------------------------------------- diff --git a/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml b/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml new file mode 100644 index 000000000..ca6cc6c90 --- /dev/null +++ b/nifi-registry-framework/src/test/resources/provider/hook/bad-script-provider.xml @@ -0,0 +1,30 @@ + + + + + + org.apache.nifi.registry.provider.MockFlowPersistenceProvider + flow foo + flow bar + + + + org.apache.nifi.registry.provider.hook.ScriptFlowHookProvider + + + + + \ No newline at end of file diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java index c5e06f560..9c2a8183b 100644 --- a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java @@ -56,4 +56,9 @@ public interface FlowSnapshotContext { */ long getSnapshotTimestamp(); + /** + * @return the author of the snapshot + */ + String getAuthor(); + } diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java new file mode 100644 index 000000000..c0aebc728 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.registry.hook; + +public enum FlowHookEvent { + CREATE_BUCKET, + CREATE_FLOW, + CREATE_VERSION, + + UPDATE_BUCKET, + UPDATE_FLOW, + + DELETE_BUCKET, + DELETE_FLOW, + DELETE_VERSION; +} diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java new file mode 100644 index 000000000..85033ca1a --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.hook; + +/** + * An Exception for errors encountered when a FlowHookProvider executes an action before/after a commit. + */ +public class FlowHookException extends RuntimeException { + + public FlowHookException(String message) { + super(message); + } + + public FlowHookException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java new file mode 100644 index 000000000..599cbed09 --- /dev/null +++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/FlowHookProvider.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.hook; + +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.provider.Provider; + +/** + * A service that defines post event action hook + * + * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may + * change across releases until the registry matures. + */ +public interface FlowHookProvider extends Provider { + + /** + * @param bucketId + * @throws FlowHookException + */ + default void postCreateBucket(String bucketId) throws FlowHookException { } + + /** + * @param bucketId + * @param flowId + * @throws FlowHookException + */ + default void postCreateFlow(String bucketId, String flowId) throws FlowHookException { } + + /** + * @param flowSnapshotContext + * @throws FlowHookException + */ + default void postCreateFlowVersion(FlowSnapshotContext flowSnapshotContext) throws FlowHookException { } + + /** + * @param bucketId + * @throws FlowHookException + */ + default void postDeleteBucket(String bucketId) throws FlowHookException { } + + /** + * @param bucketId + * @param flowId + * @throws FlowHookException + */ + default void postDeleteFlow(String bucketId, String flowId) throws FlowHookException { } + + /** + * @param flowSnapshotContext + * @throws FlowHookException + */ + default void postDeleteFlowVersion(String bucketIdentifier, String flowIdentifier, int version) throws FlowHookException { } + + /** + * @param bucketId + * @throws FlowHookException + */ + default void postUpdateBucket(String bucketId) throws FlowHookException { } + + /** + * @param bucketId + * @param flowId + * @throws FlowHookException + */ + default void postUpdateFlow(String bucketId, String flowId) throws FlowHookException { } + +} diff --git a/nifi-registry-resources/src/main/resources/conf/providers.xml b/nifi-registry-resources/src/main/resources/conf/providers.xml index 40bf01297..81f828a31 100644 --- a/nifi-registry-resources/src/main/resources/conf/providers.xml +++ b/nifi-registry-resources/src/main/resources/conf/providers.xml @@ -20,4 +20,12 @@ ./flow_storage + + \ No newline at end of file