Skip to content

Commit

Permalink
NIFI-3431: Support batch update in Notify processor
Browse files Browse the repository at this point in the history
- Added Signal Counter Delta property
- Added Signal Buffer Count property
- Added processor property name and display name
- Changed IOException handling from routing it to failure to throw
  RuntimeException, so that NiFi framework can yield the processor for a while and try again

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#1466.
  • Loading branch information
ijokarumawak authored and aperepel committed Mar 20, 2017
1 parent efa6687 commit 31bc287
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
Expand All @@ -35,6 +34,7 @@
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
Expand All @@ -59,54 +59,83 @@ public class Notify extends AbstractProcessor {

// Identifies the distributed map cache client
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor")
.required(true)
.identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();
.name("distributed-cache-service")
.displayName("Distributed Cache Service")
.description("The Controller Service that is used to cache release signals in order to release files queued at a corresponding Wait processor")
.required(true)
.identifiesControllerService(AtomicDistributedMapCacheClient.class)
.build();

// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Release Signal Identifier")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the release signal cache key")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.build();
.name("release-signal-id")
.displayName("Release Signal Identifier")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the release signal cache key")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.build();

public static final PropertyDescriptor SIGNAL_COUNTER_NAME = new PropertyDescriptor.Builder()
.name("Signal Counter Name")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the signal counter name. " +
"Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " +
"of different types of events, such as success or failure, or destination data source names, etc.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME)
.build();
.name("signal-counter-name")
.displayName("Signal Counter Name")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the signal counter name. " +
"Signal counter name is useful when a corresponding Wait processor needs to know the number of occurrences " +
"of different types of events, such as success or failure, or destination data source names, etc.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.defaultValue(WaitNotifyProtocol.DEFAULT_COUNT_NAME)
.build();

public static final PropertyDescriptor SIGNAL_COUNTER_DELTA = new PropertyDescriptor.Builder()
.name("signal-counter-delta")
.displayName("Signal Counter Delta")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the signal counter delta. " +
"Specify how much the counter should increase. " +
"For example, if multiple signal events are processed at upstream flow in batch oriented way, " +
"the number of events processed can be notified with this property at once.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.defaultValue("1")
.build();

public static final PropertyDescriptor SIGNAL_BUFFER_COUNT = new PropertyDescriptor.Builder()
.name("signal-buffer-count")
.displayName("Signal Buffer Count")
.description("Specify the maximum number of incoming flow files that can be buffered until signals are notified to cache service. " +
"The more buffer can provide the better performance, as it reduces the number of interactions with cache service " +
"by grouping signals by signal identifier when multiple incoming flow files share the same signal identifier.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.build();

// Specifies an optional regex used to identify which attributes to cache
public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder()
.name("Attribute Cache Regex")
.description("Any attributes whose names match this regex will be stored in the distributed cache to be "
+ "copied to any FlowFiles released from a corresponding Wait processor. Note that the "
+ "uuid attribute will not be cached regardless of this value. If blank, no attributes "
+ "will be cached.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(false)
.build();
.name("attribute-cache-regex")
.displayName("Attribute Cache Regex")
.description("Any attributes whose names match this regex will be stored in the distributed cache to be "
+ "copied to any FlowFiles released from a corresponding Wait processor. Note that the "
+ "uuid attribute will not be cached regardless of this value. If blank, no attributes "
+ "will be cached.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(false)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles where the release signal has been successfully entered in the cache will be routed to this relationship")
.build();
.name("success")
.description("All FlowFiles where the release signal has been successfully entered in the cache will be routed to this relationship")
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship")
.build();
.name("failure")
.description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship")
.build();

private final Set<Relationship> relationships;

Expand All @@ -122,6 +151,8 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
descriptors.add(SIGNAL_COUNTER_NAME);
descriptors.add(SIGNAL_COUNTER_DELTA);
descriptors.add(SIGNAL_BUFFER_COUNT);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_CACHE_REGEX);
return descriptors;
Expand All @@ -132,58 +163,100 @@ public Set<Relationship> getRelationships() {
return relationships;
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
private class SignalBuffer {

FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Map<String, Integer> deltas = new HashMap<>();
final Map<String, String> attributesToCache = new HashMap<>();
final List<FlowFile> flowFiles = new ArrayList<>();

final ComponentLog logger = getLogger();
int incrementDelta(final String counterName, final int delta) {
int current = deltas.containsKey(counterName) ? deltas.get(counterName) : 0;
int updated = current + delta;
deltas.put(counterName, updated);
return updated;
}
}

// Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final String signalId = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
final String counterName = context.getProperty(SIGNAL_COUNTER_NAME).evaluateAttributeExpressions(flowFile).getValue();
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

// if the computed value is null, or empty, we transfer the flow file to failure relationship
if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final ComponentLog logger = getLogger();
final PropertyValue signalIdProperty = context.getProperty(RELEASE_SIGNAL_IDENTIFIER);
final PropertyValue counterNameProperty = context.getProperty(SIGNAL_COUNTER_NAME);
final PropertyValue deltaProperty = context.getProperty(SIGNAL_COUNTER_DELTA);
final String attributeCacheRegex = context.getProperty(ATTRIBUTE_CACHE_REGEX).getValue();
final Integer bufferCount = context.getProperty(SIGNAL_BUFFER_COUNT).asInteger();

// the cache client used to interact with the distributed cache
// the cache client used to interact with the distributed cache.
final AtomicDistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(AtomicDistributedMapCacheClient.class);
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);

try {
final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet())
? context.getProperty(ATTRIBUTE_CACHE_REGEX).getValue()
: null;
final Map<String, SignalBuffer> signalBuffers = new HashMap<>();

for (int i = 0; i < bufferCount; i++) {

final FlowFile flowFile = session.get();
if (flowFile == null) {
break;
}

// Signal id is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final String signalId = signalIdProperty.evaluateAttributeExpressions(flowFile).getValue();

// if the computed value is null, or empty, we transfer the flow file to failure relationship
if (StringUtils.isBlank(signalId)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
session.transfer(flowFile, REL_FAILURE);
continue;
}

String counterName = counterNameProperty.evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(counterName)) {
counterName = WaitNotifyProtocol.DEFAULT_COUNT_NAME;
}

int delta = 1;
if (deltaProperty.isSet()) {
final String deltaStr = deltaProperty.evaluateAttributeExpressions(flowFile).getValue();
try {
delta = Integer.parseInt(deltaStr);
} catch (final NumberFormatException e) {
logger.error("Failed to calculate delta for FlowFile {} due to {}", new Object[] {flowFile, e}, e);
session.transfer(flowFile, REL_FAILURE);
continue;
}
}

if (!signalBuffers.containsKey(signalId)) {
signalBuffers.put(signalId, new SignalBuffer());
}
final SignalBuffer signalBuffer = signalBuffers.get(signalId);

Map<String, String> attributesToCache = new HashMap<>();
if (StringUtils.isNotEmpty(attributeCacheRegex)) {
attributesToCache = flowFile.getAttributes().entrySet()
flowFile.getAttributes().entrySet()
.stream().filter(e -> (!e.getKey().equals("uuid") && e.getKey().matches(attributeCacheRegex)))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
.forEach(e -> signalBuffer.attributesToCache.put(e.getKey(), e.getValue()));
}

signalBuffer.incrementDelta(counterName, delta);
signalBuffer.flowFiles.add(flowFile);

if (logger.isDebugEnabled()) {
logger.debug("Cached release signal identifier {} counterName {} from FlowFile {}", new Object[] {signalId, counterName, flowFile});
}

// In case of ConcurrentModificationException, just throw the exception so that processor can
// retry after yielding for a while.
protocol.notify(signalId, counterName, 1, attributesToCache);

session.transfer(flowFile, REL_SUCCESS);
} catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
}

signalBuffers.forEach((signalId, signalBuffer) -> {
// In case of Exception, just throw the exception so that processor can
// retry after yielding for a while.
try {
protocol.notify(signalId, signalBuffer.deltas, signalBuffer.attributesToCache);
session.transfer(signalBuffer.flowFiles, REL_SUCCESS);
} catch (IOException e) {
throw new RuntimeException(String.format("Unable to communicate with cache when processing %s due to %s", signalId, e), e);
}
});
}

}
Loading

0 comments on commit 31bc287

Please sign in to comment.