From 87b3f991e66bf072b35d49bee338a4e08656a5a8 Mon Sep 17 00:00:00 2001 From: Bryan Rosander Date: Wed, 9 Nov 2016 21:11:33 -0500 Subject: [PATCH] MINIFI-136 - Fixing ordering issue in ConfigTransformer --- .../bootstrap/util/ConfigTransformer.java | 16 +- .../bootstrap/util/ConfigTransformerTest.java | 40 ++++- .../test/resources/config-funnel-and-rpg.yml | 170 ++++++++++++++++++ 3 files changed, 217 insertions(+), 9 deletions(-) create mode 100644 minifi-bootstrap/src/test/resources/config-funnel-and-rpg.yml diff --git a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java index 19d3268d7..09776de9e 100644 --- a/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java +++ b/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java @@ -366,14 +366,6 @@ protected static void addProcessGroup(Document doc, Element element, ProcessGrou addProcessor(element, processorConfig); } - for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : processGroupSchema.getRemoteProcessingGroups()) { - addRemoteProcessGroup(element, remoteProcessingGroupSchema); - } - - for (FunnelSchema funnelSchema : processGroupSchema.getFunnels()) { - addFunnel(element, funnelSchema); - } - for (PortSchema portSchema : processGroupSchema.getInputPortSchemas()) { addPort(doc, element, portSchema, "inputPort"); } @@ -382,12 +374,20 @@ protected static void addProcessGroup(Document doc, Element element, ProcessGrou addPort(doc, element, portSchema, "outputPort"); } + for (FunnelSchema funnelSchema : processGroupSchema.getFunnels()) { + addFunnel(element, funnelSchema); + } + for (ProcessGroupSchema child : processGroupSchema.getProcessGroupSchemas()) { Element processGroups = doc.createElement("processGroup"); element.appendChild(processGroups); addProcessGroup(doc, processGroups, child, parentGroupIdResolver); } + for (RemoteProcessingGroupSchema remoteProcessingGroupSchema : processGroupSchema.getRemoteProcessingGroups()) { + addRemoteProcessGroup(element, remoteProcessingGroupSchema); + } + for (ConnectionSchema connectionConfig : processGroupSchema.getConnections()) { addConnection(element, connectionConfig, parentGroupIdResolver); } diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java index b41dc90bd..05f2abf81 100644 --- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java +++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java @@ -44,18 +44,23 @@ import javax.xml.xpath.XPathFactory; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; public class ConfigTransformerTest { - + public static final Map PG_ELEMENT_ORDER_MAP = generateOrderMap( + Arrays.asList("processor", "inputPort", "outputPort", "funnel", "processGroup", "remoteProcessGroup", "connection")); private XPathFactory xPathFactory; private Document document; private Element config; @@ -109,6 +114,11 @@ public void testFunnelsTransform() throws Exception { testConfigFileTransform("stress-test-framework-funnel.yml"); } + @Test + public void testFunnelAndRpgTransform() throws Exception { + testConfigFileTransform("config-funnel-and-rpg.yml"); + } + public void testConfigFileTransform(String configFile) throws Exception { ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream(configFile)); @@ -124,6 +134,8 @@ private void testProcessGroup(Element element, ProcessGroupSchema processGroupSc assertEquals(processGroupSchema.getName(), getText(element, "name")); assertEquals(nullToEmpty(processGroupSchema.getComment()), nullToEmpty(getText(element, "comment"))); + checkOrderOfChildren(element, PG_ELEMENT_ORDER_MAP); + NodeList processorElements = (NodeList) xPathFactory.newXPath().evaluate("processor", element, XPathConstants.NODESET); assertEquals(processGroupSchema.getProcessors().size(), processorElements.getLength()); for (int i = 0; i < processorElements.getLength(); i++) { @@ -256,4 +268,30 @@ private String getText(Element element, String path) throws XPathExpressionExcep private String nullToEmpty(Object val) { return val == null ? "" : val.toString(); } + + private static Map generateOrderMap(List elements) { + Map map = new HashMap<>(); + for (int i = 0; i < elements.size(); i++) { + map.put(elements.get(i), i); + } + return Collections.unmodifiableMap(map); + } + + private static void checkOrderOfChildren(Element element, Map orderMap) { + int elementOrderList = 0; + NodeList childNodes = element.getChildNodes(); + String lastOrderedElementName = null; + for (int i = 0; i < childNodes.getLength(); i++) { + String nodeName = childNodes.item(i).getNodeName(); + Integer index = orderMap.get(nodeName); + if (index != null) { + if (elementOrderList > index) { + fail("Found " + nodeName + " after " + lastOrderedElementName + "; expected all " + nodeName + " elements to come before the following elements: " + orderMap.entrySet().stream() + .filter(e -> e.getValue() > index ).sorted(Comparator.comparingInt(e -> e.getValue())).map(e -> e.getKey()).collect(Collectors.joining(", "))); + } + lastOrderedElementName = nodeName; + elementOrderList = index; + } + } + } } diff --git a/minifi-bootstrap/src/test/resources/config-funnel-and-rpg.yml b/minifi-bootstrap/src/test/resources/config-funnel-and-rpg.yml new file mode 100644 index 000000000..bcc91d616 --- /dev/null +++ b/minifi-bootstrap/src/test/resources/config-funnel-and-rpg.yml @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +MiNiFi Config Version: 2 +Flow Controller: + name: MiNiFi test Funnels multiple dest + comment: '' +Core Properties: + flow controller graceful shutdown period: 10 sec + flow service write delay interval: 500 ms + administrative yield duration: 30 sec + bored yield duration: 10 millis + max concurrent threads: 1 +FlowFile Repository: + partitions: 256 + checkpoint interval: 2 mins + always sync: false + Swap: + threshold: 20000 + in period: 5 sec + in threads: 1 + out period: 5 sec + out threads: 4 +Content Repository: + content claim max appendable size: 10 MB + content claim max flow files: 100 + always sync: false +Provenance Repository: + provenance rollover time: 1 min +Component Status Repository: + buffer size: 1440 + snapshot frequency: 1 min +Security Properties: + keystore: '' + keystore type: '' + keystore password: '' + key password: '' + truststore: '' + truststore type: '' + truststore password: '' + ssl protocol: '' + Sensitive Props: + key: + algorithm: PBEWITHMD5AND256BITAES-CBC-OPENSSL + provider: BC +Processors: +- id: f028f52b-e4da-44fe-94b0-93eab6918cde + name: GenerateFlowFile + class: org.apache.nifi.processors.standard.GenerateFlowFile + max concurrent tasks: 1 + scheduling strategy: CRON_DRIVEN + scheduling period: 0/10 * * * * ? + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Batch Size: '1' + Data Format: Binary + File Size: 10B + Unique FlowFiles: 'false' +- id: dd424151-3e2c-427e-be9e-80b67e574d03 + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Delete Attributes Expression: + a: 'true' +- id: 11101824-f926-4250-9dfc-8b52e04212bf + name: UpdateAttribute + class: org.apache.nifi.processors.attributes.UpdateAttribute + max concurrent tasks: 1 + scheduling strategy: TIMER_DRIVEN + scheduling period: 0 sec + penalization period: 30 sec + yield period: 1 sec + run duration nanos: 0 + auto-terminated relationships list: [] + Properties: + Delete Attributes Expression: + b: 'true' +Process Groups: [] +Input Ports: [] +Output Ports: [] +Funnels: +- id: b34f32fd-e9c3-4907-9e09-484506fc4417 +Connections: +- id: caac3cbe-45cd-4934-a918-f364c38e0bdb + name: GenerateFlowFile/success/b34f32fd-e9c3-4907-9e09-484506fc4417 + source id: f028f52b-e4da-44fe-94b0-93eab6918cde + source relationship names: + - success + destination id: b34f32fd-e9c3-4907-9e09-484506fc4417 + max work queue size: 0 + max work queue data size: 0 MB + flowfile expiration: 0 sec + queue prioritizer class: '' +- id: a238c7ed-8c3f-4574-b5cb-75aa2d642298 + name: UpdateAttribute/success/2c1d6dcc-b11c-4ba7-8c6f-ad9e4f730cbe + source id: 11101824-f926-4250-9dfc-8b52e04212bf + source relationship names: + - success + destination id: 2c1d6dcc-b11c-4ba7-8c6f-ad9e4f730cbe + max work queue size: 0 + max work queue data size: 0 MB + flowfile expiration: 0 sec + queue prioritizer class: '' +- id: e943a827-d60b-4dd8-ad23-e4f3ab63f906 + name: UpdateAttribute/success/8d2c579e-4ad2-4922-a311-a37b7e551b7a + source id: dd424151-3e2c-427e-be9e-80b67e574d03 + source relationship names: + - success + destination id: 8d2c579e-4ad2-4922-a311-a37b7e551b7a + max work queue size: 0 + max work queue data size: 0 MB + flowfile expiration: 0 sec + queue prioritizer class: '' +- id: 45d9f462-e03d-4e96-98e8-17ea9844ca96 + name: b34f32fd-e9c3-4907-9e09-484506fc4417//UpdateAttribute + source id: b34f32fd-e9c3-4907-9e09-484506fc4417 + source relationship names: [] + destination id: dd424151-3e2c-427e-be9e-80b67e574d03 + max work queue size: 0 + max work queue data size: 0 MB + flowfile expiration: 0 sec + queue prioritizer class: '' +- id: 0eacb4c0-3c1d-4858-884b-5b3c8d1b1f6c + name: b34f32fd-e9c3-4907-9e09-484506fc4417//UpdateAttribute + source id: b34f32fd-e9c3-4907-9e09-484506fc4417 + source relationship names: [] + destination id: 11101824-f926-4250-9dfc-8b52e04212bf + max work queue size: 0 + max work queue data size: 0 MB + flowfile expiration: 0 sec + queue prioritizer class: '' +Remote Processing Groups: +- name: NiFi Flow + url: http://localhost:8080/nifi + comment: '' + timeout: 30 sec + yield period: 10 sec + Input Ports: + - id: 8d2c579e-4ad2-4922-a311-a37b7e551b7a + name: input port + comment: '' + max concurrent tasks: 1 + use compression: false + - id: 2c1d6dcc-b11c-4ba7-8c6f-ad9e4f730cbe + name: provenance + comment: '' + max concurrent tasks: 1 + use compression: false