From 36e4c57ebc400ba1617d20a1494534ac15d7e2b2 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 5 Aug 2016 15:52:45 -0400 Subject: [PATCH] NIFI-2493: Do not fingerprint Remote Ports' running state. When synchronizing remote flow with local flow, start/stop remote group ports as appropriate based on the inherited flow --- .../controller/StandardFlowSynchronizer.java | 50 +++++++++++++++++++ .../nifi/fingerprint/FingerprintFactory.java | 3 +- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 0e0c74b7d366..d1822efebf12 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -772,6 +772,56 @@ private ProcessGroup updateProcessGroup(final FlowController controller, final P } } + // Update scheduled state of Remote Group Ports + final List remoteProcessGroupList = getChildrenByTagName(processGroupElement, "remoteProcessGroup"); + for (final Element remoteGroupElement : remoteProcessGroupList) { + final RemoteProcessGroupDTO remoteGroupDto = FlowFromDOMFactory.getRemoteProcessGroup(remoteGroupElement, encryptor); + final RemoteProcessGroup rpg = processGroup.getRemoteProcessGroup(remoteGroupDto.getId()); + + // input ports + final List inputPortElements = getChildrenByTagName(remoteGroupElement, "inputPort"); + for (final Element inputPortElement : inputPortElements) { + final RemoteProcessGroupPortDescriptor portDescriptor = FlowFromDOMFactory.getRemoteProcessGroupPort(inputPortElement); + final String inputPortId = portDescriptor.getId(); + final RemoteGroupPort inputPort = rpg.getInputPort(inputPortId); + if (inputPort == null) { + continue; + } + + if (portDescriptor.isTransmitting()) { + if (inputPort.getScheduledState() != ScheduledState.RUNNING && inputPort.getScheduledState() != ScheduledState.STARTING) { + rpg.startTransmitting(inputPort); + } + } else { + if (inputPort.getScheduledState() != ScheduledState.STOPPED && inputPort.getScheduledState() != ScheduledState.STOPPING) { + rpg.stopTransmitting(inputPort); + } + } + } + + // output ports + final List outputPortElements = getChildrenByTagName(remoteGroupElement, "outputPort"); + for (final Element outputPortElement : outputPortElements) { + final RemoteProcessGroupPortDescriptor portDescriptor = FlowFromDOMFactory.getRemoteProcessGroupPort(outputPortElement); + final String outputPortId = portDescriptor.getId(); + final RemoteGroupPort outputPort = rpg.getOutputPort(outputPortId); + if (outputPort == null) { + continue; + } + + if (portDescriptor.isTransmitting()) { + if (outputPort.getScheduledState() != ScheduledState.RUNNING && outputPort.getScheduledState() != ScheduledState.STARTING) { + rpg.startTransmitting(outputPort); + } + } else { + if (outputPort.getScheduledState() != ScheduledState.STOPPED && outputPort.getScheduledState() != ScheduledState.STOPPING) { + rpg.stopTransmitting(outputPort); + } + } + } + } + + // add labels final List labelNodeList = getChildrenByTagName(processGroupElement, "label"); for (final Element labelElement : labelNodeList) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index 26f83b5ffdaf..20bdb6046d54 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -778,7 +778,7 @@ public int compare(final Element o1, final Element o2) { } private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final Element remoteGroupPortElement) { - for (final String childName : new String[]{"id", "scheduledState", "maxConcurrentTasks", "useCompression"}) { + for (final String childName : new String[] {"id", "maxConcurrentTasks", "useCompression"}) { appendFirstValue(builder, DomUtils.getChildNodesByTagName(remoteGroupPortElement, childName)); } @@ -787,7 +787,6 @@ private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, private StringBuilder addRemoteGroupPortFingerprint(final StringBuilder builder, final RemoteProcessGroupPortDTO port) { builder.append(port.getId()); - builder.append(Boolean.TRUE.equals(port.isTransmitting()) ? "RUNNING" : "STOPPED"); builder.append(port.getConcurrentlySchedulableTaskCount()); builder.append(port.getUseCompression()); return builder;