From 278f2ae1c395219a584debf252d8491fe507ad17 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 2 Aug 2018 14:17:36 -0400 Subject: [PATCH 1/2] NIFI-5466: Keep a running total of stats for each component. Refactored FlowFileEvent and repository in order to provide more efficient storage of objects on Java heap by allowing the same 'EMPTY' object to be reused - Refactored VolatileComponentStatusRepository to avoid holding on to ProcessorStatus objects, etc, and only keep what they need - Updated VolatileComponentStatusRepository to ensure that we are efficiently storing metrics for processors, etc. that are not running --- .../status/history/MetricDescriptor.java | 5 +- .../status/history/StatusSnapshot.java | 12 +- .../StatusHistoryEndpointMerger.java | 11 +- .../controller/repository/FlowFileEvent.java | 2 - .../repository/FlowFileEventRepository.java | 7 +- .../repository/RepositoryStatusReport.java | 2 +- .../nifi/controller/FlowController.java | 26 +- .../repository/StandardProcessSession.java | 16 +- .../StandardRepositoryStatusReport.java | 7 +- .../metrics/EmptyFlowFileEvent.java | 98 ++++++ .../repository/metrics/EventContainer.java | 6 +- .../repository/metrics/EventSum.java | 38 ++- .../repository/metrics/EventSumValue.java | 63 +++- .../metrics/RingBufferEventRepository.java | 16 +- .../SecondPrecisionEventContainer.java | 96 ++++-- .../metrics/StandardFlowFileEvent.java | 14 +- .../EventDrivenSchedulingAgent.java | 18 +- .../status/history/ComponentDetails.java | 107 +++++++ .../history/ComponentStatusHistory.java | 37 +++ .../history/ConnectionStatusDescriptor.java | 30 +- .../status/history/EmptyStatusSnapshot.java | 50 +++ .../status/history/IndexableMetric.java | 5 + .../status/history/MetricRollingBuffer.java | 180 +++++++++++ .../history/ProcessGroupStatusDescriptor.java | 59 ++-- .../history/ProcessorStatusDescriptor.java | 96 +++--- .../RemoteProcessGroupStatusDescriptor.java | 53 ++-- .../history/StandardMetricDescriptor.java | 18 +- .../status/history/StandardStatusHistory.java | 34 +-- .../history/StandardStatusSnapshot.java | 58 +++- .../status/history/StatusHistoryUtil.java | 18 +- .../VolatileComponentStatusRepository.java | 285 +++++++----------- .../controller/tasks/ConnectableTask.java | 14 +- .../nifi/groups/StandardProcessGroup.java | 2 +- .../apache/nifi/util/ComponentMetrics.java | 172 +++++++++++ .../nifi/util/ComponentStatusReport.java | 137 --------- .../TestRingBufferEventRepository.java | 31 +- .../TestSecondPrecisionEventContainer.java | 119 ++++++++ .../history/TestMetricRollingBuffer.java | 110 +++++++ 38 files changed, 1445 insertions(+), 607 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java delete mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentStatusReport.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java index 8fdce05cadf6..c0c52b6af765 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/MetricDescriptor.java @@ -23,13 +23,14 @@ */ public interface MetricDescriptor { - public enum Formatter { - + enum Formatter { COUNT, DURATION, DATA_SIZE }; + int getMetricIdentifier(); + /** * Specifies how the values should be formatted * diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java index 551ceb255244..da794eea5b16 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusSnapshot.java @@ -17,7 +17,7 @@ package org.apache.nifi.controller.status.history; import java.util.Date; -import java.util.Map; +import java.util.Set; /** * A StatusSnapshot represents a Component's status report at some point in time @@ -29,10 +29,16 @@ public interface StatusSnapshot { */ Date getTimestamp(); + Set> getMetricDescriptors(); + + Long getStatusMetric(MetricDescriptor descriptor); + /** - * @return a Map of MetricDescriptor to value + * Returns an instance of StatusSnapshot that has all the same information as {@code this} except for + * Counters. If {@code this} does not contain any counters, the object returned may (or may not) be {@code this}. + * @return a StatusSnapshot without counters */ - Map, Long> getStatusMetrics(); + StatusSnapshot withoutCounters(); /** * @return a {@link ValueReducer} that is capable of merging multiple diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java index 49e952b1154e..8e4c26bece15 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/StatusHistoryEndpointMerger.java @@ -41,6 +41,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -156,7 +157,7 @@ public NodeResponse merge(URI uri, String method, Set successfulRe return counters.getOrDefault(descriptorDto.getField(), 0L); }; - final MetricDescriptor metricDescriptor = new StandardMetricDescriptor<>(descriptorDto.getField(), + final MetricDescriptor metricDescriptor = new StandardMetricDescriptor<>(() -> 0, descriptorDto.getField(), descriptorDto.getLabel(), descriptorDto.getDescription(), Formatter.COUNT, valueMapper); metricDescriptors.put(fieldName, metricDescriptor); @@ -197,11 +198,7 @@ private List mergeStatusHistories(final List nodeToSnapshotMap = dateToNodeSnapshots.get(normalizedDate); - if (nodeToSnapshotMap == null) { - nodeToSnapshotMap = new HashMap<>(); - dateToNodeSnapshots.put(normalizedDate, nodeToSnapshotMap); - } + Map nodeToSnapshotMap = dateToNodeSnapshots.computeIfAbsent(normalizedDate, k -> new HashMap<>()); nodeToSnapshotMap.put(nodeStatusSnapshot.getNodeId(), snapshot); } } @@ -220,7 +217,7 @@ private List mergeStatusHistories(final List> metricDescriptors) { - final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(); + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(new HashSet<>(metricDescriptors.values())); snapshot.setTimestamp(snapshotDto.getTimestamp()); // Default all metrics to 0 so that if a counter has not yet been registered, it will have a value of 0 instead diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java index 26cea50e7456..7b131ccf2db1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java @@ -20,8 +20,6 @@ public interface FlowFileEvent { - String getComponentIdentifier(); - int getFlowFilesIn(); int getFlowFilesOut(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java index 1781d1866dbc..8fac237add2e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java @@ -25,15 +25,16 @@ public interface FlowFileEventRepository extends Closeable { * Updates the repository to include a new FlowFile processing event * * @param event new event + * @param componentIdentifier the ID of the component that the event belongs to * @throws java.io.IOException ioe */ - void updateRepository(FlowFileEvent event) throws IOException; + void updateRepository(FlowFileEvent event, String componentIdentifier) throws IOException; /** - * @param sinceEpochMillis age of report + * @param now the current time * @return a report of processing activity since the given time */ - RepositoryStatusReport reportTransferEvents(long sinceEpochMillis); + RepositoryStatusReport reportTransferEvents(long now); /** * Causes any flow file events of the given entry age in epoch milliseconds diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java index e434905d46a0..5479e26236b7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/RepositoryStatusReport.java @@ -20,7 +20,7 @@ public interface RepositoryStatusReport { - void addReportEntry(FlowFileEvent entry); + void addReportEntry(FlowFileEvent entry, String componentId); Map getReportEntries(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index b83749c94fd1..21c61e9368b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -103,6 +103,7 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.io.LimitedInputStream; +import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent; import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent; import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent; import org.apache.nifi.controller.scheduling.RepositoryContextFactory; @@ -630,7 +631,11 @@ private FlowController( timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @Override public void run() { - componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus()); + try { + componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus()); + } catch (final Exception e) { + LOG.error("Failed to capture component stats for Stats History", e); + } } }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS); @@ -3333,18 +3338,7 @@ private ProcessorStatus getProcessorStatus(final RepositoryStatusReport report, status.setType(isProcessorAuthorized ? procNode.getComponentType() : "Processor"); final FlowFileEvent entry = report.getReportEntries().get(procNode.getIdentifier()); - if (entry == null) { - status.setInputBytes(0L); - status.setInputCount(0); - status.setOutputBytes(0L); - status.setOutputCount(0); - status.setBytesWritten(0L); - status.setBytesRead(0L); - status.setProcessingNanos(0); - status.setInvocations(0); - status.setAverageLineageDuration(0L); - status.setFlowFilesRemoved(0); - } else { + if (entry != null && entry != EmptyFlowFileEvent.INSTANCE) { final int processedCount = entry.getFlowFilesOut(); final long numProcessedBytes = entry.getContentSizeOut(); status.setOutputBytes(numProcessedBytes); @@ -4117,13 +4111,9 @@ public int getActiveThreadCount() { } private RepositoryStatusReport getProcessorStats() { - // processed in last 5 minutes - return getProcessorStats(System.currentTimeMillis() - 300000); + return flowFileEventRepository.reportTransferEvents(System.currentTimeMillis()); } - private RepositoryStatusReport getProcessorStats(final long since) { - return flowFileEventRepository.reportTransferEvents(since); - } // // Clustering methods diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 72556456682b..9741cffe02ff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -528,7 +528,7 @@ private void updateEventRepository(final Checkpoint checkpoint) { try { // update event repository final Connectable connectable = context.getConnectable(); - final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier()); + final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(); flowFileEvent.setBytesRead(checkpoint.bytesRead); flowFileEvent.setBytesWritten(checkpoint.bytesWritten); flowFileEvent.setContentSizeIn(checkpoint.contentSizeIn); @@ -553,10 +553,10 @@ private void updateEventRepository(final Checkpoint checkpoint) { final Map counters = combineCounters(checkpoint.countersOnCommit, checkpoint.immediateCounters); flowFileEvent.setCounters(counters); - context.getFlowFileEventRepository().updateRepository(flowFileEvent); + context.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier()); - for (final FlowFileEvent connectionEvent : checkpoint.connectionCounts.values()) { - context.getFlowFileEventRepository().updateRepository(connectionEvent); + for (final Map.Entry entry : checkpoint.connectionCounts.entrySet()) { + context.getFlowFileEventRepository().updateRepository(entry.getValue(), entry.getKey()); } } catch (final IOException ioe) { LOG.error("FlowFile Event Repository failed to update", ioe); @@ -1052,14 +1052,14 @@ private synchronized void rollback(final boolean penalize, final boolean rollbac } final Connectable connectable = context.getConnectable(); - final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier()); + final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(); flowFileEvent.setBytesRead(bytesRead); flowFileEvent.setBytesWritten(bytesWritten); flowFileEvent.setCounters(immediateCounters); // update event repository try { - context.getFlowFileEventRepository().updateRepository(flowFileEvent); + context.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier()); } catch (final Exception e) { LOG.error("Failed to update FlowFileEvent Repository due to " + e); if (LOG.isDebugEnabled()) { @@ -1458,7 +1458,7 @@ private void incrementConnectionInputCounts(final Connection connection, final R } private void incrementConnectionInputCounts(final String connectionId, final int flowFileCount, final long bytes) { - final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id)); + final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent()); connectionEvent.setContentSizeIn(connectionEvent.getContentSizeIn() + bytes); connectionEvent.setFlowFilesIn(connectionEvent.getFlowFilesIn() + flowFileCount); } @@ -1468,7 +1468,7 @@ private void incrementConnectionOutputCounts(final Connection connection, final } private void incrementConnectionOutputCounts(final String connectionId, final int flowFileCount, final long bytes) { - final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent(id)); + final StandardFlowFileEvent connectionEvent = connectionCounts.computeIfAbsent(connectionId, id -> new StandardFlowFileEvent()); connectionEvent.setContentSizeOut(connectionEvent.getContentSizeOut() + bytes); connectionEvent.setFlowFilesOut(connectionEvent.getFlowFilesOut() + flowFileCount); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java index 3e3005956dd5..a36716844aff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryStatusReport.java @@ -54,13 +54,14 @@ public FlowFileEvent getReportEntry(final String componentId) { * Adds an entry to the report. * * @param entry an entry + * @param componentId the id of the component that the entry belongs to */ @Override - public void addReportEntry(FlowFileEvent entry) { + public void addReportEntry(FlowFileEvent entry, final String componentId) { if (entry == null) { throw new NullPointerException("report entry may not be null"); } - this.entries.put(entry.getComponentIdentifier(), entry); + this.entries.put(componentId, entry); } @Override @@ -69,7 +70,7 @@ public String toString() { for (final String key : this.entries.keySet()) { final FlowFileEvent entry = this.entries.get(key); strb.append("[") - .append(entry.getComponentIdentifier()).append(", ") + .append(key).append(", ") .append(entry.getFlowFilesIn()).append(", ") .append(entry.getContentSizeIn()).append(", ") .append(entry.getFlowFilesOut()).append(", ") diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java new file mode 100644 index 000000000000..496a1e111466 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java @@ -0,0 +1,98 @@ +package org.apache.nifi.controller.repository.metrics; + +import org.apache.nifi.controller.repository.FlowFileEvent; + +import java.util.Collections; +import java.util.Map; + +public class EmptyFlowFileEvent implements FlowFileEvent { + public static final EmptyFlowFileEvent INSTANCE = new EmptyFlowFileEvent(); + + private EmptyFlowFileEvent() { + } + + @Override + public int getFlowFilesIn() { + return 0; + } + + @Override + public int getFlowFilesOut() { + return 0; + } + + @Override + public int getFlowFilesRemoved() { + return 0; + } + + @Override + public long getContentSizeIn() { + return 0; + } + + @Override + public long getContentSizeOut() { + return 0; + } + + @Override + public long getContentSizeRemoved() { + return 0; + } + + @Override + public long getBytesRead() { + return 0; + } + + @Override + public long getBytesWritten() { + return 0; + } + + @Override + public long getProcessingNanoseconds() { + return 0; + } + + @Override + public long getAverageLineageMillis() { + return 0; + } + + @Override + public long getAggregateLineageMillis() { + return 0; + } + + @Override + public int getFlowFilesReceived() { + return 0; + } + + @Override + public long getBytesReceived() { + return 0; + } + + @Override + public int getFlowFilesSent() { + return 0; + } + + @Override + public long getBytesSent() { + return 0; + } + + @Override + public int getInvocations() { + return 0; + } + + @Override + public Map getCounters() { + return Collections.emptyMap(); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java index 9dd3c8ef06c0..d193b7d7c5d1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventContainer.java @@ -20,9 +20,9 @@ import org.apache.nifi.controller.repository.FlowFileEvent; public interface EventContainer { - public void addEvent(FlowFileEvent event); + void addEvent(FlowFileEvent event); - public void purgeEvents(long cutoffEpochMillis); + void purgeEvents(long cutoffEpochMillis); - public FlowFileEvent generateReport(String componentId, long sinceEpochMillis); + FlowFileEvent generateReport(long sinceEpochMillis); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java index b1c9120392fc..5fef08b93c91 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSum.java @@ -17,31 +17,30 @@ package org.apache.nifi.controller.repository.metrics; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.nifi.controller.repository.FlowFileEvent; -public class EventSum { +import java.util.concurrent.atomic.AtomicReference; +public class EventSum { private final AtomicReference ref = new AtomicReference<>(); public EventSumValue getValue() { final EventSumValue value = ref.get(); - return value == null ? new EventSumValue() : value; + return value == null ? new EventSumValue(System.currentTimeMillis()) : value; } - public void addOrReset(final FlowFileEvent event) { - final long expectedMinute = System.currentTimeMillis() / 60000; + public EventSumValue addOrReset(final FlowFileEvent event, final long timestamp) { + final long expectedSecond = timestamp / 1000; EventSumValue curValue; while (true) { curValue = ref.get(); - if (curValue == null || curValue.getMinuteTimestamp() != expectedMinute) { - final EventSumValue newValue = new EventSumValue(); + if (curValue == null || (curValue.getTimestamp() / 1000) != expectedSecond) { + final EventSumValue newValue = new EventSumValue(timestamp); final boolean replaced = ref.compareAndSet(curValue, newValue); if (replaced) { - curValue = newValue; - break; + newValue.add(event); + return curValue; } } else { break; @@ -49,5 +48,24 @@ public void addOrReset(final FlowFileEvent event) { } curValue.add(event); + return null; + } + + + public EventSumValue reset(final long ifOlderThan) { + while (true) { + final EventSumValue curValue = ref.get(); + if (curValue == null) { + return null; + } + + if (curValue.getTimestamp() < ifOlderThan) { + if (ref.compareAndSet(curValue, null)) { + return curValue; + } + } else { + return null; + } + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java index 90990df69020..210f7ace8288 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java @@ -17,13 +17,14 @@ package org.apache.nifi.controller.repository.metrics; +import org.apache.nifi.controller.repository.FlowFileEvent; + import java.util.Collections; import java.util.HashMap; import java.util.Map; -import org.apache.nifi.controller.repository.FlowFileEvent; - public class EventSumValue { + private volatile boolean empty = true; private int flowFilesIn = 0; private int flowFilesOut = 0; @@ -44,16 +45,16 @@ public class EventSumValue { private int invocations = 0; private Map counters; - private final long minuteTimestamp; private final long millisecondTimestamp; - public EventSumValue() { - this.millisecondTimestamp = System.currentTimeMillis(); - this.minuteTimestamp = millisecondTimestamp / 60000; + public EventSumValue(final long timestamp) { + this.millisecondTimestamp = timestamp; } public synchronized void add(final FlowFileEvent flowFileEvent) { + empty = false; + this.aggregateLineageMillis += flowFileEvent.getAggregateLineageMillis(); this.bytesRead += flowFileEvent.getBytesRead(); this.bytesReceived += flowFileEvent.getBytesReceived(); @@ -84,8 +85,12 @@ public synchronized void add(final FlowFileEvent flowFileEvent) { } } - public synchronized FlowFileEvent toFlowFileEvent(final String componentId) { - final StandardFlowFileEvent event = new StandardFlowFileEvent(componentId); + public synchronized FlowFileEvent toFlowFileEvent() { + if (empty) { + return EmptyFlowFileEvent.INSTANCE; + } + + final StandardFlowFileEvent event = new StandardFlowFileEvent(); event.setAggregateLineageMillis(aggregateLineageMillis); event.setBytesRead(bytesRead); event.setBytesReceived(bytesReceived); @@ -106,6 +111,10 @@ public synchronized FlowFileEvent toFlowFileEvent(final String componentId) { } public synchronized void add(final EventSumValue other) { + if (other.empty) { + return; + } + synchronized (other) { this.aggregateLineageMillis += other.aggregateLineageMillis; this.bytesRead += other.bytesRead; @@ -139,8 +148,42 @@ public synchronized void add(final EventSumValue other) { } } - public long getMinuteTimestamp() { - return minuteTimestamp; + public synchronized void subtract(final EventSumValue other) { + if (other.empty) { + return; + } + + synchronized (other) { + this.aggregateLineageMillis -= other.aggregateLineageMillis; + this.bytesRead -= other.bytesRead; + this.bytesReceived -= other.bytesReceived; + this.bytesSent -= other.bytesSent; + this.bytesWritten -= other.bytesWritten; + this.contentSizeIn -= other.contentSizeIn; + this.contentSizeOut -= other.contentSizeOut; + this.contentSizeRemoved -= other.contentSizeRemoved; + this.flowFilesIn -= other.flowFilesIn; + this.flowFilesOut -= other.flowFilesOut; + this.flowFilesReceived -= other.flowFilesReceived; + this.flowFilesRemoved -= other.flowFilesRemoved; + this.flowFilesSent -= other.flowFilesSent; + this.invocations -= other.invocations; + this.processingNanos -= other.processingNanos; + + final Map eventCounters = other.counters; + if (eventCounters != null) { + if (counters == null) { + counters = new HashMap<>(); + } + + for (final Map.Entry entry : eventCounters.entrySet()) { + final String counterName = entry.getKey(); + final Long counterValue = entry.getValue(); + + counters.compute(counterName, (key, value) -> value == null ? counterValue : counterValue - value); + } + } + } } public long getTimestamp() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java index c60f98da5c12..bcd03448f34e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/RingBufferEventRepository.java @@ -16,14 +16,14 @@ */ package org.apache.nifi.controller.repository.metrics; -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.nifi.controller.repository.FlowFileEvent; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.StandardRepositoryStatusReport; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + public class RingBufferEventRepository implements FlowFileEventRepository { private final int numMinutes; @@ -38,8 +38,7 @@ public void close() throws IOException { } @Override - public void updateRepository(final FlowFileEvent event) { - final String componentId = event.getComponentIdentifier(); + public void updateRepository(final FlowFileEvent event, final String componentId) { final EventContainer eventContainer = componentEventMap.computeIfAbsent(componentId, id -> new SecondPrecisionEventContainer(numMinutes)); eventContainer.addEvent(event); } @@ -48,10 +47,7 @@ public void updateRepository(final FlowFileEvent event) { public StandardRepositoryStatusReport reportTransferEvents(final long sinceEpochMillis) { final StandardRepositoryStatusReport report = new StandardRepositoryStatusReport(); - componentEventMap.entrySet().stream() - .map(entry -> entry.getValue().generateReport(entry.getKey(), sinceEpochMillis)) - .forEach(event -> report.addReportEntry(event)); - + componentEventMap.forEach((componentId, container) -> report.addReportEntry(container.generateReport(sinceEpochMillis), componentId)); return report; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java index 72a8cfc39a7f..2482177d3daf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/SecondPrecisionEventContainer.java @@ -18,13 +18,24 @@ package org.apache.nifi.controller.repository.metrics; import org.apache.nifi.controller.repository.FlowFileEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicLong; public class SecondPrecisionEventContainer implements EventContainer { + private static final Logger logger = LoggerFactory.getLogger(SecondPrecisionEventContainer.class); + private final int numBins; private final EventSum[] sums; + private final EventSumValue aggregateValue = new EventSumValue(0); + private final AtomicLong lastUpdateSecond = new AtomicLong(0); public SecondPrecisionEventContainer(final int numMinutes) { - numBins = 1 + numMinutes * 60; + // number of bins is number of seconds in 'numMinutes' plus 1. We add one because + // we want to have the 'current bin' that we are adding values to, in addition to the + // previous (X = numMinutes * 60) bins of values that have completed + numBins = numMinutes * 60 + 1; sums = new EventSum[numBins]; for (int i = 0; i < numBins; i++) { @@ -34,11 +45,62 @@ public SecondPrecisionEventContainer(final int numMinutes) { @Override public void addEvent(final FlowFileEvent event) { - final int second = (int) (System.currentTimeMillis() / 1000); - final int binIdx = second % numBins; + addEvent(event, System.currentTimeMillis()); + } + + protected void addEvent(final FlowFileEvent event, final long timestamp) { + final long second = timestamp / 1000; + final int binIdx = (int) (second % numBins); final EventSum sum = sums[binIdx]; - sum.addOrReset(event); + final EventSumValue replaced = sum.addOrReset(event, timestamp); + + aggregateValue.add(event); + + if (replaced == null) { + logger.debug("Updated bin {}. Did NOT replace.", binIdx); + } else { + logger.debug("Replaced bin {}", binIdx); + aggregateValue.subtract(replaced); + } + + // If there are any buckets that have expired, we need to update our aggregate value to reflect that. + processExpiredBuckets(second); + } + + private void processExpiredBuckets(final long currentSecond) { + final long lastUpdate = lastUpdateSecond.get(); + if (currentSecond > lastUpdate) { + final boolean updated = lastUpdateSecond.compareAndSet(lastUpdate, currentSecond); + if (updated) { + if (lastUpdate == 0L) { + // First update, so nothing to expire + return; + } + + final int secondsElapsed = (int) (currentSecond - lastUpdate); + + int index = (int) (currentSecond % numBins); + final long expirationTimestamp = 1000 * (currentSecond - numBins); + + int expired = 0; + for (int i=0; i < secondsElapsed; i++) { + index--; + if (index < 0) { + index = sums.length - 1; + } + + final EventSum expiredSum = sums[index]; + final EventSumValue expiredValue = expiredSum.reset(expirationTimestamp); + if (expiredValue != null) { + aggregateValue.subtract(expiredValue); + expired++; + } + } + + logger.debug("Expired {} bins", expired); + } + } } @Override @@ -47,23 +109,17 @@ public void purgeEvents(final long cutoffEpochMilliseconds) { } @Override - public FlowFileEvent generateReport(final String componentId, final long sinceEpochMillis) { - final EventSumValue eventSumValue = new EventSumValue(); - final long second = sinceEpochMillis / 1000; - final int startBinIdx = (int) (second % numBins); - - for (int i = 0; i < numBins; i++) { - int binIdx = (startBinIdx + i) % numBins; - final EventSum sum = sums[binIdx]; - - final EventSumValue sumValue = sum.getValue(); - if (sumValue.getTimestamp() >= sinceEpochMillis) { - eventSumValue.add(sumValue); - } + public FlowFileEvent generateReport(final long now) { + final long second = now / 1000 + 1; + final long lastUpdate = lastUpdateSecond.get(); + final long secondsSinceUpdate = second - lastUpdate; + if (secondsSinceUpdate > numBins) { + logger.debug("EventContainer hasn't been updated in {} seconds so will generate report as Empty FlowFile Event", secondsSinceUpdate); + return EmptyFlowFileEvent.INSTANCE; } - final FlowFileEvent flowFileEvent = eventSumValue.toFlowFileEvent(componentId); - return flowFileEvent; + logger.debug("Will expire up to {} bins", secondsSinceUpdate); + processExpiredBuckets(second); + return aggregateValue.toFlowFileEvent(); } - } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java index 40ec983ca71e..fc0067514041 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java @@ -16,13 +16,11 @@ */ package org.apache.nifi.controller.repository.metrics; -import java.util.Map; - import org.apache.nifi.controller.repository.FlowFileEvent; -public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { +import java.util.Map; - private final String componentId; +public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { private int flowFilesIn; private int flowFilesOut; @@ -41,13 +39,7 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { private int invocations; private Map counters; - public StandardFlowFileEvent(final String componentId) { - this.componentId = componentId; - } - - @Override - public String getComponentIdentifier() { - return componentId; + public StandardFlowFileEvent() { } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 8b960fc3cf1f..de972251d7e8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -16,13 +16,6 @@ */ package org.apache.nifi.controller.scheduling; -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateManagerProvider; @@ -54,6 +47,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class); @@ -255,10 +255,10 @@ public void run() { } try { final long processingNanos = System.nanoTime() - startNanos; - final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier()); + final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(); procEvent.setProcessingNanos(processingNanos); procEvent.setInvocations(invocationCount); - context.getFlowFileEventRepository().updateRepository(procEvent); + context.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier()); } catch (final IOException e) { logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString()); logger.error("", e); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java new file mode 100644 index 000000000000..5381b739ae77 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java @@ -0,0 +1,107 @@ +package org.apache.nifi.controller.status.history; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; + +import java.util.HashMap; +import java.util.Map; + +public class ComponentDetails { + private final String componentId; + private final String groupId; + private final String componentName; + private final String componentType; + private final String sourceName; + private final String destinationName; + private final String targetUri; + + + public ComponentDetails(final String id, final String groupId, final String componentName, final String componentType, + final String sourceName, final String destinationName, final String remoteUri) { + this.componentId = id; + this.groupId = groupId; + this.componentName = componentName; + this.componentType = componentType; + this.sourceName = sourceName; + this.destinationName = destinationName; + this.targetUri = remoteUri; + } + + public static ComponentDetails forProcessor(final ProcessorStatus status) { + return forProcessor(status.getId(), status.getGroupId(), status.getName(), status.getType()); + } + + public static ComponentDetails forProcessor(final String id, final String groupId, final String processorName, final String processorType) { + return new ComponentDetails(id, groupId, processorName, processorType, null, null, null); + } + + public static ComponentDetails forConnection(final ConnectionStatus status) { + return forConnection(status.getId(), status.getGroupId(), status.getName(), status.getSourceName(), status.getDestinationName()); + } + + public static ComponentDetails forConnection(final String id, final String groupId, final String connectionName, final String sourceName, final String destinationName) { + return new ComponentDetails(id, groupId, connectionName, sourceName, destinationName, null, null); + } + + public static ComponentDetails forProcessGroup(final ProcessGroupStatus status) { + return forProcessGroup(status.getId(), status.getName()); + } + + public static ComponentDetails forProcessGroup(final String id, final String groupName) { + return new ComponentDetails(id,null, groupName, null, null, null, null); + } + + public static ComponentDetails forRemoteProcessGroup(final RemoteProcessGroupStatus status) { + return forRemoteProcessGroup(status.getId(), status.getGroupId(), status.getName(), status.getTargetUri()); + } + + public static ComponentDetails forRemoteProcessGroup(final String id, final String parentGroupId, final String rpgName, final String remoteUri) { + return new ComponentDetails(id, parentGroupId, rpgName, null, null, null, remoteUri); + } + + public String getComponentId() { + return componentId; + } + + public String getGroupId() { + return groupId; + } + + public String getComponentName() { + return componentName; + } + + public String getComponentType() { + return componentType; + } + + public String getSourceName() { + return sourceName; + } + + public String getDestinationName() { + return destinationName; + } + + public String getTargetUri() { + return targetUri; + } + + /** + * Returns a {@Link Map} whose keys are those values defined by {@link ComponentStatusRepository#COMPONENT_DETAIL_GROUP_ID ComponentStatusRepository.COMPONENT_DETAIL_*} + * and values are the values that are populated for this ComponentDetails object. + */ + public Map toMap() { + final Map map = new HashMap<>(); + map.put(ComponentStatusRepository.COMPONENT_DETAIL_ID, componentId); + map.put(ComponentStatusRepository.COMPONENT_DETAIL_GROUP_ID, groupId); + map.put(ComponentStatusRepository.COMPONENT_DETAIL_NAME, componentName); + map.put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, componentType); + map.put(ComponentStatusRepository.COMPONENT_DETAIL_SOURCE_NAME, sourceName); + map.put(ComponentStatusRepository.COMPONENT_DETAIL_DESTINATION_NAME, destinationName); + map.put(ComponentStatusRepository.COMPONENT_DETAIL_URI, targetUri); + return map; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java new file mode 100644 index 000000000000..5fb876881a1e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java @@ -0,0 +1,37 @@ +package org.apache.nifi.controller.status.history; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ComponentStatusHistory { + + private final MetricRollingBuffer snapshots; + private ComponentDetails componentDetails; + + public ComponentStatusHistory(final ComponentDetails details, final int maxCapacity) { + this.componentDetails = details; + snapshots = new MetricRollingBuffer(maxCapacity); + } + + public void expireBefore(final Date timestamp) { + snapshots.expireBefore(timestamp); + } + + public void update(final StatusSnapshot snapshot, final ComponentDetails details) { + if (snapshot == null) { + return; + } + + snapshots.update(snapshot); + componentDetails = details; + } + + public StatusHistory toStatusHistory(final List timestamps, final boolean includeCounters, final Set> defaultStatusMetrics) { + final Date dateGenerated = new Date(); + final Map componentDetailsMap = componentDetails.toMap(); + final List snapshotList = snapshots.getSnapshots(timestamps, includeCounters, defaultStatusMetrics); + return new StandardStatusHistory(snapshotList, componentDetailsMap, dateGenerated); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java index c298803bdfcb..ac738b3cac14 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java @@ -21,53 +21,55 @@ import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; public enum ConnectionStatusDescriptor { - INPUT_BYTES(new StandardMetricDescriptor( + INPUT_BYTES( "inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getInputBytes())), + ConnectionStatus::getInputBytes), - INPUT_COUNT(new StandardMetricDescriptor( + INPUT_COUNT( "inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getInputCount()))), + s -> Long.valueOf(s.getInputCount())), - OUTPUT_BYTES(new StandardMetricDescriptor( + OUTPUT_BYTES( "outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getOutputBytes())), + ConnectionStatus::getOutputBytes), - OUTPUT_COUNT(new StandardMetricDescriptor( + OUTPUT_COUNT( "outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getOutputCount()))), + s -> Long.valueOf(s.getOutputCount())), - QUEUED_BYTES(new StandardMetricDescriptor( + QUEUED_BYTES( "queuedBytes", "Queued Bytes", "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, - s -> s.getQueuedBytes())), + ConnectionStatus::getQueuedBytes), - QUEUED_COUNT(new StandardMetricDescriptor( + QUEUED_COUNT( "queuedCount", "Queued Count", "The number of FlowFiles queued in this Connection", Formatter.COUNT, - s -> Long.valueOf(s.getQueuedCount()))); + s -> Long.valueOf(s.getQueuedCount())); private MetricDescriptor descriptor; - private ConnectionStatusDescriptor(final MetricDescriptor descriptor) { - this.descriptor = descriptor; + ConnectionStatusDescriptor(final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction) { + + this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction); } public String getField() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java new file mode 100644 index 000000000000..1981ba0baf54 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java @@ -0,0 +1,50 @@ +package org.apache.nifi.controller.status.history; + +import java.util.Date; +import java.util.List; +import java.util.Set; + +public class EmptyStatusSnapshot implements StatusSnapshot { + private static final ValueReducer VALUE_REDUCER = new EmptyValueReducer(); + private static final Long METRIC_VALUE = 0L; + + private final Date timestamp; + private final Set> metricsDescriptors; + + public EmptyStatusSnapshot(final Date timestamp, final Set> metricsDescriptors) { + this.timestamp = timestamp; + this.metricsDescriptors = metricsDescriptors; + } + + @Override + public Date getTimestamp() { + return timestamp; + } + + @Override + public Set> getMetricDescriptors() { + return metricsDescriptors; + } + + @Override + public Long getStatusMetric(final MetricDescriptor descriptor) { + return METRIC_VALUE; + } + + @Override + public StatusSnapshot withoutCounters() { + return this; + } + + @Override + public ValueReducer getValueReducer() { + return VALUE_REDUCER; + } + + private static class EmptyValueReducer implements ValueReducer { + @Override + public StatusSnapshot reduce(final List values) { + return (values == null || values.isEmpty()) ? null : values.get(0); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java new file mode 100644 index 000000000000..5cc6a6c9800f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java @@ -0,0 +1,5 @@ +package org.apache.nifi.controller.status.history; + +public interface IndexableMetric { + int getIndex(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java new file mode 100644 index 000000000000..82e33cf8b283 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java @@ -0,0 +1,180 @@ +package org.apache.nifi.controller.status.history; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Set; + +public class MetricRollingBuffer { + private final int capacity; + + private StatusSnapshot[] snapshots; + private int writeIndex = 0; + private int readIndex; + private boolean readExhausted; + private int count = 0; + + public MetricRollingBuffer(final int maxCapacity) { + this.capacity = maxCapacity; + } + + public void update(final StatusSnapshot snapshot) { + if (snapshot == null) { + return; + } + + if (snapshots == null) { + snapshots = new StatusSnapshot[Math.min(capacity, 16)]; + } + + if (snapshots[writeIndex] == null) { + count++; + } + + snapshots[writeIndex++] = snapshot; + + if (writeIndex >= snapshots.length) { + if (snapshots.length < capacity) { + grow(); + } else { + writeIndex = 0; + } + } + } + + public int size() { + return count; + } + + public void expireBefore(final Date date) { + if (snapshots == null) { + return; + } + + int readIndex = writeIndex; + for (int i=0; i < snapshots.length; i++) { + final StatusSnapshot snapshot = snapshots[readIndex]; + if (snapshot == null) { + readIndex++; + if (readIndex >= snapshots.length) { + readIndex = 0; + } + + continue; + } + + final Date snapshotTimestamp = snapshot.getTimestamp(); + if (snapshotTimestamp.after(date)) { + break; + } + + snapshots[readIndex] = null; + count--; + + readIndex++; + if (readIndex >= snapshots.length) { + readIndex = 0; + } + } + + if (count < snapshots.length / 4 || snapshots.length - count > 128) { + // If we're using less than 1/4 of the array or we have at least 128 null entries, compact. + compact(); + } + } + + private void grow() { + final int initialSize = snapshots.length; + final int newSize = Math.min(capacity, snapshots.length + 64); + final StatusSnapshot[] newArray = new StatusSnapshot[newSize]; + System.arraycopy(snapshots, 0, newArray, 0, snapshots.length); + snapshots = newArray; + writeIndex = initialSize; + } + + private void compact() { + final StatusSnapshot[] newArray = new StatusSnapshot[count + 1]; + int insertionIndex = 0; + + int readIndex = writeIndex; + for (int i=0; i < snapshots.length; i++) { + final StatusSnapshot snapshot = snapshots[readIndex]; + if (snapshot != null) { + newArray[insertionIndex++] = snapshot; + } + + readIndex++; + if (readIndex >= snapshots.length) { + readIndex = 0; + } + } + + snapshots = newArray; + writeIndex = count; + count = newArray.length - 1; + } + + public List getSnapshots(final List timestamps, final boolean includeCounters, final Set> defaultStatusMetrics) { + if (snapshots == null) { + return Collections.emptyList(); + } + + final List list = new ArrayList<>(snapshots.length); + + resetRead(); + + for (final Date timestamp : timestamps) { + final StatusSnapshot snapshot = getSnapshotForTimestamp(timestamp); + if (snapshot == null) { + list.add(new EmptyStatusSnapshot(timestamp, defaultStatusMetrics)); + } else { + list.add(includeCounters ? snapshot : snapshot.withoutCounters()); + } + } + + return list; + } + + private StatusSnapshot getSnapshotForTimestamp(final Date timestamp) { + while (!readExhausted) { + final StatusSnapshot snapshot = snapshots[readIndex]; + if (snapshot == null) { + advanceRead(); + continue; + } + + final Date snapshotTimestamp = snapshot.getTimestamp(); + if (snapshotTimestamp.before(timestamp)) { + advanceRead(); + continue; + } + + if (snapshotTimestamp.after(timestamp)) { + return null; + } + + advanceRead(); + return snapshot; + } + + return null; + } + + private void resetRead() { + readIndex = writeIndex; + readExhausted = false; + } + + private void advanceRead() { + readIndex++; + + if (readIndex >= snapshots.length) { + readIndex = 0; + } + + if (readIndex == writeIndex) { + readExhausted = true; + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java index 25b9dfc70c23..6bdb3c03b081 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java @@ -17,79 +17,90 @@ package org.apache.nifi.controller.status.history; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; +import java.util.concurrent.TimeUnit; + public enum ProcessGroupStatusDescriptor { - BYTES_READ(new StandardMetricDescriptor( + BYTES_READ( "bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getBytesRead())), + ProcessGroupStatus::getBytesRead), - BYTES_WRITTEN(new StandardMetricDescriptor("bytesWritten", + BYTES_WRITTEN( + "bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getBytesWritten())), + ProcessGroupStatus::getBytesWritten), - BYTES_TRANSFERRED(new StandardMetricDescriptor("bytesTransferred", + BYTES_TRANSFERRED( + "bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getBytesRead() + s.getBytesWritten())), + s -> s.getBytesRead() + s.getBytesWritten()), - INPUT_BYTES(new StandardMetricDescriptor("inputBytes", + INPUT_BYTES("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getInputContentSize())), + ProcessGroupStatus::getInputContentSize), - INPUT_COUNT(new StandardMetricDescriptor("inputCount", + INPUT_COUNT( + "inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", Formatter.COUNT, - s -> s.getInputCount().longValue())), + s -> s.getInputCount().longValue()), - OUTPUT_BYTES(new StandardMetricDescriptor("outputBytes", + OUTPUT_BYTES( + "outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getOutputContentSize())), + ProcessGroupStatus::getOutputContentSize), - OUTPUT_COUNT(new StandardMetricDescriptor("outputCount", + OUTPUT_COUNT( + "outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", Formatter.COUNT, - s -> s.getOutputCount().longValue())), + s -> s.getOutputCount().longValue()), - QUEUED_BYTES(new StandardMetricDescriptor("queuedBytes", + QUEUED_BYTES( + "queuedBytes", "Queued Bytes", "The cumulative size of all FlowFiles queued in all Connections of this Process Group", Formatter.DATA_SIZE, - s -> s.getQueuedContentSize())), + ProcessGroupStatus::getQueuedContentSize), - QUEUED_COUNT(new StandardMetricDescriptor("queuedCount", + QUEUED_COUNT( + "queuedCount", "Queued Count", "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, - s -> s.getQueuedCount().longValue())), + s -> s.getQueuedCount().longValue()), - TASK_MILLIS(new StandardMetricDescriptor("taskMillis", + TASK_MILLIS( + "taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", Formatter.DURATION, - s -> calculateTaskMillis(s))); + ProcessGroupStatusDescriptor::calculateTaskMillis); + private MetricDescriptor descriptor; - private ProcessGroupStatusDescriptor(final MetricDescriptor descriptor) { - this.descriptor = descriptor; + ProcessGroupStatusDescriptor(final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction) { + + this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction); } public String getField() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java index f939273dac6d..c8f51267ecd9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java @@ -17,91 +17,92 @@ package org.apache.nifi.controller.status.history; -import java.util.List; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; +import java.util.List; +import java.util.concurrent.TimeUnit; + public enum ProcessorStatusDescriptor { - BYTES_READ(new StandardMetricDescriptor( + BYTES_READ( "bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getBytesRead())), + ProcessorStatus::getBytesRead), - BYTES_WRITTEN(new StandardMetricDescriptor( + BYTES_WRITTEN( "bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getBytesWritten())), + ProcessorStatus::getBytesWritten), - BYTES_TRANSFERRED(new StandardMetricDescriptor( + BYTES_TRANSFERRED( "bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getBytesRead() + s.getBytesWritten())), + s -> s.getBytesRead() + s.getBytesWritten()), - INPUT_BYTES(new StandardMetricDescriptor( + INPUT_BYTES( "inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getInputBytes())), + ProcessorStatus::getInputBytes), - INPUT_COUNT(new StandardMetricDescriptor( + INPUT_COUNT( "inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getInputCount()))), + s -> Long.valueOf(s.getInputCount())), - OUTPUT_BYTES(new StandardMetricDescriptor( + OUTPUT_BYTES( "outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getOutputBytes())), + ProcessorStatus::getOutputBytes), - OUTPUT_COUNT(new StandardMetricDescriptor( + OUTPUT_COUNT( "outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getOutputCount()))), + s -> Long.valueOf(s.getOutputCount())), - TASK_COUNT(new StandardMetricDescriptor( + TASK_COUNT( "taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getInvocations()))), + s -> Long.valueOf(s.getInvocations())), - TASK_MILLIS(new StandardMetricDescriptor( + TASK_MILLIS( "taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, - s -> TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS))), + s -> TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS)), - TASK_NANOS(new StandardMetricDescriptor( + TASK_NANOS( "taskNanos", "Total Task Time (nanos)", "The total number of thread-nanoseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.COUNT, - ProcessorStatus::getProcessingNanos), false), + ProcessorStatus::getProcessingNanos, + false), - FLOWFILES_REMOVED(new StandardMetricDescriptor( + FLOWFILES_REMOVED( "flowFilesRemoved", "FlowFiles Removed (5 mins)", "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, - s -> Long.valueOf(s.getFlowFilesRemoved()))), + s -> Long.valueOf(s.getFlowFilesRemoved())), - AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor( + AVERAGE_LINEAGE_DURATION( "averageLineageDuration", "Average Lineage Duration (5 mins)", "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.", @@ -114,23 +115,24 @@ public Long reduce(final List values) { int count = 0; for (final StatusSnapshot snapshot : values) { - final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue(); - final long outputCount = snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue(); + final long removed = snapshot.getStatusMetric(FLOWFILES_REMOVED.getDescriptor()).longValue(); + final long outputCount = snapshot.getStatusMetric(OUTPUT_COUNT.getDescriptor()).longValue(); final long processed = removed + outputCount; count += processed; - final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); + final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); final long totalMillis = avgMillis * processed; millis += totalMillis; } return count == 0 ? 0 : millis / count; } - } - )), + }, + true + ), - AVERAGE_TASK_NANOS(new StandardMetricDescriptor( + AVERAGE_TASK_NANOS( "averageTaskNanos", "Average Task Duration (nanoseconds)", "The average number of nanoseconds it took this Processor to complete a task, over the past 5 minutes", @@ -143,12 +145,12 @@ public Long reduce(final List values) { int invocations = 0; for (final StatusSnapshot snapshot : values) { - final Long taskNanos = snapshot.getStatusMetrics().get(TASK_NANOS.getDescriptor()); + final Long taskNanos = snapshot.getStatusMetric(TASK_NANOS.getDescriptor()); if (taskNanos != null) { procNanos += taskNanos.longValue(); } - final Long taskInvocations = snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()); + final Long taskInvocations = snapshot.getStatusMetric(TASK_COUNT.getDescriptor()); if (taskInvocations != null) { invocations += taskInvocations.intValue(); } @@ -160,22 +162,38 @@ public Long reduce(final List values) { return procNanos / invocations; } - })); + }, + true + ); private final MetricDescriptor descriptor; private final boolean visible; - private ProcessorStatusDescriptor(final MetricDescriptor descriptor) { - this(descriptor, true); + ProcessorStatusDescriptor(final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction) { + + this(field, label, description, formatter, valueFunction, true); } - private ProcessorStatusDescriptor(final MetricDescriptor descriptor, final boolean visible) { - this.descriptor = descriptor; + ProcessorStatusDescriptor(final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction, final boolean visible) { + + this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction); this.visible = visible; } + ProcessorStatusDescriptor(final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction, + final ValueReducer reducer, final boolean visible) { + + this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction, reducer); + this.visible = visible; + } + + + public String getField() { return descriptor.getField(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java index 5875249ea5c8..6b131d205566 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java @@ -24,43 +24,49 @@ import java.util.concurrent.TimeUnit; public enum RemoteProcessGroupStatusDescriptor { - SENT_BYTES(new StandardMetricDescriptor("sentBytes", + SENT_BYTES( + "sentBytes", "Bytes Sent (5 mins)", "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getSentContentSize())), + RemoteProcessGroupStatus::getSentContentSize), - SENT_COUNT(new StandardMetricDescriptor("sentCount", + SENT_COUNT( + "sentCount", "FlowFiles Sent (5 mins)", "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, - s -> s.getSentCount().longValue())), + s -> s.getSentCount().longValue()), - RECEIVED_BYTES(new StandardMetricDescriptor("receivedBytes", + RECEIVED_BYTES( + "receivedBytes", "Bytes Received (5 mins)", "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, - s -> s.getReceivedContentSize())), + RemoteProcessGroupStatus::getReceivedContentSize), - RECEIVED_COUNT(new StandardMetricDescriptor("receivedCount", + RECEIVED_COUNT( + "receivedCount", "FlowFiles Received (5 mins)", "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, - s -> s.getReceivedCount().longValue())), + s -> s.getReceivedCount().longValue()), - RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor("receivedBytesPerSecond", + RECEIVED_BYTES_PER_SECOND( + "receivedBytesPerSecond", "Received Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, - s -> s.getReceivedContentSize().longValue() / 300L)), + s -> s.getReceivedContentSize().longValue() / 300L), - SENT_BYTES_PER_SECOND(new StandardMetricDescriptor("sentBytesPerSecond", + SENT_BYTES_PER_SECOND( + "sentBytesPerSecond", "Sent Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, - s -> s.getSentContentSize().longValue() / 300L)), + s -> s.getSentContentSize().longValue() / 300L), - TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor("totalBytesPerSecond", + TOTAL_BYTES_PER_SECOND("totalBytesPerSecond", "Total Bytes Per Second", "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, @@ -69,9 +75,9 @@ public enum RemoteProcessGroupStatusDescriptor { public Long getValue(final RemoteProcessGroupStatus status) { return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L); } - })), + }), - AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor( + AVERAGE_LINEAGE_DURATION( "averageLineageDuration", "Average Lineage Duration (5 mins)", "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.", @@ -84,22 +90,29 @@ public Long reduce(final List values) { int count = 0; for (final StatusSnapshot snapshot : values) { - final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue(); + final long sent = snapshot.getStatusMetric(SENT_COUNT.getDescriptor()).longValue(); count += sent; - final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); + final long avgMillis = snapshot.getStatusMetric(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue(); final long totalMillis = avgMillis * sent; millis += totalMillis; } return count == 0 ? 0 : millis / count; } - })); + }); + private final MetricDescriptor descriptor; - private RemoteProcessGroupStatusDescriptor(final MetricDescriptor descriptor) { - this.descriptor = descriptor; + RemoteProcessGroupStatusDescriptor(final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction) { + this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction); + } + + RemoteProcessGroupStatusDescriptor(final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction, final ValueReducer reducer) { + this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction, reducer); } public String getField() { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java index 6970fce15e62..de5e0b55e19c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java @@ -20,6 +20,7 @@ public class StandardMetricDescriptor implements MetricDescriptor { + private final IndexableMetric indexableMetric; private final String field; private final String label; private final String description; @@ -27,12 +28,14 @@ public class StandardMetricDescriptor implements MetricDescriptor { private final ValueMapper valueMapper; private final ValueReducer reducer; - public StandardMetricDescriptor(final String field, final String label, final String description, final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction) { - this(field, label, description, formatter, valueFunction, null); + public StandardMetricDescriptor(final IndexableMetric indexableMetric, final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction) { + this(indexableMetric, field, label, description, formatter, valueFunction, null); } - public StandardMetricDescriptor(final String field, final String label, final String description, - final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction, final ValueReducer reducer) { + public StandardMetricDescriptor(final IndexableMetric indexableMetric, final String field, final String label, final String description, + final MetricDescriptor.Formatter formatter, final ValueMapper valueFunction, final ValueReducer reducer) { + this.indexableMetric = indexableMetric; this.field = field; this.label = label; this.description = description; @@ -41,6 +44,11 @@ public StandardMetricDescriptor(final String field, final String label, final St this.reducer = reducer == null ? new SumReducer() : reducer; } + @Override + public int getMetricIdentifier() { + return indexableMetric.getIndex(); + } + @Override public String getField() { return field; @@ -100,7 +108,7 @@ class SumReducer implements ValueReducer { public Long reduce(final List values) { long sum = 0; for (final StatusSnapshot snapshot : values) { - sum += snapshot.getStatusMetrics().get(StandardMetricDescriptor.this); + sum += snapshot.getStatusMetric(StandardMetricDescriptor.this); } return sum; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusHistory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusHistory.java index 9299b0d36f23..96b37599335f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusHistory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusHistory.java @@ -16,18 +16,25 @@ */ package org.apache.nifi.controller.status.history; -import java.util.ArrayList; -import java.util.Collections; import java.util.Date; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; public class StandardStatusHistory implements StatusHistory { + private final List snapshots; + private final Date generated; + private final Map componentDetails; + + public StandardStatusHistory(final List snapshots, final Map componentDetails, final Date generated) { + this.snapshots = snapshots; + this.generated = generated; + this.componentDetails = componentDetails; + } - private final List snapshots = new ArrayList<>(); - private final Date generated = new Date(); - private final Map componentDetails = new LinkedHashMap<>(); + @Override + public List getStatusSnapshots() { + return snapshots; + } @Override public Date getDateGenerated() { @@ -36,19 +43,6 @@ public Date getDateGenerated() { @Override public Map getComponentDetails() { - return Collections.unmodifiableMap(componentDetails); - } - - public void setComponentDetail(final String detailName, final String detailValue) { - componentDetails.put(detailName, detailValue); - } - - @Override - public List getStatusSnapshots() { - return Collections.unmodifiableList(snapshots); - } - - public void addStatusSnapshot(final StatusSnapshot snapshot) { - snapshots.add(snapshot); + return componentDetails; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java index abaf89931628..46627530bff9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardStatusSnapshot.java @@ -17,7 +17,7 @@ package org.apache.nifi.controller.status.history; import java.util.Date; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -25,25 +25,63 @@ public class StandardStatusSnapshot implements StatusSnapshot { - private final Map, Long> metricValues = new LinkedHashMap<>(); + private final Set> metricDescriptors; + private final long[] values; + + private Map, Long> counterValues = null; private Date timestamp = new Date(); + + public StandardStatusSnapshot(final Set> metricDescriptors) { + this.metricDescriptors = metricDescriptors; + values = new long[metricDescriptors.size()]; + } + + private StandardStatusSnapshot(final Set> metricDescriptors, final long[] values) { + this.metricDescriptors = metricDescriptors; + this.values = values; + } + @Override public Date getTimestamp() { return timestamp; } - public void setTimestamp(final Date timestamp) { - this.timestamp = timestamp; + @Override + public Set> getMetricDescriptors() { + return metricDescriptors; } @Override - public Map, Long> getStatusMetrics() { - return metricValues; + public Long getStatusMetric(final MetricDescriptor descriptor) { + return values[descriptor.getMetricIdentifier()]; + } + + public void setTimestamp(final Date timestamp) { + this.timestamp = timestamp; } + public void addStatusMetric(final MetricDescriptor metric, final Long value) { - metricValues.put(metric, value); + values[metric.getMetricIdentifier()] = value; + } + + public void addCounterStatusMetric(final MetricDescriptor metric, final Long value) { + if (counterValues == null) { + counterValues = new HashMap<>(); + } + + counterValues.put(metric, value); + } + + public StandardStatusSnapshot withoutCounters() { + if (counterValues == null) { + return this; + } + + final StandardStatusSnapshot without = new StandardStatusSnapshot(metricDescriptors, values); + without.setTimestamp(timestamp); + return without; } @Override @@ -52,16 +90,16 @@ public ValueReducer getValueReducer() { @Override public StatusSnapshot reduce(final List values) { Date reducedTimestamp = null; - final Set> allDescriptors = new LinkedHashSet<>(metricValues.keySet()); + final Set> allDescriptors = new LinkedHashSet<>(metricDescriptors); for (final StatusSnapshot statusSnapshot : values) { if (reducedTimestamp == null) { reducedTimestamp = statusSnapshot.getTimestamp(); } - allDescriptors.addAll(statusSnapshot.getStatusMetrics().keySet()); + allDescriptors.addAll(statusSnapshot.getMetricDescriptors()); } - final StandardStatusSnapshot reduced = new StandardStatusSnapshot(); + final StandardStatusSnapshot reduced = new StandardStatusSnapshot(allDescriptors); if (reducedTimestamp != null) { reduced.setTimestamp(reducedTimestamp); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java index a1265893d48b..9ac3909a58cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.controller.status.history; +import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO; +import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; + import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -27,10 +31,6 @@ import java.util.Map; import java.util.Set; -import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO; -import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; -import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; - public class StatusHistoryUtil { public static StatusHistoryDTO createStatusHistoryDTO(final StatusHistory statusHistory) { @@ -43,7 +43,7 @@ public static StatusHistoryDTO createStatusHistoryDTO(final StatusHistory status final StatusSnapshotDTO snapshotDto = StatusHistoryUtil.createStatusSnapshotDto(snapshot); snapshotDtos.add(snapshotDto); metricNames.addAll(snapshotDto.getStatusMetrics().keySet()); - metricDescriptors.addAll(snapshot.getStatusMetrics().keySet()); + metricDescriptors.addAll(snapshot.getMetricDescriptors()); } // We need to ensure that the 'aggregate snapshot' has an entry for every metric, including counters. @@ -94,9 +94,7 @@ public static List createFieldDescriptorDtos(final StatusHi final Set> allDescriptors = new LinkedHashSet<>(); for (final StatusSnapshot statusSnapshot : statusHistory.getStatusSnapshots()) { - for (final MetricDescriptor metricDescriptor : statusSnapshot.getStatusMetrics().keySet()) { - allDescriptors.add(metricDescriptor); - } + allDescriptors.addAll(statusSnapshot.getMetricDescriptors()); } for (final MetricDescriptor metricDescriptor : allDescriptors) { @@ -111,8 +109,8 @@ public static StatusSnapshotDTO createStatusSnapshotDto(final StatusSnapshot sta dto.setTimestamp(statusSnapshot.getTimestamp()); final Map statusMetrics = new HashMap<>(); - for (final Map.Entry, Long> entry : statusSnapshot.getStatusMetrics().entrySet()) { - statusMetrics.put(entry.getKey().getField(), entry.getValue()); + for (final MetricDescriptor descriptor : statusSnapshot.getMetricDescriptors()) { + statusMetrics.put(descriptor.getField(), statusSnapshot.getStatusMetric(descriptor)); } dto.setStatusMetrics(statusMetrics); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java index 6b9003087a74..5336d17fbb91 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java @@ -16,43 +16,64 @@ */ package org.apache.nifi.controller.status.history; -import java.util.Date; -import java.util.List; -import java.util.Map; - import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.RemoteProcessGroupStatus; -import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter; -import org.apache.nifi.util.ComponentStatusReport; -import org.apache.nifi.util.ComponentStatusReport.ComponentType; +import org.apache.nifi.util.ComponentMetrics; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.RingBuffer; -import org.apache.nifi.util.RingBuffer.ForEachEvaluator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + public class VolatileComponentStatusRepository implements ComponentStatusRepository { + private static final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class); + + private static final Set> DEFAULT_PROCESSOR_METRICS = Arrays.stream(ProcessorStatusDescriptor.values()) + .map(ProcessorStatusDescriptor::getDescriptor) + .collect(Collectors.toSet()); + private static final Set> DEFAULT_CONNECTION_METRICS = Arrays.stream(ConnectionStatusDescriptor.values()) + .map(ConnectionStatusDescriptor::getDescriptor) + .collect(Collectors.toSet()); + private static final Set> DEFAULT_GROUP_METRICS = Arrays.stream(ProcessGroupStatusDescriptor.values()) + .map(ProcessGroupStatusDescriptor::getDescriptor) + .collect(Collectors.toSet()); + private static final Set> DEFAULT_RPG_METRICS = Arrays.stream(RemoteProcessGroupStatusDescriptor.values()) + .map(RemoteProcessGroupStatusDescriptor::getDescriptor) + .collect(Collectors.toSet()); + public static final String NUM_DATA_POINTS_PROPERTY = "nifi.components.status.repository.buffer.size"; public static final int DEFAULT_NUM_DATA_POINTS = 288; // 1 day worth of 5-minute snapshots - private final RingBuffer captures; - private final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class); + private final Map componentStatusHistories = new HashMap<>(); + private final RingBuffer timestamps; + private final RingBuffer> gcStatuses; + private final int numDataPoints; private volatile long lastCaptureTime = 0L; /** * Default no args constructor for service loading only */ - public VolatileComponentStatusRepository(){ - captures = null; + public VolatileComponentStatusRepository() { + numDataPoints = DEFAULT_NUM_DATA_POINTS; + gcStatuses = null; + timestamps = null; } public VolatileComponentStatusRepository(final NiFiProperties nifiProperties) { - final int numDataPoints = nifiProperties.getIntegerProperty(NUM_DATA_POINTS_PROPERTY, DEFAULT_NUM_DATA_POINTS); - captures = new RingBuffer<>(numDataPoints); + numDataPoints = nifiProperties.getIntegerProperty(NUM_DATA_POINTS_PROPERTY, DEFAULT_NUM_DATA_POINTS); + gcStatuses = new RingBuffer<>(numDataPoints); + timestamps = new RingBuffer<>(numDataPoints); } @Override @@ -62,214 +83,116 @@ public void capture(final ProcessGroupStatus rootGroupStatus, final List gcStatus, final Date timestamp) { - final ComponentStatusReport statusReport = ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR, - ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP); + final Date evicted = timestamps.add(timestamp); + if (evicted != null) { + componentStatusHistories.values().forEach(history -> history.expireBefore(evicted)); + } + + capture(rootGroupStatus, timestamp); + gcStatuses.add(gcStatus); - captures.add(new Capture(timestamp, statusReport, gcStatus)); logger.debug("Captured metrics for {}", this); lastCaptureTime = Math.max(lastCaptureTime, timestamp.getTime()); } - @Override - public Date getLastCaptureDate() { - return new Date(lastCaptureTime); - } - @Override - public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) { - final StandardStatusHistory history = new StandardStatusHistory(); - history.setComponentDetail(COMPONENT_DETAIL_ID, processorId); - - captures.forEach(new ForEachEvaluator() { - @Override - public boolean evaluate(final Capture capture) { - final ComponentStatusReport statusReport = capture.getStatusReport(); - final ProcessorStatus status = statusReport.getProcessorStatus(processorId); - if (status == null) { - return true; - } + private void capture(final ProcessGroupStatus groupStatus, final Date timestamp) { + // Capture status for the ProcessGroup + final ComponentDetails groupDetails = ComponentDetails.forProcessGroup(groupStatus); + final StatusSnapshot groupSnapshot = ComponentMetrics.createSnapshot(groupStatus, timestamp); + updateStatusHistory(groupSnapshot, groupDetails, timestamp); - history.setComponentDetail(COMPONENT_DETAIL_GROUP_ID, status.getGroupId()); - history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName()); - history.setComponentDetail(COMPONENT_DETAIL_TYPE, status.getType()); + // Capture statuses for the Processors + for (final ProcessorStatus processorStatus : groupStatus.getProcessorStatus()) { + final ComponentDetails componentDetails = ComponentDetails.forProcessor(processorStatus); + final StatusSnapshot snapshot = ComponentMetrics.createSnapshot(processorStatus, timestamp); + updateStatusHistory(snapshot, componentDetails, timestamp); + } - final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(); - snapshot.setTimestamp(capture.getCaptureDate()); + // Capture statuses for the Connections + for (final ConnectionStatus connectionStatus : groupStatus.getConnectionStatus()) { + final ComponentDetails componentDetails = ComponentDetails.forConnection(connectionStatus); + final StatusSnapshot snapshot = ComponentMetrics.createSnapshot(connectionStatus, timestamp); + updateStatusHistory(snapshot, componentDetails, timestamp); + } - for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { - if (descriptor.isVisible()) { - snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); - } - } + // Capture statuses for the RPG's + for (final RemoteProcessGroupStatus rpgStatus : groupStatus.getRemoteProcessGroupStatus()) { + final ComponentDetails componentDetails = ComponentDetails.forRemoteProcessGroup(rpgStatus); + final StatusSnapshot snapshot = ComponentMetrics.createSnapshot(rpgStatus, timestamp); + updateStatusHistory(snapshot, componentDetails, timestamp); + } - if (includeCounters) { - final Map counters = status.getCounters(); - if (counters != null) { - for (final Map.Entry entry : counters.entrySet()) { - final String counterName = entry.getKey(); + // Capture statuses for the child groups + for (final ProcessGroupStatus childStatus : groupStatus.getProcessGroupStatus()) { + capture(childStatus, timestamp); + } + } - final String label = entry.getKey() + " (5 mins)"; - final MetricDescriptor metricDescriptor = new StandardMetricDescriptor<>(entry.getKey(), label, label, Formatter.COUNT, - s -> s.getCounters() == null ? null : s.getCounters().get(counterName)); - snapshot.addStatusMetric(metricDescriptor, entry.getValue()); - } - } - } + private void updateStatusHistory(final StatusSnapshot statusSnapshot, final ComponentDetails componentDetails, final Date timestamp) { + final String componentId = componentDetails.getComponentId(); + final ComponentStatusHistory procHistory = componentStatusHistories.computeIfAbsent(componentId, id -> new ComponentStatusHistory(componentDetails, numDataPoints)); + procHistory.update(statusSnapshot, componentDetails); + } - history.addStatusSnapshot(snapshot); - return true; - } - }); - return history; + @Override + public Date getLastCaptureDate() { + return new Date(lastCaptureTime); } @Override - public StatusHistory getConnectionStatusHistory(final String connectionId, final Date start, final Date end, final int preferredDataPoints) { - final StandardStatusHistory history = new StandardStatusHistory(); - history.setComponentDetail(COMPONENT_DETAIL_ID, connectionId); - - captures.forEach(new ForEachEvaluator() { - @Override - public boolean evaluate(final Capture capture) { - final ComponentStatusReport statusReport = capture.getStatusReport(); - final ConnectionStatus status = statusReport.getConnectionStatus(connectionId); - if (status == null) { - return true; - } - - history.setComponentDetail(COMPONENT_DETAIL_GROUP_ID, status.getGroupId()); - history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName()); - history.setComponentDetail(COMPONENT_DETAIL_SOURCE_NAME, status.getSourceName()); - history.setComponentDetail(COMPONENT_DETAIL_DESTINATION_NAME, status.getDestinationName()); - - final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(); - snapshot.setTimestamp(capture.getCaptureDate()); - - for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) { - snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); - } - - history.addStatusSnapshot(snapshot); - return true; - } - }); + public StatusHistory getProcessorStatusHistory(final String processorId, final Date start, final Date end, final int preferredDataPoints, final boolean includeCounters) { + return getStatusHistory(processorId, includeCounters, DEFAULT_PROCESSOR_METRICS); + } - return history; + @Override + public StatusHistory getConnectionStatusHistory(final String connectionId, final Date start, final Date end, final int preferredDataPoints) { + return getStatusHistory(connectionId, true, DEFAULT_CONNECTION_METRICS); } @Override public StatusHistory getProcessGroupStatusHistory(final String processGroupId, final Date start, final Date end, final int preferredDataPoints) { - final StandardStatusHistory history = new StandardStatusHistory(); - history.setComponentDetail(COMPONENT_DETAIL_ID, processGroupId); - - captures.forEach(new ForEachEvaluator() { - @Override - public boolean evaluate(final Capture capture) { - final ComponentStatusReport statusReport = capture.getStatusReport(); - final ProcessGroupStatus status = statusReport.getProcessGroupStatus(processGroupId); - if (status == null) { - return true; - } - - history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName()); - - final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(); - snapshot.setTimestamp(capture.getCaptureDate()); - - for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) { - snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); - } - - history.addStatusSnapshot(snapshot); - return true; - } - }); - - return history; + return getStatusHistory(processGroupId, true, DEFAULT_GROUP_METRICS); } @Override public StatusHistory getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date start, final Date end, final int preferredDataPoints) { - final StandardStatusHistory history = new StandardStatusHistory(); - history.setComponentDetail(COMPONENT_DETAIL_ID, remoteGroupId); - - captures.forEach(new ForEachEvaluator() { - @Override - public boolean evaluate(final Capture capture) { - final ComponentStatusReport statusReport = capture.getStatusReport(); - final RemoteProcessGroupStatus status = statusReport.getRemoteProcessGroupStatus(remoteGroupId); - if (status == null) { - return true; - } - - history.setComponentDetail(COMPONENT_DETAIL_GROUP_ID, status.getGroupId()); - history.setComponentDetail(COMPONENT_DETAIL_NAME, status.getName()); - history.setComponentDetail(COMPONENT_DETAIL_URI, status.getTargetUri()); + return getStatusHistory(remoteGroupId, true, DEFAULT_RPG_METRICS); + } - final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(); - snapshot.setTimestamp(capture.getCaptureDate()); - for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) { - snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); - } - - history.addStatusSnapshot(snapshot); - return true; - } - }); + private synchronized StatusHistory getStatusHistory(final String componentId, final boolean includeCounters, final Set> defaultMetricDescriptors) { + final ComponentStatusHistory history = componentStatusHistories.get(componentId); + if (history == null) { + return null; + } - return history; + final List dates = timestamps.asList(); + return history.toStatusHistory(dates, includeCounters, defaultMetricDescriptors); } + @Override public GarbageCollectionHistory getGarbageCollectionHistory(final Date start, final Date end) { final StandardGarbageCollectionHistory history = new StandardGarbageCollectionHistory(); - captures.forEach(new ForEachEvaluator() { - @Override - public boolean evaluate(final Capture capture) { - if (capture.getCaptureDate().before(start)) { - return true; - } - if (capture.getCaptureDate().after(end)) { - return false; + gcStatuses.forEach(statusSet -> { + for (final GarbageCollectionStatus gcStatus : statusSet) { + if (gcStatus.getTimestamp().before(start)) { + continue; } - - final List statuses = capture.getGarbageCollectionStatus(); - if (statuses != null) { - statuses.stream().forEach(history::addGarbageCollectionStatus); + if (gcStatus.getTimestamp().after(end)) { + continue; } - return true; + history.addGarbageCollectionStatus(gcStatus); } + + return true; }); return history; } - - private static class Capture { - private final Date captureDate; - private final ComponentStatusReport statusReport; - private final List gcStatus; - - public Capture(final Date date, final ComponentStatusReport statusReport, final List gcStatus) { - this.captureDate = date; - this.statusReport = statusReport; - this.gcStatus = gcStatus; - } - - public Date getCaptureDate() { - return captureDate; - } - - public ComponentStatusReport getStatusReport() { - return statusReport; - } - - public List getGarbageCollectionStatus() { - return gcStatus; - } - } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java index 1a98ee3006fd..e68aba8e8b30 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java @@ -16,10 +16,6 @@ */ package org.apache.nifi.controller.tasks; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; @@ -35,8 +31,8 @@ import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.scheduling.ConnectableProcessContext; -import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.controller.scheduling.LifecycleState; +import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.controller.scheduling.SchedulingAgent; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.logging.ComponentLog; @@ -51,6 +47,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + /** * Continually runs a {@link Connectable} component as long as the component has work to do. * {@link #invoke()} ()} will return {@link InvocationResult} telling if the component should be yielded. @@ -267,10 +267,10 @@ public InvocationResult invoke() { final long processingNanos = System.nanoTime() - startNanos; try { - final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier()); + final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(); procEvent.setProcessingNanos(processingNanos); procEvent.setInvocations(invocationCount); - repositoryContext.getFlowFileEventRepository().updateRepository(procEvent); + repositoryContext.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier()); } catch (final IOException e) { logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable.getRunnableComponent(), e.toString()); logger.error("", e); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index edfa355fabb3..e173cc6cb311 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -957,7 +957,7 @@ public void run() { public Collection getProcessors() { readLock.lock(); try { - return processors.values(); + return new HashSet<>(processors.values()); } finally { readLock.unlock(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java new file mode 100644 index 000000000000..218017769ec0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java @@ -0,0 +1,172 @@ +package org.apache.nifi.util; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; +import org.apache.nifi.controller.status.history.MetricDescriptor; +import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; +import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.StandardMetricDescriptor; +import org.apache.nifi.controller.status.history.StandardStatusSnapshot; +import org.apache.nifi.controller.status.history.StatusSnapshot; + +import java.util.Date; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class ComponentMetrics { + private static final Set> PROCESSOR_METRICS; + private static final Set> CONNECTION_METRICS; + private static final Set> PROCESS_GROUP_METRICS; + private static final Set> RPG_METRICS; + + static { + PROCESSOR_METRICS = new HashSet<>(); + CONNECTION_METRICS = new HashSet<>(); + PROCESS_GROUP_METRICS = new HashSet<>(); + RPG_METRICS = new HashSet<>(); + + for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { + PROCESSOR_METRICS.add(descriptor.getDescriptor()); + } + + for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) { + CONNECTION_METRICS.add(descriptor.getDescriptor()); + } + + for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) { + PROCESS_GROUP_METRICS.add(descriptor.getDescriptor()); + } + + for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) { + RPG_METRICS.add(descriptor.getDescriptor()); + } + } + + + public static StatusSnapshot createSnapshot(final ProcessorStatus status, final Date timestamp) { + if (isEmpty(status)) { + return null; + } + + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS); + snapshot.setTimestamp(timestamp); + + for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { + if (descriptor.isVisible()) { + snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); + } + } + + final Map counters = status.getCounters(); + if (counters != null) { + for (final Map.Entry entry : counters.entrySet()) { + final String counterName = entry.getKey(); + + final String label = entry.getKey() + " (5 mins)"; + final MetricDescriptor metricDescriptor = new StandardMetricDescriptor<>(() -> 0, entry.getKey(), label, label, MetricDescriptor.Formatter.COUNT, + s -> s.getCounters() == null ? null : s.getCounters().get(counterName)); + + snapshot.addCounterStatusMetric(metricDescriptor, entry.getValue()); + } + } + + return snapshot; + } + + public static boolean isEmpty(final ProcessorStatus status) { + for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { + if (descriptor.isVisible()) { + final Long value = descriptor.getDescriptor().getValueFunction().getValue(status); + if (value != null && value > 0) { + return false; + } + } + } + + return true; + } + + + public static StatusSnapshot createSnapshot(final ConnectionStatus status, final Date timestamp) { + if (isEmpty(status)) { + return null; + } + + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(CONNECTION_METRICS); + snapshot.setTimestamp(timestamp); + + for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) { + snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); + } + + return snapshot; + } + + public static boolean isEmpty(final ConnectionStatus status) { + for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) { + final Long value = descriptor.getDescriptor().getValueFunction().getValue(status); + if (value != null && value > 0) { + return false; + } + } + + return true; + } + + public static StatusSnapshot createSnapshot(final ProcessGroupStatus status, final Date timestamp) { + if (isEmpty(status)) { + return null; + } + + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESS_GROUP_METRICS); + snapshot.setTimestamp(timestamp); + + for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) { + snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); + } + + return snapshot; + } + + private static boolean isEmpty(final ProcessGroupStatus status) { + for (final ProcessGroupStatusDescriptor descriptor : ProcessGroupStatusDescriptor.values()) { + final Long value = descriptor.getDescriptor().getValueFunction().getValue(status); + if (value != null && value > 0) { + return false; + } + } + + return true; + } + + public static StatusSnapshot createSnapshot(final RemoteProcessGroupStatus status, final Date timestamp) { + if (isEmpty(status)) { + return null; + } + + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(RPG_METRICS); + snapshot.setTimestamp(timestamp); + + for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) { + snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); + } + + return snapshot; + } + + private static boolean isEmpty(final RemoteProcessGroupStatus status) { + for (final RemoteProcessGroupStatusDescriptor descriptor : RemoteProcessGroupStatusDescriptor.values()) { + final Long value = descriptor.getDescriptor().getValueFunction().getValue(status); + if (value != null && value > 0) { + return false; + } + } + + return true; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentStatusReport.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentStatusReport.java deleted file mode 100644 index ca31467d35ea..000000000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentStatusReport.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.nifi.controller.status.ConnectionStatus; -import org.apache.nifi.controller.status.PortStatus; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; -import org.apache.nifi.controller.status.RemoteProcessGroupStatus; - -/** - * ComponentStatusReport is a util class that can be used to "flatten" a ProcessGroupStatus into a collection of Map's so that retrieval of a Status for a particular component is very efficient - */ -public class ComponentStatusReport { - - private final Map groupMap = new HashMap<>(); - private final Map processorMap = new HashMap<>(); - private final Map connectionMap = new HashMap<>(); - private final Map remoteGroupMap = new HashMap<>(); - private final Map inputPortMap = new HashMap<>(); - private final Map outputPortMap = new HashMap<>(); - - private ComponentStatusReport() { - } - - public static ComponentStatusReport createEmpty() { - return new ComponentStatusReport(); - } - - public static ComponentStatusReport fromProcessGroupStatus(final ProcessGroupStatus status) { - return fromProcessGroupStatus(status, ComponentType.values()); - } - - public static ComponentStatusReport fromProcessGroupStatus(final ProcessGroupStatus status, final ComponentType... componentTypes) { - final Set componentTypeSet = new HashSet<>(); - for (final ComponentType type : componentTypes) { - componentTypeSet.add(type); - } - - final ComponentStatusReport report = new ComponentStatusReport(); - report.populate(status, componentTypeSet); - return report; - } - - private void populate(final ProcessGroupStatus status, final Set componentTypes) { - if (componentTypes.contains(ComponentType.PROCESS_GROUP)) { - groupMap.put(status.getId(), status); - } - - if (componentTypes.contains(ComponentType.PROCESSOR)) { - for (final ProcessorStatus procStatus : status.getProcessorStatus()) { - processorMap.put(procStatus.getId(), procStatus); - } - } - - if (componentTypes.contains(ComponentType.CONNECTION)) { - for (final ConnectionStatus connStatus : status.getConnectionStatus()) { - connectionMap.put(connStatus.getId(), connStatus); - } - } - - if (componentTypes.contains(ComponentType.REMOTE_PROCESS_GROUP)) { - for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { - remoteGroupMap.put(rpgStatus.getId(), rpgStatus); - } - } - - if (componentTypes.contains(ComponentType.INPUT_PORT)) { - for (final PortStatus portStatus : status.getInputPortStatus()) { - inputPortMap.put(portStatus.getId(), portStatus); - } - } - - if (componentTypes.contains(ComponentType.OUTPUT_PORT)) { - for (final PortStatus portStatus : status.getOutputPortStatus()) { - outputPortMap.put(portStatus.getId(), portStatus); - } - } - - for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) { - populate(childStatus, componentTypes); - } - } - - public ProcessGroupStatus getProcessGroupStatus(final String groupId) { - return groupMap.get(groupId); - } - - public ProcessorStatus getProcessorStatus(final String processorId) { - return processorMap.get(processorId); - } - - public ConnectionStatus getConnectionStatus(final String connectionId) { - return connectionMap.get(connectionId); - } - - public RemoteProcessGroupStatus getRemoteProcessGroupStatus(final String remoteGroupId) { - return remoteGroupMap.get(remoteGroupId); - } - - public PortStatus getInputPortStatus(final String portId) { - return inputPortMap.get(portId); - } - - public PortStatus getOutputPortStatus(final String portId) { - return outputPortMap.get(portId); - } - - public static enum ComponentType { - - PROCESSOR, - INPUT_PORT, - OUTPUT_PORT, - PROCESS_GROUP, - CONNECTION, - REMOTE_PROCESS_GROUP; - } -} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java index 2bc158fdcdcd..778efbe49cfb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java @@ -16,17 +16,15 @@ */ package org.apache.nifi.controller.repository; -import org.apache.nifi.controller.repository.StandardRepositoryStatusReport; import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository; -import org.apache.nifi.controller.repository.FlowFileEvent; +import org.junit.Test; +import org.testng.Assert; + import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.junit.Test; -import org.testng.Assert; - public class TestRingBufferEventRepository { @Test @@ -34,15 +32,15 @@ public void testAdd() throws IOException { final RingBufferEventRepository repo = new RingBufferEventRepository(5); long insertNanos = 0L; for (int i = 0; i < 1000000; i++) { - final FlowFileEvent event = generateEvent("ABC"); + final FlowFileEvent event = generateEvent(); final long insertStart = System.nanoTime(); - repo.updateRepository(event); + repo.updateRepository(event, "ABC"); insertNanos += System.nanoTime() - insertStart; } final long queryStart = System.nanoTime(); - final StandardRepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000); + final StandardRepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis()); final long queryNanos = System.nanoTime() - queryStart; System.out.println(report); System.out.println("Insert: " + TimeUnit.MILLISECONDS.convert(insertNanos, TimeUnit.NANOSECONDS)); @@ -55,36 +53,31 @@ public void testPurge() throws IOException { final FlowFileEventRepository repo = new RingBufferEventRepository(5); String id1 = "component1"; String id2 = "component2"; - repo.updateRepository(generateEvent(id1)); - repo.updateRepository(generateEvent(id2)); - RepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000); + repo.updateRepository(generateEvent(), id1); + repo.updateRepository(generateEvent(), id2); + RepositoryStatusReport report = repo.reportTransferEvents(System.currentTimeMillis()); FlowFileEvent entry = report.getReportEntry(id1); Assert.assertNotNull(entry); entry = report.getReportEntry(id2); Assert.assertNotNull(entry); repo.purgeTransferEvents(id1); - report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000); + report = repo.reportTransferEvents(System.currentTimeMillis()); entry = report.getReportEntry(id1); Assert.assertNull(entry); entry = report.getReportEntry(id2); Assert.assertNotNull(entry); repo.purgeTransferEvents(id2); - report = repo.reportTransferEvents(System.currentTimeMillis() - 2 * 60000); + report = repo.reportTransferEvents(System.currentTimeMillis()); entry = report.getReportEntry(id2); Assert.assertNull(entry); repo.close(); } - private FlowFileEvent generateEvent(final String id) { + private FlowFileEvent generateEvent() { return new FlowFileEvent() { - @Override - public String getComponentIdentifier() { - return id; - } - @Override public int getFlowFilesIn() { return 1; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java new file mode 100644 index 000000000000..682a56524071 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java @@ -0,0 +1,119 @@ +package org.apache.nifi.controller.repository.metrics; + +import org.apache.nifi.controller.repository.FlowFileEvent; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestSecondPrecisionEventContainer { + + @Test + public void testUpdateOncePerSecond() { + final SecondPrecisionEventContainer container = new SecondPrecisionEventContainer(5); + final long startTime = System.currentTimeMillis(); + + final StandardFlowFileEvent event = new StandardFlowFileEvent(); + event.setBytesRead(100L); + event.setBytesWritten(100L); + + for (int i=0; i < 5; i++) { + for (int j=0; j < 300; j++) { + container.addEvent(event, startTime + (i * 300_000) + (j * 1000)); + } + + final long timestamp = startTime + 300_000 * i + 300_000; + final FlowFileEvent result = container.generateReport(timestamp); + assertEquals("Failure at i=" + i, 300 * 100, result.getBytesRead()); + assertEquals("Failure at i=" + i, 300 * 100, result.getBytesWritten()); + } + } + + @Test + public void testExpiresOnReportGeneration() { + final SecondPrecisionEventContainer container = new SecondPrecisionEventContainer(5); + final long startTime = System.currentTimeMillis(); + + final StandardFlowFileEvent event = new StandardFlowFileEvent(); + event.setBytesRead(100L); + event.setBytesWritten(100L); + + for (int j=0; j < 100; j++) { + container.addEvent(event, startTime + (j * 1000)); + } + + final FlowFileEvent resultAt5Mins = container.generateReport(startTime + 300_000); + assertEquals(100 * 100, resultAt5Mins.getBytesRead()); + assertEquals(100 * 100, resultAt5Mins.getBytesWritten()); + + final FlowFileEvent resultAt5MinsPlus50Seconds = container.generateReport(startTime + 350_000); + assertEquals(50 * 100, resultAt5MinsPlus50Seconds.getBytesRead()); + assertEquals(50 * 100, resultAt5MinsPlus50Seconds.getBytesWritten()); + + final FlowFileEvent resultAt5MinsPlus99Seconds = container.generateReport(startTime + 399_000); + assertEquals(100, resultAt5MinsPlus99Seconds.getBytesRead()); + assertEquals(100, resultAt5MinsPlus99Seconds.getBytesWritten()); + + final FlowFileEvent resultAt5MinsPlus100Seconds = container.generateReport(startTime + 400_000); + assertEquals(0, resultAt5MinsPlus100Seconds.getBytesRead()); + assertEquals(0, resultAt5MinsPlus100Seconds.getBytesWritten()); + + final FlowFileEvent resultAt5MinsPlus101Seconds = container.generateReport(startTime + 401_000); + assertEquals(0, resultAt5MinsPlus101Seconds.getBytesRead()); + assertEquals(0, resultAt5MinsPlus101Seconds.getBytesWritten()); + + final FlowFileEvent resultsAt5MinsPlus300seconds = container.generateReport(startTime + 600_000); + assertEquals(0, resultsAt5MinsPlus300seconds.getBytesRead()); + assertEquals(0, resultsAt5MinsPlus300seconds.getBytesWritten()); + + final FlowFileEvent resultsAt5MinsPlus600seconds = container.generateReport(startTime + 900_000); + assertEquals(0, resultsAt5MinsPlus600seconds.getBytesRead()); + assertEquals(0, resultsAt5MinsPlus600seconds.getBytesWritten()); + } + + @Test + public void testExpiresOnReportGenerationWithSkipsBetweenUpdates() { + final SecondPrecisionEventContainer container = new SecondPrecisionEventContainer(5); + final long startTime = System.currentTimeMillis(); + + final StandardFlowFileEvent event = new StandardFlowFileEvent(); + event.setBytesRead(100L); + event.setBytesWritten(100L); + + for (int j=0; j < 20; j++) { + container.addEvent(event, startTime + (j * 5000)); + } + + final FlowFileEvent resultAt5Mins = container.generateReport(startTime + 300_000); + assertEquals(20 * 100, resultAt5Mins.getBytesRead()); + assertEquals(20 * 100, resultAt5Mins.getBytesWritten()); + + final FlowFileEvent resultAt5MinsPlus50Seconds = container.generateReport(startTime + 350_000); + assertEquals(10 * 100, resultAt5MinsPlus50Seconds.getBytesRead()); + assertEquals(10 * 100, resultAt5MinsPlus50Seconds.getBytesWritten()); + + final FlowFileEvent resultAt5MinsPlus94Seconds = container.generateReport(startTime + 394_000); + assertEquals(100, resultAt5MinsPlus94Seconds.getBytesRead()); + assertEquals(100, resultAt5MinsPlus94Seconds.getBytesWritten()); + + final FlowFileEvent resultAt5MinsPlus95Seconds = container.generateReport(startTime + 395_000); + assertEquals(100, resultAt5MinsPlus95Seconds.getBytesRead()); + assertEquals(100, resultAt5MinsPlus95Seconds.getBytesWritten()); + + final FlowFileEvent resultAt5MinsPlus100Seconds = container.generateReport(startTime + 400_000); + assertEquals(0, resultAt5MinsPlus100Seconds.getBytesRead()); + assertEquals(0, resultAt5MinsPlus100Seconds.getBytesWritten()); + + final FlowFileEvent resultAt5MinsPlus101Seconds = container.generateReport(startTime + 401_000); + assertEquals(0, resultAt5MinsPlus101Seconds.getBytesRead()); + assertEquals(0, resultAt5MinsPlus101Seconds.getBytesWritten()); + + final FlowFileEvent resultsAt5MinsPlus300seconds = container.generateReport(startTime + 600_000); + assertEquals(0, resultsAt5MinsPlus300seconds.getBytesRead()); + assertEquals(0, resultsAt5MinsPlus300seconds.getBytesWritten()); + + final FlowFileEvent resultsAt5MinsPlus600seconds = container.generateReport(startTime + 900_000); + assertEquals(0, resultsAt5MinsPlus600seconds.getBytesRead()); + assertEquals(0, resultsAt5MinsPlus600seconds.getBytesWritten()); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java new file mode 100644 index 000000000000..b2893dcdf0ea --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java @@ -0,0 +1,110 @@ +package org.apache.nifi.controller.status.history; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestMetricRollingBuffer { + private static final Set> PROCESSOR_METRICS = Arrays.stream(ProcessorStatusDescriptor.values()) + .map(ProcessorStatusDescriptor::getDescriptor) + .collect(Collectors.toSet()); + + @Test + public void testBufferGrows() { + final int bufferCapacity = 1000; + final MetricRollingBuffer buffer = new MetricRollingBuffer(bufferCapacity); + + final long startTime = System.currentTimeMillis(); + final List timestamps = new ArrayList<>(); + + int iterations = 1440; + for (int i=0; i < iterations; i++) { + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS); + snapshot.setTimestamp(new Date(startTime + i * 1000)); + timestamps.add(snapshot.getTimestamp()); + + snapshot.addStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor(), Long.valueOf(i)); + + buffer.update(snapshot); + } + + assertEquals(bufferCapacity, buffer.size()); + + final List snapshots = buffer.getSnapshots(timestamps, true, PROCESSOR_METRICS); + assertEquals(iterations, snapshots.size()); + + final int expectedEmptyCount = iterations - bufferCapacity; + final long emptyCount = snapshots.stream().filter(snapshot -> snapshot instanceof EmptyStatusSnapshot).count(); + assertEquals(expectedEmptyCount, emptyCount); + + for (int i=0; i < iterations; i++) { + final StatusSnapshot snapshot = snapshots.get(i); + if (i < expectedEmptyCount) { + assertTrue("Snapshot at i=" + i + " is not an EmptyStatusSnapshot", snapshot instanceof EmptyStatusSnapshot); + } else { + assertEquals(Long.valueOf(i), snapshot.getStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor())); + assertFalse(snapshot instanceof EmptyStatusSnapshot); + } + } + } + + @Test + public void testBufferShrinks() { + // Cause buffer to grow + final int bufferCapacity = 1000; + final MetricRollingBuffer buffer = new MetricRollingBuffer(bufferCapacity); + + final long startTime = System.currentTimeMillis(); + + int iterations = 1440; + for (int i=0; i < iterations; i++) { + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS); + snapshot.setTimestamp(new Date(startTime + i * 1000)); + + snapshot.addStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor(), Long.valueOf(i)); + buffer.update(snapshot); + } + + assertEquals(bufferCapacity, buffer.size()); + + // Expire data ensure that the buffer shrinks + final long lastTimestamp = startTime + 1440 * 1000; + buffer.expireBefore(new Date(lastTimestamp - 144_001L)); + assertEquals(144, buffer.size()); + + buffer.expireBefore(new Date(lastTimestamp - 16_001L)); + assertEquals(16, buffer.size()); + + buffer.expireBefore(new Date(lastTimestamp)); + assertEquals(0, buffer.size()); + + // Ensure that we can now properly add data again + long insertStart = lastTimestamp + 10_000L; + final List timestamps = new ArrayList<>(); + for (int i=0; i < 4; i++) { + final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(PROCESSOR_METRICS); + snapshot.setTimestamp(new Date(insertStart + i * 1000)); + timestamps.add(snapshot.getTimestamp()); + + snapshot.addStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor(), Long.valueOf(i)); + buffer.update(snapshot); + } + + assertEquals(4, buffer.size()); + final List snapshots = buffer.getSnapshots(timestamps, true, PROCESSOR_METRICS); + assertEquals(4, snapshots.size()); + for (int i=0; i < 4; i++) { + final StatusSnapshot snapshot = snapshots.get(i); + assertEquals(Long.valueOf(i), snapshot.getStatusMetric(ProcessorStatusDescriptor.BYTES_WRITTEN.getDescriptor())); + } + } +} From f687ae92dd283f2cbd94863ce555931f12e53e6c Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 21 Aug 2018 11:42:54 -0400 Subject: [PATCH 2/2] Added missing license headers --- .../repository/metrics/EmptyFlowFileEvent.java | 16 ++++++++++++++++ .../status/history/ComponentDetails.java | 16 ++++++++++++++++ .../status/history/ComponentStatusHistory.java | 16 ++++++++++++++++ .../status/history/EmptyStatusSnapshot.java | 16 ++++++++++++++++ .../status/history/IndexableMetric.java | 16 ++++++++++++++++ .../status/history/MetricRollingBuffer.java | 16 ++++++++++++++++ .../org/apache/nifi/util/ComponentMetrics.java | 16 ++++++++++++++++ .../TestSecondPrecisionEventContainer.java | 16 ++++++++++++++++ .../status/history/TestMetricRollingBuffer.java | 16 ++++++++++++++++ 9 files changed, 144 insertions(+) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java index 496a1e111466..3c141401e0ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.controller.repository.metrics; import org.apache.nifi.controller.repository.FlowFileEvent; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java index 5381b739ae77..68a79b91c99f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetails.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.controller.status.history; import org.apache.nifi.controller.status.ConnectionStatus; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java index 5fb876881a1e..cd4b76b5b703 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusHistory.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.controller.status.history; import java.util.Date; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java index 1981ba0baf54..68c04caf8f57 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmptyStatusSnapshot.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.controller.status.history; import java.util.Date; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java index 5cc6a6c9800f..d5cbeaefc6e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/IndexableMetric.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.controller.status.history; public interface IndexableMetric { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java index 82e33cf8b283..215e364dbe40 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/MetricRollingBuffer.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.controller.status.history; import java.util.ArrayList; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java index 218017769ec0..9c1505cd002b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/ComponentMetrics.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.util; import org.apache.nifi.controller.status.ConnectionStatus; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java index 682a56524071..f8b455941634 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/metrics/TestSecondPrecisionEventContainer.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.controller.repository.metrics; import org.apache.nifi.controller.repository.FlowFileEvent; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java index b2893dcdf0ea..338fc625a608 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/TestMetricRollingBuffer.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.nifi.controller.status.history; import org.junit.Test;