From 5adcb0b2db68eef18fb7bcc84634436d36c97be7 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 3 Aug 2017 22:56:27 +0200 Subject: [PATCH 1/2] NIFI-4028 - fix cache update when Wait releases flow files --- .../apache/nifi/processors/standard/Wait.java | 10 +++- .../nifi/processors/standard/TestWait.java | 58 +++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index fccd443817bb..9016e0dca3d1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -444,7 +444,15 @@ public void onTrigger(final ProcessContext context, final ProcessSession session getFlowFilesFor.apply(REL_WAIT).addAll(candidates); // If releasableFlowFileCount == 0, leave signal as it is, // so that any number of FlowFile can be released as long as target count condition matches. - waitCompleted = true; + + // update the counter + try { + waitCompleted = targetCounterName == null ? true : + protocol.notify(signalId, targetCounterName, (int) -targetCount, null).getCount(targetCounterName) == 0L; + } catch (final IOException e) { + session.rollback(); + throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e); + } } } else { getFlowFilesFor.apply(REL_WAIT).addAll(candidates); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java index 71187b6def24..012dbbe8ade7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -463,6 +464,63 @@ public void testWaitForSpecificCount() throws InitializationException, IOExcepti } + @Test + public void testDecrementCache() throws ConcurrentModificationException, IOException { + Map cachedAttributes = new HashMap<>(); + cachedAttributes.put("both", "notifyValue"); + cachedAttributes.put("uuid", "notifyUuid"); + cachedAttributes.put("notify.only", "notifyValue"); + + // Setup existing cache entry. + final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service); + + // A flow file comes in Notify and increment the counter + protocol.notify("key", "counter", 1, cachedAttributes); + + // another flow files comes in Notify and increment the counter + protocol.notify("key", "counter", 1, cachedAttributes); + + runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); + runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}"); + runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "1"); + runner.assertValid(); + + final Map waitAttributes = new HashMap<>(); + waitAttributes.put("releaseSignalAttribute", "key"); + waitAttributes.put("signalCounterName", "counter"); + waitAttributes.put("wait.only", "waitValue"); + waitAttributes.put("both", "waitValue"); + waitAttributes.put("uuid", UUID.randomUUID().toString()); + String flowFileContent = "content"; + runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); + + /* + * 1st iteration + */ + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); + MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + outputFlowFile.assertAttributeEquals("wait.counter.counter", "2"); + + // expect counter to be decremented to 1 + assertEquals("1", Long.toString(protocol.getSignal("key").getCount("counter"))); + + // introduce a second flow file with the same signal attribute + runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); + + /* + * 2nd iteration + */ + runner.clearTransferState(); + runner.run(); + runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); + outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); + outputFlowFile.assertAttributeEquals("wait.counter.counter", "1"); + + assertNull("The key no longer exist", protocol.getSignal("key")); + runner.clearTransferState(); + } + private class TestIteration { final List released = new ArrayList<>(); final List waiting = new ArrayList<>(); From e7889445fc847ca0c8527bfa3397d24ea58bf8ee Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Mon, 14 Aug 2017 16:00:09 +0900 Subject: [PATCH 2/2] NIFI-4028: Refactored Wait processor. - Consolidated implementation for the cases of releasableFlowCount is 1 or more, in order to reduce complexity and behavior differences - Added 'consumed' counter when total counter is used to release incoming FlowFiles - Fixed method name typo, releaseCandidates --- .../apache/nifi/processors/standard/Wait.java | 37 ++---- .../standard/WaitNotifyProtocol.java | 36 ++++-- .../nifi/processors/standard/TestWait.java | 10 +- .../standard/TestWaitNotifyProtocol.java | 109 +++++++++++++++++- 4 files changed, 149 insertions(+), 43 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index 9016e0dca3d1..4e5ae5dd32c2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -305,6 +305,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue(); final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode); final AtomicReference signalRef = new AtomicReference<>(); + // This map contains original counts before those are consumed to release incoming FlowFiles. + final HashMap originalSignalCounts = new HashMap<>(); final Consumer transferToFailure = flowFile -> { flowFile = session.penalize(flowFile); @@ -324,7 +326,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } final List flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream() - .map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList()); + .map(f -> copySignalAttributes(session, f, signalRef.get(), originalSignalCounts, replaceOriginalAttributes)).collect(Collectors.toList()); session.transfer(flowFilesWithSignalAttributes, relationship); }; @@ -349,6 +351,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // get notifying signal try { signal = protocol.getSignal(signalId); + if (signal != null) { + originalSignalCounts.putAll(signal.getCounts()); + } signalRef.set(signal); } catch (final IOException e) { throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e); @@ -423,37 +428,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session boolean waitProgressed = false; if (signal != null && !candidates.isEmpty()) { - if (releasableFlowFileCount > 1) { - signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates, + if (releasableFlowFileCount > 0) { + signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates, released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released), waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting)); + waitCompleted = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0; waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty(); } else { - // releasableFlowFileCount = 0 or 1 boolean reachedTargetCount = StringUtils.isBlank(targetCounterName) ? signal.isTotalCountReached(targetCount) : signal.isCountReached(targetCounterName, targetCount); if (reachedTargetCount) { - if (releasableFlowFileCount == 0) { - getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates); - } else { - // releasableFlowFileCount = 1 - getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0)); - getFlowFilesFor.apply(REL_WAIT).addAll(candidates); - // If releasableFlowFileCount == 0, leave signal as it is, - // so that any number of FlowFile can be released as long as target count condition matches. - - // update the counter - try { - waitCompleted = targetCounterName == null ? true : - protocol.notify(signalId, targetCounterName, (int) -targetCount, null).getCount(targetCounterName) == 0L; - } catch (final IOException e) { - session.rollback(); - throw new ProcessException(String.format("Unable to communicate with cache while updating %s due to %s", signalId, e), e); - } - } + getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates); } else { getFlowFilesFor.apply(REL_WAIT).addAll(candidates); } @@ -478,7 +466,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } - private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) { + private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map originalCount, final boolean replaceOriginal) { if (signal == null) { return flowFile; } @@ -496,8 +484,7 @@ private FlowFile copySignalAttributes(final ProcessSession session, final FlowFi } // Copy counter attributes - final Map counts = signal.getCounts(); - final long totalCount = counts.entrySet().stream().mapToLong(e -> { + final long totalCount = originalCount.entrySet().stream().mapToLong(e -> { final Long count = e.getValue(); attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count)); return count; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java index 61834553ffc8..2c9c9fdfcc5d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.standard; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; @@ -45,6 +46,7 @@ public class WaitNotifyProtocol { private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class); public static final String DEFAULT_COUNT_NAME = "default"; + public static final String CONSUMED_COUNT_NAME = "consumed"; private static final int MAX_REPLACE_RETRY_COUNT = 5; private static final int REPLACE_RETRY_WAIT_MILLIS = 10; @@ -86,9 +88,13 @@ public void setAttributes(Map attributes) { this.attributes = attributes; } + @JsonIgnore + public long getTotalCount() { + return counts.values().stream().mapToLong(Long::longValue).sum(); + } + public boolean isTotalCountReached(final long targetCount) { - final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum(); - return totalCount >= targetCount; + return getTotalCount() >= targetCount; } public boolean isCountReached(final String counterName, final long targetCount) { @@ -96,6 +102,10 @@ public boolean isCountReached(final String counterName, final long targetCount) } public long getCount(final String counterName) { + if (counterName == null || counterName.isEmpty()) { + return getTotalCount(); + } + final Long count = counts.get(counterName); return count != null ? count : 0; } @@ -115,7 +125,7 @@ public void setReleasableCount(int releasableCount) { * Caller of this method is responsible for updating cache storage after processing released and waiting candidates * by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.

* - * @param _counterName signal counter name to consume from. + * @param counterName signal counter name to consume from. If not specified, total counter is used, and 'consumed' counter is added to subtract consumed counters from total counter. * @param requiredCountForPass number of required signals to acquire a pass. * @param releasableCandidateCountPerPass number of releasable candidate per pass. * @param candidates candidates waiting for being allowed to pass. @@ -123,12 +133,9 @@ public void setReleasableCount(int releasableCount) { * @param waiting function to process candidates those should remain in waiting queue. * @param Type of candidate */ - public void releaseCandidatese(final String _counterName, final long requiredCountForPass, - final int releasableCandidateCountPerPass, final List candidates, - final Consumer> released, final Consumer> waiting) { - - // counterName is mandatory otherwise, we can't decide which counter to convert into pass count. - final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName; + public void releaseCandidates(final String counterName, final long requiredCountForPass, + final int releasableCandidateCountPerPass, final List candidates, + final Consumer> released, final Consumer> waiting) { final int candidateSize = candidates.size(); if (releasableCount < candidateSize) { @@ -137,11 +144,18 @@ public void releaseCandidatese(final String _counterName, final long require final long signalCount = getCount(counterName); releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass; final long reducedSignalCount = signalCount % requiredCountForPass; - counts.put(counterName, reducedSignalCount); + if (counterName != null && !counterName.isEmpty()) { + // Update target counter with reduced count. + counts.put(counterName, reducedSignalCount); + } else { + // If target counter name is not specified, add consumed count to subtract from accumulated total count. + Long consumedCount = counts.getOrDefault(CONSUMED_COUNT_NAME, 0L); + consumedCount -= signalCount - reducedSignalCount; + counts.put(CONSUMED_COUNT_NAME, consumedCount); + } } int releaseCount = Math.min(releasableCount, candidateSize); - released.accept(candidates.subList(0, releaseCount)); waiting.accept(candidates.subList(releaseCount, candidateSize)); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java index 012dbbe8ade7..a4df2f37e99b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java @@ -460,8 +460,6 @@ public void testWaitForSpecificCount() throws InitializationException, IOExcepti runner.clearTransferState(); - assertNull("The key no longer exist", protocol.getSignal("key")); - } @Test @@ -502,8 +500,9 @@ public void testDecrementCache() throws ConcurrentModificationException, IOExcep MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); outputFlowFile.assertAttributeEquals("wait.counter.counter", "2"); - // expect counter to be decremented to 1 - assertEquals("1", Long.toString(protocol.getSignal("key").getCount("counter"))); + // expect counter to be decremented to 0 and releasable count remains 1. + assertEquals("0", Long.toString(protocol.getSignal("key").getCount("counter"))); + assertEquals("1", Long.toString(protocol.getSignal("key").getReleasableCount())); // introduce a second flow file with the same signal attribute runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes); @@ -515,7 +514,8 @@ public void testDecrementCache() throws ConcurrentModificationException, IOExcep runner.run(); runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1); outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0); - outputFlowFile.assertAttributeEquals("wait.counter.counter", "1"); + // All counters are consumed. + outputFlowFile.assertAttributeEquals("wait.counter.counter", "0"); assertNull("The key no longer exist", protocol.getSignal("key")); runner.clearTransferState(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java index e3f982c8e128..01983d523088 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.nifi.processors.standard.WaitNotifyProtocol.CONSUMED_COUNT_NAME; import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -267,12 +268,12 @@ public void testReleaseCandidate() throws Exception { final List waiting = new ArrayList<>(); // Test default name. - final String counterName = null; + final String counterName = DEFAULT_COUNT_NAME; final BiConsumer releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> { released.clear(); waiting.clear(); - signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates, + signal.releaseCandidates(counterName, requiredCountForPass, releasableCandidatePerPass, candidates, r -> released.addAll(r), w -> waiting.addAll(w)); }; @@ -336,4 +337,108 @@ public void testReleaseCandidate() throws Exception { } + + @Test + public void testReleaseCandidateTotal() throws Exception { + final List candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList()); + final Signal signal = new Signal(); + final List released = new ArrayList<>(); + final List waiting = new ArrayList<>(); + + // Test empty counter name, should use total counters. + final String emptyCounterName = null; + + final BiConsumer releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> { + released.clear(); + waiting.clear(); + signal.releaseCandidates(emptyCounterName, requiredCountForPass, releasableCandidatePerPass, candidates, + r -> released.addAll(r), w -> waiting.addAll(w)); + }; + + final String counterA = "counterA"; + final String counterB = "counterB"; + final String counterC = "counterC"; + + final Field releasableCount = Signal.class.getDeclaredField("releasableCount"); + releasableCount.setAccessible(true); + + // No counter, should wait. + releaseCandidate.accept(3L, 1); + assertEquals(0, released.size()); + assertEquals(10, waiting.size()); + assertEquals(0, signal.getCount(emptyCounterName)); + assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter is not enough yet. + signal.getCounts().put(counterA, 1L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 1); + assertEquals(0, released.size()); + assertEquals(10, waiting.size()); + assertEquals(1, signal.getCount(emptyCounterName)); // Counter incremented, but not enough + assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target. + signal.getCounts().put(counterA, 1L); + signal.getCounts().put(counterB, 1L); + signal.getCounts().put(counterC, 1L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 1); + assertEquals(1, released.size()); + assertEquals(9, waiting.size()); + assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release + assertEquals(-3, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target for two candidates. + signal.getCounts().put(counterA, 1L); + signal.getCounts().put(counterB, 2L); + signal.getCounts().put(counterC, 3L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 1); + assertEquals(2, released.size()); + assertEquals(8, waiting.size()); + assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release + assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target for two candidates, and reminder is 2. + signal.getCounts().put(counterA, 3L); + signal.getCounts().put(counterB, 3L); + signal.getCounts().put(counterC, 5L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 1); + assertEquals(3, released.size()); // 11 / 3 = 3 + assertEquals(7, waiting.size()); + assertEquals(2, signal.getCount(emptyCounterName)); + assertEquals(-9, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // Counter reached the target for two pass count and each pass can release 2 candidates. + signal.getCounts().put(counterA, 1L); + signal.getCounts().put(counterB, 2L); + signal.getCounts().put(counterC, 3L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 2); + assertEquals(4, released.size()); // (6 / 3) * 2 = 4 + assertEquals(6, waiting.size()); + assertEquals(0, signal.getCount(emptyCounterName)); + assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME)); + assertEquals(0, releasableCount.getInt(signal)); + + // If there are counts more than enough to release current candidates, unused releasableCount should remain. + signal.getCounts().put(counterA, 10L); + signal.getCounts().put(counterB, 20L); + signal.getCounts().put(counterC, 20L); + signal.getCounts().remove(CONSUMED_COUNT_NAME); + releaseCandidate.accept(3L, 2); + assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10. + assertEquals(0, waiting.size()); + assertEquals(2, signal.getCount(emptyCounterName)); // 50 % 3 = 2. + assertEquals(-48, signal.getCount(CONSUMED_COUNT_NAME)); // 50 % 3 = 2. + assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22. + + } } \ No newline at end of file