Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
final AtomicReference<Signal> signalRef = new AtomicReference<>();
// This map contains original counts before those are consumed to release incoming FlowFiles.
final HashMap<String, Long> originalSignalCounts = new HashMap<>();

final Consumer<FlowFile> transferToFailure = flowFile -> {
flowFile = session.penalize(flowFile);
Expand All @@ -324,7 +326,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
.map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList());
.map(f -> copySignalAttributes(session, f, signalRef.get(), originalSignalCounts, replaceOriginalAttributes)).collect(Collectors.toList());
session.transfer(flowFilesWithSignalAttributes, relationship);
};

Expand All @@ -349,6 +351,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
// get notifying signal
try {
signal = protocol.getSignal(signalId);
if (signal != null) {
originalSignalCounts.putAll(signal.getCounts());
}
signalRef.set(signal);
} catch (final IOException e) {
throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
Expand Down Expand Up @@ -423,29 +428,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
boolean waitProgressed = false;
if (signal != null && !candidates.isEmpty()) {

if (releasableFlowFileCount > 1) {
signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates,
if (releasableFlowFileCount > 0) {
signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates,
released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
waitCompleted = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0;
waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();

} else {
// releasableFlowFileCount = 0 or 1
boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
? signal.isTotalCountReached(targetCount)
: signal.isCountReached(targetCounterName, targetCount);

if (reachedTargetCount) {
if (releasableFlowFileCount == 0) {
getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
} else {
// releasableFlowFileCount = 1
getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0));
getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
// If releasableFlowFileCount == 0, leave signal as it is,
// so that any number of FlowFile can be released as long as target count condition matches.
waitCompleted = true;
}
getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
} else {
getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
}
Expand All @@ -470,7 +466,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

}

private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) {
if (signal == null) {
return flowFile;
}
Expand All @@ -488,8 +484,7 @@ private FlowFile copySignalAttributes(final ProcessSession session, final FlowFi
}

// Copy counter attributes
final Map<String, Long> counts = signal.getCounts();
final long totalCount = counts.entrySet().stream().mapToLong(e -> {
final long totalCount = originalCount.entrySet().stream().mapToLong(e -> {
final Long count = e.getValue();
attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
return count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class WaitNotifyProtocol {
private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);

public static final String DEFAULT_COUNT_NAME = "default";
public static final String CONSUMED_COUNT_NAME = "consumed";
private static final int MAX_REPLACE_RETRY_COUNT = 5;
private static final int REPLACE_RETRY_WAIT_MILLIS = 10;

Expand Down Expand Up @@ -86,16 +88,24 @@ public void setAttributes(Map<String, String> attributes) {
this.attributes = attributes;
}

@JsonIgnore
public long getTotalCount() {
return counts.values().stream().mapToLong(Long::longValue).sum();
}

public boolean isTotalCountReached(final long targetCount) {
final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum();
return totalCount >= targetCount;
return getTotalCount() >= targetCount;
}

public boolean isCountReached(final String counterName, final long targetCount) {
return getCount(counterName) >= targetCount;
}

public long getCount(final String counterName) {
if (counterName == null || counterName.isEmpty()) {
return getTotalCount();
}

final Long count = counts.get(counterName);
return count != null ? count : 0;
}
Expand All @@ -115,20 +125,17 @@ public void setReleasableCount(int releasableCount) {
* Caller of this method is responsible for updating cache storage after processing released and waiting candidates
* by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.</p>
*
* @param _counterName signal counter name to consume from.
* @param counterName signal counter name to consume from. If not specified, total counter is used, and 'consumed' counter is added to subtract consumed counters from total counter.
* @param requiredCountForPass number of required signals to acquire a pass.
* @param releasableCandidateCountPerPass number of releasable candidate per pass.
* @param candidates candidates waiting for being allowed to pass.
* @param released function to process allowed candidates to pass.
* @param waiting function to process candidates those should remain in waiting queue.
* @param <E> Type of candidate
*/
public <E> void releaseCandidatese(final String _counterName, final long requiredCountForPass,
final int releasableCandidateCountPerPass, final List<E> candidates,
final Consumer<List<E>> released, final Consumer<List<E>> waiting) {

// counterName is mandatory otherwise, we can't decide which counter to convert into pass count.
final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName;
public <E> void releaseCandidates(final String counterName, final long requiredCountForPass,
final int releasableCandidateCountPerPass, final List<E> candidates,
final Consumer<List<E>> released, final Consumer<List<E>> waiting) {

final int candidateSize = candidates.size();
if (releasableCount < candidateSize) {
Expand All @@ -137,11 +144,18 @@ public <E> void releaseCandidatese(final String _counterName, final long require
final long signalCount = getCount(counterName);
releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass;
final long reducedSignalCount = signalCount % requiredCountForPass;
counts.put(counterName, reducedSignalCount);
if (counterName != null && !counterName.isEmpty()) {
// Update target counter with reduced count.
counts.put(counterName, reducedSignalCount);
} else {
// If target counter name is not specified, add consumed count to subtract from accumulated total count.
Long consumedCount = counts.getOrDefault(CONSUMED_COUNT_NAME, 0L);
consumedCount -= signalCount - reducedSignalCount;
counts.put(CONSUMED_COUNT_NAME, consumedCount);
}
}

int releaseCount = Math.min(releasableCount, candidateSize);

released.accept(candidates.subList(0, releaseCount));
waiting.accept(candidates.subList(releaseCount, candidateSize));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -459,8 +460,65 @@ public void testWaitForSpecificCount() throws InitializationException, IOExcepti

runner.clearTransferState();

assertNull("The key no longer exist", protocol.getSignal("key"));
}

@Test
public void testDecrementCache() throws ConcurrentModificationException, IOException {
Map<String, String> cachedAttributes = new HashMap<>();
cachedAttributes.put("both", "notifyValue");
cachedAttributes.put("uuid", "notifyUuid");
cachedAttributes.put("notify.only", "notifyValue");

// Setup existing cache entry.
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);

// A flow file comes in Notify and increment the counter
protocol.notify("key", "counter", 1, cachedAttributes);

// another flow files comes in Notify and increment the counter
protocol.notify("key", "counter", 1, cachedAttributes);

runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "1");
runner.assertValid();

final Map<String, String> waitAttributes = new HashMap<>();
waitAttributes.put("releaseSignalAttribute", "key");
waitAttributes.put("signalCounterName", "counter");
waitAttributes.put("wait.only", "waitValue");
waitAttributes.put("both", "waitValue");
waitAttributes.put("uuid", UUID.randomUUID().toString());
String flowFileContent = "content";
runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);

/*
* 1st iteration
*/
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");

// expect counter to be decremented to 0 and releasable count remains 1.
assertEquals("0", Long.toString(protocol.getSignal("key").getCount("counter")));
assertEquals("1", Long.toString(protocol.getSignal("key").getReleasableCount()));

// introduce a second flow file with the same signal attribute
runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);

/*
* 2nd iteration
*/
runner.clearTransferState();
runner.run();
runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
// All counters are consumed.
outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");

assertNull("The key no longer exist", protocol.getSignal("key"));
runner.clearTransferState();
}

private class TestIteration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.nifi.processors.standard.WaitNotifyProtocol.CONSUMED_COUNT_NAME;
import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -267,12 +268,12 @@ public void testReleaseCandidate() throws Exception {
final List<Integer> waiting = new ArrayList<>();

// Test default name.
final String counterName = null;
final String counterName = DEFAULT_COUNT_NAME;

final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
released.clear();
waiting.clear();
signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
signal.releaseCandidates(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
r -> released.addAll(r), w -> waiting.addAll(w));
};

Expand Down Expand Up @@ -336,4 +337,108 @@ public void testReleaseCandidate() throws Exception {

}


@Test
public void testReleaseCandidateTotal() throws Exception {
final List<Integer> candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
final Signal signal = new Signal();
final List<Integer> released = new ArrayList<>();
final List<Integer> waiting = new ArrayList<>();

// Test empty counter name, should use total counters.
final String emptyCounterName = null;

final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
released.clear();
waiting.clear();
signal.releaseCandidates(emptyCounterName, requiredCountForPass, releasableCandidatePerPass, candidates,
r -> released.addAll(r), w -> waiting.addAll(w));
};

final String counterA = "counterA";
final String counterB = "counterB";
final String counterC = "counterC";

final Field releasableCount = Signal.class.getDeclaredField("releasableCount");
releasableCount.setAccessible(true);

// No counter, should wait.
releaseCandidate.accept(3L, 1);
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName));
assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));

// Counter is not enough yet.
signal.getCounts().put(counterA, 1L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 1);
assertEquals(0, released.size());
assertEquals(10, waiting.size());
assertEquals(1, signal.getCount(emptyCounterName)); // Counter incremented, but not enough
assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));

// Counter reached the target.
signal.getCounts().put(counterA, 1L);
signal.getCounts().put(counterB, 1L);
signal.getCounts().put(counterC, 1L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 1);
assertEquals(1, released.size());
assertEquals(9, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
assertEquals(-3, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));

// Counter reached the target for two candidates.
signal.getCounts().put(counterA, 1L);
signal.getCounts().put(counterB, 2L);
signal.getCounts().put(counterC, 3L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 1);
assertEquals(2, released.size());
assertEquals(8, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));

// Counter reached the target for two candidates, and reminder is 2.
signal.getCounts().put(counterA, 3L);
signal.getCounts().put(counterB, 3L);
signal.getCounts().put(counterC, 5L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 1);
assertEquals(3, released.size()); // 11 / 3 = 3
assertEquals(7, waiting.size());
assertEquals(2, signal.getCount(emptyCounterName));
assertEquals(-9, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));

// Counter reached the target for two pass count and each pass can release 2 candidates.
signal.getCounts().put(counterA, 1L);
signal.getCounts().put(counterB, 2L);
signal.getCounts().put(counterC, 3L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 2);
assertEquals(4, released.size()); // (6 / 3) * 2 = 4
assertEquals(6, waiting.size());
assertEquals(0, signal.getCount(emptyCounterName));
assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
assertEquals(0, releasableCount.getInt(signal));

// If there are counts more than enough to release current candidates, unused releasableCount should remain.
signal.getCounts().put(counterA, 10L);
signal.getCounts().put(counterB, 20L);
signal.getCounts().put(counterC, 20L);
signal.getCounts().remove(CONSUMED_COUNT_NAME);
releaseCandidate.accept(3L, 2);
assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10.
assertEquals(0, waiting.size());
assertEquals(2, signal.getCount(emptyCounterName)); // 50 % 3 = 2.
assertEquals(-48, signal.getCount(CONSUMED_COUNT_NAME)); // 50 % 3 = 2.
assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.

}
}