Skip to content

Commit

Permalink
NIFI-3545: Release M FlowFilews once N signals arrive
Browse files Browse the repository at this point in the history
- Support multiplle incoming FlowFiles to Wait processor, up to Wait
  Buffer Count
- Added Releasable FlowFile Count, which controls how many FlowFiles can
  be released when wait condition is met
- Added special meaning to Notify delta Zero(0) to clear a signal
  counter back to zero

  This closes apache#1554

Signed-off-by: Aldrin Piri <aldrin@apache.org>
  • Loading branch information
ijokarumawak authored and aperepel committed Mar 20, 2017
1 parent 74e483d commit 182393e
Show file tree
Hide file tree
Showing 5 changed files with 617 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ public class Notify extends AbstractProcessor {
"be evaluated against a FlowFile in order to determine the signal counter delta. " +
"Specify how much the counter should increase. " +
"For example, if multiple signal events are processed at upstream flow in batch oriented way, " +
"the number of events processed can be notified with this property at once.")
"the number of events processed can be notified with this property at once. " +
"Zero (0) has a special meaning, it clears target count back to 0, which is especially useful when used with Wait " +
Wait.RELEASABLE_FLOWFILE_COUNT.getDisplayName() + " = Zero (0) mode, to provide 'open-close-gate' type of flow control. " +
"One (1) can open a corresponding Wait processor, and Zero (0) can negate it as if closing a gate.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
Expand Down Expand Up @@ -171,7 +174,8 @@ private class SignalBuffer {

int incrementDelta(final String counterName, final int delta) {
int current = deltas.containsKey(counterName) ? deltas.get(counterName) : 0;
int updated = current + delta;
// Zero (0) clears count.
int updated = delta == 0 ? 0 : current + delta;
deltas.put(counterName, updated);
return updated;
}
Expand Down
Loading

0 comments on commit 182393e

Please sign in to comment.