From 1b4d2fd09fceba98d6c739296eb416ad508e6241 Mon Sep 17 00:00:00 2001 From: Joe Skora Date: Thu, 17 Nov 2016 15:30:52 +0000 Subject: [PATCH] NIFI-3039 Provenance Repository - Fix PurgeOldEvent and Rollover Size Limits * Added low water mark for purgeOldEvents() to prevent thrashing on event cleanup. * Adjusted rollover high water mark to match purgeOldEvents() to prevent overrunning "nifi.provenance.repository.max.storage.size". * Moved high/low water marks to constants. * Adjusted looping logic in mergeJournals() to use ".firstKey()" instead of ".entrySet().iterator().next()" to avoid unnecessary object creation. --- .../PersistentProvenanceRepository.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 8b971b5eda73..e24ebfa7a340 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -116,6 +116,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository public static final int MAX_UNDELETED_QUERY_RESULTS = 10; public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file + private static final float PURGE_OLD_EVENTS_HIGH_WATER = 0.9f; + private static final float PURGE_OLD_EVENTS_LOW_WATER = 0.7f; + private static final float ROLLOVER_HIGH_WATER = 0.9f; + private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class); private final long maxPartitionMillis; @@ -878,13 +882,13 @@ public int compare(final File o1, final File o2) { }; // If we have too much data (at least 90% of our max capacity), start aging it off - if (bytesUsed > configuration.getMaxStorageCapacity() * 0.9) { + if (bytesUsed > configuration.getMaxStorageCapacity() * PURGE_OLD_EVENTS_HIGH_WATER) { Collections.sort(sortedByBasename, sortByBasenameComparator); for (final File file : sortedByBasename) { toPurge.add(file); bytesUsed -= file.length(); - if (bytesUsed < configuration.getMaxStorageCapacity()) { + if (bytesUsed < configuration.getMaxStorageCapacity() * PURGE_OLD_EVENTS_LOW_WATER) { // we've shrunk the repo size down enough to stop break; } @@ -1286,7 +1290,7 @@ public void run() { int journalFileCount = getJournalCount(); long repoSize = getSize(getLogFiles(), 0L); final int journalCountThreshold = configuration.getJournalCount() * 5; - final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max capacity + final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * ROLLOVER_HIGH_WATER); // check if we need to apply backpressure. // If we have too many journal files, or if the repo becomes too large, backpressure is necessary. Without it, @@ -1664,9 +1668,8 @@ public Object call() throws IOException { boolean indexEvents = true; while (!recordToReaderMap.isEmpty()) { - final Map.Entry entry = recordToReaderMap.entrySet().iterator().next(); - final StandardProvenanceEventRecord record = entry.getKey(); - final RecordReader reader = entry.getValue(); + final StandardProvenanceEventRecord record = recordToReaderMap.firstKey(); + final RecordReader reader = recordToReaderMap.get(record); writer.writeRecord(record, record.getEventId()); final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();