From 2aa6bd1e135cd4959cef5a483c454dc9a1591439 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 15 Feb 2022 14:25:36 -0500 Subject: [PATCH] NIFI-9689: When all FlowFiles in a FlowFile Queue are penalized, do not schedule the destination to run. Also expose this fact via the ConnectionStatusSnapshotDTO, as this allows the front-end to render this information to the user in order to avoid confusion when it appears that the Processor has data but does nothing Signed-off-by: Matthew Burgess This closes #5771 --- .../controller/status/ConnectionStatus.java | 12 +++++ .../status/FlowFileAvailability.java | 26 ++++++++++ .../controller/status/ProcessGroupStatus.java | 27 +++++++++- .../nifi/controller/queue/FlowFileQueue.java | 6 +++ .../status/ConnectionStatusSnapshotDTO.java | 10 ++++ .../ConnectionStatusEndpointMerger.java | 2 +- .../nifi/cluster/manager/StatusMerger.java | 16 ++++-- .../nifi/reporting/AbstractEventAccess.java | 1 + .../org/apache/nifi/util/Connectables.java | 9 ++-- .../queue/StandardFlowFileQueue.java | 6 +++ .../queue/SwappablePriorityQueue.java | 26 ++++++++++ .../SocketLoadBalancedFlowFileQueue.java | 6 +++ .../partition/LocalQueuePartition.java | 6 +++ .../SwappablePriorityQueueLocalPartition.java | 6 +++ .../nifi/controller/MockFlowFileRecord.java | 21 +++++--- .../TestSocketLoadBalancedFlowFileQueue.java | 17 +++++++ .../TestWriteAheadFlowFileRepository.java | 51 +++++++++++-------- .../controller/tasks/TestConnectableTask.java | 31 +++++------ .../apache/nifi/web/api/dto/DtoFactory.java | 7 +-- .../queue/StatelessFlowFileQueue.java | 7 +++ 20 files changed, 235 insertions(+), 58 deletions(-) create mode 100644 nifi-api/src/main/java/org/apache/nifi/controller/status/FlowFileAvailability.java diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java index 0fe5a99a8dff..754483e5edab 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java @@ -44,6 +44,7 @@ public class ConnectionStatus implements Cloneable { private long maxQueuedBytes; private long totalQueuedDuration; private long maxQueuedDuration; + private FlowFileAvailability flowFileAvailability; public String getId() { return id; @@ -214,6 +215,14 @@ public void setMaxQueuedDuration(long maxQueuedDuration) { this.maxQueuedDuration = maxQueuedDuration; } + public FlowFileAvailability getFlowFileAvailability() { + return flowFileAvailability; + } + + public void setFlowFileAvailability(final FlowFileAvailability availability) { + this.flowFileAvailability = availability; + } + @Override public ConnectionStatus clone() { final ConnectionStatus clonedObj = new ConnectionStatus(); @@ -230,6 +239,7 @@ public ConnectionStatus clone() { clonedObj.sourceName = sourceName; clonedObj.destinationId = destinationId; clonedObj.destinationName = destinationName; + clonedObj.flowFileAvailability = flowFileAvailability; if (predictions != null) { clonedObj.setPredictions(predictions.clone()); @@ -265,6 +275,8 @@ public String toString() { builder.append(backPressureDataSizeThreshold); builder.append(", backPressureObjectThreshold="); builder.append(backPressureObjectThreshold); + builder.append(", flowFileAvailability="); + builder.append(flowFileAvailability); builder.append(", inputCount="); builder.append(inputCount); builder.append(", inputBytes="); diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/FlowFileAvailability.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/FlowFileAvailability.java new file mode 100644 index 000000000000..d805312dfaab --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/FlowFileAvailability.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.status; + +public enum FlowFileAvailability { + ACTIVE_QUEUE_EMPTY, + + HEAD_OF_QUEUE_PENALIZED, + + FLOWFILE_AVAILABLE; +} diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java index 758a059802fe..3e4720ef6b0d 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java @@ -16,11 +16,12 @@ */ package org.apache.nifi.controller.status; +import org.apache.nifi.registry.flow.VersionedFlowState; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import org.apache.nifi.registry.flow.VersionedFlowState; /** */ @@ -255,7 +256,6 @@ public void setBytesTransferred(long bytesTransferred) { @Override public ProcessGroupStatus clone() { - final ProcessGroupStatus clonedObj = new ProcessGroupStatus(); clonedObj.id = id; @@ -447,6 +447,7 @@ public static void merge(final ProcessGroupStatus target, final ProcessGroupStat merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes()); merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount()); merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes()); + merged.setFlowFileAvailability(mergeFlowFileAvailability(merged.getFlowFileAvailability(), statusToMerge.getFlowFileAvailability())); } target.setConnectionStatus(mergedConnectionMap.values()); @@ -588,4 +589,26 @@ public static void merge(final ProcessGroupStatus target, final ProcessGroupStat target.setRemoteProcessGroupStatus(mergedRemoteGroupMap.values()); } + + public static FlowFileAvailability mergeFlowFileAvailability(final FlowFileAvailability availabilityA, final FlowFileAvailability availabilityB) { + if (availabilityA == availabilityB) { + return availabilityA; + } + if (availabilityA == null) { + return availabilityB; + } + if (availabilityB == null) { + return availabilityA; + } + + if (availabilityA == FlowFileAvailability.FLOWFILE_AVAILABLE || availabilityB == FlowFileAvailability.FLOWFILE_AVAILABLE) { + return FlowFileAvailability.FLOWFILE_AVAILABLE; + } + + if (availabilityA == FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED || availabilityB == FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED) { + return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED; + } + + return FlowFileAvailability.FLOWFILE_AVAILABLE; + } } diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index d4b6b2ed65bb..8ed2d6c1497f 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -18,6 +18,7 @@ import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.processor.FlowFileFilter; @@ -109,6 +110,11 @@ public interface FlowFileQueue { */ boolean isEmpty(); + /** + * @return the FlowFile Availability for this queue + */ + FlowFileAvailability getFlowFileAvailability(); + /** * @return true if the queue is empty or contains only FlowFiles that already are being processed * by others, false if the queue contains at least one FlowFile that is available for processing, diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java index e8c9df611acb..ba507ec93554 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatusSnapshotDTO.java @@ -48,6 +48,7 @@ public class ConnectionStatusSnapshotDTO implements Cloneable { private String queuedCount; private Integer percentUseCount; private Integer percentUseBytes; + private String flowFileAvailability; /* getters / setters */ /** @@ -283,6 +284,15 @@ public void setPercentUseBytes(Integer percentUseBytes) { this.percentUseBytes = percentUseBytes; } + @ApiModelProperty("The availability of FlowFiles in this connection") + public String getFlowFileAvailability() { + return flowFileAvailability; + } + + public void setFlowFileAvailability(final String availability) { + this.flowFileAvailability = availability; + } + @Override public ConnectionStatusSnapshotDTO clone() { final ConnectionStatusSnapshotDTO other = new ConnectionStatusSnapshotDTO(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpointMerger.java index db455d4bd7a3..d993187696e8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ConnectionStatusEndpointMerger.java @@ -52,7 +52,7 @@ protected void mergeResponses(ConnectionStatusEntity clientEntity, Map e.getValue() == clientEntity) - .map(e -> e.getKey()) + .map(Map.Entry::getKey) .findFirst() .orElse(null); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java index bf702773ff72..0ad131fe79ca 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/StatusMerger.java @@ -17,6 +17,8 @@ package org.apache.nifi.cluster.manager; +import org.apache.nifi.controller.status.FlowFileAvailability; +import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.RunStatus; import org.apache.nifi.controller.status.TransmissionStatus; import org.apache.nifi.registry.flow.VersionedFlowState; @@ -274,7 +276,7 @@ public static void merge(final ProcessGroupStatusSnapshotDTO target, final boole } private static Collection replaceNull(final Collection collection) { - return (collection == null) ? Collections.emptyList() : collection; + return (collection == null) ? Collections.emptyList() : collection; } @@ -490,6 +492,11 @@ public static void merge(final ConnectionStatusSnapshotDTO target, final boolean target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued()); target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued()); + final FlowFileAvailability targetFlowFileAvailability = target.getFlowFileAvailability() == null ? null : FlowFileAvailability.valueOf(target.getFlowFileAvailability()); + final FlowFileAvailability toMergeFlowFileAvailability = toMerge.getFlowFileAvailability() == null ? null : FlowFileAvailability.valueOf(toMerge.getFlowFileAvailability()); + final FlowFileAvailability mergedFlowFileAvailability = ProcessGroupStatus.mergeFlowFileAvailability(targetFlowFileAvailability, toMergeFlowFileAvailability); + target.setFlowFileAvailability(mergedFlowFileAvailability == null ? null : mergedFlowFileAvailability.name()); + if (target.getPercentUseBytes() == null) { target.setPercentUseBytes(toMerge.getPercentUseBytes()); } else if (toMerge.getPercentUseBytes() != null) { @@ -543,14 +550,15 @@ public static void merge(final ConnectionStatusSnapshotDTO target, final boolean } private static long minNonNegative(long a, long b){ - if(a < 0){ + if (a < 0) { return b; - }else if(b < 0){ + } else if (b < 0) { return a; - }else{ + } else { return Math.min(a, b); } } + public static void updatePrettyPrintedFields(final ConnectionStatusSnapshotDTO target) { target.setQueued(prettyPrint(target.getFlowFilesQueued(), target.getBytesQueued())); target.setQueuedCount(formatCount(target.getFlowFilesQueued())); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java index 56e87c6d5e36..e98a3f7e71c7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java @@ -247,6 +247,7 @@ ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStat connStatus.setTotalQueuedDuration(conn.getFlowFileQueue().getTotalQueuedDuration(now)); long minLastQueueDate = conn.getFlowFileQueue().getMinLastQueueDate(); connStatus.setMaxQueuedDuration(minLastQueueDate == 0 ? 0 : now - minLastQueueDate); + connStatus.setFlowFileAvailability(conn.getFlowFileQueue().getFlowFileAvailability()); final FlowFileEvent connectionStatusReport = statusReport.getReportEntry(conn.getIdentifier()); if (connectionStatusReport != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/Connectables.java index 5d74ebb4818c..4a36be5a85cc 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/Connectables.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/Connectables.java @@ -16,18 +16,19 @@ */ package org.apache.nifi.util; -import java.util.Collection; -import java.util.List; - import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.processor.Relationship; +import java.util.Collection; +import java.util.List; + public class Connectables { public static boolean flowFilesQueued(final Connectable connectable) { for (final Connection conn : connectable.getIncomingConnections()) { - if (!conn.getFlowFileQueue().isActiveQueueEmpty()) { + if (conn.getFlowFileQueue().getFlowFileAvailability() == FlowFileAvailability.FLOWFILE_AVAILABLE) { return true; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java index a87fcfe46d54..3828588b3505 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java @@ -22,6 +22,7 @@ import org.apache.nifi.controller.repository.FlowFileSwapManager; import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.processor.FlowFileFilter; @@ -177,6 +178,11 @@ public boolean isEmpty() { return queue.getFlowFileQueueSize().isEmpty(); } + @Override + public FlowFileAvailability getFlowFileAvailability() { + return queue.getFlowFileAvailability(); + } + @Override public boolean isActiveQueueEmpty() { final FlowFileQueueSize queueSize = queue.getFlowFileQueueSize(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java index 34da62c6150c..92129a58ab68 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java @@ -23,6 +23,7 @@ import org.apache.nifi.controller.repository.SwapContents; import org.apache.nifi.controller.repository.SwapSummary; import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.controller.swap.StandardSwapSummary; import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFile; @@ -440,6 +441,31 @@ public boolean isActiveQueueEmpty() { return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0; } + public FlowFileAvailability getFlowFileAvailability() { + // If queue is empty, avoid obtaining a lock. + if (isActiveQueueEmpty()) { + return FlowFileAvailability.ACTIVE_QUEUE_EMPTY; + } + + final FlowFileRecord top; + readLock.lock(); + try { + top = activeQueue.peek(); + } finally { + readLock.unlock("isFlowFileAvailable"); + } + + if (top == null) { + return FlowFileAvailability.ACTIVE_QUEUE_EMPTY; + } + + if (top.isPenalized()) { + return FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED; + } + + return FlowFileAvailability.FLOWFILE_AVAILABLE; + } + public void acknowledge(final FlowFileRecord flowFile) { logger.trace("{} Acknowledging {}", this, flowFile); incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 6b8d418cfe9d..2a0c504afde3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -27,6 +27,7 @@ import org.apache.nifi.controller.queue.ConnectionEventListener; import org.apache.nifi.controller.queue.DropFlowFileRequest; import org.apache.nifi.controller.queue.DropFlowFileState; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.controller.queue.FlowFileQueueContents; import org.apache.nifi.controller.queue.IllegalClusterStateException; import org.apache.nifi.controller.queue.LoadBalanceStrategy; @@ -559,6 +560,11 @@ public boolean isEmpty() { return size().getObjectCount() == 0; } + @Override + public FlowFileAvailability getFlowFileAvailability() { + return localPartition.getFlowFileAvailability(); + } + @Override public boolean isActiveQueueEmpty() { return localPartition.isActiveQueueEmpty(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java index 84f0bab3bce4..0679b509ce6a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.queue.clustered.partition; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.controller.queue.FlowFileQueueContents; import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; import org.apache.nifi.controller.queue.PollStrategy; @@ -38,6 +39,11 @@ public interface LocalQueuePartition extends QueuePartition { */ boolean isActiveQueueEmpty(); + /** + * @return the availability of FlowFiles in the queue + */ + FlowFileAvailability getFlowFileAvailability(); + /** * @return true if there is at least one FlowFile that has not yet been acknowledged, false if all FlowFiles have been acknowledged. */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java index 03e8e1848aae..ae41e554bc7f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java @@ -20,6 +20,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.queue.DropFlowFileAction; import org.apache.nifi.controller.queue.DropFlowFileRequest; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueueContents; import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; @@ -102,6 +103,11 @@ public boolean isActiveQueueEmpty() { return priorityQueue.isActiveQueueEmpty(); } + @Override + public FlowFileAvailability getFlowFileAvailability() { + return priorityQueue.getFlowFileAvailability(); + } + @Override public FlowFileRecord poll(final Set expiredRecords, final PollStrategy pollStrategy) { return priorityQueue.poll(expiredRecords, getExpiration(), pollStrategy); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java index 4369fa21ae64..1b62b69af491 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/MockFlowFileRecord.java @@ -17,16 +17,16 @@ package org.apache.nifi.controller; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + public class MockFlowFileRecord implements FlowFileRecord { private static final AtomicLong idGenerator = new AtomicLong(0L); @@ -37,6 +37,9 @@ public class MockFlowFileRecord implements FlowFileRecord { private final ContentClaim contentClaim; private long lastQueuedDate = System.currentTimeMillis() + 1; + private volatile long penaltyExpiration = 0L; + + public MockFlowFileRecord() { this(1L); } @@ -85,7 +88,7 @@ public long getLineageStartDate() { @Override public boolean isPenalized() { - return false; + return penaltyExpiration > System.currentTimeMillis(); } @Override @@ -110,7 +113,11 @@ public int compareTo(final FlowFile o) { @Override public long getPenaltyExpirationMillis() { - return 0; + return penaltyExpiration; + } + + public void setPenaltyExpiration(final long timestamp) { + penaltyExpiration = timestamp; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java index 0deb91733132..83eea46a34d9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java @@ -25,6 +25,7 @@ import org.apache.nifi.controller.MockFlowFileRecord; import org.apache.nifi.controller.MockSwapManager; import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.controller.queue.NopConnectionEventListener; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry; @@ -139,6 +140,22 @@ private NodeIdentifier createNodeIdentifier(final String uuid) { "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet()); } + @Test + public void testFlowFileAvailability() { + assertTrue(queue.isEmpty()); + assertSame(FlowFileAvailability.ACTIVE_QUEUE_EMPTY, queue.getFlowFileAvailability()); + + final MockFlowFileRecord penalizedFlowFile = new MockFlowFileRecord(0L); + penalizedFlowFile.setPenaltyExpiration(System.currentTimeMillis() + 500_000L); + queue.put(penalizedFlowFile); + + assertFalse(queue.isEmpty()); + assertSame(FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED, queue.getFlowFileAvailability()); + + penalizedFlowFile.setPenaltyExpiration(System.currentTimeMillis() - 1); + assertFalse(queue.isEmpty()); + assertSame(FlowFileAvailability.FLOWFILE_AVAILABLE, queue.getFlowFileAvailability()); + } @Test public void testPriorities() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 712a9e7c3a7b..39696bff4a81 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -16,31 +16,10 @@ */ package org.apache.nifi.controller.repository; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.when; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.queue.DropFlowFileStatus; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.FlowFileQueueSize; import org.apache.nifi.controller.queue.ListFlowFileStatus; @@ -76,6 +55,29 @@ import org.wali.MinimalLockingWriteAheadLog; import org.wali.WriteAheadRepository; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + @SuppressWarnings("deprecation") public class TestWriteAheadFlowFileRepository { @@ -188,6 +190,11 @@ public boolean isEmpty() { return false; } + @Override + public FlowFileAvailability getFlowFileAvailability() { + return isActiveQueueEmpty() ? FlowFileAvailability.ACTIVE_QUEUE_EMPTY : FlowFileAvailability.FLOWFILE_AVAILABLE; + } + @Override public boolean isActiveQueueEmpty() { return false; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java index ca4e13694860..92f922712fab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java @@ -17,16 +17,6 @@ package org.apache.nifi.controller.tasks; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; @@ -38,14 +28,25 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.RepositoryContext; import org.apache.nifi.controller.repository.StandardRepositoryContext; -import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.controller.scheduling.LifecycleState; +import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.controller.scheduling.SchedulingAgent; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.encrypt.PropertyEncryptor; import org.apache.nifi.processor.Processor; import org.junit.Test; import org.mockito.Mockito; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + public class TestConnectableTask { @@ -90,10 +91,10 @@ public void testIsWorkToDo() { // Test with only a single connection that is self-looping and empty final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); - when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true); + when(flowFileQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.ACTIVE_QUEUE_EMPTY); final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class); - when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false); + when(nonEmptyQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.FLOWFILE_AVAILABLE); when(selfLoopingConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue); assertFalse(task.invoke().isYield()); @@ -139,7 +140,7 @@ public void testIsWorkToDoFunnels() { when(funnel.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection)); final FlowFileQueue emptyQueue = Mockito.mock(FlowFileQueue.class); - when(emptyQueue.isActiveQueueEmpty()).thenReturn(true); + when(emptyQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.ACTIVE_QUEUE_EMPTY); when(selfLoopingConnection.getFlowFileQueue()).thenReturn(emptyQueue); final Set outgoingConnections = new HashSet<>(); @@ -173,7 +174,7 @@ public void testIsWorkToDoFunnels() { // Adding input FlowFiles. final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class); - when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false); + when(nonEmptyQueue.getFlowFileAvailability()).thenReturn(FlowFileAvailability.FLOWFILE_AVAILABLE); when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(nonEmptyQueue); assertFalse("When a Funnel has both incoming and outgoing connections and FlowFiles to process, then it should be executed.", task.invoke().isYield()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 0b0d7f81770e..ae2fd44ace3f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -708,7 +708,7 @@ public ConnectionDTO createConnectionDto(final Connection connection) { dto.setBackPressureObjectThreshold(flowFileQueue.getBackPressureObjectThreshold()); dto.setBackPressureDataSizeThreshold(flowFileQueue.getBackPressureDataSizeThreshold()); dto.setFlowFileExpiration(flowFileQueue.getFlowFileExpiration()); - dto.setPrioritizers(new ArrayList()); + dto.setPrioritizers(new ArrayList<>()); for (final FlowFilePrioritizer comparator : flowFileQueue.getPriorities()) { dto.getPrioritizers().add(comparator.getClass().getCanonicalName()); } @@ -717,7 +717,7 @@ public ConnectionDTO createConnectionDto(final Connection connection) { for (final Relationship selectedRelationship : connection.getRelationships()) { if (!Relationship.ANONYMOUS.equals(selectedRelationship)) { if (dto.getSelectedRelationships() == null) { - dto.setSelectedRelationships(new TreeSet(Collator.getInstance(Locale.US))); + dto.setSelectedRelationships(new TreeSet<>(Collator.getInstance(Locale.US))); } dto.getSelectedRelationships().add(selectedRelationship.getName()); @@ -728,7 +728,7 @@ public ConnectionDTO createConnectionDto(final Connection connection) { for (final Relationship availableRelationship : connection.getSource().getRelationships()) { if (!Relationship.ANONYMOUS.equals(availableRelationship)) { if (dto.getAvailableRelationships() == null) { - dto.setAvailableRelationships(new TreeSet(Collator.getInstance(Locale.US))); + dto.setAvailableRelationships(new TreeSet<>(Collator.getInstance(Locale.US))); } dto.getAvailableRelationships().add(availableRelationship.getName()); @@ -1177,6 +1177,7 @@ public ConnectionStatusDTO createConnectionStatusDto(final ConnectionStatus conn snapshot.setFlowFilesQueued(connectionStatus.getQueuedCount()); snapshot.setBytesQueued(connectionStatus.getQueuedBytes()); + snapshot.setFlowFileAvailability(connectionStatus.getFlowFileAvailability().name()); snapshot.setFlowFilesIn(connectionStatus.getInputCount()); snapshot.setBytesIn(connectionStatus.getInputBytes()); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java index 3b4a621d3527..998122849757 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/queue/StatelessFlowFileQueue.java @@ -18,6 +18,7 @@ package org.apache.nifi.stateless.queue; import org.apache.nifi.controller.queue.DropFlowFileStatus; +import org.apache.nifi.controller.status.FlowFileAvailability; import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceStrategy; @@ -123,6 +124,12 @@ public boolean isEmpty() { return flowFiles.isEmpty() && unacknowledgedCount.get() == 0; } + @Override + public FlowFileAvailability getFlowFileAvailability() { + // Penalization is ignored in stateless so we can just rely on whether or not the active queue is empty + return isActiveQueueEmpty() ? FlowFileAvailability.ACTIVE_QUEUE_EMPTY : FlowFileAvailability.FLOWFILE_AVAILABLE; + } + @Override public boolean isActiveQueueEmpty() { return flowFiles.isEmpty();