From 7291859596e3d7ac3cad8b9655e14fe556d758c6 Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Mon, 15 May 2017 12:52:46 -0400 Subject: [PATCH] NIFI-3868: - Ensuring we do not attempt to group bulletins that lack permissions. - Only group bulletins when all nodes report the same message. - Retain the most recent bulletin. --- .../BulletinBoardEndpointMerger.java | 2 +- .../ControllerBulletinsEndpointMerger.java | 6 +- .../nifi/cluster/manager/BulletinMerger.java | 44 ++++++++-- .../manager/ComponentEntityMerger.java | 2 +- .../cluster/manager/BulletinMergerTest.java | 86 +++++++++++++++++++ 5 files changed, 130 insertions(+), 10 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/manager/BulletinMergerTest.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java index 10019128b03d..0aa705c5ce69 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/BulletinBoardEndpointMerger.java @@ -61,7 +61,7 @@ protected void mergeResponses(BulletinBoardDTO clientDto, Map BULLETIN_COMPARATOR = new Comparator() { @@ -54,7 +55,7 @@ public int compare(BulletinEntity o1, BulletinEntity o2) { * * @param bulletins bulletins */ - public static List mergeBulletins(final Map> bulletins) { + public static List mergeBulletins(final Map> bulletins, final int totalNodes) { final List bulletinEntities = new ArrayList<>(); for (final Map.Entry> entry : bulletins.entrySet()) { @@ -76,9 +77,42 @@ public static List mergeBulletins(final Map entities = Lists.newArrayList(); - final Map> groupingEntities = bulletinEntities.stream().collect(Collectors.groupingBy(b -> b.getBulletin().getMessage())); - groupingEntities.values().stream().map(e -> e.get(0)).forEach(entities::add); + // group by message when permissions allow + final Map> groupingEntities = bulletinEntities.stream() + .filter(bulletinEntity -> bulletinEntity.getCanRead()) + .collect(Collectors.groupingBy(b -> b.getBulletin().getMessage())); + + // add one from each grouped bulletin when all nodes report the same message + groupingEntities.forEach((message, groupedBulletinEntities) -> { + if (groupedBulletinEntities.size() == totalNodes) { + // get the most current bulletin + final BulletinEntity selectedBulletinEntity = groupedBulletinEntities.stream() + .max(Comparator.comparingLong(bulletinEntity -> { + if (bulletinEntity.getTimestamp() == null) { + return 0; + } else { + return bulletinEntity.getTimestamp().getTime(); + } + })).orElse(null); + + // should never be null, but just in case + if (selectedBulletinEntity != null) { + selectedBulletinEntity.setNodeAddress(ALL_NODES_MESSAGE); + selectedBulletinEntity.getBulletin().setNodeAddress(ALL_NODES_MESSAGE); + entities.add(selectedBulletinEntity); + } + } else { + // since all nodes didn't report the exact same bulletin, keep them all + entities.addAll(groupedBulletinEntities); + } + }); + + // ensure we also get the remainder of the bulletin entities + bulletinEntities.stream() + .filter(bulletinEntity -> !bulletinEntity.getCanRead()) + .forEach(entities::add); + // ensure the bulletins are sorted by time Collections.sort(entities, (BulletinEntity o1, BulletinEntity o2) -> { final int timeComparison = o1.getTimestamp().compareTo(o2.getTimestamp()); if (timeComparison != 0) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java index eda3c0f66840..f7c28fd889e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ComponentEntityMerger.java @@ -59,7 +59,7 @@ default void merge(final EntityType clientEntity, final Map> nodeMap = new HashMap<>(); + nodeMap.put(node1, new ArrayList<>()); + nodeMap.put(node2, new ArrayList<>()); + + nodeMap.get(node1).add(bulletinEntity1); + nodeMap.get(node1).add(bulletinEntity2); + nodeMap.get(node1).add(unauthorizedBulletin); + + nodeMap.get(node2).add(copyOfBulletin1); + + final List bulletinEntities = BulletinMerger.mergeBulletins(nodeMap, nodeMap.size()); + assertEquals(bulletinEntities.size(), 3); + assertTrue(bulletinEntities.contains(copyOfBulletin1)); + assertEquals(copyOfBulletin1.getNodeAddress(), ALL_NODES_MESSAGE); + assertTrue(bulletinEntities.contains(bulletinEntity2)); + assertTrue(bulletinEntities.contains(unauthorizedBulletin)); + } + +} \ No newline at end of file