From 7f9f3ff7b299ad6761aabf0a33ea5b375fcaf87a Mon Sep 17 00:00:00 2001 From: jpercivall Date: Fri, 1 Apr 2016 17:27:42 -0400 Subject: [PATCH 1/6] NIFI-1582 added state to UpdateAttribute as well as updated a few parts that hadn't be touched in years (referenced the 'FlowFileMetadataEnhancer' processor'. Also added a 'NUMBER_VALIDATOR' to StandardValidators --- .../processor/util/StandardValidators.java | 20 ++ .../attributes/UpdateAttribute.java | 291 ++++++++++++------ .../additionalDetails.html | 70 ++++- .../attributes/TestUpdateAttribute.java | 229 +++++++++++++- 4 files changed, 494 insertions(+), 116 deletions(-) diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index a577bc8118cc..772aa8e97c5e 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -23,6 +23,8 @@ import java.nio.charset.Charset; import java.nio.charset.UnsupportedCharsetException; import java.time.Instant; +import java.text.NumberFormat; +import java.text.ParseException; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -126,6 +128,24 @@ public ValidationResult validate(final String subject, final String value, final } }; + public static final Validator NUMBER_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); + } + + String reason = null; + try { + NumberFormat.getInstance().parse(value); + } catch (ParseException e) { + reason = "not a valid Number"; + } + + return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build(); + } + }; + public static final Validator PORT_VALIDATOR = createLongValidator(1, 65535, true); /** diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index 08f4ee9ebc2d..ca130d4b7378 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.attributes; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -37,15 +38,21 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; import org.apache.nifi.expression.AttributeExpression; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -66,67 +73,20 @@ import org.apache.nifi.update.attributes.Rule; import org.apache.nifi.update.attributes.serde.CriteriaSerDe; -/** - * This processor supports updating flowfile attributes and can do so - * conditionally or unconditionally. It can also delete flowfile attributes - * that match a regular expression. - * - * Like the FlowFileMetadataEnhancer, it can - * be configured with an arbitrary number of optional properties to define how - * attributes should be updated. Each optional property represents an action - * that is applied to all incoming flow files. An action is comprised of an - * attribute key and a format string. The format string supports the following - * parameters. - * - * - * When creating the optional properties, enter the attribute key as the - * property name and the desired format string as the value. The optional - * properties are considered default actions and are applied unconditionally. - * - * In addition to the default actions, this processor has a user interface (UI) - * where conditional actions can be specified. In the UI, rules can be created. - * Rules are comprised of an arbitrary number of conditions and actions. In - * order for a rule to be activated, all conditions must evaluate to true. - * - * A rule condition is comprised of an attribute key and a regular expression. A - * condition evaluates to true when the flowfile contains the attribute - * specified and it's value matches the specified regular expression. - * - * A rule action follows the same definition as a rule above. It includes an - * attribute key and a format string. The format string supports the same - * parameters defined above. - * - * When a rule is activated (because conditions evaluate to true), all actions - * in that rule are executed. Once each action has been applied, any remaining - * default actions will be applied. This means that if rule action and a default - * action modify the same attribute, only the rule action will execute. Default - * actions will only execute when the attribute in question is not modified as - * part of an activated rule. - * - * The incoming flow file is cloned for each rule that is activated. If no rule - * is activated, any default actions are applied to the original flowfile and it - * is transferred. - * - * This processor only supports a SUCCESS relationship. - * - * Note: In order for configuration changes made in the custom UI to take - * effect, the processor must be stopped and started. - */ @EventDriven @SideEffectFree +@SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"attributes", "modification", "update", "delete", "Attribute Expression Language"}) +@Tags({"attributes", "modification", "update", "delete", "Attribute Expression Language", "state"}) @CapabilityDescription("Updates the Attributes for a FlowFile by using the Attribute Expression Language and/or deletes the attributes based on a regular expression") @DynamicProperty(name = "A FlowFile attribute to update", value = "The value to set it to", supportsExpressionLanguage = true, description = "Updates a FlowFile attribute specified by the Dynamic Property's key with the value specified by the Dynamic Property's value") @WritesAttribute(attribute = "See additional details", description = "This processor may write or remove zero or more attributes as described in additional details") +@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner." + + "State is stored either local or clustered depend on the property.") public class UpdateAttribute extends AbstractProcessor implements Searchable { + private Scope scope = null; private final AtomicReference criteriaCache = new AtomicReference<>(null); private final ConcurrentMap propertyValues = new ConcurrentHashMap<>(); @@ -162,19 +122,46 @@ public ValidationResult validate(String subject, String input, ValidationContext // static properties public static final PropertyDescriptor DELETE_ATTRIBUTES = new PropertyDescriptor.Builder() .name("Delete Attributes Expression") - .description("Regular expression for attributes to be deleted from flowfiles.") + .description("Regular expression for attributes to be deleted from FlowFiles.") .required(false) .addValidator(DELETE_PROPERTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + + public static final AllowableValue LOCATION_STATELESS = new AllowableValue("Stateless", "Stateless", "Do not store state."); + public static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "Store the state locally."); + public static final AllowableValue LOCATION_CLUSTER = new AllowableValue("Cluster", "Cluster", "Store the state at the cluster level."); + + public static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder() + .name("State Location") + .description("Select where or not state will be store and if so, where to store it. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " + + "FlowFile in a stateless manner. Selecting 'Local' or 'Cluster' will not only store the attributes on the FlowFile but also in the Processors state. See the 'Stateful Usage' " + + "topic of the 'Additional Details' section of this processor's documentation for more information") + .required(true) + .allowableValues(LOCATION_STATELESS, LOCATION_LOCAL, LOCATION_CLUSTER) + .defaultValue(LOCATION_STATELESS.getValue()) + .build(); + public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder() + .name("Stateful Variables Initial Value") + .description("If using state to set/reference variables then this value is used to set the initial value of the stateful variable. This will only be used in the @OnScheduled method " + + "when state does not contain a value for the variable.") + .required(false) + .defaultValue("0") + .addValidator(StandardValidators.NUMBER_VALIDATOR) + .build(); + // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() - .description("All FlowFiles are routed to this relationship").name("success").build(); + .description("All successful FlowFiles are routed to this relationship").name("success").build(); + public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder() + .description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here. If the processor is set to 'Stateless' then all FlowFiles will " + + "route to success").name("set state fail").build(); public UpdateAttribute() { final Set relationshipSet = new HashSet<>(); relationshipSet.add(REL_SUCCESS); + relationshipSet.add(REL_FAILED_SET_STATE); relationships = Collections.unmodifiableSet(relationshipSet); } @@ -187,24 +174,85 @@ public Set getRelationships() { protected List getSupportedPropertyDescriptors() { List descriptors = new ArrayList<>(); descriptors.add(DELETE_ATTRIBUTES); + descriptors.add(STATE_LOCATION); + descriptors.add(STATEFUL_VARIABLES_INIT_VALUE); return Collections.unmodifiableList(descriptors); } @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .required(false) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) - .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) - .expressionLanguageSupported(true) - .dynamic(true) - .build(); + if(scope != null){ + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } else { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + super.onPropertyModified(descriptor, oldValue, newValue); + + if (descriptor.equals(STATE_LOCATION)) { + if (LOCATION_CLUSTER.getValue().equalsIgnoreCase(newValue)) { + scope = Scope.CLUSTER; + } else if (LOCATION_LOCAL.getValue().equalsIgnoreCase(newValue)) { + scope = Scope.LOCAL; + } else { + scope = null; + } + } } @OnScheduled - public void clearPropertyValueMap() { + public void onScheduled(final ProcessContext context) throws IOException { + criteriaCache.set(CriteriaSerDe.deserialize(context.getAnnotationData())); + propertyValues.clear(); + + if(scope != null) { + StateManager stateManager = context.getStateManager(); + StateMap state = stateManager.getState(scope); + HashMap tempMap = new HashMap<>(); + tempMap.putAll(state.toMap()); + String initValue = context.getProperty(STATEFUL_VARIABLES_INIT_VALUE).getValue(); + + // Initialize the stateful default actions + for (PropertyDescriptor entry : context.getProperties().keySet()) { + if (entry.isDynamic()) { + if(!tempMap.containsKey(entry.getName()+"_state")) { + tempMap.put(entry.getName() + "_state", initValue); + } + } + } + + // Initialize the stateful actions if the criteria exists + final Criteria criteria = criteriaCache.get(); + if (criteria != null) { + for (Rule rule : criteria.getRules()) { + for (Action action : rule.getActions()) { + if (!tempMap.containsKey(action.getAttribute() + "_state")) { + tempMap.put(action.getAttribute() + "_state", initValue); + } + } + } + } + + context.getStateManager().setState(tempMap, scope); + } } @Override @@ -325,20 +373,12 @@ public Collection search(final SearchContext context) { } } - @OnScheduled - public void parseAnnotationData(final ProcessContext context) { - criteriaCache.set(CriteriaSerDe.deserialize(context.getAnnotationData())); - } - @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { final ComponentLog logger = getLogger(); final Criteria criteria = criteriaCache.get(); - List flowFiles = session.get(100); - if (flowFiles.isEmpty()) { - return; - } + FlowFile flowFile = session.get(); final Map properties = context.getProperties(); @@ -353,45 +393,70 @@ public void onTrigger(final ProcessContext context, final ProcessSession session // because is the original flowfile is used for all matching rules. in this // case the order of the matching rules is preserved in the list final Map> matchedRules = new HashMap<>(); + Map statefulAttributes = null; + + matchedRules.clear(); - for (FlowFile flowFile : flowFiles) { - matchedRules.clear(); + try { + if (scope != null) { + statefulAttributes = new HashMap<>(context.getStateManager().getState(scope).toMap()); + } else { + statefulAttributes = null; + } + } catch (IOException e) { + logger.error("Failed to update attributes for {} due to failing to get state; transferring FlowFile back to '{}'", new Object[]{flowFile, Relationship.SELF.getName()}, e); + session.transfer(flowFile); + context.yield(); + return; + } - // if there is update criteria specified, evaluate it - if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules)) { - // apply the actions for each rule and transfer the flowfile - for (final Map.Entry> entry : matchedRules.entrySet()) { - FlowFile match = entry.getKey(); - final List rules = entry.getValue(); + // if there is update criteria specified, evaluate it + if (criteria != null && evaluateCriteria(session, context, criteria, flowFile, matchedRules, statefulAttributes)) { + // apply the actions for each rule and transfer the flowfile + for (final Map.Entry> entry : matchedRules.entrySet()) { + FlowFile match = entry.getKey(); + final List rules = entry.getValue(); - // execute each matching rule(s) - match = executeActions(session, context, rules, defaultActions, match); + // execute each matching rule(s) + try { + match = executeActions(session, context, rules, defaultActions, match, statefulAttributes); logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{match, REL_SUCCESS.getName()}); // transfer the match session.getProvenanceReporter().modifyAttributes(match); session.transfer(match, REL_SUCCESS); + } catch (IOException e) { + logger.error("Failed to update attributes for {} due to a failure to set the state afterwards; transferring to '{}'", new Object[]{match, REL_FAILED_SET_STATE.getName()}, e); + session.transfer(match, REL_FAILED_SET_STATE); + return; } - } else { - // transfer the flowfile to no match (that has the default actions applied) - flowFile = executeActions(session, context, null, defaultActions, flowFile); + } + } else { + // transfer the flowfile to no match (that has the default actions applied) + try { + flowFile = executeActions(session, context, null, defaultActions, flowFile, statefulAttributes); logger.info("Updated attributes for {}; transferring to '{}'", new Object[]{flowFile, REL_SUCCESS.getName()}); session.getProvenanceReporter().modifyAttributes(flowFile); session.transfer(flowFile, REL_SUCCESS); + } catch (IOException e) { + logger.error("Failed to update attributes for {} due to failures setting state afterwards; transferring to '{}'", new Object[]{flowFile, REL_FAILED_SET_STATE.getName()}, e); + session.transfer(flowFile, REL_FAILED_SET_STATE); + return; } } } //Evaluates the specified Criteria on the specified flowfile. Clones the // specified flow file for each rule that is applied. - private boolean evaluateCriteria(final ProcessSession session, final ProcessContext context, final Criteria criteria, final FlowFile flowfile, final Map> matchedRules) { - final ComponentLog logger = getLogger(); + private boolean evaluateCriteria(final ProcessSession session, final ProcessContext context, final Criteria criteria, final FlowFile flowfile, final Map> matchedRules, final Map statefulAttributes) { + final ComponentLog logger = getLogger(); final List rules = criteria.getRules(); // consider each rule and hold a copy of the flowfile for each matched rule for (final Rule rule : rules) { // evaluate the rule - if (evaluateRule(context, rule, flowfile)) { + if (evaluateRule(context, rule, flowfile, statefulAttributes)) { final FlowFile flowfileToUse; // determine if we should use the original flow file or clone @@ -421,12 +486,12 @@ private boolean evaluateCriteria(final ProcessSession session, final ProcessCont } //Evaluates the specified rule on the specified flowfile. - private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile) { + private boolean evaluateRule(final ProcessContext context, final Rule rule, FlowFile flowfile, final Map statefulAttributes) { // go through each condition for (final Condition condition : rule.getConditions()) { // fail if any condition is not met - if (!evaluateCondition(context, condition, flowfile)) { + if (!evaluateCondition(context, condition, flowfile, statefulAttributes)) { return false; } } @@ -448,18 +513,19 @@ private PropertyValue getPropertyValue(final String text, final ProcessContext c } //Evaluates the specified condition on the specified flowfile. - private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile) { + private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile, final Map statefulAttributes) { try { // evaluate the expression for the given flow file - return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile).asBoolean(); + return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile, statefulAttributes).asBoolean(); } catch (final ProcessException pe) { throw new ProcessException(String.format("Unable to evaluate condition '%s': %s.", condition.getExpression(), pe), pe); } } // Executes the specified action on the specified flowfile. - private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List rules, final Map defaultActions, final FlowFile flowfile) { - final ComponentLog logger = getLogger(); + private FlowFile executeActions(final ProcessSession session, final ProcessContext context, final List rules, final Map defaultActions, final FlowFile flowfile, + final Map statefulAttributes) throws IOException { + final ComponentLog logger = getLogger(); final Map actions = new HashMap<>(defaultActions); final String ruleName = (rules == null || rules.isEmpty()) ? "default" : rules.get(rules.size() - 1).getName(); @@ -489,17 +555,32 @@ private FlowFile executeActions(final ProcessSession session, final ProcessConte final Map attributesToUpdate = new HashMap<>(actions.size()); final Set attributesToDelete = new HashSet<>(actions.size()); + final Map statefulAttributesToSet; + + if (statefulAttributes != null){ + statefulAttributesToSet = new HashMap<>(); + } else { + statefulAttributesToSet = null; + } + + // go through each action for (final Action action : actions.values()) { if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) { try { - final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile).getValue(); + final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, statefulAttributes).getValue(); // log if appropriate if (logger.isDebugEnabled()) { logger.debug(String.format("%s setting attribute '%s' = '%s' for %s per rule '%s'.", this, action.getAttribute(), newAttributeValue, flowfile, ruleName)); } + if (statefulAttributesToSet != null) { + if(!action.getAttribute().equals("UpdateAttribute.matchedRule")) { + statefulAttributesToSet.put(action.getAttribute() + "_state", newAttributeValue); + } + } + attributesToUpdate.put(action.getAttribute(), newAttributeValue); } catch (final ProcessException pe) { throw new ProcessException(String.format("Unable to evaluate new value for attribute '%s': %s.", action.getAttribute(), pe), pe); @@ -545,8 +626,14 @@ private FlowFile executeActions(final ProcessSession session, final ProcessConte } } - // update and delete the flowfile attributes - return session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete); + // update and delete the FlowFile attributes + FlowFile returnFlowfile = session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete); + + if(statefulAttributesToSet != null) { + context.getStateManager().setState(statefulAttributesToSet, scope); + } + + return returnFlowfile; } // Gets the default actions. @@ -554,10 +641,12 @@ private Map getDefaultActions(final Map defaultActions = new HashMap<>(); for (final Map.Entry entry : properties.entrySet()) { - final Action action = new Action(); - action.setAttribute(entry.getKey().getName()); - action.setValue(entry.getValue()); - defaultActions.put(action.getAttribute(), action); + if(entry.getKey() != STATE_LOCATION && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) { + final Action action = new Action(); + action.setAttribute(entry.getKey().getName()); + action.setValue(entry.getValue()); + defaultActions.put(action.getAttribute(), action); + } } return defaultActions; diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html index cd4d34fca80a..89db3a3f275c 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html @@ -247,6 +247,68 @@

Description:

Once all changes have been saved in the Advanced UI, the UI can be closed using the X in the top right corner.

+

+ Stateful Usage +

+ +

+ By selecting either the "Local" or "Cluster" option for the "State Location" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but + also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be + referenced as a stateful variable like so: + +

    +
  • Dynamic Property +
      +
    • key : theCount
    • +
    • value : ${theCount_state:plus(1)}
    • +
    +
  • +
+ + This example will keep a count of the total number of FlowFiles that have passed through the processor. To use logic on top of State, simply use the "Advanced Usage" of UpdateAttribute. + All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the + flow so far. This would be done by having a condition of "${maxValue_state:lt(${value})}" and an action of attribute:"maxValue", value:"${value}". + + The "Stateful Variables Initial Value" property is used to initialize the stateful variables. Some logic rules will require a very high initial value, like using the Advanced rules to + determine the minimum value. + + + If stateful properties reference other stateful properties then the value for the other stateful properties will be an iteration behind. For example, attempting to calculate the + average of the incoming stream requires the sum and count. If all three properties are set in the same UpdateAttribute (like below) then the Average will always not include the most + recent values of count and sum: + +
    +
  • Count +
      +
    • key : theCount
    • +
    • value : ${theCount_state:plus(1)}
    • +
    +
  • + +
  • Sum +
      +
    • key : theSum
    • +
    • value : ${theSum_state:plus(${flowfileValue})}
    • +
    +
  • + +
  • Average +
      +
    • key : theAverage
    • +
    • value : ${theSum_state:divide(theCount_state)}
    • +
    +
  • +
+ + Instead, since average only relies on theCount and theSum attributes (which are added to the FlowFile as well) there should be a following Stateless UpdateAttribute which properly + calculates the average. + + In the event that the processor is unable to get the state at the beginning of the onTrigger, the FlowFile will be pushed back to the originating relationship and the processor will yield. + If the processor is able to get the state at the beginning of the onTrigger but unable to set the state after adding attributes to the FlowFile, the FlowFile will be transferred to + "set state fail". This is normally due to the state not being the most up to date version (another thread has replaced the state with another version). In most use-cases this relationship + should loop back to the processor since the only affected attributes will be overwritten. +

+

Properties:

@@ -267,7 +329,13 @@

Description:

  • success
    • If the processor successfully updates the specified attribute(s), then the FlowFile follows this relationship.
    • -
  • + + +
  • set state fail +
      +
    • If the processor is running statefully, and fails to set the state after adding attributes to the FlowFile, then the FlowFile will be routed to this relationship.
    • +
    +
  • diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index 90b51bd6b82a..cd944d308793 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -105,17 +105,218 @@ public void testDefault() throws Exception { @Test public void testDefaultAddAttribute() throws Exception { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("NewAttr", "abc${'Hello${Goose}'}!"); + runner.setProperty("NewAttr", "${one:plus(${two})}"); final Map attributes = new HashMap<>(); - attributes.put("Goose", "Geese"); - attributes.put("HelloGeese", "123"); + attributes.put("one", "1"); + attributes.put("two", "2"); runner.enqueue(new byte[0], attributes); runner.run(); runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 1); - runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(0).assertAttributeEquals("NewAttr", "abc123!"); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(0).assertAttributeEquals("NewAttr", "3"); + } + + @Test + public void testDefaultState() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty("count", "${count_state:plus(1)}"); + runner.setProperty("sum", "${sum_state:plus(${pencils})}"); + + runner.assertValid(); + + final Map attributes2 = new HashMap<>(); + attributes2.put("pencils", "2"); + + runner.enqueue(new byte[0],attributes2); + runner.enqueue(new byte[0],attributes2); + + final Map attributes3 = new HashMap<>(); + attributes3.put("pencils", "3"); + runner.enqueue(new byte[0], attributes3); + runner.enqueue(new byte[0], attributes3); + + final Map attributes5 = new HashMap<>(); + attributes5.put("pencils", "5"); + runner.enqueue(new byte[0], attributes5); + + runner.run(5); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 5); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(4).assertAttributeEquals("count", "5"); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(4).assertAttributeEquals("sum", "15"); + } + + @Test + public void testStateWithInitValue() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "10"); + runner.setProperty("count", "${count_state:plus(1)}"); + runner.setProperty("sum", "${sum_state:plus(${pencils})}"); + + runner.assertValid(); + + final Map attributes2 = new HashMap<>(); + attributes2.put("pencils", "2"); + + runner.enqueue(new byte[0],attributes2); + runner.enqueue(new byte[0],attributes2); + + final Map attributes3 = new HashMap<>(); + attributes3.put("pencils", "3"); + runner.enqueue(new byte[0], attributes3); + runner.enqueue(new byte[0], attributes3); + + final Map attributes5 = new HashMap<>(); + attributes5.put("pencils", "5"); + runner.enqueue(new byte[0], attributes5); + + runner.run(5); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 5); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(4).assertAttributeEquals("count", "15"); + runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS).get(4).assertAttributeEquals("sum", "25"); + } + + @Test + public void testRuleHitWithState() throws Exception { + final Criteria criteria = getCriteria(); + addRule(criteria, "rule", Arrays.asList( + // conditions + "${maxValue_state:lt(${value})}"), getMap( + // actions + "maxValue", "${value}")); + + TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setAnnotationData(serialize(criteria)); + + final Map attributes = new HashMap<>(); + attributes.put("value", "1"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "2"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "4"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "1"); + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 4); + final List result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS); + result.get(2).assertAttributeEquals("maxValue", "4"); + result.get(3).assertAttributeEquals("maxValue", null); + } + + @Test + public void testRuleHitWithStateWithDefault() throws Exception { + final Criteria criteria = getCriteria(); + addRule(criteria, "rule", Arrays.asList( + // conditions + "${maxValue_state:lt(${value})}"), getMap( + // actions + "maxValue", "${value}")); + + TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setAnnotationData(serialize(criteria)); + runner.setProperty("maxValue", "${maxValue_state}"); + + final Map attributes = new HashMap<>(); + attributes.put("value", "1"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "2"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "4"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "1"); + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 4); + final List result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS); + result.get(2).assertAttributeEquals("maxValue", "4"); + result.get(3).assertAttributeEquals("maxValue", "4"); + } + + @Test + public void testRuleHitWithStateWithInitValue() throws Exception { + final Criteria criteria = getCriteria(); + addRule(criteria, "rule", Arrays.asList( + // conditions + "${minValue_state:ge(${value})}"), getMap( + // actions + "minValue", "${value}")); + + TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "5"); + runner.setAnnotationData(serialize(criteria)); + + final Map attributes = new HashMap<>(); + attributes.put("value", "1"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "2"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "4"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "1"); + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 4); + final List result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS); + result.get(3).assertAttributeEquals("minValue", "1"); + } + + @Test + public void testMultipleRulesWithStateAndDelete() throws Exception { + final Criteria criteria = getCriteria(); + addRule(criteria, "rule", Arrays.asList( + // conditions + "${maxValue_state:lt(${value})}"), getMap( + // actions + "maxValue", "${value}")); + + TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); + runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "badValue"); + runner.setAnnotationData(serialize(criteria)); + runner.setProperty("maxValue", "${maxValue_state}"); + runner.setProperty("theCount", "${theCount_state:plus(1)}"); + + final Map attributes = new HashMap<>(); + attributes.put("value", "1"); + attributes.put("badValue", "10"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "2"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "5"); + runner.enqueue(new byte[0], attributes); + runner.run(); + attributes.put("value", "1"); + runner.enqueue(new byte[0], attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateAttribute.REL_SUCCESS, 4); + final List result = runner.getFlowFilesForRelationship(UpdateAttribute.REL_SUCCESS); + result.get(3).assertAttributeEquals("maxValue", "5"); + result.get(3).assertAttributeEquals("theCount", "4"); + result.get(0).assertAttributeEquals("badValue", null); } @Test @@ -423,7 +624,7 @@ public void testMultipleRuleHitsWithUseOriginalDoesntApplyDefaultsRepeatedly() t @Test public void testSimpleDelete() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "attribute.2"); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "attribute.2"); final Map attributes = new HashMap<>(); attributes.put("attribute.1", "value.1"); @@ -441,7 +642,7 @@ public void testSimpleDelete() { @Test public void testRegexDotDelete() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "attribute.2"); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "attribute.2"); final Map attributes = new HashMap<>(); attributes.put("attribute.1", "value.1"); @@ -461,7 +662,7 @@ public void testRegexDotDelete() { @Test public void testRegexLiteralDotDelete() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "attribute\\.2"); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "attribute\\.2"); final Map attributes = new HashMap<>(); attributes.put("attribute.1", "value.1"); @@ -481,7 +682,7 @@ public void testRegexLiteralDotDelete() { @Test public void testRegexGroupDelete() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "(attribute\\.[2-5]|sample.*)"); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "(attribute\\.[2-5]|sample.*)"); final Map attributes = new HashMap<>(); attributes.put("attribute.1", "value.1"); @@ -507,7 +708,7 @@ public void testRegexGroupDelete() { @Test public void testAttributeKey() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "(attribute\\.[2-5]|sample.*)"); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "(attribute\\.[2-5]|sample.*)"); final Map attributes = new HashMap<>(); attributes.put("attribute.1", "value.1"); @@ -533,7 +734,7 @@ public void testAttributeKey() { @Test public void testExpressionLiteralDelete() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "${literal('attribute\\.'):append(${literal(6)})}"); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "${literal('attribute\\.'):append(${literal(6)})}"); final Map attributes = new HashMap<>(); attributes.put("attribute.1", "value.1"); @@ -559,7 +760,7 @@ public void testExpressionLiteralDelete() { @Test public void testExpressionRegexDelete() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "${literal('(attribute\\.'):append(${literal('[2-5]')}):append(${literal('|sample.*)')})}"); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "${literal('(attribute\\.'):append(${literal('[2-5]')}):append(${literal('|sample.*)')})}"); final Map attributes = new HashMap<>(); attributes.put("attribute.1", "value.1"); @@ -585,7 +786,7 @@ public void testExpressionRegexDelete() { @Test public void testAttributeListDelete() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "attribute.1|attribute.2|sample.1|simple.1"); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "attribute.1|attribute.2|sample.1|simple.1"); final Map attributes = new HashMap<>(); attributes.put("attribute.1", "value.1"); @@ -611,14 +812,14 @@ public void testAttributeListDelete() { @Test public void testInvalidRegex() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "("); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "("); runner.assertNotValid(); } @Test public void testInvalidRegexInAttribute() { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty("Delete Attributes Expression", "${butter}"); + runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "${butter}"); runner.assertValid(); final Map attributes = new HashMap<>(); From 5ab5530cee3dd2f6c3f5699091171cf9a771651e Mon Sep 17 00:00:00 2001 From: jpercivall Date: Wed, 6 Apr 2016 13:14:19 -0400 Subject: [PATCH 2/6] NIFI-1582 removing the option to use cluster state --- .../attributes/UpdateAttribute.java | 51 ++++++++----------- .../additionalDetails.html | 2 +- .../attributes/TestUpdateAttribute.java | 12 ++--- 3 files changed, 28 insertions(+), 37 deletions(-) diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index ca130d4b7378..00b41f8259e7 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -44,7 +44,6 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -82,11 +81,10 @@ @DynamicProperty(name = "A FlowFile attribute to update", value = "The value to set it to", supportsExpressionLanguage = true, description = "Updates a FlowFile attribute specified by the Dynamic Property's key with the value specified by the Dynamic Property's value") @WritesAttribute(attribute = "See additional details", description = "This processor may write or remove zero or more attributes as described in additional details") -@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner." + - "State is stored either local or clustered depend on the property.") +@Stateful(scopes = {Scope.LOCAL}, description = "Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner.") public class UpdateAttribute extends AbstractProcessor implements Searchable { - private Scope scope = null; + private boolean stateful = false; private final AtomicReference criteriaCache = new AtomicReference<>(null); private final ConcurrentMap propertyValues = new ConcurrentHashMap<>(); @@ -128,19 +126,14 @@ public ValidationResult validate(String subject, String input, ValidationContext .expressionLanguageSupported(true) .build(); - - public static final AllowableValue LOCATION_STATELESS = new AllowableValue("Stateless", "Stateless", "Do not store state."); - public static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", "Local", "Store the state locally."); - public static final AllowableValue LOCATION_CLUSTER = new AllowableValue("Cluster", "Cluster", "Store the state at the cluster level."); - - public static final PropertyDescriptor STATE_LOCATION = new PropertyDescriptor.Builder() - .name("State Location") - .description("Select where or not state will be store and if so, where to store it. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " + - "FlowFile in a stateless manner. Selecting 'Local' or 'Cluster' will not only store the attributes on the FlowFile but also in the Processors state. See the 'Stateful Usage' " + + public static final PropertyDescriptor STORE_STATE = new PropertyDescriptor.Builder() + .name("Store State") + .description("Select whether or not state will be stored. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " + + "FlowFile in a stateless manner. Selecting 'Stateful' will not only store the attributes on the FlowFile but also in the Processors state. See the 'Stateful Usage' " + "topic of the 'Additional Details' section of this processor's documentation for more information") .required(true) - .allowableValues(LOCATION_STATELESS, LOCATION_LOCAL, LOCATION_CLUSTER) - .defaultValue(LOCATION_STATELESS.getValue()) + .allowableValues("false", "true") + .defaultValue("false") .build(); public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder() .name("Stateful Variables Initial Value") @@ -174,14 +167,14 @@ public Set getRelationships() { protected List getSupportedPropertyDescriptors() { List descriptors = new ArrayList<>(); descriptors.add(DELETE_ATTRIBUTES); - descriptors.add(STATE_LOCATION); + descriptors.add(STORE_STATE); descriptors.add(STATEFUL_VARIABLES_INIT_VALUE); return Collections.unmodifiableList(descriptors); } @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - if(scope != null){ + if(!stateful){ return new PropertyDescriptor.Builder() .name(propertyDescriptorName) .required(false) @@ -206,13 +199,11 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { super.onPropertyModified(descriptor, oldValue, newValue); - if (descriptor.equals(STATE_LOCATION)) { - if (LOCATION_CLUSTER.getValue().equalsIgnoreCase(newValue)) { - scope = Scope.CLUSTER; - } else if (LOCATION_LOCAL.getValue().equalsIgnoreCase(newValue)) { - scope = Scope.LOCAL; + if (descriptor.equals(STORE_STATE)) { + if ("true".equalsIgnoreCase(newValue)) { + stateful = true; } else { - scope = null; + stateful = false; } } } @@ -223,9 +214,9 @@ public void onScheduled(final ProcessContext context) throws IOException { propertyValues.clear(); - if(scope != null) { + if(stateful) { StateManager stateManager = context.getStateManager(); - StateMap state = stateManager.getState(scope); + StateMap state = stateManager.getState(Scope.LOCAL); HashMap tempMap = new HashMap<>(); tempMap.putAll(state.toMap()); String initValue = context.getProperty(STATEFUL_VARIABLES_INIT_VALUE).getValue(); @@ -251,7 +242,7 @@ public void onScheduled(final ProcessContext context) throws IOException { } } - context.getStateManager().setState(tempMap, scope); + context.getStateManager().setState(tempMap, Scope.LOCAL); } } @@ -398,8 +389,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session matchedRules.clear(); try { - if (scope != null) { - statefulAttributes = new HashMap<>(context.getStateManager().getState(scope).toMap()); + if (stateful) { + statefulAttributes = new HashMap<>(context.getStateManager().getState(Scope.LOCAL).toMap()); } else { statefulAttributes = null; } @@ -630,7 +621,7 @@ private FlowFile executeActions(final ProcessSession session, final ProcessConte FlowFile returnFlowfile = session.removeAllAttributes(session.putAllAttributes(flowfile, attributesToUpdate), attributesToDelete); if(statefulAttributesToSet != null) { - context.getStateManager().setState(statefulAttributesToSet, scope); + context.getStateManager().setState(statefulAttributesToSet, Scope.LOCAL); } return returnFlowfile; @@ -641,7 +632,7 @@ private Map getDefaultActions(final Map defaultActions = new HashMap<>(); for (final Map.Entry entry : properties.entrySet()) { - if(entry.getKey() != STATE_LOCATION && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) { + if(entry.getKey() != STORE_STATE && entry.getKey() != STATEFUL_VARIABLES_INIT_VALUE) { final Action action = new Action(); action.setAttribute(entry.getKey().getName()); action.setValue(entry.getValue()); diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html index 89db3a3f275c..6574fd24829a 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html @@ -252,7 +252,7 @@

    Description:

    - By selecting either the "Local" or "Cluster" option for the "State Location" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but + By selecting "Stateful" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be referenced as a stateful variable like so: diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index cd944d308793..7178e3d4eff7 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -121,7 +121,7 @@ public void testDefaultAddAttribute() throws Exception { @Test public void testDefaultState() throws Exception { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty(UpdateAttribute.STORE_STATE, "true"); runner.setProperty("count", "${count_state:plus(1)}"); runner.setProperty("sum", "${sum_state:plus(${pencils})}"); @@ -152,7 +152,7 @@ public void testDefaultState() throws Exception { @Test public void testStateWithInitValue() throws Exception { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty(UpdateAttribute.STORE_STATE, "true"); runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "10"); runner.setProperty("count", "${count_state:plus(1)}"); runner.setProperty("sum", "${sum_state:plus(${pencils})}"); @@ -191,7 +191,7 @@ public void testRuleHitWithState() throws Exception { "maxValue", "${value}")); TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty(UpdateAttribute.STORE_STATE, "true"); runner.setAnnotationData(serialize(criteria)); final Map attributes = new HashMap<>(); @@ -224,7 +224,7 @@ public void testRuleHitWithStateWithDefault() throws Exception { "maxValue", "${value}")); TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty(UpdateAttribute.STORE_STATE, "true"); runner.setAnnotationData(serialize(criteria)); runner.setProperty("maxValue", "${maxValue_state}"); @@ -258,7 +258,7 @@ public void testRuleHitWithStateWithInitValue() throws Exception { "minValue", "${value}")); TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty(UpdateAttribute.STORE_STATE, "true"); runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "5"); runner.setAnnotationData(serialize(criteria)); @@ -291,7 +291,7 @@ public void testMultipleRulesWithStateAndDelete() throws Exception { "maxValue", "${value}")); TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STATE_LOCATION, UpdateAttribute.LOCATION_LOCAL); + runner.setProperty(UpdateAttribute.STORE_STATE, "true"); runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "badValue"); runner.setAnnotationData(serialize(criteria)); runner.setProperty("maxValue", "${maxValue_state}"); From 7cf00ac0b846686592ea060ad945547d5124a5e4 Mon Sep 17 00:00:00 2001 From: jpercivall Date: Mon, 18 Apr 2016 20:27:25 -0400 Subject: [PATCH 3/6] NIFI-1582 addressing Oleg's comments --- .../attributes/UpdateAttribute.java | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index 00b41f8259e7..414cb463dbf1 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -174,23 +174,20 @@ protected List getSupportedPropertyDescriptors() { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - if(!stateful){ - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .required(false) - .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .dynamic(true) + PropertyDescriptor.Builder propertyBuilder = new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true); + + if (stateful) { + return propertyBuilder + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) .build(); } else { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .required(false) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) - .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) - .expressionLanguageSupported(true) - .dynamic(true) + return propertyBuilder + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); } } @@ -200,11 +197,7 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String super.onPropertyModified(descriptor, oldValue, newValue); if (descriptor.equals(STORE_STATE)) { - if ("true".equalsIgnoreCase(newValue)) { - stateful = true; - } else { - stateful = false; - } + stateful = "true".equalsIgnoreCase(newValue); } } From cc5b34f08cdb3203372cb678761187e93ace5fff Mon Sep 17 00:00:00 2001 From: jpercivall Date: Wed, 4 May 2016 08:58:08 -0400 Subject: [PATCH 4/6] NIFI-1582 No longer forcing numbers as the init value and adding getStateValue() to EL instead of using 'ATTRIBUTE_state' --- .../apache/nifi/components/PropertyValue.java | 23 ++++ .../language/antlr/AttributeExpressionLexer.g | 1 + .../antlr/AttributeExpressionParser.g | 2 +- .../language/AttributesAndState.java | 102 ++++++++++++++++++ .../language/EmptyPreparedQuery.java | 5 + .../language/InvalidPreparedQuery.java | 5 +- .../expression/language/PreparedQuery.java | 1 + .../attribute/expression/language/Query.java | 31 +++++- .../language/StandardPreparedQuery.java | 9 +- .../language/StandardPropertyValue.java | 8 +- .../functions/GetStateVariableEvaluator.java | 59 ++++++++++ .../expression/language/TestQuery.java | 35 +++++- .../asciidoc/expression-language-guide.adoc | 16 +++ .../apache/nifi/util/MockPropertyValue.java | 8 +- .../scheduling/ConnectableProcessContext.java | 6 ++ .../attributes/UpdateAttribute.java | 69 ++++++++---- .../additionalDetails.html | 15 +-- .../attributes/TestUpdateAttribute.java | 35 +++--- 18 files changed, 372 insertions(+), 58 deletions(-) create mode 100644 nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/AttributesAndState.java create mode 100644 nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetStateVariableEvaluator.java diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java index 666ada5539a5..efe76ee591b9 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java @@ -257,6 +257,29 @@ public interface PropertyValue { */ PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator) throws ProcessException; + + /** + *

    + * Replaces values in the Property Value using the NiFi Expression + * Language; a PropertyValue with the new value is then returned, supporting + * call chaining. + *

    + * + * @param flowFile to evaluate attributes of + * @param additionalAttributes a Map of additional attributes that the Expression can reference. If entries in + * this Map conflict with entries in the FlowFile's attributes, the entries in this Map are given a higher priority. + * @param decorator the decorator to use in order to update the values returned by the Expression Language + * @param stateValues a Map of the state values to be referenced explicitly by specific state accessing functions + * + * @return a PropertyValue with the new value is returned, supporting call + * chaining + * + * @throws ProcessException if the Expression cannot be compiled or evaluating + * the Expression against the given attributes causes an Exception to be thrown + */ + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator, Map stateValues) + throws ProcessException; + /** *

    * Replaces values in the Property Value using the NiFi Expression Language. diff --git a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g index f09eba8e3721..071fda9a2a90 100644 --- a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g +++ b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g @@ -137,6 +137,7 @@ UNESCAPE_HTML3 : 'unescapeHtml3'; UNESCAPE_HTML4 : 'unescapeHtml4'; BASE64_ENCODE : 'base64Encode'; BASE64_DECODE : 'base64Decode'; +GET_STATE_VALUE: 'getStateValue'; // 1 arg functions SUBSTRING_AFTER : 'substringAfter'; diff --git a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g index eb50a280f970..11cbec866ac8 100644 --- a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g +++ b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g @@ -129,7 +129,7 @@ functionCall : functionRef -> booleanLiteral : TRUE | FALSE; zeroArgStandaloneFunction : (IP | UUID | NOW | NEXT_INT | HOSTNAME | RANDOM) LPAREN! RPAREN!; -oneArgStandaloneFunction : ((TO_LITERAL | MATH)^ LPAREN! anyArg RPAREN!) | +oneArgStandaloneFunction : ((TO_LITERAL | MATH | GET_STATE_VALUE)^ LPAREN! anyArg RPAREN!) | (HOSTNAME^ LPAREN! booleanLiteral RPAREN!); standaloneFunction : zeroArgStandaloneFunction | oneArgStandaloneFunction; diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/AttributesAndState.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/AttributesAndState.java new file mode 100644 index 000000000000..b4ec5b2e2251 --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/AttributesAndState.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.attribute.expression.language; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +/* + *This class is passed to Evaluators so that certain evaluators that specifically work with state will have access to the state values explicitly. + *It implements Map so that other evaluators don't have to be changed. + */ +public class AttributesAndState implements Map { + + private final Map stateMap; + private final Map attributes; + + public AttributesAndState(Map attributes, Map state) { + super(); + stateMap = state; + this.attributes = attributes; + } + + public Map getStateMap() { + return stateMap; + } + + @Override + public int size() { + return attributes.size(); + } + + @Override + public boolean isEmpty() { + return attributes.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return attributes.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return attributes.containsValue(value); + } + + @Override + public String get(Object key) { + return attributes.get(key); + } + + @Override + public String put(String key, String value) { + throw new UnsupportedOperationException(); + } + + @Override + public String remove(Object key) { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map m) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public Set keySet() { + return attributes.keySet(); + } + + @Override + public Collection values() { + return attributes.values(); + } + + @Override + public Set> entrySet() { + return attributes.entrySet(); + } +} diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java index 5ed00ed74ec3..403753163f60 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java @@ -33,4 +33,9 @@ public class EmptyPreparedQuery implements PreparedQuery { public String evaluateExpressions(Map valueLookup, AttributeValueDecorator decorator) throws ProcessException { return value; } + + @Override + public String evaluateExpressions(Map attributes, AttributeValueDecorator decorator, Map stateVariables) throws ProcessException { + return value; + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java index cbf6c663b56e..1033c713eb9a 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java @@ -43,5 +43,8 @@ public String evaluateExpressions(final Map valueLookup, final At throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); } - + @Override + public String evaluateExpressions( Map valueLookup, AttributeValueDecorator decorator, Map stateVariables) throws ProcessException { + throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java index e1a1db7bdda5..53f7296234cb 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java @@ -25,4 +25,5 @@ public interface PreparedQuery { String evaluateExpressions(Map valueLookup, AttributeValueDecorator decorator) throws ProcessException; + String evaluateExpressions(final Map valueLookup, final AttributeValueDecorator decorator, final Map stateVariables) throws ProcessException; } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java index fb48b0ffe331..06caf5361f77 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/Query.java @@ -49,6 +49,7 @@ import org.apache.nifi.attribute.expression.language.evaluation.functions.FormatEvaluator; import org.apache.nifi.attribute.expression.language.evaluation.functions.FromRadixEvaluator; import org.apache.nifi.attribute.expression.language.evaluation.functions.GetDelimitedFieldEvaluator; +import org.apache.nifi.attribute.expression.language.evaluation.functions.GetStateVariableEvaluator; import org.apache.nifi.attribute.expression.language.evaluation.functions.GreaterThanEvaluator; import org.apache.nifi.attribute.expression.language.evaluation.functions.GreaterThanOrEqualEvaluator; import org.apache.nifi.attribute.expression.language.evaluation.functions.HostnameEvaluator; @@ -149,6 +150,7 @@ import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.FIND; import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.FORMAT; import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GET_DELIMITED_FIELD; +import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GET_STATE_VALUE; import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GREATER_THAN; import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.GREATER_THAN_OR_EQUAL; import static org.apache.nifi.attribute.expression.language.antlr.AttributeExpressionParser.HOSTNAME; @@ -384,8 +386,9 @@ static int findEndQuoteChar(final String value, final int quoteStart) { return -1; } - static String evaluateExpression(final Tree tree, final String queryText, final Map valueMap, final AttributeValueDecorator decorator) throws ProcessException { - final Object evaluated = Query.fromTree(tree, queryText).evaluate(valueMap).getValue(); + static String evaluateExpression(final Tree tree, final String queryText, final Map valueMap, final AttributeValueDecorator decorator, + final Map stateVariables) throws ProcessException { + final Object evaluated = Query.fromTree(tree, queryText).evaluate(valueMap, stateVariables).getValue(); if (evaluated == null) { return null; } @@ -395,6 +398,11 @@ static String evaluateExpression(final Tree tree, final String queryText, final return decorator == null ? escaped : decorator.decorate(escaped); } + static String evaluateExpressions(final String rawValue, Map expressionMap, final AttributeValueDecorator decorator, final Map stateVariables) + throws ProcessException { + return Query.prepare(rawValue).evaluateExpressions(expressionMap, decorator, stateVariables); + } + static String evaluateExpressions(final String rawValue, final Map valueLookup) throws ProcessException { return evaluateExpressions(rawValue, valueLookup, null); } @@ -563,13 +571,22 @@ public ResultType getResultType() { } QueryResult evaluate(final Map map) { + return evaluate(map, null); + } + + QueryResult evaluate(final Map attributes, final Map stateMap) { if (evaluated.getAndSet(true)) { throw new IllegalStateException("A Query cannot be evaluated more than once"); } - - return evaluator.evaluate(map); + if (stateMap != null) { + AttributesAndState attributesAndState = new AttributesAndState(attributes, stateMap); + return evaluator.evaluate(attributesAndState); + } else { + return evaluator.evaluate(attributes); + } } + Tree getTree() { return this.tree; } @@ -747,6 +764,12 @@ private static Evaluator buildEvaluator(final Tree tree) { throw new AttributeExpressionLanguageParsingException("Call to math() as the subject must take exactly 1 parameter"); } } + case GET_STATE_VALUE: { + final Tree childTree = tree.getChild(0); + final Evaluator argEvaluator = buildEvaluator(childTree); + final Evaluator stringEvaluator = toStringEvaluator(argEvaluator); + return new GetStateVariableEvaluator(stringEvaluator); + } default: throw new AttributeExpressionLanguageParsingException("Unexpected token: " + tree.toString()); } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java index 7473b3b11fcc..39cfb25f9f70 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPreparedQuery.java @@ -38,14 +38,14 @@ public StandardPreparedQuery(final List queryStrings, final Map valueMap, final AttributeValueDecorator decorator) throws ProcessException { + public String evaluateExpressions(final Map valMap, final AttributeValueDecorator decorator, final Map stateVariables) throws ProcessException { final StringBuilder sb = new StringBuilder(); for (final String val : queryStrings) { final Tree tree = trees.get(val); if (tree == null) { sb.append(val); } else { - final String evaluated = Query.evaluateExpression(tree, val, valueMap, decorator); + final String evaluated = Query.evaluateExpression(tree, val, valMap, decorator, stateVariables); if (evaluated != null) { sb.append(evaluated); } @@ -54,4 +54,9 @@ public String evaluateExpressions(final Map valueMap, final Attr return sb.toString(); } + @Override + public String evaluateExpressions(final Map valMap, final AttributeValueDecorator decorator) + throws ProcessException { + return evaluateExpressions(valMap, decorator, null); + } } diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java index d70b2d86ac0f..94c1c5021e2f 100644 --- a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/StandardPropertyValue.java @@ -145,11 +145,17 @@ public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final @Override public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException { + return evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, null); + } + + @Override + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator, Map stateValues) + throws ProcessException { if (rawValue == null || preparedQuery == null) { return this; } final ValueLookup lookup = new ValueLookup(variableRegistry, flowFile, additionalAttributes); - return new StandardPropertyValue(preparedQuery.evaluateExpressions(lookup, decorator), serviceLookup, null, variableRegistry); + return new StandardPropertyValue(preparedQuery.evaluateExpressions(lookup, decorator, stateValues), serviceLookup, null); } @Override diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetStateVariableEvaluator.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetStateVariableEvaluator.java new file mode 100644 index 000000000000..8808e17eb66d --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/evaluation/functions/GetStateVariableEvaluator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.attribute.expression.language.evaluation.functions; + +import org.apache.nifi.attribute.expression.language.AttributesAndState; +import org.apache.nifi.attribute.expression.language.evaluation.Evaluator; +import org.apache.nifi.attribute.expression.language.evaluation.QueryResult; +import org.apache.nifi.attribute.expression.language.evaluation.StringEvaluator; +import org.apache.nifi.attribute.expression.language.evaluation.StringQueryResult; + +import java.util.Map; + +public class GetStateVariableEvaluator extends StringEvaluator { + + private final Evaluator subject; + + public GetStateVariableEvaluator(final Evaluator subject) { + this.subject = subject; + } + + @Override + public QueryResult evaluate(Map attributes) { + if (!(attributes instanceof AttributesAndState)){ + return new StringQueryResult(null); + } + + final String subjectValue = subject.evaluate(attributes).getValue(); + if (subjectValue == null) { + return new StringQueryResult(null); + } + + AttributesAndState attributesAndState = (AttributesAndState) attributes; + + Map stateMap = attributesAndState.getStateMap(); + String stateValue = stateMap.get(subjectValue); + + return new StringQueryResult(stateValue); + } + + @Override + public Evaluator getSubjectEvaluator() { + return subject; + } +} diff --git a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java index f47fbbb7592b..b666b261d39b 100644 --- a/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java +++ b/nifi-commons/nifi-expression-language/src/test/java/org/apache/nifi/attribute/expression/language/TestQuery.java @@ -72,6 +72,7 @@ public void testCompilation() { assertValid("${hostname()}"); assertValid("${literal(3)}"); assertValid("${random()}"); + assertValid("${getStateValue('the_count')}"); // left here because it's convenient for looking at the output //System.out.println(Query.compile("").evaluate(null)); } @@ -1488,6 +1489,32 @@ public void testEvaluateWithinCurlyBraces() { assertEquals("{ xyz }", Query.evaluateExpressions(query, attributes)); } + @Test + public void testGetStateValue() { + final Map stateValues = new HashMap<>(); + stateValues.put("abc", "xyz"); + stateValues.put("123", "qwe"); + stateValues.put("true", "asd"); + stateValues.put("iop", "098"); + + final Map attributes = new HashMap<>(); + attributes.put("abc", "iop"); + attributes.put("4321", "123"); + attributes.put("false", "bnm"); + + String query = "${getStateValue('abc')}"; + verifyEquals(query, attributes, stateValues, "xyz"); + + query = "${getStateValue(${'4321':toString()})}"; + verifyEquals(query, attributes, stateValues, "qwe"); + + query = "${getStateValue(${literal(true):toString()})}"; + verifyEquals(query, attributes, stateValues, "asd"); + + query = "${getStateValue(${abc}):equals('098')}"; + verifyEquals(query, attributes, stateValues, true); + } + @Test public void testLiteralFunction() { final Map attrs = Collections.emptyMap(); @@ -1658,11 +1685,15 @@ public void testUnescapeFunctions() { } private void verifyEquals(final String expression, final Map attributes, final Object expectedResult) { + verifyEquals(expression,attributes, null, expectedResult); + } + + private void verifyEquals(final String expression, final Map attributes, final Map stateValues, final Object expectedResult) { Query.validateExpression(expression, false); - assertEquals(String.valueOf(expectedResult), Query.evaluateExpressions(expression, attributes, null)); + assertEquals(String.valueOf(expectedResult), Query.evaluateExpressions(expression, attributes, null, stateValues)); final Query query = Query.compile(expression); - final QueryResult result = query.evaluate(attributes); + final QueryResult result = query.evaluate(attributes, stateValues); if (expectedResult instanceof Long) { if (ResultType.NUMBER.equals(result.getResultType())) { diff --git a/nifi-docs/src/main/asciidoc/expression-language-guide.adoc b/nifi-docs/src/main/asciidoc/expression-language-guide.adoc index c5e2a77ff60d..7af3be73f96d 100644 --- a/nifi-docs/src/main/asciidoc/expression-language-guide.adoc +++ b/nifi-docs/src/main/asciidoc/expression-language-guide.adoc @@ -1974,6 +1974,22 @@ an error when validating the function. `${literal( ${allMatchingAttributes('a.*'):count()} ):gt(3)}` returns true if there are more than 3 attributes whose names begin with the letter `a`. +[.function] +=== getStateValue + +*Description*: [.description]#Access a processor's state values by passing in the String key and getting the value back as a String. This + is a special Expression Language function that only works with processors that explicitly allow EL to query state. Currently only UpdateAttribute + does.# + +*Subject Type*: [.subjectless]#No Subject# + +*Arguments*: + + - [.String]#_Key_# : [.argDesc]#The key to use when accessing the state map.# + +*Return Type*: [.returnType]#String# + +*Examples*: UpdateAttribute processor has stored the key "count" with value "20" in state. '${getStateValue("count")}` returns `20`. [[multi]] == Evaluating Multiple Attributes diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java index 86c6ee7f9f2b..b6752a758864 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java @@ -171,12 +171,18 @@ public PropertyValue evaluateAttributeExpressions(final Map attr @Override public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final Map additionalAttributes, final AttributeValueDecorator decorator) throws ProcessException { + return evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, null); + } + + @Override + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator, Map stateValues) + throws ProcessException { markEvaluated(); if (rawValue == null) { return this; } - final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator); + final PropertyValue newValue = stdPropValue.evaluateAttributeExpressions(flowFile, additionalAttributes, decorator, stateValues); return new MockPropertyValue(newValue.getValue(), serviceLookup, propertyDescriptor, true, variableRegistry); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index 169cdee0a9ff..7c4ce77d32c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@ -160,6 +160,12 @@ public PropertyValue evaluateAttributeExpressions(Map attributes public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator) throws ProcessException { return null; } + + @Override + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, Map additionalAttributes, AttributeValueDecorator decorator, Map stateValues) + throws ProcessException { + return null; + } }; } diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index 414cb463dbf1..f220e795dc50 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -84,11 +84,37 @@ @Stateful(scopes = {Scope.LOCAL}, description = "Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner.") public class UpdateAttribute extends AbstractProcessor implements Searchable { + + public static final String DO_NOT_STORE_STATE = "do not store state"; + public static final String STORE_STATE_LOCALLY = "store state locally"; + private boolean stateful = false; private final AtomicReference criteriaCache = new AtomicReference<>(null); private final ConcurrentMap propertyValues = new ConcurrentHashMap<>(); - private final Set relationships; + private final static Set statelessRelationshipSet; + private final static Set statefulRelationshipSet; + + // relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .description("All successful FlowFiles are routed to this relationship").name("success").build(); + public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder() + .description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here.").name("set state fail").build(); + + static { + Set tempStatelessSet = new HashSet<>(); + tempStatelessSet.add(REL_SUCCESS); + + statelessRelationshipSet = Collections.unmodifiableSet(tempStatelessSet); + + Set tempStatefulSet = new HashSet<>(); + tempStatefulSet.add(REL_SUCCESS); + tempStatefulSet.add(REL_FAILED_SET_STATE); + + statefulRelationshipSet = Collections.unmodifiableSet(tempStatefulSet); + } + + private volatile Set relationships; private static final Validator DELETE_PROPERTY_VALIDATOR = new Validator() { private final Validator DPV_RE_VALIDATOR = StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true); @@ -132,8 +158,8 @@ public ValidationResult validate(String subject, String input, ValidationContext "FlowFile in a stateless manner. Selecting 'Stateful' will not only store the attributes on the FlowFile but also in the Processors state. See the 'Stateful Usage' " + "topic of the 'Additional Details' section of this processor's documentation for more information") .required(true) - .allowableValues("false", "true") - .defaultValue("false") + .allowableValues(DO_NOT_STORE_STATE, STORE_STATE_LOCALLY) + .defaultValue(DO_NOT_STORE_STATE) .build(); public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder() .name("Stateful Variables Initial Value") @@ -141,21 +167,12 @@ public ValidationResult validate(String subject, String input, ValidationContext "when state does not contain a value for the variable.") .required(false) .defaultValue("0") - .addValidator(StandardValidators.NUMBER_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - // relationships - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .description("All successful FlowFiles are routed to this relationship").name("success").build(); - public static final Relationship REL_FAILED_SET_STATE = new Relationship.Builder() - .description("A failure to set the state after adding the attributes to the FlowFile will route the FlowFile here. If the processor is set to 'Stateless' then all FlowFiles will " + - "route to success").name("set state fail").build(); public UpdateAttribute() { - final Set relationshipSet = new HashSet<>(); - relationshipSet.add(REL_SUCCESS); - relationshipSet.add(REL_FAILED_SET_STATE); - relationships = Collections.unmodifiableSet(relationshipSet); + relationships = statelessRelationshipSet; } @Override @@ -197,7 +214,13 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String super.onPropertyModified(descriptor, oldValue, newValue); if (descriptor.equals(STORE_STATE)) { - stateful = "true".equalsIgnoreCase(newValue); + if (DO_NOT_STORE_STATE.equals(newValue)){ + stateful = false; + relationships = statelessRelationshipSet; + } else { + stateful = true; + relationships = statefulRelationshipSet; + } } } @@ -217,8 +240,8 @@ public void onScheduled(final ProcessContext context) throws IOException { // Initialize the stateful default actions for (PropertyDescriptor entry : context.getProperties().keySet()) { if (entry.isDynamic()) { - if(!tempMap.containsKey(entry.getName()+"_state")) { - tempMap.put(entry.getName() + "_state", initValue); + if(!tempMap.containsKey(entry.getName())) { + tempMap.put(entry.getName(), initValue); } } } @@ -228,8 +251,8 @@ public void onScheduled(final ProcessContext context) throws IOException { if (criteria != null) { for (Rule rule : criteria.getRules()) { for (Action action : rule.getActions()) { - if (!tempMap.containsKey(action.getAttribute() + "_state")) { - tempMap.put(action.getAttribute() + "_state", initValue); + if (!tempMap.containsKey(action.getAttribute())) { + tempMap.put(action.getAttribute(), initValue); } } } @@ -496,11 +519,11 @@ private PropertyValue getPropertyValue(final String text, final ProcessContext c return currentValue; } - //Evaluates the specified condition on the specified flowfile. + // Evaluates the specified condition on the specified flowfile. private boolean evaluateCondition(final ProcessContext context, final Condition condition, final FlowFile flowfile, final Map statefulAttributes) { try { // evaluate the expression for the given flow file - return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile, statefulAttributes).asBoolean(); + return getPropertyValue(condition.getExpression(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).asBoolean(); } catch (final ProcessException pe) { throw new ProcessException(String.format("Unable to evaluate condition '%s': %s.", condition.getExpression(), pe), pe); } @@ -552,7 +575,7 @@ private FlowFile executeActions(final ProcessSession session, final ProcessConte for (final Action action : actions.values()) { if (!action.getAttribute().equals(DELETE_ATTRIBUTES.getName())) { try { - final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, statefulAttributes).getValue(); + final String newAttributeValue = getPropertyValue(action.getValue(), context).evaluateAttributeExpressions(flowfile, null, null, statefulAttributes).getValue(); // log if appropriate if (logger.isDebugEnabled()) { @@ -561,7 +584,7 @@ private FlowFile executeActions(final ProcessSession session, final ProcessConte if (statefulAttributesToSet != null) { if(!action.getAttribute().equals("UpdateAttribute.matchedRule")) { - statefulAttributesToSet.put(action.getAttribute() + "_state", newAttributeValue); + statefulAttributesToSet.put(action.getAttribute(), newAttributeValue); } } diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html index 6574fd24829a..5c59ded4a9e6 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html @@ -252,7 +252,7 @@

    Description:

    - By selecting "Stateful" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but + By selecting "store state locally" option for the "Store State" property UpdateAttribute will not only store the evaluated properties as attributes of the FlowFile but also as stateful variables to be referenced in a recursive fashion. This enables the processor to calculate things like the sum or count of incoming FlowFiles. A dynamic property can be referenced as a stateful variable like so: @@ -260,14 +260,14 @@

    Description:

  • Dynamic Property
    • key : theCount
    • -
    • value : ${theCount_state:plus(1)}
    • +
    • value : ${getStateValue("theCount"):plus(1)}
  • This example will keep a count of the total number of FlowFiles that have passed through the processor. To use logic on top of State, simply use the "Advanced Usage" of UpdateAttribute. All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the - flow so far. This would be done by having a condition of "${maxValue_state:lt(${value})}" and an action of attribute:"maxValue", value:"${value}". + flow so far. This would be done by having a condition of "${getStateValue("maxValue"):lt(${value})}" and an action of attribute:"maxValue", value:"${value}". The "Stateful Variables Initial Value" property is used to initialize the stateful variables. Some logic rules will require a very high initial value, like using the Advanced rules to determine the minimum value. @@ -281,21 +281,21 @@

    Description:

  • Count
    • key : theCount
    • -
    • value : ${theCount_state:plus(1)}
    • +
    • value : ${getStateValue("theCount"):plus(1)}
  • Sum
    • key : theSum
    • -
    • value : ${theSum_state:plus(${flowfileValue})}
    • +
    • value : ${getStateValue("theSum"):plus(${flowfileValue})}
  • Average
    • key : theAverage
    • -
    • value : ${theSum_state:divide(theCount_state)}
    • +
    • value : ${getStateValue("theSum"):divide(getStateValue("theCount"))}
  • @@ -307,6 +307,9 @@

    Description:

    If the processor is able to get the state at the beginning of the onTrigger but unable to set the state after adding attributes to the FlowFile, the FlowFile will be transferred to "set state fail". This is normally due to the state not being the most up to date version (another thread has replaced the state with another version). In most use-cases this relationship should loop back to the processor since the only affected attributes will be overwritten. + + Note: Currently the only "stateful" option is to store state locally. This is done because the current implementation of clustered state relies on Zookeeper and Zookeeper isn't designed + for the type of load/throughput UpdateAttribute with state would demand. In the future, if/when multiple different clustered state options are added, UpdateAttribute will be updated.

    diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index 7178e3d4eff7..749f28279e70 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -34,6 +34,7 @@ import org.junit.Test; +import static org.apache.nifi.processors.attributes.UpdateAttribute.STORE_STATE_LOCALLY; import static org.junit.Assert.assertEquals; /** @@ -121,9 +122,9 @@ public void testDefaultAddAttribute() throws Exception { @Test public void testDefaultState() throws Exception { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STORE_STATE, "true"); - runner.setProperty("count", "${count_state:plus(1)}"); - runner.setProperty("sum", "${sum_state:plus(${pencils})}"); + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); + runner.setProperty("count", "${getStateValue('count'):plus(1)}"); + runner.setProperty("sum", "${getStateValue('sum'):plus(${pencils})}"); runner.assertValid(); @@ -152,10 +153,10 @@ public void testDefaultState() throws Exception { @Test public void testStateWithInitValue() throws Exception { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STORE_STATE, "true"); + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "10"); - runner.setProperty("count", "${count_state:plus(1)}"); - runner.setProperty("sum", "${sum_state:plus(${pencils})}"); + runner.setProperty("count", "${getStateValue('count'):plus(1)}"); + runner.setProperty("sum", "${getStateValue('sum'):plus(${pencils})}"); runner.assertValid(); @@ -186,12 +187,12 @@ public void testRuleHitWithState() throws Exception { final Criteria criteria = getCriteria(); addRule(criteria, "rule", Arrays.asList( // conditions - "${maxValue_state:lt(${value})}"), getMap( + "${getStateValue('maxValue'):lt(${value})}"), getMap( // actions "maxValue", "${value}")); TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STORE_STATE, "true"); + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); runner.setAnnotationData(serialize(criteria)); final Map attributes = new HashMap<>(); @@ -219,14 +220,14 @@ public void testRuleHitWithStateWithDefault() throws Exception { final Criteria criteria = getCriteria(); addRule(criteria, "rule", Arrays.asList( // conditions - "${maxValue_state:lt(${value})}"), getMap( + "${getStateValue('maxValue'):lt(${value})}"), getMap( // actions "maxValue", "${value}")); TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STORE_STATE, "true"); + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); runner.setAnnotationData(serialize(criteria)); - runner.setProperty("maxValue", "${maxValue_state}"); + runner.setProperty("maxValue", "${getStateValue('maxValue')}"); final Map attributes = new HashMap<>(); attributes.put("value", "1"); @@ -253,12 +254,12 @@ public void testRuleHitWithStateWithInitValue() throws Exception { final Criteria criteria = getCriteria(); addRule(criteria, "rule", Arrays.asList( // conditions - "${minValue_state:ge(${value})}"), getMap( + "${getStateValue('minValue'):ge(${value})}"), getMap( // actions "minValue", "${value}")); TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STORE_STATE, "true"); + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "5"); runner.setAnnotationData(serialize(criteria)); @@ -286,16 +287,16 @@ public void testMultipleRulesWithStateAndDelete() throws Exception { final Criteria criteria = getCriteria(); addRule(criteria, "rule", Arrays.asList( // conditions - "${maxValue_state:lt(${value})}"), getMap( + "${getStateValue('maxValue'):lt(${value})}"), getMap( // actions "maxValue", "${value}")); TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); - runner.setProperty(UpdateAttribute.STORE_STATE, "true"); + runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "badValue"); runner.setAnnotationData(serialize(criteria)); - runner.setProperty("maxValue", "${maxValue_state}"); - runner.setProperty("theCount", "${theCount_state:plus(1)}"); + runner.setProperty("maxValue", "${getStateValue('maxValue')}"); + runner.setProperty("theCount", "${getStateValue('theCount'):plus(1)}"); final Map attributes = new HashMap<>(); attributes.put("value", "1"); From 45d58825845115cfa8775f1bf305f9764d925418 Mon Sep 17 00:00:00 2001 From: jpercivall Date: Tue, 6 Dec 2016 14:54:52 -0500 Subject: [PATCH 5/6] NIFI-1582 Removing init state value --- .../nifi/processors/attributes/UpdateAttribute.java | 9 ++++++++- .../nifi/update/attributes/TestUpdateAttribute.java | 7 ++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index f220e795dc50..c11f4029a744 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -166,7 +166,6 @@ public ValidationResult validate(String subject, String input, ValidationContext .description("If using state to set/reference variables then this value is used to set the initial value of the stateful variable. This will only be used in the @OnScheduled method " + "when state does not contain a value for the variable.") .required(false) - .defaultValue("0") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -266,6 +265,14 @@ public void onScheduled(final ProcessContext context) throws IOException { protected Collection customValidate(final ValidationContext context) { final List reasons = new ArrayList<>(super.customValidate(context)); + if (!context.getProperty(STORE_STATE).getValue().equals(DO_NOT_STORE_STATE)){ + String initValue = context.getProperty(STATEFUL_VARIABLES_INIT_VALUE).getValue(); + if (initValue == null){ + reasons.add(new ValidationResult.Builder().subject(STATEFUL_VARIABLES_INIT_VALUE.getDisplayName()).valid(false) + .explanation("initial state value much be set if the processor is configured to store state.").build()); + } + } + Criteria criteria = null; try { criteria = CriteriaSerDe.deserialize(context.getAnnotationData()); diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java index 749f28279e70..1812a509f32a 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/test/java/org/apache/nifi/update/attributes/TestUpdateAttribute.java @@ -120,12 +120,14 @@ public void testDefaultAddAttribute() throws Exception { } @Test - public void testDefaultState() throws Exception { + public void testBasicState() throws Exception { final TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); runner.setProperty("count", "${getStateValue('count'):plus(1)}"); runner.setProperty("sum", "${getStateValue('sum'):plus(${pencils})}"); + runner.assertNotValid(); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0"); runner.assertValid(); final Map attributes2 = new HashMap<>(); @@ -193,6 +195,7 @@ public void testRuleHitWithState() throws Exception { TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0"); runner.setAnnotationData(serialize(criteria)); final Map attributes = new HashMap<>(); @@ -226,6 +229,7 @@ public void testRuleHitWithStateWithDefault() throws Exception { TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0"); runner.setAnnotationData(serialize(criteria)); runner.setProperty("maxValue", "${getStateValue('maxValue')}"); @@ -294,6 +298,7 @@ public void testMultipleRulesWithStateAndDelete() throws Exception { TestRunner runner = TestRunners.newTestRunner(new UpdateAttribute()); runner.setProperty(UpdateAttribute.STORE_STATE, STORE_STATE_LOCALLY); runner.setProperty(UpdateAttribute.DELETE_ATTRIBUTES, "badValue"); + runner.setProperty(UpdateAttribute.STATEFUL_VARIABLES_INIT_VALUE, "0"); runner.setAnnotationData(serialize(criteria)); runner.setProperty("maxValue", "${getStateValue('maxValue')}"); runner.setProperty("theCount", "${getStateValue('theCount'):plus(1)}"); From dcb77f766c90cbafd2e2a4b59b70c2f2a44d7c31 Mon Sep 17 00:00:00 2001 From: jpercivall Date: Tue, 6 Dec 2016 16:00:05 -0500 Subject: [PATCH 6/6] NIFI-1582 Adding documentation for the changes to Init State value --- .../nifi/processors/attributes/UpdateAttribute.java | 8 ++++---- .../additionalDetails.html | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java index c11f4029a744..22e558bd25c9 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/java/org/apache/nifi/processors/attributes/UpdateAttribute.java @@ -155,8 +155,8 @@ public ValidationResult validate(String subject, String input, ValidationContext public static final PropertyDescriptor STORE_STATE = new PropertyDescriptor.Builder() .name("Store State") .description("Select whether or not state will be stored. Selecting 'Stateless' will offer the default functionality of purely updating the attributes on a " + - "FlowFile in a stateless manner. Selecting 'Stateful' will not only store the attributes on the FlowFile but also in the Processors state. See the 'Stateful Usage' " + - "topic of the 'Additional Details' section of this processor's documentation for more information") + "FlowFile in a stateless manner. Selecting a stateful option will not only store the attributes on the FlowFile but also in the Processors " + + "state. See the 'Stateful Usage' topic of the 'Additional Details' section of this processor's documentation for more information") .required(true) .allowableValues(DO_NOT_STORE_STATE, STORE_STATE_LOCALLY) .defaultValue(DO_NOT_STORE_STATE) @@ -164,9 +164,9 @@ public ValidationResult validate(String subject, String input, ValidationContext public static final PropertyDescriptor STATEFUL_VARIABLES_INIT_VALUE = new PropertyDescriptor.Builder() .name("Stateful Variables Initial Value") .description("If using state to set/reference variables then this value is used to set the initial value of the stateful variable. This will only be used in the @OnScheduled method " + - "when state does not contain a value for the variable.") + "when state does not contain a value for the variable. This is required if running statefully but can be empty if needed.") .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(Validator.VALID) .build(); diff --git a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html index 5c59ded4a9e6..8a60c8fb87b3 100644 --- a/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html +++ b/nifi-nar-bundles/nifi-update-attribute-bundle/nifi-update-attribute-processor/src/main/resources/docs/org.apache.nifi.processors.attributes.UpdateAttribute/additionalDetails.html @@ -269,7 +269,7 @@

    Description:

    All Actions will be stored as stateful attributes as well as being added to FlowFiles. Using the "Advanced Usage" it is possible to keep track of things like a maximum value of the flow so far. This would be done by having a condition of "${getStateValue("maxValue"):lt(${value})}" and an action of attribute:"maxValue", value:"${value}". - The "Stateful Variables Initial Value" property is used to initialize the stateful variables. Some logic rules will require a very high initial value, like using the Advanced rules to + The "Stateful Variables Initial Value" property is used to initialize the stateful variables and is required to be set if running statefully. Some logic rules will require a very high initial value, like using the Advanced rules to determine the minimum value.