From d051399b7c9f277799e60b5772138996be8528eb Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Wed, 8 Feb 2017 00:13:23 +0900 Subject: [PATCH 1/4] NIFI-3414: Added EnforceOrder processor Use it with FirstInFirstOutPrioritizer, it can enforce original ordering of 'out-of-order' FlowFiles. nifi-mock is modified to support FlowFile assertion using Prioritizer. --- .../org/apache/nifi/util/MockFlowFile.java | 16 +- .../apache/nifi/util/MockProcessSession.java | 39 +- .../nifi-standard-processors/pom.xml | 5 + .../processors/standard/EnforceOrder.java | 531 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../processors/standard/TestEnforceOrder.java | 436 ++++++++++++++ 6 files changed, 1004 insertions(+), 24 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index df87de59c55d..b23f2eb64d3a 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -51,10 +51,14 @@ public class MockFlowFile implements FlowFileRecord { private byte[] data = new byte[0]; + private long lastEnqueuedDate = 0; + private long enqueuedIndex = 0; + public MockFlowFile(final long id) { this.creationTime = System.nanoTime(); this.id = id; entryDate = System.currentTimeMillis(); + lastEnqueuedDate = entryDate; attributes.put(CoreAttributes.FILENAME.key(), String.valueOf(System.nanoTime()) + ".mockFlowFile"); attributes.put(CoreAttributes.PATH.key(), "target"); @@ -290,7 +294,11 @@ public byte[] toByteArray() { @Override public Long getLastQueueDate() { - return entryDate; + return lastEnqueuedDate; + } + + public void setLastEnqueuedDate(long lastEnqueuedDate) { + this.lastEnqueuedDate = lastEnqueuedDate; } @Override @@ -315,7 +323,11 @@ public long getLineageStartIndex() { @Override public long getQueueDateIndex() { - return 0; + return enqueuedIndex; + } + + public void setEnqueuedIndex(long enqueuedIndex) { + this.enqueuedIndex = enqueuedIndex; } public boolean isAttributeEqual(final String attributeName, final String expectedValue) { diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java index 3228e1f0a9b9..faf6e42988b4 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java @@ -38,6 +38,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -79,6 +80,8 @@ public class MockProcessSession implements ProcessSession { private boolean rolledback = false; private final Set removedFlowFiles = new HashSet<>(); + private static final AtomicLong enqueuedIndex = new AtomicLong(0L); + public MockProcessSession(final SharedSessionState sharedState, final Processor processor) { this.processor = processor; this.sharedState = sharedState; @@ -715,8 +718,18 @@ public void transfer(final FlowFile flowFile) { throw new IllegalArgumentException("I only accept MockFlowFile"); } + final MockFlowFile mockFlowFile = (MockFlowFile) flowFile; beingProcessed.remove(flowFile.getId()); - processorQueue.offer((MockFlowFile) flowFile); + processorQueue.offer(mockFlowFile); + updateLastQueuedDate(mockFlowFile); + + } + + private void updateLastQueuedDate(MockFlowFile mockFlowFile) { + // Simulate StandardProcessSession.updateLastQueuedDate, + // which is called when a flow file is transferred to a relationship. + mockFlowFile.setLastEnqueuedDate(System.currentTimeMillis()); + mockFlowFile.setEnqueuedIndex(enqueuedIndex.incrementAndGet()); } @Override @@ -737,14 +750,11 @@ public void transfer(final FlowFile flowFile, final Relationship relationship) { } validateState(flowFile); - List list = transferMap.get(relationship); - if (list == null) { - list = new ArrayList<>(); - transferMap.put(relationship, list); - } + List list = transferMap.computeIfAbsent(relationship, r -> new ArrayList<>()); beingProcessed.remove(flowFile.getId()); list.add((MockFlowFile) flowFile); + updateLastQueuedDate((MockFlowFile) flowFile); } @Override @@ -753,23 +763,8 @@ public void transfer(final Collection flowFiles, final Relationship re transfer(flowFiles); return; } - if(!processor.getRelationships().contains(relationship)){ - throw new IllegalArgumentException("this relationship " + relationship.getName() + " is not known"); - } - - for (final FlowFile flowFile : flowFiles) { - validateState(flowFile); - } - - List list = transferMap.get(relationship); - if (list == null) { - list = new ArrayList<>(); - transferMap.put(relationship, list); - } - for (final FlowFile flowFile : flowFiles) { - beingProcessed.remove(flowFile.getId()); - list.add((MockFlowFile) flowFile); + transfer(flowFile, relationship); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 1a18b0869d65..dc7730923fc9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -214,6 +214,11 @@ language governing permissions and limitations under the License. --> nifi-ssl-context-service test + + org.apache.nifi + nifi-standard-prioritizers + test + org.apache.derby derby diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java new file mode 100644 index 000000000000..2367d9945eb3 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java @@ -0,0 +1,531 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.StringUtils.isBlank; + +@EventDriven +@Tags({"sort", "order"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@TriggerSerially +@CapabilityDescription("Enforces expected ordering of FlowFiles those belong to the same data group. " + + " Although PriorityAttributePrioritizer can be used on a connection to ensure that flow files going through that connection are in priority order," + + " depending on error-handling, branching, and other flow designs, it is possible for FlowFiles to get out-of-order." + + " EnforceOrder can be used to enforce original ordering for those FlowFiles." + + " [IMPORTANT] In order to take effect of EnforceOrder, FirstInFirstOutPrioritizer should be used at EVERY downstream relationship" + + " UNTIL the order of FlowFiles physically get FIXED by operation such as MergeContent or being stored to the final destination.") +@Stateful(scopes = Scope.LOCAL, description = "EnforceOrder uses following states per ordering group:" + + " '.target' is a order number which is being waited to arrive next." + + " When a FlowFile with a matching order arrives, or a FlowFile overtakes the FlowFile being waited for because of wait timeout," + + " target order will be updated to (FlowFile.order + 1)." + + " '.max is the maximum order number for a group." + + " '.updatedAt' is a timestamp when the order of a group was updated last time." + + " These managed states will be removed automatically once a group is determined as inactive, see 'Inactive Timeout' for detail.") +@WritesAttributes({ + @WritesAttribute(attribute = EnforceOrder.ATTR_STARTED_AT, + description = "All FlowFiles going through this processor will have this attribute. This value is used to determine wait timeout."), + @WritesAttribute(attribute = EnforceOrder.ATTR_RESULT, + description = "All FlowFiles going through this processor will have this attribute denoting which relationship it was routed to."), + @WritesAttribute(attribute = EnforceOrder.ATTR_DETAIL, + description = "FlowFiles routed to 'failure' or 'skipped' relationship will have this attribute describing details."), + @WritesAttribute(attribute = EnforceOrder.ATTR_EXPECTED_ORDER, + description = "FlowFiles routed to 'wait' or 'skipped' relationship will have this attribute denoting expected order when the FlowFile was processed.") +}) +public class EnforceOrder extends AbstractProcessor { + + public static final String ATTR_STARTED_AT = "EnforceOrder.startedAt"; + public static final String ATTR_EXPECTED_ORDER = "EnforceOrder.expectedOrder"; + public static final String ATTR_RESULT = "EnforceOrder.result"; + public static final String ATTR_DETAIL = "EnforceOrder.detail"; + private static final Function STATE_TARGET_ORDER = groupId -> groupId + ".target"; + private static final String STATE_SUFFIX_UPDATED_AT = ".updatedAt"; + private static final Function STATE_UPDATED_AT = groupId -> groupId + STATE_SUFFIX_UPDATED_AT; + private static final Function STATE_MAX_ORDER = groupId -> groupId + ".max"; + + public static final PropertyDescriptor GROUP_IDENTIFIER = new PropertyDescriptor.Builder() + .name("Group Identifier") + .description("EnforceOrder is capable of multiple ordering groups." + + " 'Group Identifier' is used to determine which group a FlowFile belongs to." + + " This property will be evaluated with each incoming FlowFile." + + " If evaluated result is empty, the FlowFile will be routed to failure.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("${filename}") + .build(); + + public static final PropertyDescriptor ORDER_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("Order Attribute") + .description("A name of FlowFile attribute whose value will be used to enforce order of FlowFiles within a group." + + " If a FlowFile does not have this attribute, or its value is not an integer, the FlowFile will be routed to failure.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor INITIAL_ORDER = new PropertyDescriptor.Builder() + .name("Initial Order") + .description("When the first FlowFile of a group arrives, initial target order will be computed and stored in the managed state." + + " After that, target order will start being tracked by EnforceOrder and stored in the state management store." + + " If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure," + + " and initial order will be left unknown until consecutive FlowFiles provide a valid initial order.") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("1") + .build(); + + public static final PropertyDescriptor MAX_ORDER = new PropertyDescriptor.Builder() + .name("Maximum Order") + .description("If specified, any FlowFiles that has larger order will be routed to failure." + + " This property is computed only once for a given group." + + " After a maximum order is computed, it will be persisted in the state management store and used for other FlowFiles belonging to the same group." + + " If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure," + + " and maximum order will be left unknown until consecutive FlowFiles provide a valid maximum order.") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor WAIT_TIMEOUT = new PropertyDescriptor.Builder() + .name("Wait Timeout") + .description("Indicates the duration after which waiting FlowFiles will be routed to the 'overtook' relationship.") + .required(true) + .defaultValue("10 min") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor INACTIVE_TIMEOUT = new PropertyDescriptor.Builder() + .name("Inactive Timeout") + .description("Indicates the duration after which state for an inactive group will be cleared from managed state." + + " Group is determined as inactive if any new incoming FlowFile has not seen for a group for specified duration." + + " Inactive Timeout must be longer than Wait Timeout." + + " If a FlowFile arrives late after its group is already cleared, it will be treated as a brand new group," + + " but will never match the order since expected preceding FlowFiles are already gone." + + " The FlowFile will eventually timeout for waiting and routed to 'overtook'." + + " To avoid this, group states should be kept long enough, however, shorter duration would be helpful for reusing the same group identifier again.") + .required(true) + .defaultValue("30 min") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor BATCH_COUNT = new PropertyDescriptor.Builder() + .name("Batch Count") + .description("The maximum number of FlowFiles that EnforceOrder can process at an execution.") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile with a matching order number will be routed to this relationship.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFiles which does not have required attributes, or fails to compute those will be routed to this relationship") + .build(); + + public static final Relationship REL_WAIT = new Relationship.Builder() + .name("wait") + .description("A FlowFile with non matching order will be routed to this relationship") + .build(); + + public static final Relationship REL_OVERTOOK = new Relationship.Builder() + .name("overtook") + .description("A FlowFile that waited for preceding FlowFiles longer than Wait Timeout and overtook those FlowFiles, will be routed to this relationship.") + .build(); + + public static final Relationship REL_SKIPPED = new Relationship.Builder() + .name("skipped") + .description("A FlowFile that has an order younger than current, which means arrived too late and skipped, will be routed to this relationship.") + .build(); + + private final Set relationships; + + public EnforceOrder() { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_WAIT); + rels.add(REL_OVERTOOK); + rels.add(REL_FAILURE); + rels.add(REL_SKIPPED); + relationships = Collections.unmodifiableSet(rels); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(GROUP_IDENTIFIER); + descriptors.add(ORDER_ATTRIBUTE); + descriptors.add(INITIAL_ORDER); + descriptors.add(MAX_ORDER); + descriptors.add(BATCH_COUNT); + descriptors.add(WAIT_TIMEOUT); + descriptors.add(INACTIVE_TIMEOUT); + return descriptors; + } + + @Override + public Set getRelationships() { + return relationships; + } + + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(super.customValidate(validationContext)); + + final Long waitTimeoutMillis = validationContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS); + final Long inactiveTimeoutMillis = validationContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MICROSECONDS); + + if (waitTimeoutMillis >= inactiveTimeoutMillis) { + results.add(new ValidationResult.Builder().input(validationContext.getProperty(INACTIVE_TIMEOUT).getValue()) + .subject(INACTIVE_TIMEOUT.getDisplayName()) + .explanation(String.format("%s should be longer than %s", + INACTIVE_TIMEOUT.getDisplayName(), WAIT_TIMEOUT.getDisplayName())) + .valid(false) + .build()); + } + + return results; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + final ComponentLog logger = getLogger(); + final Integer batchCount = context.getProperty(BATCH_COUNT).asInteger(); + + final StateMap stateMap; + try { + stateMap = context.getStateManager().getState(Scope.LOCAL); + } catch (final IOException e) { + logger.error("Failed to retrieve state from StateManager due to {}" + e, e); + context.yield(); + return; + } + + final OrderingContext oc = new OrderingContext(context, session); + + oc.groupStates.putAll(stateMap.toMap()); + + for (int i = 0; i < batchCount; i++) { + + oc.setFlowFile(session.get()); + if (oc.flowFile == null) { + break; + } + + if (!oc.computeGroupId() + || !oc.computeOrder() + || !oc.computeInitialOrder() + || !oc.computeMaxOrder()) { + continue; + } + + // At this point, the flow file is confirmed to be valid. + oc.markFlowFileValied(); + } + + oc.transferFlowFiles(); + + oc.cleanupInactiveStates(); + + try { + context.getStateManager().setState(oc.groupStates, Scope.LOCAL); + } catch (final IOException e) { + throw new RuntimeException("Failed to update state due to " + e + + ". Session will be rollback and processor will be yielded for a while.", e); + } + + } + + private class OrderingContext { + + private final ComponentLog logger = getLogger(); + private final ProcessSession processSession; + private final ProcessContext processContext; + + // Following properties are static global setting for all groups. + private final String orderAttribute; + private final Long waitTimeoutMillis; + private final Function getOrder; + + private final Map groupStates = new HashMap<>(); + private final long now = System.currentTimeMillis(); + + // Following properties are computed per flow file. + private final PropertyValue groupIdentifierProperty ; + + // Followings are per group objects. + private final PropertyValue initOrderProperty; + private final PropertyValue maxOrderProperty; + private final Map> flowFileGroups = new TreeMap<>(); + + // Current variables within incoming FlowFiles loop. + private FlowFile flowFile; + private String groupId; + private Integer order; + + private OrderingContext(final ProcessContext processContext, final ProcessSession processSession) { + this.processContext = processContext; + this.processSession = processSession; + + orderAttribute = processContext.getProperty(ORDER_ATTRIBUTE).getValue(); + waitTimeoutMillis = processContext.getProperty(WAIT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + getOrder = flowFile -> Integer.parseInt(flowFile.getAttribute(orderAttribute)); + + + groupIdentifierProperty = processContext.getProperty(GROUP_IDENTIFIER); + + initOrderProperty = processContext.getProperty(INITIAL_ORDER); + maxOrderProperty = processContext.getProperty(MAX_ORDER); + } + + private void setFlowFile(final FlowFile flowFile) { + this.flowFile = flowFile; + this.groupId = null; + this.order = null; + } + + private boolean computeGroupId() { + groupId = groupIdentifierProperty.evaluateAttributeExpressions(flowFile).getValue(); + if (isBlank(groupId)) { + transferToFailure(flowFile, "Failed to get Group Identifier."); + return false; + } + return true; + } + + private boolean computeOrder() { + try { + order = getOrder.apply(flowFile); + } catch (final NumberFormatException e) { + transferToFailure(flowFile, "Failed to parse order attribute due to " + e, e); + return false; + } + return true; + } + + private boolean computeMaxOrder() { + if (maxOrderProperty.isSet()) { + // Compute maxOrder for this group if it's not there yet. + final String maxOrderStr = groupStates.computeIfAbsent(STATE_MAX_ORDER.apply(groupId), + k -> maxOrderProperty.evaluateAttributeExpressions(flowFile).getValue()); + if (isBlank(maxOrderStr)) { + transferToFailure(flowFile, String.format("%s was specified but result was empty.", MAX_ORDER.getDisplayName())); + return false; + } + + final Integer maxOrder; + try { + maxOrder = Integer.parseInt(maxOrderStr); + } catch (final NumberFormatException e) { + final String msg = String.format("Failed to get Maximum Order for group [%s] due to %s", groupId, e); + transferToFailure(flowFile, msg, e); + return false; + } + + // Check max order. + if (order > maxOrder) { + final String msg = String.format("Order (%d) is greater than the Maximum Order (%d) for Group [%s]", order, maxOrder, groupId); + transferToFailure(flowFile, msg); + return false; + } + } + return true; + } + + private boolean computeInitialOrder() { + // Compute initial order. Use asInteger() to check if it's a valid integer. + final String stateKeyOrder = STATE_TARGET_ORDER.apply(groupId); + try { + final AtomicReference computedInitOrder = new AtomicReference<>(); + groupStates.computeIfAbsent(stateKeyOrder, k -> { + final String initOrderStr = initOrderProperty.evaluateAttributeExpressions(flowFile).getValue(); + final int initOrder = Integer.parseInt(initOrderStr); + computedInitOrder.set(initOrderStr); + return initOrderStr; + }); + // If these map modification is in the computeIfAbsent function, it causes this issue. + // JDK-8071667 : HashMap.computeIfAbsent() adds entry that HashMap.get() does not find. + // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=8071667 + if (!isBlank(computedInitOrder.get())) { + groupStates.put(STATE_UPDATED_AT.apply(groupId), String.valueOf(now)); + } + + } catch (final NumberFormatException e) { + final String msg = String.format("Failed to get Initial Order for Group [%s] due to %s", groupId, e); + transferToFailure(flowFile, msg, e); + return false; + } + return true; + } + + private void markFlowFileValied() { + final List groupedFlowFiles = flowFileGroups.computeIfAbsent(groupId, k -> new ArrayList<>()); + + final FlowFile validFlowFile; + if (isBlank(flowFile.getAttribute(ATTR_STARTED_AT))) { + validFlowFile = processSession.putAttribute(flowFile, ATTR_STARTED_AT, String.valueOf(now)); + } else { + validFlowFile = flowFile; + } + + groupedFlowFiles.add(validFlowFile); + } + + private void transferFlowFiles() { + flowFileGroups.entrySet().stream().filter(entry -> !entry.getValue().isEmpty()).map(entry -> { + // Sort flow files within each group. + final List groupedFlowFiles = entry.getValue(); + groupedFlowFiles.sort(Comparator.comparing(getOrder)); + return entry; + }).forEach(entry -> { + // Check current state. + final String groupId = entry.getKey(); + final String stateKeyOrder = STATE_TARGET_ORDER.apply(groupId); + final int previousTargetOrder = Integer.parseInt(groupStates.get(stateKeyOrder)); + final AtomicInteger targetOrder = new AtomicInteger(previousTargetOrder); + final List groupedFlowFiles = entry.getValue(); + final String maxOrderStr = groupStates.get(STATE_MAX_ORDER.apply(groupId)); + + groupedFlowFiles.forEach(f -> { + final Integer order = getOrder.apply(f); + final boolean isMaxOrder = !isBlank(maxOrderStr) && order.equals(Integer.parseInt(maxOrderStr)); + + if (order == targetOrder.get()) { + transferResult(f, REL_SUCCESS, null, null); + if (!isMaxOrder) { + // If max order is specified and this FlowFile has the max order, don't increment target anymore. + targetOrder.incrementAndGet(); + } + + } else if (order > targetOrder.get()) { + + if (now - Long.parseLong(f.getAttribute(ATTR_STARTED_AT)) > waitTimeoutMillis) { + transferResult(f, REL_OVERTOOK, null, targetOrder.get()); + targetOrder.set(isMaxOrder ? order : order + 1); + } else { + transferResult(f, REL_WAIT, null, targetOrder.get()); + } + + } else { + final String msg = String.format("Skipped, FlowFile order was %d but current target is %d", order, targetOrder.get()); + logger.warn(msg + ". {}", new Object[]{f}); + transferResult(f, REL_SKIPPED, msg, targetOrder.get()); + } + + }); + + if (previousTargetOrder != targetOrder.get()) { + groupStates.put(stateKeyOrder, String.valueOf(targetOrder.get())); + groupStates.put(STATE_UPDATED_AT.apply(groupId), String.valueOf(now)); + } + }); + } + + private FlowFile transferResult(final FlowFile flowFile, final Relationship result, final String detail, final Integer expectedOrder) { + final Map attributes = new HashMap<>(); + attributes.put(ATTR_RESULT, result.getName()); + if (expectedOrder != null) { + attributes.put(ATTR_EXPECTED_ORDER, expectedOrder.toString()); + } + if (!isBlank(detail)) { + attributes.put(ATTR_DETAIL, detail); + } + final FlowFile resultFlowFile = processSession.putAllAttributes(flowFile, attributes); + processSession.transfer(resultFlowFile, result); + return resultFlowFile; + } + + private void transferToFailure(final FlowFile flowFile, final String message) { + transferToFailure(flowFile, message, null); + } + + private void transferToFailure(final FlowFile flowFile, final String message, final Throwable cause) { + if (cause != null) { + getLogger().warn(message + " {}", new Object[]{flowFile}, cause); + } else { + getLogger().warn(message + " {}", new Object[]{flowFile}); + } + transferResult(flowFile, REL_FAILURE, message, null); + } + + private void cleanupInactiveStates() { + final Long inactiveTimeout = processContext.getProperty(INACTIVE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); + final List inactiveGroups = groupStates.keySet().stream() + .filter(k -> k.endsWith(STATE_SUFFIX_UPDATED_AT) && (now - Long.parseLong(groupStates.get(k)) > inactiveTimeout)) + .map(k -> k.substring(0, k.length() - STATE_SUFFIX_UPDATED_AT.length())) + .collect(Collectors.toList()); + inactiveGroups.forEach(groupId -> { + groupStates.remove(STATE_TARGET_ORDER.apply(groupId)); + groupStates.remove(STATE_UPDATED_AT.apply(groupId)); + groupStates.remove(STATE_MAX_ORDER.apply(groupId)); + }); + } + + } + + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index cfcc85afb3a3..9de5ab65ea94 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -23,6 +23,7 @@ org.apache.nifi.processors.standard.DetectDuplicate org.apache.nifi.processors.standard.DistributeLoad org.apache.nifi.processors.standard.DuplicateFlowFile org.apache.nifi.processors.standard.EncryptContent +org.apache.nifi.processors.standard.EnforceOrder org.apache.nifi.processors.standard.EvaluateJsonPath org.apache.nifi.processors.standard.EvaluateXPath org.apache.nifi.processors.standard.EvaluateXQuery diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java new file mode 100644 index 000000000000..161e6bf4a847 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.prioritizer.FirstInFirstOutPrioritizer; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestEnforceOrder { + + @Test + public void testDefaultPropertyValidation() { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + // Default values should not be valid. + runner.assertNotValid(); + + // Set required properties. + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.assertValid(); + } + + @Test + public void testCustomPropertyValidation() { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + // Set required properties. + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.assertValid(); + + // Inactive Timeout should be longer than Wait Timeout + runner.setProperty(EnforceOrder.WAIT_TIMEOUT, "30 sec"); + runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "29 sec"); + runner.assertNotValid(); + + // Inactive Timeout should be longer than Wait Timeout + runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "30 sec"); + runner.assertNotValid(); + + // Inactive Timeout should be longer than Wait Timeout + runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "31 sec"); + runner.assertValid(); + } + + + private static class Ordered { + private final Map map = new HashMap<>(); + private Ordered(final int index) { + map.put("index", String.valueOf(index)); + } + + private static Ordered i(final int index) { + return new Ordered(index); + } + + private static Ordered i(final String group, final int index) { + return new Ordered(index).put("group", group); + } + + private Ordered put(final String key, final String value) { + map.put(key, value); + return this; + } + + private Map map() { + return map; + } + + private static MockFlowFile enqueue(final TestRunner runner, final String group, final int index) { + return runner.enqueue(group + "." + index, i(group, index).map()); + } + } + + @Test + public void testSort() { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.assertValid(); + Ordered.enqueue(runner, "b", 1); + Ordered.enqueue(runner, "a", 2); + Ordered.enqueue(runner, "a", 1); + + runner.run(); + + runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SUCCESS, 3); + + final List succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS); + succeeded.sort(new FirstInFirstOutPrioritizer()); + succeeded.get(0).assertContentEquals("a.1"); + succeeded.get(1).assertContentEquals("a.2"); + succeeded.get(2).assertContentEquals("b.1"); + } + + @Test + public void testDuplicatedOrder() { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.assertValid(); + Ordered.enqueue(runner, "b", 1); + Ordered.enqueue(runner, "a", 2); + Ordered.enqueue(runner, "a", 1); + Ordered.enqueue(runner, "a", 2); + Ordered.enqueue(runner, "a", 3); + + runner.run(); + + final List succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS); + assertEquals(4, succeeded.size()); + succeeded.sort(new FirstInFirstOutPrioritizer()); + succeeded.get(0).assertContentEquals("a.1"); + succeeded.get(1).assertContentEquals("a.2"); + succeeded.get(2).assertContentEquals("a.3"); + succeeded.get(3).assertContentEquals("b.1"); + + // It's not possible to distinguish skipped and duplicated, since we only tracks target order number. + final List skipped = runner.getFlowFilesForRelationship(EnforceOrder.REL_SKIPPED); + assertEquals(1, skipped.size()); + skipped.get(0).assertContentEquals("a.2"); + skipped.get(0).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "3"); + } + + @Test + public void testNoGroupIdentifier() { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.assertValid(); + Ordered.enqueue(runner, "b", 1); + Ordered.enqueue(runner, "a", 2); + runner.enqueue("no group id", Ordered.i(1).map()); // without group attribute + Ordered.enqueue(runner, "a", 1); + + runner.run(); + + final List succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS); + assertEquals(3, succeeded.size()); + succeeded.sort(new FirstInFirstOutPrioritizer()); + succeeded.get(0).assertContentEquals("a.1"); + succeeded.get(1).assertContentEquals("a.2"); + succeeded.get(2).assertContentEquals("b.1"); + + final List failed = runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE); + assertEquals(1, failed.size()); + failed.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL); + } + + @Test + public void testIllegalOrderValue() { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.assertValid(); + Ordered.enqueue(runner, "b", 1); + Ordered.enqueue(runner, "a", 2); + runner.enqueue("illegal order", Ordered.i("a", 1).put("index", "non-integer").map()); + Ordered.enqueue(runner, "a", 1); + + runner.run(); + + final List succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS); + assertEquals(3, succeeded.size()); + succeeded.sort(new FirstInFirstOutPrioritizer()); + succeeded.get(0).assertContentEquals("a.1"); + succeeded.get(1).assertContentEquals("a.2"); + succeeded.get(2).assertContentEquals("b.1"); + + final List failed = runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE); + assertEquals(1, failed.size()); + failed.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL); + failed.get(0).assertContentEquals("illegal order"); + } + + @Test + public void testInitialOrderValue() { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.INITIAL_ORDER, "${index.start}"); + runner.setProperty(EnforceOrder.MAX_ORDER, "${index.max}"); + runner.assertValid(); + runner.enqueue("b.0", Ordered.i("b", 0).put("index.start", "0").put("index.max", "99").map()); + runner.enqueue("a.100", Ordered.i("a", 100).put("index.start", "100").put("index.max", "103").map()); + runner.enqueue("a.101", Ordered.i("a", 101).put("index.start", "100").put("index.max", "103").map()); + runner.enqueue("illegal initial order", Ordered.i("c", 1).put("index.start", "non-integer").map()); + runner.enqueue("without initial order", Ordered.i("d", 1).map()); + // Even if this flow file doesn't have initial order attribute, this will be routed to success. + // Because target order for group b is already computed from b.0. + Ordered.enqueue(runner, "b", 1); + + runner.run(); + + List succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS); + assertEquals(4, succeeded.size()); + succeeded.sort(new FirstInFirstOutPrioritizer()); + succeeded.get(0).assertContentEquals("a.100"); + succeeded.get(1).assertContentEquals("a.101"); + succeeded.get(2).assertContentEquals("b.0"); + succeeded.get(3).assertContentEquals("b.1"); + + final List failed = runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE); + assertEquals(2, failed.size()); + failed.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL); + failed.get(0).assertContentEquals("illegal initial order"); + failed.get(1).assertAttributeExists(EnforceOrder.ATTR_DETAIL); + failed.get(1).assertContentEquals("without initial order"); + + final MockStateManager stateManager = runner.getStateManager(); + stateManager.assertStateEquals("a.target", "102", Scope.LOCAL); + stateManager.assertStateEquals("a.max", "103", Scope.LOCAL); + stateManager.assertStateEquals("b.target", "2", Scope.LOCAL); + stateManager.assertStateEquals("b.max", "99", Scope.LOCAL); + + runner.clearTransferState(); + + } + + @Test + public void testMaxOrder() { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${fragment.identifier}"); + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.MAX_ORDER, "${fragment.count}"); + runner.assertValid(); + runner.enqueue("b.1", Ordered.i(1).put("fragment.identifier", "b").put("fragment.count", "3").map()); + runner.enqueue("a.2", Ordered.i(2).put("fragment.identifier", "a").put("fragment.count", "2").map()); + runner.enqueue("without max order", Ordered.i(1).put("fragment.identifier", "c").map()); + runner.enqueue("illegal max order", Ordered.i(1).put("fragment.identifier", "d").put("fragment.count", "X").map()); + runner.enqueue("a.1", Ordered.i(1).put("fragment.identifier", "a").put("fragment.count", "2").map()); + runner.enqueue("a.3", Ordered.i(3).put("fragment.identifier", "a").put("fragment.count", "2").map()); // Exceed max + + runner.run(); + + final List succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS); + succeeded.sort(new FirstInFirstOutPrioritizer()); + assertEquals(3, succeeded.size()); + succeeded.get(0).assertContentEquals("a.1"); + succeeded.get(1).assertContentEquals("a.2"); + succeeded.get(2).assertContentEquals("b.1"); + + final List failed = runner.getFlowFilesForRelationship(EnforceOrder.REL_FAILURE); + assertEquals(3, failed.size()); + failed.get(0).assertContentEquals("without max order"); + failed.get(1).assertContentEquals("illegal max order"); + failed.get(2).assertContentEquals("a.3"); // exceeds max order + + final MockStateManager stateManager = runner.getStateManager(); + stateManager.assertStateEquals("a.target", "2", Scope.LOCAL); + stateManager.assertStateEquals("a.max", "2", Scope.LOCAL); + } + + @Test + public void testWaitOvertakeSkip() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.MAX_ORDER, "10"); + runner.assertValid(); + Ordered.enqueue(runner, "b", 1); + Ordered.enqueue(runner, "a", 2); + Ordered.enqueue(runner, "a", 1); + Ordered.enqueue(runner, "a", 5); // waits for a.3 and a.4 + Ordered.enqueue(runner, "b", 3); // waits for b.2 + Ordered.enqueue(runner, "c", 9); // waits for c.1 to 8 + Ordered.enqueue(runner, "d", 10); // waits for d.1 to 9 + + runner.run(); + + List succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS); + assertEquals(3, succeeded.size()); + final FirstInFirstOutPrioritizer fifo = new FirstInFirstOutPrioritizer(); + succeeded.sort(fifo); + succeeded.get(0).assertContentEquals("a.1"); + succeeded.get(1).assertContentEquals("a.2"); + succeeded.get(2).assertContentEquals("b.1"); + + List waiting = runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT); + assertEquals(4, waiting.size()); + waiting.get(0).assertContentEquals("a.5"); + waiting.get(1).assertContentEquals("b.3"); + waiting.get(2).assertContentEquals("c.9"); + waiting.get(3).assertContentEquals("d.10"); + waiting.get(0).assertAttributeExists("EnforceOrder.startedAt"); + waiting.get(1).assertAttributeExists("EnforceOrder.startedAt"); + waiting.get(2).assertAttributeExists("EnforceOrder.startedAt"); + waiting.get(3).assertAttributeExists("EnforceOrder.startedAt"); + + final MockStateManager stateManager = runner.getStateManager(); + stateManager.assertStateEquals("a.target", "3", Scope.LOCAL); + stateManager.assertStateEquals("b.target", "2", Scope.LOCAL); + stateManager.assertStateEquals("c.target", "1", Scope.LOCAL); + stateManager.assertStateEquals("d.target", "1", Scope.LOCAL); + stateManager.assertStateSet("a.updatedAt", Scope.LOCAL); + stateManager.assertStateSet("b.updatedAt", Scope.LOCAL); + stateManager.assertStateSet("c.updatedAt", Scope.LOCAL); + stateManager.assertStateSet("d.updatedAt", Scope.LOCAL); + + // Run it again with waiting files. + runner.clearTransferState(); + waiting.forEach(runner::enqueue); + runner.run(); + + runner.assertAllFlowFilesTransferred(EnforceOrder.REL_WAIT, 4); + waiting = runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT); + + // Run it again with shorter wait timeout to make overtaking happen. + runner.clearTransferState(); + runner.setProperty(EnforceOrder.WAIT_TIMEOUT, "10 ms"); + Thread.sleep(20); + waiting.forEach(runner::enqueue); + Ordered.enqueue(runner, "b", 2); // arrived in time + Ordered.enqueue(runner, "a", 6); // a.4 and a.5 have not arrived yet + runner.run(); + + succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS); + succeeded.sort(fifo); + assertEquals(3, succeeded.size()); + succeeded.get(0).assertContentEquals("a.6"); // This is ok because a.5 was there. + succeeded.get(1).assertContentEquals("b.2"); + succeeded.get(2).assertContentEquals("b.3"); + + List overtook = runner.getFlowFilesForRelationship(EnforceOrder.REL_OVERTOOK); + assertEquals(3, overtook.size()); + overtook.get(0).assertContentEquals("a.5"); // overtook a.3. + overtook.get(0).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "3"); + overtook.get(1).assertContentEquals("c.9"); // overtook c.1 - 8. + overtook.get(1).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "1"); + overtook.get(2).assertContentEquals("d.10"); // overtook d.1 - 9. + overtook.get(2).assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "1"); + + stateManager.assertStateEquals("a.target", "7", Scope.LOCAL); + stateManager.assertStateEquals("b.target", "4", Scope.LOCAL); + stateManager.assertStateEquals("c.target", "10", Scope.LOCAL); // it was c.9, so +1 + stateManager.assertStateEquals("d.target", "10", Scope.LOCAL); // it was d.10 (max) so don't +1 + + // Simulate a.3 and a.4 arrive but too late.. + runner.clearTransferState(); + Ordered.enqueue(runner, "a", 3); + Ordered.enqueue(runner, "a", 4); + runner.run(); + + runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SKIPPED, 2); + final List skipped = runner.getFlowFilesForRelationship(EnforceOrder.REL_SKIPPED); + skipped.get(0).assertContentEquals("a.3"); + skipped.get(0).assertAttributeExists(EnforceOrder.ATTR_DETAIL); + skipped.get(1).assertContentEquals("a.4"); + skipped.get(1).assertAttributeExists(EnforceOrder.ATTR_DETAIL); + + } + + @Test + public void testCleanInactiveGroups() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.assertValid(); + Ordered.enqueue(runner, "b", 1); + Ordered.enqueue(runner, "a", 2); + Ordered.enqueue(runner, "c", 1); + Ordered.enqueue(runner, "a", 1); + + runner.run(); + + runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SUCCESS, 4); + + // Run it again with shorter inactive timeout + runner.clearTransferState(); + runner.setProperty(EnforceOrder.WAIT_TIMEOUT, "5 ms"); + runner.setProperty(EnforceOrder.INACTIVE_TIMEOUT, "10 ms"); + + Thread.sleep(15); + + // No group b. + Ordered.enqueue(runner, "a", 3); + Ordered.enqueue(runner, "c", 2); + + runner.run(); + + // Group b was determined as inactive, thus its states should be removed. + final MockStateManager stateManager = runner.getStateManager(); + stateManager.assertStateEquals("a.target", "4", Scope.LOCAL); + stateManager.assertStateNotSet("b.target", Scope.LOCAL); + stateManager.assertStateEquals("c.target", "3", Scope.LOCAL); + stateManager.assertStateSet("a.updatedAt", Scope.LOCAL); + stateManager.assertStateNotSet("b.updatedAt", Scope.LOCAL); + stateManager.assertStateSet("c.updatedAt", Scope.LOCAL); + + // If b comes again, it'll be treated as brand new group. + runner.clearTransferState(); + Ordered.enqueue(runner, "b", 2); + + runner.run(); + stateManager.assertStateEquals("b.target", "1", Scope.LOCAL); + stateManager.assertStateSet("b.updatedAt", Scope.LOCAL); + + // b.2 should be routed to wait, since there's no b.1. It will eventually overtake. + runner.assertAllFlowFilesTransferred(EnforceOrder.REL_WAIT, 1); + final List waiting = runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT); + waiting.get(0).assertContentEquals("b.2"); + + } + +} \ No newline at end of file From b4ccc40cbfeec5f0c60de4c9a2d43ad81e355263 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 16 Feb 2017 21:03:43 +0900 Subject: [PATCH 2/4] NIFI-3414: Added EnforceOrder processor Incorporated review comments, added displayNames. --- .../processors/standard/EnforceOrder.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java index 2367d9945eb3..a2ef8a6d8b13 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java @@ -98,7 +98,8 @@ public class EnforceOrder extends AbstractProcessor { private static final Function STATE_MAX_ORDER = groupId -> groupId + ".max"; public static final PropertyDescriptor GROUP_IDENTIFIER = new PropertyDescriptor.Builder() - .name("Group Identifier") + .name("group-id") + .displayName("Group Identifier") .description("EnforceOrder is capable of multiple ordering groups." + " 'Group Identifier' is used to determine which group a FlowFile belongs to." + " This property will be evaluated with each incoming FlowFile." + @@ -110,7 +111,8 @@ public class EnforceOrder extends AbstractProcessor { .build(); public static final PropertyDescriptor ORDER_ATTRIBUTE = new PropertyDescriptor.Builder() - .name("Order Attribute") + .name("order-attribute") + .displayName("Order Attribute") .description("A name of FlowFile attribute whose value will be used to enforce order of FlowFiles within a group." + " If a FlowFile does not have this attribute, or its value is not an integer, the FlowFile will be routed to failure.") .required(true) @@ -119,7 +121,8 @@ public class EnforceOrder extends AbstractProcessor { .build(); public static final PropertyDescriptor INITIAL_ORDER = new PropertyDescriptor.Builder() - .name("Initial Order") + .name("initial-order") + .displayName("Initial Order") .description("When the first FlowFile of a group arrives, initial target order will be computed and stored in the managed state." + " After that, target order will start being tracked by EnforceOrder and stored in the state management store." + " If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure," + @@ -131,7 +134,8 @@ public class EnforceOrder extends AbstractProcessor { .build(); public static final PropertyDescriptor MAX_ORDER = new PropertyDescriptor.Builder() - .name("Maximum Order") + .name("maximum-order") + .displayName("Maximum Order") .description("If specified, any FlowFiles that has larger order will be routed to failure." + " This property is computed only once for a given group." + " After a maximum order is computed, it will be persisted in the state management store and used for other FlowFiles belonging to the same group." + @@ -143,7 +147,8 @@ public class EnforceOrder extends AbstractProcessor { .build(); public static final PropertyDescriptor WAIT_TIMEOUT = new PropertyDescriptor.Builder() - .name("Wait Timeout") + .name("wait-timeout") + .displayName("Wait Timeout") .description("Indicates the duration after which waiting FlowFiles will be routed to the 'overtook' relationship.") .required(true) .defaultValue("10 min") @@ -152,7 +157,8 @@ public class EnforceOrder extends AbstractProcessor { .build(); public static final PropertyDescriptor INACTIVE_TIMEOUT = new PropertyDescriptor.Builder() - .name("Inactive Timeout") + .name("inactive-timeout") + .displayName("Inactive Timeout") .description("Indicates the duration after which state for an inactive group will be cleared from managed state." + " Group is determined as inactive if any new incoming FlowFile has not seen for a group for specified duration." + " Inactive Timeout must be longer than Wait Timeout." + @@ -167,7 +173,8 @@ public class EnforceOrder extends AbstractProcessor { .build(); public static final PropertyDescriptor BATCH_COUNT = new PropertyDescriptor.Builder() - .name("Batch Count") + .name("batch-count") + .displayName("Batch Count") .description("The maximum number of FlowFiles that EnforceOrder can process at an execution.") .required(true) .defaultValue("1000") From 85ff15ba9ae4893e33256fa2970b951325ed238a Mon Sep 17 00:00:00 2001 From: ijokarumawak Date: Mon, 10 Apr 2017 11:12:59 +0900 Subject: [PATCH 3/4] NIFI-3414: Added EnforceOrder processor Incorporate review comments: - Moved nifi-standard-prioritizers dependency to top level nifi/pom.xml. - Changed default initial order from 1 to 0. - Fixed typos. - Use session.get(batchCount). --- .../nifi-framework-bundle/pom.xml | 5 ---- .../processors/standard/EnforceOrder.java | 25 +++++++++++-------- .../processors/standard/TestEnforceOrder.java | 16 ++++++++---- pom.xml | 5 ++++ 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/pom.xml index 1ea79c020080..1b343d759e80 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/pom.xml @@ -128,11 +128,6 @@ nifi-authorizer 1.2.0-SNAPSHOT - - org.apache.nifi - nifi-standard-prioritizers - 1.2.0-SNAPSHOT - diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java index a2ef8a6d8b13..3b8c7e7a14d5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java @@ -130,13 +130,13 @@ public class EnforceOrder extends AbstractProcessor { .required(true) .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .expressionLanguageSupported(true) - .defaultValue("1") + .defaultValue("0") .build(); public static final PropertyDescriptor MAX_ORDER = new PropertyDescriptor.Builder() .name("maximum-order") .displayName("Maximum Order") - .description("If specified, any FlowFiles that has larger order will be routed to failure." + + .description("If specified, any FlowFiles that have larger order will be routed to failure." + " This property is computed only once for a given group." + " After a maximum order is computed, it will be persisted in the state management store and used for other FlowFiles belonging to the same group." + " If Expression Language is used but evaluated result was not an integer, then the FlowFile will be routed to failure," + @@ -260,9 +260,15 @@ protected Collection customValidate(ValidationContext validati @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); final Integer batchCount = context.getProperty(BATCH_COUNT).asInteger(); + List flowFiles = session.get(batchCount); + if (flowFiles == null || flowFiles.isEmpty()) { + return; + } + final StateMap stateMap; try { stateMap = context.getStateManager().getState(Scope.LOCAL); @@ -276,9 +282,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session oc.groupStates.putAll(stateMap.toMap()); - for (int i = 0; i < batchCount; i++) { - - oc.setFlowFile(session.get()); + for (FlowFile flowFile : flowFiles) { + oc.setFlowFile(flowFile); if (oc.flowFile == null) { break; } @@ -291,7 +296,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } // At this point, the flow file is confirmed to be valid. - oc.markFlowFileValied(); + oc.markFlowFileValid(); } oc.transferFlowFiles(); @@ -410,7 +415,8 @@ private boolean computeInitialOrder() { final AtomicReference computedInitOrder = new AtomicReference<>(); groupStates.computeIfAbsent(stateKeyOrder, k -> { final String initOrderStr = initOrderProperty.evaluateAttributeExpressions(flowFile).getValue(); - final int initOrder = Integer.parseInt(initOrderStr); + // Parse it to check if it is a valid integer. + Integer.parseInt(initOrderStr); computedInitOrder.set(initOrderStr); return initOrderStr; }); @@ -429,7 +435,7 @@ private boolean computeInitialOrder() { return true; } - private void markFlowFileValied() { + private void markFlowFileValid() { final List groupedFlowFiles = flowFileGroups.computeIfAbsent(groupId, k -> new ArrayList<>()); final FlowFile validFlowFile; @@ -492,7 +498,7 @@ private void transferFlowFiles() { }); } - private FlowFile transferResult(final FlowFile flowFile, final Relationship result, final String detail, final Integer expectedOrder) { + private void transferResult(final FlowFile flowFile, final Relationship result, final String detail, final Integer expectedOrder) { final Map attributes = new HashMap<>(); attributes.put(ATTR_RESULT, result.getName()); if (expectedOrder != null) { @@ -503,7 +509,6 @@ private FlowFile transferResult(final FlowFile flowFile, final Relationship resu } final FlowFile resultFlowFile = processSession.putAllAttributes(flowFile, attributes); processSession.transfer(resultFlowFile, result); - return resultFlowFile; } private void transferToFailure(final FlowFile flowFile, final String message) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java index 161e6bf4a847..a31117a7adab 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java @@ -102,9 +102,9 @@ public void testSort() { runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); runner.assertValid(); - Ordered.enqueue(runner, "b", 1); - Ordered.enqueue(runner, "a", 2); + Ordered.enqueue(runner, "b", 0); Ordered.enqueue(runner, "a", 1); + Ordered.enqueue(runner, "a", 0); runner.run(); @@ -112,9 +112,9 @@ public void testSort() { final List succeeded = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS); succeeded.sort(new FirstInFirstOutPrioritizer()); - succeeded.get(0).assertContentEquals("a.1"); - succeeded.get(1).assertContentEquals("a.2"); - succeeded.get(2).assertContentEquals("b.1"); + succeeded.get(0).assertContentEquals("a.0"); + succeeded.get(1).assertContentEquals("a.1"); + succeeded.get(2).assertContentEquals("b.0"); } @Test @@ -123,6 +123,7 @@ public void testDuplicatedOrder() { runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.INITIAL_ORDER, "1"); runner.assertValid(); Ordered.enqueue(runner, "b", 1); Ordered.enqueue(runner, "a", 2); @@ -153,6 +154,7 @@ public void testNoGroupIdentifier() { runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.INITIAL_ORDER, "1"); runner.assertValid(); Ordered.enqueue(runner, "b", 1); Ordered.enqueue(runner, "a", 2); @@ -179,6 +181,7 @@ public void testIllegalOrderValue() { runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.INITIAL_ORDER, "1"); runner.assertValid(); Ordered.enqueue(runner, "b", 1); Ordered.enqueue(runner, "a", 2); @@ -251,6 +254,7 @@ public void testMaxOrder() { runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${fragment.identifier}"); runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.INITIAL_ORDER, "1"); runner.setProperty(EnforceOrder.MAX_ORDER, "${fragment.count}"); runner.assertValid(); runner.enqueue("b.1", Ordered.i(1).put("fragment.identifier", "b").put("fragment.count", "3").map()); @@ -286,6 +290,7 @@ public void testWaitOvertakeSkip() throws Exception { runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.INITIAL_ORDER, "1"); runner.setProperty(EnforceOrder.MAX_ORDER, "10"); runner.assertValid(); Ordered.enqueue(runner, "b", 1); @@ -386,6 +391,7 @@ public void testCleanInactiveGroups() throws Exception { runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.INITIAL_ORDER, "1"); runner.assertValid(); Ordered.enqueue(runner, "b", 1); Ordered.enqueue(runner, "a", 2); diff --git a/pom.xml b/pom.xml index 4e2b3cd93989..91aae37e1fdc 100644 --- a/pom.xml +++ b/pom.xml @@ -970,6 +970,11 @@ language governing permissions and limitations under the License. --> 1.2.0-SNAPSHOT nar + + org.apache.nifi + nifi-standard-prioritizers + 1.2.0-SNAPSHOT + org.apache.nifi nifi-jetty-bundle From 575cc7ac6e915c4744bc565a9ee2ebebea40e225 Mon Sep 17 00:00:00 2001 From: ijokarumawak Date: Mon, 10 Apr 2017 23:17:36 +0900 Subject: [PATCH 4/4] NIFI-3414: Added EnforceOrder processor When a FlowFile is transferred to success, remove attributes previously set when it was transferred to wait or failure. --- .../processors/standard/EnforceOrder.java | 10 +++- .../processors/standard/TestEnforceOrder.java | 51 +++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java index 3b8c7e7a14d5..fa3d1b64dab7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java @@ -507,7 +507,15 @@ private void transferResult(final FlowFile flowFile, final Relationship result, if (!isBlank(detail)) { attributes.put(ATTR_DETAIL, detail); } - final FlowFile resultFlowFile = processSession.putAllAttributes(flowFile, attributes); + + FlowFile resultFlowFile = processSession.putAllAttributes(flowFile, attributes); + // Remove + if (expectedOrder == null) { + resultFlowFile = processSession.removeAttribute(resultFlowFile, ATTR_EXPECTED_ORDER); + } + if (detail == null) { + resultFlowFile = processSession.removeAttribute(resultFlowFile, ATTR_DETAIL); + } processSession.transfer(resultFlowFile, result); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java index a31117a7adab..0a179a3289df 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEnforceOrder.java @@ -439,4 +439,55 @@ public void testCleanInactiveGroups() throws Exception { } + @Test + public void testClearOldProperties() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(EnforceOrder.class); + + runner.setProperty(EnforceOrder.GROUP_IDENTIFIER, "${group}"); + runner.setProperty(EnforceOrder.ORDER_ATTRIBUTE, "index"); + runner.setProperty(EnforceOrder.INITIAL_ORDER, "1"); + runner.assertValid(); + Ordered.enqueue(runner, "a", 2); + Ordered.enqueue(runner, "b", 1); + + runner.run(); + + runner.assertTransferCount(EnforceOrder.REL_WAIT, 1); + MockFlowFile a2 = runner.getFlowFilesForRelationship(EnforceOrder.REL_WAIT).get(0); + a2.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "wait"); + a2.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT); + a2.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL); + a2.assertAttributeEquals(EnforceOrder.ATTR_EXPECTED_ORDER, "1"); + a2.assertContentEquals("a.2"); + + runner.assertTransferCount(EnforceOrder.REL_SUCCESS, 1); + MockFlowFile b1 = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS).get(0); + b1.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "success"); + b1.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT); + b1.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL); + b1.assertAttributeNotExists(EnforceOrder.ATTR_EXPECTED_ORDER); + b1.assertContentEquals("b.1"); + + runner.clearTransferState(); + + Ordered.enqueue(runner, "a", 1); + runner.enqueue(a2); + + runner.run(); + + runner.assertAllFlowFilesTransferred(EnforceOrder.REL_SUCCESS, 2); + MockFlowFile a1 = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS).get(0); + a1.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "success"); + a1.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT); + a1.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL); + a1.assertAttributeNotExists(EnforceOrder.ATTR_EXPECTED_ORDER); + a1.assertContentEquals("a.1"); + + a2 = runner.getFlowFilesForRelationship(EnforceOrder.REL_SUCCESS).get(1); + a2.assertAttributeEquals(EnforceOrder.ATTR_RESULT, "success"); + a2.assertAttributeExists(EnforceOrder.ATTR_STARTED_AT); + a2.assertAttributeNotExists(EnforceOrder.ATTR_DETAIL); + a2.assertAttributeNotExists(EnforceOrder.ATTR_EXPECTED_ORDER); // Should be cleared. + a2.assertContentEquals("a.2"); + } } \ No newline at end of file