From de8b65e3b16f0185029fd53f201334347e0f5ffd Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 15 Apr 2016 16:49:47 +0200 Subject: [PATCH 1/2] NIFI-1777 Prevent deleting a connection going to a running processor --- .../org/apache/nifi/connectable/StandardConnection.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index d43a3db6aa3c..11761470d5c4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -392,5 +392,12 @@ public void verifyCanDelete() { throw new IllegalStateException("Source of Connection (" + source + ") is running"); } } + + final Connectable dest = destination.get(); + if (dest.isRunning()) { + if (!ConnectableType.FUNNEL.equals(dest.getConnectableType())) { + throw new IllegalStateException("Destination of Connection (" + dest + ") is running"); + } + } } } From 27d4096e0cc872b9af681561394a8a0a1da996cd Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Mon, 18 Apr 2016 10:13:23 +0200 Subject: [PATCH 2/2] NIFI-1777 Added unit tests to test processor removal --- .../scheduling/TestProcessorLifecycle.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index 560c4cbe8cf0..b6962e35131f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -23,7 +23,9 @@ import java.io.File; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -45,6 +47,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.FlowController; @@ -574,6 +577,73 @@ public void validateStartSucceedsOnProcessorWithEnabledService() throws Exceptio fc.shutdown(true); } + /** + * Test deletion of processor when connected to another + * @throws Exception exception + */ + @Test + public void validateProcessorDeletion() throws Exception { + FlowController fc = this.buildFlowControllerForTest(); + ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); + this.setControllerRootGroup(fc, testGroup); + + ProcessorNode testProcNodeA = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + testProcNodeA.setProperty("P", "hello"); + testGroup.addProcessor(testProcNodeA); + + ProcessorNode testProcNodeB = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString()); + testProcNodeB.setProperty("P", "hello"); + testGroup.addProcessor(testProcNodeB); + + Collection relationNames = new ArrayList(); + relationNames.add("relation"); + Connection connection = fc.createConnection(UUID.randomUUID().toString(), Connection.class.getName(), testProcNodeA, testProcNodeB, relationNames); + testGroup.addConnection(connection); + + ProcessScheduler ps = fc.getProcessScheduler(); + ps.startProcessor(testProcNodeA); + ps.startProcessor(testProcNodeB); + + try { + testGroup.removeProcessor(testProcNodeA); + fail(); + } catch (Exception e) { + // should throw exception because processor running + } + + try { + testGroup.removeProcessor(testProcNodeB); + fail(); + } catch (Exception e) { + // should throw exception because processor running + } + + ps.stopProcessor(testProcNodeB); + Thread.sleep(100); + + try { + testGroup.removeProcessor(testProcNodeA); + fail(); + } catch (Exception e) { + // should throw exception because destination processor running + } + + try { + testGroup.removeProcessor(testProcNodeB); + fail(); + } catch (Exception e) { + // should throw exception because source processor running + } + + ps.stopProcessor(testProcNodeA); + Thread.sleep(100); + + testGroup.removeProcessor(testProcNodeA); + testGroup.removeProcessor(testProcNodeB); + testGroup.shutdown(); + fc.shutdown(true); + } + /** * Scenario where onTrigger() is executed with random delay limited to * 'delayLimit', yet with guaranteed exit from onTrigger().