From 00eb25b1d450716aacbae2d9d1f67b865f6b8b22 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 20 May 2025 15:07:45 -0400 Subject: [PATCH] NIFI-14586, NIFI-14587: Expose Processors' Performance Metrics in the UI as part of the Status History; in doing so, I discovered a bug in which the GC time was not being tracked properly and fixed it. Sorted counter values lexicographically. --- .../repository/metrics/EventSumValue.java | 2 + .../nifi/reporting/AbstractEventAccess.java | 41 +++-- .../reporting/PerformanceMetricsUtil.java | 37 ---- .../status/history/StatusHistoryUtil.java | 11 +- .../history/ProcessorStatusDescriptor.java | 170 ++++++++++++++++++ 5 files changed, 206 insertions(+), 55 deletions(-) delete mode 100644 nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java index 316623d11822..62ca00eacfe7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java @@ -150,6 +150,7 @@ public synchronized void add(final EventSumValue other) { this.contentReadNanos += other.contentReadNanos; this.contentWriteNanos += other.contentWriteNanos; this.sessionCommitNanos += other.sessionCommitNanos; + this.gcMillis += other.gcMillis; final Map eventCounters = other.counters; if (eventCounters != null) { @@ -192,6 +193,7 @@ public synchronized void subtract(final EventSumValue other) { this.contentReadNanos -= other.contentReadNanos; this.contentWriteNanos -= other.contentWriteNanos; this.sessionCommitNanos -= other.sessionCommitNanos; + this.gcMillis -= other.gcMillis; final Map eventCounters = other.counters; if (eventCounters != null) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java index 1fc75efa5ffd..2345472f7646 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java @@ -685,25 +685,15 @@ protected ProcessorStatus getProcessorStatus(final FlowFileEvent flowFileEvent, status.setCounters(flowFileEvent.getCounters()); } - final ProcessingPerformanceStatus performanceStatus = PerformanceMetricsUtil.getPerformanceMetrics(flowFileEvent, procNode); - status.setProcessingPerformanceStatus(performanceStatus); + final ProcessingPerformanceStatus perfStatus = createProcessingPerformanceStatus(flowFileEvent, procNode); + status.setProcessingPerformanceStatus(perfStatus); } // Determine the run status and get any validation error... only validating while STOPPED // is a trade-off we are willing to make, even though processor validity could change due to // environmental conditions (property configured with a file path and the file being externally // removed). This saves on validation costs that would be unnecessary most of the time. - if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) { - status.setRunStatus(RunStatus.Disabled); - } else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) { - status.setRunStatus(RunStatus.Running); - } else if (procNode.getValidationStatus() == ValidationStatus.VALIDATING) { - status.setRunStatus(RunStatus.Validating); - } else if (procNode.getValidationStatus() == ValidationStatus.INVALID && procNode.getActiveThreadCount() == 0) { - status.setRunStatus(RunStatus.Invalid); - } else { - status.setRunStatus(RunStatus.Stopped); - } + status.setRunStatus(determineRunStatus(procNode)); status.setExecutionNode(procNode.getExecutionNode()); status.setTerminatedThreadCount(procNode.getTerminatedThreadCount()); @@ -712,6 +702,31 @@ protected ProcessorStatus getProcessorStatus(final FlowFileEvent flowFileEvent, return status; } + private ProcessingPerformanceStatus createProcessingPerformanceStatus(final FlowFileEvent flowFileEvent, final ProcessorNode procNode) { + final ProcessingPerformanceStatus perfStatus = new ProcessingPerformanceStatus(); + perfStatus.setIdentifier(procNode.getIdentifier()); + perfStatus.setCpuDuration(flowFileEvent.getCpuNanoseconds()); + perfStatus.setContentReadDuration(flowFileEvent.getContentReadNanoseconds()); + perfStatus.setContentWriteDuration(flowFileEvent.getContentWriteNanoseconds()); + perfStatus.setSessionCommitDuration(flowFileEvent.getSessionCommitNanoseconds()); + perfStatus.setGarbageCollectionDuration(flowFileEvent.getGargeCollectionMillis()); + return perfStatus; + } + + private RunStatus determineRunStatus(final ProcessorNode procNode) { + if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) { + return RunStatus.Disabled; + } else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) { + return RunStatus.Running; + } else if (procNode.getValidationStatus() == ValidationStatus.VALIDATING) { + return RunStatus.Validating; + } else if (procNode.getValidationStatus() == ValidationStatus.INVALID && procNode.getActiveThreadCount() == 0) { + return RunStatus.Invalid; + } + + return RunStatus.Stopped; + } + /** * Returns the status of all components in the controller. This request is * not in the context of a user so the results will be unfiltered. diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java deleted file mode 100644 index 78731d93b0cf..000000000000 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/PerformanceMetricsUtil.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting; - -import org.apache.nifi.controller.ProcessorNode; -import org.apache.nifi.controller.repository.FlowFileEvent; -import org.apache.nifi.controller.status.ProcessingPerformanceStatus; - -public class PerformanceMetricsUtil { - - public static ProcessingPerformanceStatus getPerformanceMetrics(final FlowFileEvent fileEvent, final ProcessorNode processorNode) { - final ProcessingPerformanceStatus newMetrics = new ProcessingPerformanceStatus(); - - newMetrics.setIdentifier(processorNode.getProcessGroup().getIdentifier()); - newMetrics.setCpuDuration(fileEvent.getCpuNanoseconds()); - newMetrics.setContentReadDuration(fileEvent.getContentReadNanoseconds()); - newMetrics.setContentWriteDuration(fileEvent.getContentWriteNanoseconds()); - newMetrics.setSessionCommitDuration(fileEvent.getSessionCommitNanoseconds()); - newMetrics.setGarbageCollectionDuration(fileEvent.getGargeCollectionMillis()); - - return newMetrics; - } -} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java index f15733e344ca..4091b7a746c1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java @@ -23,16 +23,16 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; public class StatusHistoryUtil { @@ -79,7 +79,7 @@ public static StatusDescriptorDTO createStatusDescriptorDto(final MetricDescript public static List createFieldDescriptorDtos(final Collection> metricDescriptors) { final StatusDescriptorDTO[] standardMetricDescriptors = new StatusDescriptorDTO[metricDescriptors.size()]; - final List counterMetricDescriptors = new LinkedList<>(); + final List counterMetricDescriptors = new ArrayList<>(); for (final MetricDescriptor metricDescriptor : metricDescriptors) { if (metricDescriptor instanceof StandardMetricDescriptor) { @@ -91,9 +91,10 @@ public static List createFieldDescriptorDtos(final Collecti } } - // Ordered standard metric descriptors are added first than counter metric descriptors in the order of appearance. + // Ordered standard metric descriptors are added first, then counter metric descriptors in lexicographical order of their label. + counterMetricDescriptors.sort(Comparator.comparing(StatusDescriptorDTO::getLabel)); final List result = new ArrayList<>(metricDescriptors.size()); - result.addAll(Arrays.asList(standardMetricDescriptors).stream().filter(i -> i != null).collect(Collectors.toList())); + result.addAll(Arrays.stream(standardMetricDescriptors).filter(Objects::nonNull).toList()); result.addAll(counterMetricDescriptors); return result; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java index feff921299de..e701b2be2f2c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java @@ -17,11 +17,13 @@ package org.apache.nifi.controller.status.history; +import org.apache.nifi.controller.status.ProcessingPerformanceStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Function; public enum ProcessorStatusDescriptor { BYTES_READ( @@ -164,8 +166,176 @@ public Long reduce(final List values) { } }, true + ), + + CPU_MILLIS( + "cpuTime", + "CPU Time (5 mins)", + "The total amount of time that the Processor has used the CPU in the past 5 minutes. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.DURATION, + status -> nanosToMillis(status, ProcessingPerformanceStatus::getCpuDuration) + ), + + CPU_PERCENTAGE( + "cpuPercentage", + "CPU Percentage (5 mins)", + "Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor was using the CPU. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.COUNT, + status -> nanosValue(status, ProcessingPerformanceStatus::getCpuDuration) * 100 / Math.max(1, status.getProcessingNanos()), + processingPercentage(CPU_MILLIS.getDescriptor()), + true + ), + + CONTENT_REPO_READ_MILLIS( + "contentRepoReadTime", + "Content Repo Read Time (5 mins)", + "The amount of time that the Processor has spent reading from the Content Repository in the past 5 minutes. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.DURATION, + status -> nanosToMillis(status, ProcessingPerformanceStatus::getContentReadDuration) + ), + + CONTENT_REPO_READ_PERCENTAGE( + "contentRepoReadPercentage", + "Content Repo Read Percentage (5 mins)", + "Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor was reading from the Content Repository. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.COUNT, + status -> nanosValue(status, ProcessingPerformanceStatus::getContentReadDuration) * 100 / Math.max(1, status.getProcessingNanos()), + processingPercentage(CONTENT_REPO_READ_MILLIS.getDescriptor()), + true + ), + + CONTENT_REPO_WRITE_MILLIS( + "contentRepoWriteTime", + "Content Repo Write Time (5 mins)", + "The total amount of time that the Processor has spent writing to the Content Repository in the past 5 minutes. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.DURATION, + status -> nanosToMillis(status, ProcessingPerformanceStatus::getContentWriteDuration) + ), + + CONTENT_REPO_WRITE_PERCENTAGE( + "contentRepoWritePercentage", + "Content Repo Write Percentage (5 mins)", + "Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor was writing to the Content Repository. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.COUNT, + status -> nanosValue(status, ProcessingPerformanceStatus::getContentWriteDuration) * 100 / Math.max(1, status.getProcessingNanos()), + processingPercentage(CONTENT_REPO_WRITE_MILLIS.getDescriptor()), + true + ), + + SESSION_COMMIT_MILLIS( + "sessionCommitTime", + "Session Commit Time (5 mins)", + "The total amount of time that the Processor has spent waiting for the framework to commit its ProcessSession in the past 5 minutes. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.DURATION, + status -> nanosToMillis(status, ProcessingPerformanceStatus::getSessionCommitDuration) + ), + + SESSION_COMMIT_PERCENTAGE( + "sessionCommitPercentage", + "Session Commit Percentage (5 mins)", + "Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor was waiting for hte framework to commit its ProcessSession. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.COUNT, + status -> nanosValue(status, ProcessingPerformanceStatus::getSessionCommitDuration) * 100 / Math.max(1, status.getProcessingNanos()), + processingPercentage(SESSION_COMMIT_MILLIS.getDescriptor()), + true + ), + + GARBAGE_COLLECTION_MILLIS( + "garbageCollectionTime", + "Garbage Collection Time (5 mins)", + "The total amount of time that the Processor has spent blocked on Garbage Collection in the past 5 minutes. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.DURATION, + status -> { + final ProcessingPerformanceStatus perfStatus = status.getProcessingPerformanceStatus(); + if (perfStatus == null) { + return 0L; + } + // Garbage Collection is reported in milliseconds rather than nanos. + return perfStatus.getGarbageCollectionDuration(); + } + ), + + GARBAGE_COLLECTION_PERCENTAGE( + "garbageCollectionPercentage", + "Garbage Collection Percentage (5 mins)", + "Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor spent blocked on Garbage Collection. " + + "Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.", + Formatter.COUNT, + status -> { + final ProcessingPerformanceStatus perfStatus = status.getProcessingPerformanceStatus(); + if (perfStatus == null) { + return 0L; + } + // Garbage Collection is reported in milliseconds rather than nanos. + final long percentage = perfStatus.getGarbageCollectionDuration() * 100 / Math.max(1, status.getProcessingNanos()); + if (percentage == 0 && perfStatus.getGarbageCollectionDuration() > 0) { + // If the value is non-zero but less than 1%, we want to return 1%. + return 1L; + } + return percentage; + }, + processingPercentage(GARBAGE_COLLECTION_MILLIS.getDescriptor()), + true ); + private static long nanosToMillis(final ProcessorStatus procStatus, final Function metricTransform) { + final ProcessingPerformanceStatus perfStatus = procStatus.getProcessingPerformanceStatus(); + if (perfStatus == null) { + return 0; + } + + final long nanos = metricTransform.apply(perfStatus); + final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); + if (millis == 0 && nanos > 0) { + // If the value is non-zero but less than 1ms, we want to return 1ms. + return 1; + } + + return millis; + } + + private static ValueReducer processingPercentage(final MetricDescriptor metricDescriptor) { + return new ValueReducer<>() { + @Override + public Long reduce(final List values) { + long procNanos = 0L; + long metricMillis = 0L; + + for (final StatusSnapshot snapshot : values) { + final long millis = snapshot.getStatusMetric(metricDescriptor); + metricMillis += millis; + + final long taskNanos = snapshot.getStatusMetric(ProcessorStatusDescriptor.TASK_NANOS.getDescriptor()); + procNanos += taskNanos; + } + + if (procNanos == 0) { + return 0L; + } + + final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS); + return metricMillis * 100 / procMillis; + } + }; + } + + private static long nanosValue(final ProcessorStatus procStatus, final Function metricTransform) { + final ProcessingPerformanceStatus perfStatus = procStatus.getProcessingPerformanceStatus(); + if (perfStatus == null) { + return 0; + } + + return metricTransform.apply(perfStatus); + } private final MetricDescriptor descriptor;