From 575e09da4570d06c5401c1a5974a890537f2cead Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 25 May 2017 15:31:43 -0400 Subject: [PATCH] NIFI-3981: When serializing flow to cluster, use the Scheduled State of ports as they are configured to be, not the current state, since the current state may change as soon as the FlowController has finished initializing --- .../nifi/controller/FlowController.java | 51 ++++++++++++++++++- .../controller/StandardFlowSynchronizer.java | 12 ++--- .../serialization/ScheduledStateLookup.java | 15 +++++- .../serialization/StandardFlowSerializer.java | 16 +++--- 4 files changed, 78 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 34ea266d7a33..aef6d46d4001 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -1518,7 +1518,29 @@ public void shutdown(final boolean kill) { public void serialize(final FlowSerializer serializer, final OutputStream os) throws FlowSerializationException { readLock.lock(); try { - final ScheduledStateLookup scheduledStateLookup = procNode -> startConnectablesAfterInitialization.contains(procNode) ? ScheduledState.RUNNING : procNode.getScheduledState(); + final ScheduledStateLookup scheduledStateLookup = new ScheduledStateLookup() { + @Override + public ScheduledState getScheduledState(final ProcessorNode procNode) { + if (startConnectablesAfterInitialization.contains(procNode)) { + return ScheduledState.RUNNING; + } + + return procNode.getScheduledState(); + } + + @Override + public ScheduledState getScheduledState(final Port port) { + if (startConnectablesAfterInitialization.contains(port)) { + return ScheduledState.RUNNING; + } + if (startRemoteGroupPortsAfterInitialization.contains(port)) { + return ScheduledState.RUNNING; + } + + return port.getScheduledState(); + } + }; + serializer.serialize(this, os, scheduledStateLookup); } finally { readLock.unlock(); @@ -2922,6 +2944,33 @@ public void startConnectable(final Connectable connectable) { } } + public void stopConnectable(final Connectable connectable) { + final ProcessGroup group = requireNonNull(connectable).getProcessGroup(); + + writeLock.lock(); + try { + switch (requireNonNull(connectable).getConnectableType()) { + case FUNNEL: + // Ignore. We don't support stopping funnels. + break; + case INPUT_PORT: + case REMOTE_INPUT_PORT: + startConnectablesAfterInitialization.remove(connectable); + group.stopInputPort((Port) connectable); + break; + case OUTPUT_PORT: + case REMOTE_OUTPUT_PORT: + startConnectablesAfterInitialization.remove(connectable); + group.stopOutputPort((Port) connectable); + break; + default: + throw new IllegalArgumentException(); + } + } finally { + writeLock.unlock(); + } + } + public void startTransmitting(final RemoteGroupPort remoteGroupPort) { writeLock.lock(); try { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 975f95425d30..6f1e8e1055d2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -774,19 +774,19 @@ private ProcessGroup updateProcessGroup(final FlowController controller, final P case DISABLED: // switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing), // and then we have to disable it. - port.getProcessGroup().stopInputPort(port); + controller.stopConnectable(port); port.getProcessGroup().disableInputPort(port); break; case RUNNING: // we want to run now. Make sure processor is not disabled and then start it. port.getProcessGroup().enableInputPort(port); - port.getProcessGroup().startInputPort(port); + controller.startConnectable(port); break; case STOPPED: if (port.getScheduledState() == ScheduledState.DISABLED) { port.getProcessGroup().enableInputPort(port); } else if (port.getScheduledState() == ScheduledState.RUNNING) { - port.getProcessGroup().stopInputPort(port); + controller.stopConnectable(port); } break; } @@ -803,19 +803,19 @@ private ProcessGroup updateProcessGroup(final FlowController controller, final P case DISABLED: // switch processor do disabled. This means we have to stop it (if it's already stopped, this method does nothing), // and then we have to disable it. - port.getProcessGroup().stopOutputPort(port); + controller.stopConnectable(port); port.getProcessGroup().disableOutputPort(port); break; case RUNNING: // we want to run now. Make sure processor is not disabled and then start it. port.getProcessGroup().enableOutputPort(port); - port.getProcessGroup().startOutputPort(port); + controller.startConnectable(port); break; case STOPPED: if (port.getScheduledState() == ScheduledState.DISABLED) { port.getProcessGroup().enableOutputPort(port); } else if (port.getScheduledState() == ScheduledState.RUNNING) { - port.getProcessGroup().stopOutputPort(port); + controller.stopConnectable(port); } break; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java index 07f6017b5ced..39693b8987fe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/ScheduledStateLookup.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.serialization; +import org.apache.nifi.connectable.Port; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; @@ -24,5 +25,17 @@ public interface ScheduledStateLookup { ScheduledState getScheduledState(ProcessorNode procNode); - public static final ScheduledStateLookup IDENTITY_LOOKUP = ProcessorNode::getScheduledState; + ScheduledState getScheduledState(Port port); + + public static final ScheduledStateLookup IDENTITY_LOOKUP = new ScheduledStateLookup() { + @Override + public ScheduledState getScheduledState(final ProcessorNode procNode) { + return procNode.getScheduledState(); + } + + @Override + public ScheduledState getScheduledState(final Port port) { + return port.getScheduledState(); + } + }; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index fea1ecb6419f..702932cce66e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -159,19 +159,19 @@ private void addProcessGroup(final Element parentElement, final ProcessGroup gro if (group.isRootGroup()) { for (final Port port : group.getInputPorts()) { - addRootGroupPort(element, (RootGroupPort) port, "inputPort"); + addRootGroupPort(element, (RootGroupPort) port, "inputPort", scheduledStateLookup); } for (final Port port : group.getOutputPorts()) { - addRootGroupPort(element, (RootGroupPort) port, "outputPort"); + addRootGroupPort(element, (RootGroupPort) port, "outputPort", scheduledStateLookup); } } else { for (final Port port : group.getInputPorts()) { - addPort(element, port, "inputPort"); + addPort(element, port, "inputPort", scheduledStateLookup); } for (final Port port : group.getOutputPorts()) { - addPort(element, port, "outputPort"); + addPort(element, port, "outputPort", scheduledStateLookup); } } @@ -330,7 +330,7 @@ private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPo parentElement.appendChild(element); } - private void addPort(final Element parentElement, final Port port, final String elementName) { + private void addPort(final Element parentElement, final Port port, final String elementName, final ScheduledStateLookup scheduledStateLookup) { final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement(elementName); parentElement.appendChild(element); @@ -338,12 +338,12 @@ private void addPort(final Element parentElement, final Port port, final String addTextElement(element, "name", port.getName()); addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); - addTextElement(element, "scheduledState", port.getScheduledState().name()); + addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name()); parentElement.appendChild(element); } - private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName) { + private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) { final Document doc = parentElement.getOwnerDocument(); final Element element = doc.createElement(elementName); parentElement.appendChild(element); @@ -351,7 +351,7 @@ private void addRootGroupPort(final Element parentElement, final RootGroupPort p addTextElement(element, "name", port.getName()); addPosition(element, port.getPosition()); addTextElement(element, "comments", port.getComments()); - addTextElement(element, "scheduledState", port.getScheduledState().name()); + addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name()); addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks())); for (final String user : port.getUserAccessControl()) { addTextElement(element, "userAccessControl", user);