From f186634212560e8a8d7e817f8cdb94ce3d9fe14e Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 8 Aug 2016 15:01:32 +0000 Subject: [PATCH] NIFI-2495: Ensure that we always close Index Searchers when we're finished with them --- .../nifi/provenance/lucene/IndexManager.java | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java index b93d3b70b627..699517331fa8 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -21,11 +21,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -189,7 +187,7 @@ public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException } else { // keep track of any searchers that have been closed so that we can remove them // from our cache later. - final Set expired = new HashSet<>(); + final List expired = new ArrayList<>(); try { for ( final ActiveIndexSearcher searcher : currentlyCached ) { @@ -307,10 +305,12 @@ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher s // Check if the given searcher is in our list. We use an Iterator to do this so that if we // find it we can call remove() on the iterator if need be. - final Iterator itr = currentlyCached.iterator(); + final Iterator itr = new ArrayList<>(currentlyCached).iterator(); + boolean activeSearcherFound = false; while (itr.hasNext()) { final ActiveIndexSearcher activeSearcher = itr.next(); if ( activeSearcher.getSearcher().equals(searcher) ) { + activeSearcherFound = true; if ( activeSearcher.isCache() ) { // if the searcher is poisoned, close it and remove from "pool". if ( activeSearcher.isPoisoned() ) { @@ -318,7 +318,10 @@ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher s try { logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory); - activeSearcher.close(); + final boolean allReferencesClosed = activeSearcher.close(); + if (!allReferencesClosed) { + currentlyCached.add(activeSearcher); + } } catch (final IOException ioe) { logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); if ( logger.isDebugEnabled() ) { @@ -366,7 +369,10 @@ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher s try { logger.debug("Closing Index Searcher for {}", indexDirectory); - activeSearcher.close(); + final boolean allReferencesClosed = activeSearcher.close(); + if (!allReferencesClosed) { + currentlyCached.add(activeSearcher); + } } catch (final IOException ioe) { logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); if ( logger.isDebugEnabled() ) { @@ -376,6 +382,10 @@ public void returnIndexSearcher(final File indexDirectory, final IndexSearcher s } } } + + if (!activeSearcherFound) { + logger.error("Index Searcher {} was returned for {} but found no Active Searcher for it", searcher, indexDirectory); + } } finally { lock.unlock(); } @@ -448,7 +458,7 @@ private static void close(final Closeable... closeables) throws IOException { } - private static class ActiveIndexSearcher implements Closeable { + private static class ActiveIndexSearcher { private final IndexSearcher searcher; private final DirectoryReader directoryReader; private final File indexDirectory; @@ -490,14 +500,15 @@ public int decrementReferenceCount() { return referenceCount.decrementAndGet(); } - @Override - public void close() throws IOException { + public boolean close() throws IOException { final int updatedRefCount = referenceCount.decrementAndGet(); if (updatedRefCount <= 0) { logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount); IndexManager.close(directoryReader, directory); + return true; } else { logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount); + return false; } }