Skip to content

Commit

Permalink
NIFI-190: Initial commit of Wait and Notify processors
Browse files Browse the repository at this point in the history
This closes #1329.

Signed-off-by: Bryan Bende <bbende@apache.org>
  • Loading branch information
Joe Gresock authored and bbende committed Dec 16, 2016
1 parent 9a5986b commit 34627f7
Show file tree
Hide file tree
Showing 7 changed files with 1,079 additions and 0 deletions.
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;

@EventDriven
@SupportsBatching
@Tags({"map", "cache", "notify", "distributed", "signal", "release"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Caches a release signal identifier in the distributed cache, optionally along with "
+ "the FlowFile's attributes. Any flow files held at a corresponding Wait processor will be "
+ "released once this signal in the cache is discovered.")
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.Wait"})
public class Notify extends AbstractProcessor {

// Identifies the distributed map cache client
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("The Controller Service that is used to check for release signals from a corresponding Notify processor")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();

// Selects the FlowFile attribute or expression, whose value is used as cache key
public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Release Signal Identifier")
.description("A value, or the results of an Attribute Expression Language statement, which will " +
"be evaluated against a FlowFile in order to determine the release signal cache key")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.expressionLanguageSupported(true)
.build();

// Specifies an optional regex used to identify which attributes to cache
public static final PropertyDescriptor ATTRIBUTE_CACHE_REGEX = new PropertyDescriptor.Builder()
.name("Attribute Cache Regex")
.description("Any attributes whose names match this regex will be stored in the distributed cache to be "
+ "copied to any FlowFiles released from a corresponding Wait processor. Note that the "
+ "uuid attribute will not be cached regardless of this value. If blank, no attributes "
+ "will be cached.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(false)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles where the release signal has been successfully entered in the cache will be routed to this relationship")
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship")
.build();

private final Set<Relationship> relationships;

private final Serializer<String> keySerializer = new StringSerializer();
private final Serializer<Map<String, String>> valueSerializer = new FlowFileAttributesSerializer();

public Notify() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(RELEASE_SIGNAL_IDENTIFIER);
descriptors.add(DISTRIBUTED_CACHE_SERVICE);
descriptors.add(ATTRIBUTE_CACHE_REGEX);
return descriptors;
}

@Override
public Set<Relationship> getRelationships() {
return relationships;
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

final ComponentLog logger = getLogger();

// cache key is computed from attribute 'RELEASE_SIGNAL_IDENTIFIER' with expression language support
final String cacheKey = context.getProperty(RELEASE_SIGNAL_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();

// if the computed value is null, or empty, we transfer the flow file to failure relationship
if (StringUtils.isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Release Signal Identifier", new Object[] {flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}

// the cache client used to interact with the distributed cache
final DistributedMapCacheClient cache = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);

try {
final String attributeCacheRegex = (context.getProperty(ATTRIBUTE_CACHE_REGEX).isSet())
? context.getProperty(ATTRIBUTE_CACHE_REGEX).getValue()
: null;

Map<String, String> attributesToCache = new HashMap<>();
if (StringUtils.isNotEmpty(attributeCacheRegex)) {
attributesToCache = flowFile.getAttributes().entrySet()
.stream().filter(e -> (!e.getKey().equals("uuid") && e.getKey().matches(attributeCacheRegex)))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
}

if (logger.isDebugEnabled()) {
logger.debug("Cached release signal identifier {} from FlowFile {}", new Object[] {cacheKey, flowFile});
}

cache.put(cacheKey, attributesToCache, keySerializer, valueSerializer);

session.transfer(flowFile, REL_SUCCESS);
} catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[] {flowFile, e});
}
}

/**
* Simple string serializer, used for serializing the cache key
*/
public static class StringSerializer implements Serializer<String> {

@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}

}

0 comments on commit 34627f7

Please sign in to comment.