Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ public synchronized void add(final EventSumValue other) {
this.contentReadNanos += other.contentReadNanos;
this.contentWriteNanos += other.contentWriteNanos;
this.sessionCommitNanos += other.sessionCommitNanos;
this.gcMillis += other.gcMillis;

final Map<String, Long> eventCounters = other.counters;
if (eventCounters != null) {
Expand Down Expand Up @@ -192,6 +193,7 @@ public synchronized void subtract(final EventSumValue other) {
this.contentReadNanos -= other.contentReadNanos;
this.contentWriteNanos -= other.contentWriteNanos;
this.sessionCommitNanos -= other.sessionCommitNanos;
this.gcMillis -= other.gcMillis;

final Map<String, Long> eventCounters = other.counters;
if (eventCounters != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,25 +685,15 @@ protected ProcessorStatus getProcessorStatus(final FlowFileEvent flowFileEvent,
status.setCounters(flowFileEvent.getCounters());
}

final ProcessingPerformanceStatus performanceStatus = PerformanceMetricsUtil.getPerformanceMetrics(flowFileEvent, procNode);
status.setProcessingPerformanceStatus(performanceStatus);
final ProcessingPerformanceStatus perfStatus = createProcessingPerformanceStatus(flowFileEvent, procNode);
status.setProcessingPerformanceStatus(perfStatus);
}

// Determine the run status and get any validation error... only validating while STOPPED
// is a trade-off we are willing to make, even though processor validity could change due to
// environmental conditions (property configured with a file path and the file being externally
// removed). This saves on validation costs that would be unnecessary most of the time.
if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
status.setRunStatus(RunStatus.Disabled);
} else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
status.setRunStatus(RunStatus.Running);
} else if (procNode.getValidationStatus() == ValidationStatus.VALIDATING) {
status.setRunStatus(RunStatus.Validating);
} else if (procNode.getValidationStatus() == ValidationStatus.INVALID && procNode.getActiveThreadCount() == 0) {
status.setRunStatus(RunStatus.Invalid);
} else {
status.setRunStatus(RunStatus.Stopped);
}
status.setRunStatus(determineRunStatus(procNode));

status.setExecutionNode(procNode.getExecutionNode());
status.setTerminatedThreadCount(procNode.getTerminatedThreadCount());
Expand All @@ -712,6 +702,31 @@ protected ProcessorStatus getProcessorStatus(final FlowFileEvent flowFileEvent,
return status;
}

private ProcessingPerformanceStatus createProcessingPerformanceStatus(final FlowFileEvent flowFileEvent, final ProcessorNode procNode) {
final ProcessingPerformanceStatus perfStatus = new ProcessingPerformanceStatus();
perfStatus.setIdentifier(procNode.getIdentifier());
perfStatus.setCpuDuration(flowFileEvent.getCpuNanoseconds());
perfStatus.setContentReadDuration(flowFileEvent.getContentReadNanoseconds());
perfStatus.setContentWriteDuration(flowFileEvent.getContentWriteNanoseconds());
perfStatus.setSessionCommitDuration(flowFileEvent.getSessionCommitNanoseconds());
perfStatus.setGarbageCollectionDuration(flowFileEvent.getGargeCollectionMillis());
return perfStatus;
}

private RunStatus determineRunStatus(final ProcessorNode procNode) {
if (ScheduledState.DISABLED.equals(procNode.getScheduledState())) {
return RunStatus.Disabled;
} else if (ScheduledState.RUNNING.equals(procNode.getScheduledState())) {
return RunStatus.Running;
} else if (procNode.getValidationStatus() == ValidationStatus.VALIDATING) {
return RunStatus.Validating;
} else if (procNode.getValidationStatus() == ValidationStatus.INVALID && procNode.getActiveThreadCount() == 0) {
return RunStatus.Invalid;
}

return RunStatus.Stopped;
}

/**
* Returns the status of all components in the controller. This request is
* not in the context of a user so the results will be unfiltered.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

public class StatusHistoryUtil {

Expand Down Expand Up @@ -79,7 +79,7 @@ public static StatusDescriptorDTO createStatusDescriptorDto(final MetricDescript

public static List<StatusDescriptorDTO> createFieldDescriptorDtos(final Collection<MetricDescriptor<?>> metricDescriptors) {
final StatusDescriptorDTO[] standardMetricDescriptors = new StatusDescriptorDTO[metricDescriptors.size()];
final List<StatusDescriptorDTO> counterMetricDescriptors = new LinkedList<>();
final List<StatusDescriptorDTO> counterMetricDescriptors = new ArrayList<>();

for (final MetricDescriptor<?> metricDescriptor : metricDescriptors) {
if (metricDescriptor instanceof StandardMetricDescriptor) {
Expand All @@ -91,9 +91,10 @@ public static List<StatusDescriptorDTO> createFieldDescriptorDtos(final Collecti
}
}

// Ordered standard metric descriptors are added first than counter metric descriptors in the order of appearance.
// Ordered standard metric descriptors are added first, then counter metric descriptors in lexicographical order of their label.
counterMetricDescriptors.sort(Comparator.comparing(StatusDescriptorDTO::getLabel));
final List<StatusDescriptorDTO> result = new ArrayList<>(metricDescriptors.size());
result.addAll(Arrays.asList(standardMetricDescriptors).stream().filter(i -> i != null).collect(Collectors.toList()));
result.addAll(Arrays.stream(standardMetricDescriptors).filter(Objects::nonNull).toList());
result.addAll(counterMetricDescriptors);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.nifi.controller.status.history;

import org.apache.nifi.controller.status.ProcessingPerformanceStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public enum ProcessorStatusDescriptor {
BYTES_READ(
Expand Down Expand Up @@ -164,8 +166,176 @@ public Long reduce(final List<StatusSnapshot> values) {
}
},
true
),

CPU_MILLIS(
"cpuTime",
"CPU Time (5 mins)",
"The total amount of time that the Processor has used the CPU in the past 5 minutes. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.DURATION,
status -> nanosToMillis(status, ProcessingPerformanceStatus::getCpuDuration)
),

CPU_PERCENTAGE(
"cpuPercentage",
"CPU Percentage (5 mins)",
"Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor was using the CPU. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.COUNT,
status -> nanosValue(status, ProcessingPerformanceStatus::getCpuDuration) * 100 / Math.max(1, status.getProcessingNanos()),
processingPercentage(CPU_MILLIS.getDescriptor()),
true
),

CONTENT_REPO_READ_MILLIS(
"contentRepoReadTime",
"Content Repo Read Time (5 mins)",
"The amount of time that the Processor has spent reading from the Content Repository in the past 5 minutes. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.DURATION,
status -> nanosToMillis(status, ProcessingPerformanceStatus::getContentReadDuration)
),

CONTENT_REPO_READ_PERCENTAGE(
"contentRepoReadPercentage",
"Content Repo Read Percentage (5 mins)",
"Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor was reading from the Content Repository. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.COUNT,
status -> nanosValue(status, ProcessingPerformanceStatus::getContentReadDuration) * 100 / Math.max(1, status.getProcessingNanos()),
processingPercentage(CONTENT_REPO_READ_MILLIS.getDescriptor()),
true
),

CONTENT_REPO_WRITE_MILLIS(
"contentRepoWriteTime",
"Content Repo Write Time (5 mins)",
"The total amount of time that the Processor has spent writing to the Content Repository in the past 5 minutes. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.DURATION,
status -> nanosToMillis(status, ProcessingPerformanceStatus::getContentWriteDuration)
),

CONTENT_REPO_WRITE_PERCENTAGE(
"contentRepoWritePercentage",
"Content Repo Write Percentage (5 mins)",
"Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor was writing to the Content Repository. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.COUNT,
status -> nanosValue(status, ProcessingPerformanceStatus::getContentWriteDuration) * 100 / Math.max(1, status.getProcessingNanos()),
processingPercentage(CONTENT_REPO_WRITE_MILLIS.getDescriptor()),
true
),

SESSION_COMMIT_MILLIS(
"sessionCommitTime",
"Session Commit Time (5 mins)",
"The total amount of time that the Processor has spent waiting for the framework to commit its ProcessSession in the past 5 minutes. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.DURATION,
status -> nanosToMillis(status, ProcessingPerformanceStatus::getSessionCommitDuration)
),

SESSION_COMMIT_PERCENTAGE(
"sessionCommitPercentage",
"Session Commit Percentage (5 mins)",
"Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor was waiting for hte framework to commit its ProcessSession. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.COUNT,
status -> nanosValue(status, ProcessingPerformanceStatus::getSessionCommitDuration) * 100 / Math.max(1, status.getProcessingNanos()),
processingPercentage(SESSION_COMMIT_MILLIS.getDescriptor()),
true
),

GARBAGE_COLLECTION_MILLIS(
"garbageCollectionTime",
"Garbage Collection Time (5 mins)",
"The total amount of time that the Processor has spent blocked on Garbage Collection in the past 5 minutes. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.DURATION,
status -> {
final ProcessingPerformanceStatus perfStatus = status.getProcessingPerformanceStatus();
if (perfStatus == null) {
return 0L;
}
// Garbage Collection is reported in milliseconds rather than nanos.
return perfStatus.getGarbageCollectionDuration();
}
),

GARBAGE_COLLECTION_PERCENTAGE(
"garbageCollectionPercentage",
"Garbage Collection Percentage (5 mins)",
"Of the time that the Processor was running in the past 5 minutes, the percentage of that time that the Processor spent blocked on Garbage Collection. " +
"Note, this metric may be unavailable, depending on configuration of the `nifi.performance.tracking.percentage` property.",
Formatter.COUNT,
status -> {
final ProcessingPerformanceStatus perfStatus = status.getProcessingPerformanceStatus();
if (perfStatus == null) {
return 0L;
}
// Garbage Collection is reported in milliseconds rather than nanos.
final long percentage = perfStatus.getGarbageCollectionDuration() * 100 / Math.max(1, status.getProcessingNanos());
if (percentage == 0 && perfStatus.getGarbageCollectionDuration() > 0) {
// If the value is non-zero but less than 1%, we want to return 1%.
return 1L;
}
return percentage;
},
processingPercentage(GARBAGE_COLLECTION_MILLIS.getDescriptor()),
true
);

private static long nanosToMillis(final ProcessorStatus procStatus, final Function<ProcessingPerformanceStatus, Long> metricTransform) {
final ProcessingPerformanceStatus perfStatus = procStatus.getProcessingPerformanceStatus();
if (perfStatus == null) {
return 0;
}

final long nanos = metricTransform.apply(perfStatus);
final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
if (millis == 0 && nanos > 0) {
// If the value is non-zero but less than 1ms, we want to return 1ms.
return 1;
}

return millis;
}

private static ValueReducer<StatusSnapshot, Long> processingPercentage(final MetricDescriptor<?> metricDescriptor) {
return new ValueReducer<>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
long procNanos = 0L;
long metricMillis = 0L;

for (final StatusSnapshot snapshot : values) {
final long millis = snapshot.getStatusMetric(metricDescriptor);
metricMillis += millis;

final long taskNanos = snapshot.getStatusMetric(ProcessorStatusDescriptor.TASK_NANOS.getDescriptor());
procNanos += taskNanos;
}

if (procNanos == 0) {
return 0L;
}

final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
return metricMillis * 100 / procMillis;
}
};
}

private static long nanosValue(final ProcessorStatus procStatus, final Function<ProcessingPerformanceStatus, Long> metricTransform) {
final ProcessingPerformanceStatus perfStatus = procStatus.getProcessingPerformanceStatus();
if (perfStatus == null) {
return 0;
}

return metricTransform.apply(perfStatus);
}


private final MetricDescriptor<ProcessorStatus> descriptor;
Expand Down