From 11fa5ff13b5fcc5b8f5b1de577b8a061cc4ca174 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 6 Jul 2017 20:53:23 +0900 Subject: [PATCH] NIFI-4155: Expand EnforceOrder capability to cluster - Added 'Ordering Scope' to specify how it should manage state, to choose from local and cluster. - Avoid unnecessary status update if it does not make any progress, only update it when order proceeds. - If updating state fails, due to it is already updated by other nodes, rollback the session to process the same FlowFiles again. --- .../processors/standard/EnforceOrder.java | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java index fa3d1b64dab7..8f9803adfce3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java @@ -25,10 +25,12 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.flowfile.FlowFile; @@ -182,6 +184,21 @@ public class EnforceOrder extends AbstractProcessor { .expressionLanguageSupported(false) .build(); + public static final AllowableValue ORDERING_SCOPE_LOCAL = new AllowableValue("local", "Local", + "Enforce ordering within a node. Each node enforces ordering with FlowFiles it processes individually."); + public static final AllowableValue ORDERING_SCOPE_CLUSTER = new AllowableValue("cluster", "Cluster", + "Enforce ordering across a NiFi cluster. Since it can be costly to enforce order among nodes, it's recommended to use if and only if necessary."); + public static final PropertyDescriptor ORDERING_SCOPE = new PropertyDescriptor.Builder() + .name("ordering-scope") + .displayName("Ordering Scope") + .description("Specify how ordering is enforced with a NiFi cluster. This property does not have any effect if this is a standalone NiFi.") + .required(true) + .allowableValues(ORDERING_SCOPE_LOCAL, ORDERING_SCOPE_CLUSTER) + .defaultValue(ORDERING_SCOPE_LOCAL.getValue()) + .addValidator(Validator.VALID) + .expressionLanguageSupported(false) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile with a matching order number will be routed to this relationship.") @@ -229,6 +246,7 @@ protected List getSupportedPropertyDescriptors() { descriptors.add(BATCH_COUNT); descriptors.add(WAIT_TIMEOUT); descriptors.add(INACTIVE_TIMEOUT); + descriptors.add(ORDERING_SCOPE); return descriptors; } @@ -269,9 +287,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session return; } + final Scope scope = Scope.valueOf(context.getProperty(ORDERING_SCOPE).getValue().toUpperCase()); final StateMap stateMap; try { - stateMap = context.getStateManager().getState(Scope.LOCAL); + stateMap = context.getStateManager().getState(scope); } catch (final IOException e) { logger.error("Failed to retrieve state from StateManager due to {}" + e, e); context.yield(); @@ -303,11 +322,18 @@ public void onTrigger(final ProcessContext context, final ProcessSession session oc.cleanupInactiveStates(); - try { - context.getStateManager().setState(oc.groupStates, Scope.LOCAL); - } catch (final IOException e) { - throw new RuntimeException("Failed to update state due to " + e - + ". Session will be rollback and processor will be yielded for a while.", e); + // Update state only if it proceeds. + if (!stateMap.toMap().equals(oc.groupStates)) { + try { + if (!context.getStateManager().replace(stateMap, oc.groupStates, scope)) { + // The state is updated from other node within the same cluster. + // Discard current ordering progress. + session.rollback(false); + } + } catch (final IOException e) { + throw new RuntimeException("Failed to update state due to " + e + + ". Session will be rollback and processor will be yielded for a while.", e); + } } }