From 3a42c7b671972001ed912ef8e907d5b8658554e9 Mon Sep 17 00:00:00 2001 From: patricker Date: Mon, 5 Nov 2018 11:33:11 -0700 Subject: [PATCH 1/2] NIFI-3229 When a queue contains only Penalized FlowFile's the next processor Tasks/Time statistics becomes extremely large --- .../nifi/controller/queue/FlowFileQueue.java | 5 +++++ .../nifi/controller/EventDrivenWorkerQueue.java | 2 +- .../controller/queue/StandardFlowFileQueue.java | 5 +++++ .../controller/queue/SwappablePriorityQueue.java | 16 ++++++++++++++++ .../SocketLoadBalancedFlowFileQueue.java | 5 +++++ .../clustered/partition/LocalQueuePartition.java | 5 +++++ .../SwappablePriorityQueueLocalPartition.java | 5 +++++ .../scheduling/EventDrivenSchedulingAgent.java | 4 ++-- .../nifi/controller/tasks/ConnectableTask.java | 4 ++-- .../java/org/apache/nifi/util/Connectables.java | 10 +++++++--- .../TestWriteAheadFlowFileRepository.java | 5 +++++ 11 files changed, 58 insertions(+), 8 deletions(-) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java index 8870f1d1ce18..0ca75e0865c8 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java @@ -150,6 +150,11 @@ public interface FlowFileQueue { List poll(FlowFileFilter filter, Set expiredRecords); + /** + * @return the next flow file on the queue; null if empty + */ + FlowFileRecord peek(); + String getFlowFileExpiration(); int getFlowFileExpiration(TimeUnit timeUnit); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java index f36a45951425..ad6a4d9fd084 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/EventDrivenWorkerQueue.java @@ -257,7 +257,7 @@ private DelayProcessingReason getDelayReason(final Worker worker) { } } - if (connectable.hasIncomingConnection() && !Connectables.flowFilesQueued(connectable)) { + if (connectable.hasIncomingConnection() && !Connectables.nonPenalizedFlowFilesQueued(connectable)) { return DelayProcessingReason.NO_WORK; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java index 8872ba7e6119..66d878645330 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java @@ -129,6 +129,11 @@ public FlowFileRecord poll(final Set expiredRecords) { return queue.poll(expiredRecords, expirationMillis); } + @Override + public FlowFileRecord peek() { + return queue.peek(); + } + @Override public List poll(int maxResults, final Set expiredRecords) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java index 058c7149e55d..21c44662f02c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/SwappablePriorityQueue.java @@ -481,6 +481,22 @@ public FlowFileRecord poll(final Set expiredRecords, final long } } + public FlowFileRecord peek() { + writeLock.lock(); + try { + // peek at the first FlowFile in the queue + return doPeek(); + } finally { + writeLock.unlock("peek()"); + } + } + + private FlowFileRecord doPeek() { + migrateSwapToActive(); + + // Peek at the top FlowFile + return this.activeQueue.peek(); + } private FlowFileRecord doPoll(final Set expiredRecords, final long expirationMillis) { FlowFileRecord flowFile; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index 84731f769ee0..17ecc42c44ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -884,6 +884,11 @@ public List poll(FlowFileFilter filter, Set expi return flowFiles; } + @Override + public FlowFileRecord peek() { + return localPartition.peek(); + } + @Override public void acknowledge(final FlowFileRecord flowFile) { localPartition.acknowledge(flowFile); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java index 9ee0e0ea6d42..864d7a63190c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java @@ -68,6 +68,11 @@ public interface LocalQueuePartition extends QueuePartition { */ List poll(FlowFileFilter filter, Set expiredRecords); + /** + * @return the next flow file on the queue; null if empty + */ + FlowFileRecord peek(); + /** * Acknowledges that the given FlowFile has been accounted for and is no longer the responsibility of this partition * @param flowFile the FlowFile that has been accounted for diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java index e5e64d04bd8a..ee98cd8acf29 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java @@ -106,6 +106,11 @@ public List poll(final FlowFileFilter filter, final Set poll(FlowFileFilter filter, Set expi return null; } + @Override + public FlowFileRecord peek() { + return null; + } + @Override public String getFlowFileExpiration() { return null; From 47114d5b1825c8a5716599b9dc16d9a50591b3ac Mon Sep 17 00:00:00 2001 From: Peter Wicks Date: Wed, 14 Nov 2018 13:56:07 -0700 Subject: [PATCH 2/2] NIFI-3229 Unit Test Experiments for Lock performance testing. Signed-off-by: Peter Wicks --- .../org/apache/nifi/util/Connectables.java | 10 +++ .../TestStandardProcessSession.java | 64 +++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java index b79ae97aba0c..0ca617923c76 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java @@ -25,6 +25,16 @@ import org.apache.nifi.processor.Relationship; public class Connectables { + public static boolean flowFilesQueued(final Connectable connectable) { + for (final Connection conn : connectable.getIncomingConnections()) { + if (!conn.getFlowFileQueue().isActiveQueueEmpty()) { + return true; + } + } + + return false; + } + public static boolean nonPenalizedFlowFilesQueued(final Connectable connectable) { for (final Connection conn : connectable.getIncomingConnections()) { if (!conn.getFlowFileQueue().isActiveQueueEmpty()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index efe2bd4380e4..17f908506c05 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -46,8 +46,10 @@ import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.Connectables; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; +import org.apache.nifi.util.StopWatch; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -2043,6 +2045,68 @@ public void testMultipleReadCounts() throws IOException { flowFile = session.putAttribute(flowFile, "counter", "4"); } + @Test + @Ignore + public void testPerformanceOfFlowFilesInEmptyQueueDetection() throws IOException { + // Run a non-timed startup iteration, to reduce one-time hits being counted + Connectables.flowFilesQueued(connectable); + Connectables.nonPenalizedFlowFilesQueued(connectable); + + // Check if there are FlowFile's queued 1_000_000 times, when queue is empty + StopWatch elapsedWatch = new StopWatch(true); + + for(int i=0;i<1_000_000; i++){ + Connectables.flowFilesQueued(connectable); + } + + elapsedWatch.stop(); + System.out.println("1M checks for FlowFiles, empty queue: " + elapsedWatch.getDuration()); + + elapsedWatch.start(); + for(int i=0;i<1_000_000; i++){ + Connectables.nonPenalizedFlowFilesQueued(connectable); + } + + elapsedWatch.stop(); + System.out.println("1M checks for FlowFiles, non-penalized method, empty queue: " + elapsedWatch.getDuration()); + } + + @Test + @Ignore + public void testPerformanceOfFlowFilesInQueueDetection() throws IOException { + // Run a non-timed startup iteration, to reduce one-time hits being counted + Connectables.flowFilesQueued(connectable); + Connectables.nonPenalizedFlowFilesQueued(connectable); + + // Check if there are FlowFile's queued 1_000_000 times, when queue is empty + StopWatch elapsedWatch = new StopWatch(true); + + // Check if there are FlowFile's queued 1_000_000 times, when queue is not empty + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + + flowFileQueue.put(flowFileRecord); + + elapsedWatch.start(); + for(int i=0;i<1_000_000; i++){ + Connectables.flowFilesQueued(connectable); + } + + elapsedWatch.stop(); + System.out.println("1M checks for FlowFiles, non-empty queue: " + elapsedWatch.getDuration()); + + elapsedWatch.start(); + for(int i=0;i<1_000_000; i++){ + Connectables.nonPenalizedFlowFilesQueued(connectable); + } + + elapsedWatch.stop(); + System.out.println("1M checks for FlowFiles, non-penalized method, non-empty queue: " + elapsedWatch.getDuration()); + } + + private static class MockFlowFileRepository implements FlowFileRepository {