From d61911f7b8c2ccf05d9e1e44a6b8bff56ad6e429 Mon Sep 17 00:00:00 2001 From: Peter Ansell Date: Mon, 5 Sep 2016 15:36:14 +1000 Subject: [PATCH] issue#301 : Work on tightening the contract for lucene sail components Note, this does not fix the issue which is still visible, by enforces more structural constraints to avoid future issues. Lock more Lucene methods. The internal Lucene API isn't stable for long term threaded access so external synchronisation is necessary. Add checks on calls after closure Signed-off-by: Peter Ansell --- .../sail/lucene/AbstractLuceneIndex.java | 4 +- .../sail/lucene/AbstractReaderMonitor.java | 33 ++-- .../eclipse/rdf4j/sail/lucene/LuceneSail.java | 53 ++++-- .../sail/lucene/LuceneSailConnection.java | 32 ++-- .../rdf4j/sail/lucene/LuceneIndex.java | 179 +++++++++++------- 5 files changed, 190 insertions(+), 111 deletions(-) diff --git a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractLuceneIndex.java b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractLuceneIndex.java index 1ed944ee6e5..fb47ccdd2e2 100644 --- a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractLuceneIndex.java +++ b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractLuceneIndex.java @@ -8,8 +8,8 @@ package org.eclipse.rdf4j.sail.lucene; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; public abstract class AbstractLuceneIndex extends AbstractSearchIndex { @@ -17,7 +17,7 @@ public abstract class AbstractLuceneIndex extends AbstractSearchIndex { * keep a lit of old monitors that are still iterating but not closed (open iterators), will be all closed * on shutdown items are removed from list by ReaderMnitor.endReading() when closing */ - protected final Collection oldmonitors = new LinkedList(); + protected final Collection oldmonitors = new ArrayList(); protected abstract AbstractReaderMonitor getCurrentMonitor(); diff --git a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractReaderMonitor.java b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractReaderMonitor.java index 322dcf89f40..ff2b8a089b6 100644 --- a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractReaderMonitor.java +++ b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractReaderMonitor.java @@ -33,14 +33,22 @@ protected AbstractReaderMonitor(AbstractLuceneIndex index) { this.index = index; } - public int getReadingCount() { + public final int getReadingCount() { return readingCount.get(); } /** * */ - public void beginReading() { + public final synchronized void beginReading() { + if (closed.get()) { + throw new IllegalStateException("Cannot begin reading as we have been closed."); + } + // We cannot allow any more readers to be open at this stage, as any + // decrements towards zero on readingCount could trigger closure/removal + if (doClose.get()) { + throw new IllegalStateException("Cannot begin reading as we have moved into closing stages."); + } readingCount.incrementAndGet(); } @@ -49,16 +57,17 @@ public void beginReading() { * * @throws IOException */ - public void endReading() + public final synchronized void endReading() throws IOException { - if (readingCount.decrementAndGet() == 0 && doClose.get()) { - // when endReading is called on CurrentMonitor and it should be closed, - // close it - close();// close Lucene index remove them self from Lucene index + if (readingCount.decrementAndGet() <= 0 && doClose.get()) { + // when endReading is called on CurrentMonitor and it should be + // closed, close it + close(); + // close Lucene index remove them self from Lucene index synchronized (index.oldmonitors) { - index.oldmonitors.remove(this); // if its not in the list, then this - // is a no-operation + // if its not in the list, then this is a no-operation + index.oldmonitors.remove(this); } } } @@ -69,7 +78,7 @@ public void endReading() * @return true if the close succeeded, false otherwise. * @throws IOException */ - public boolean closeWhenPossible() + public final synchronized boolean closeWhenPossible() throws IOException { doClose.set(true); @@ -79,10 +88,10 @@ public boolean closeWhenPossible() return closed.get(); } - public void close() + public final void close() throws IOException { - if (!closed.getAndSet(true)) { + if (closed.compareAndSet(false, true)) { handleClose(); } } diff --git a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java index d258a029075..2d9f71492ab 100644 --- a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java +++ b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Resource; @@ -253,13 +254,13 @@ public class LuceneSail extends NotifyingSailWrapper { /** * The LuceneIndex holding the indexed literals. */ - private SearchIndex luceneIndex; + private volatile SearchIndex luceneIndex; protected final Properties parameters = new Properties(); - private String reindexQuery = "SELECT ?s ?p ?o ?c WHERE {{?s ?p ?o} UNION {GRAPH ?c {?s ?p ?o.}}} ORDER BY ?s"; + private volatile String reindexQuery = "SELECT ?s ?p ?o ?c WHERE {{?s ?p ?o} UNION {GRAPH ?c {?s ?p ?o.}}} ORDER BY ?s"; - private boolean incompleteQueryFails = true; + private volatile boolean incompleteQueryFails = true; private Set indexedFields; @@ -267,6 +268,8 @@ public class LuceneSail extends NotifyingSailWrapper { private IndexableStatementFilter filter = null; + private final AtomicBoolean closed = new AtomicBoolean(false); + public void setLuceneIndex(SearchIndex luceneIndex) { this.luceneIndex = luceneIndex; } @@ -286,18 +289,22 @@ public NotifyingSailConnection getConnection() public void shutDown() throws SailException { - try { - if (luceneIndex != null) { - luceneIndex.shutDown(); + if(closed.compareAndSet(false, true)) { + try { + SearchIndex toShutDownLuceneIndex = luceneIndex; + luceneIndex = null; + if (toShutDownLuceneIndex != null) { + toShutDownLuceneIndex.shutDown(); + } + } + catch (IOException e) { + throw new SailException(e); + } + finally { + // ensure that super is also invoked when the LuceneIndex causes an + // IOException + super.shutDown(); } - } - catch (IOException e) { - throw new SailException(e); - } - finally { - // ensure that super is also invoked when the LuceneIndex causes an - // IOException - super.shutDown(); } } @@ -492,27 +499,33 @@ public void registerStatementFilter(IndexableStatementFilter filter) { } protected boolean acceptStatementToIndex(Statement s) { - return (filter != null) ? filter.accept(s) : true; + IndexableStatementFilter nextFilter = filter; + return (nextFilter != null) ? nextFilter.accept(s) : true; } public Statement mapStatement(Statement statement) { IRI p = statement.getPredicate(); boolean predicateChanged = false; - if (indexedFieldsMapping != null) { - IRI res = indexedFieldsMapping.get(p); + Map nextIndexedFieldsMapping = indexedFieldsMapping; + if (nextIndexedFieldsMapping != null) { + IRI res = nextIndexedFieldsMapping.get(p); if (res != null) { p = res; predicateChanged = true; } } - if (this.indexedFields != null && !this.indexedFields.contains(p)) + Set nextIndexedFields = indexedFields; + if (nextIndexedFields != null && !nextIndexedFields.contains(p)) { return null; + } - if (predicateChanged) + if (predicateChanged) { return getValueFactory().createStatement(statement.getSubject(), p, statement.getObject(), statement.getContext()); - else + } + else { return statement; + } } protected Collection getSearchQueryInterpreters() { diff --git a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSailConnection.java b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSailConnection.java index a7ef52df97b..782f6f9f173 100644 --- a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSailConnection.java +++ b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSailConnection.java @@ -14,6 +14,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.model.IRI; @@ -121,7 +122,7 @@ public void statementRemoved(Statement statement) { /** * To remember if the iterator was already closed and only free resources once */ - private boolean mustclose = false; + private final AtomicBoolean closed = new AtomicBoolean(false); public LuceneSailConnection(NotifyingSailConnection wrappedConnection, SearchIndex luceneIndex, LuceneSail sail) @@ -148,19 +149,21 @@ public synchronized void addStatement(Resource arg0, IRI arg1, Value arg2, Resou public void close() throws SailException { - // remember if you were closed before, some sloppy programmers - // may call close() twice. - if (mustclose) { - mustclose = false; + if (closed.compareAndSet(false, true)) { try { - luceneIndex.endReading(); + super.close(); } - catch (IOException e) { - logger.warn("could not close IndexReader or IndexSearcher " + e, e); + finally { + try { + luceneIndex.endReading(); + } + catch (IOException e) { + logger.warn("could not close IndexReader or IndexSearcher " + e, e); + } + // remember if you were closed before, some sloppy programmers + // may call close() twice. } } - - super.close(); } // //////////////////////////////// Methods related to indexing @@ -277,8 +280,8 @@ private void clearContexts(Resource... contexts) // //////////////////////////////// Methods related to querying @Override - public CloseableIteration evaluate(TupleExpr tupleExpr, - Dataset dataset, BindingSet bindings, boolean includeInferred) + public synchronized CloseableIteration evaluate( + TupleExpr tupleExpr, Dataset dataset, BindingSet bindings, boolean includeInferred) throws SailException { // Don't modify the original tuple expression @@ -319,6 +322,10 @@ private void evaluateLuceneQueries(Collection queries, Tup // - multiple different property constraints can be put into the lucene // query string (escape colons here) + if (closed.get()) { + throw new SailException("Sail has been closed already"); + } + // mark that reading is in progress try { this.luceneIndex.beginReading(); @@ -326,7 +333,6 @@ private void evaluateLuceneQueries(Collection queries, Tup catch (IOException e) { throw new SailException(e); } - this.mustclose = true; // evaluate queries, generate binding sets, and remove queries for (SearchQueryEvaluator query : queries) { diff --git a/core/sail/fts/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneIndex.java b/core/sail/fts/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneIndex.java index 48223374439..5990d69b1a2 100644 --- a/core/sail/fts/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneIndex.java +++ b/core/sail/fts/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneIndex.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.io.StringReader; +import java.lang.reflect.UndeclaredThrowableException; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.ArrayList; @@ -16,11 +17,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; @@ -95,7 +98,8 @@ public class LuceneIndex extends AbstractLuceneIndex { static { - // do NOT set this to Integer.MAX_VALUE, because this breaks fuzzy queries + // do NOT set this to Integer.MAX_VALUE, because this breaks fuzzy + // queries BooleanQuery.setMaxClauseCount(1024 * 1024); } @@ -106,26 +110,28 @@ public class LuceneIndex extends AbstractLuceneIndex { /** * The Directory that holds the Lucene index files. */ - private Directory directory; + private volatile Directory directory; /** * The Analyzer used to tokenize strings and queries. */ - private Analyzer analyzer; + private volatile Analyzer analyzer; - private Analyzer queryAnalyzer; + private volatile Analyzer queryAnalyzer; /** * The IndexWriter that can be used to alter the index' contents. Created lazily. */ - private IndexWriter indexWriter; + private volatile IndexWriter indexWriter; /** * This holds IndexReader and IndexSearcher. */ - protected ReaderMonitor currentMonitor; + protected volatile ReaderMonitor currentMonitor; - private Function geoStrategyMapper; + private volatile Function geoStrategyMapper; + + private final AtomicBoolean closed = new AtomicBoolean(false); public LuceneIndex() { } @@ -151,7 +157,7 @@ public LuceneIndex(Directory directory, Analyzer analyzer) } @Override - public void initialize(Properties parameters) + public synchronized void initialize(Properties parameters) throws Exception { super.initialize(parameters); @@ -254,15 +260,21 @@ public Analyzer getAnalyzer() { // ReaderMonitor directly to be able to close the reader when they // are done. - public IndexReader getIndexReader() + public synchronized IndexReader getIndexReader() throws IOException { + if (closed.get()) { + throw new SailException("Index has been closed"); + } return getIndexSearcher().getIndexReader(); } - public IndexSearcher getIndexSearcher() + public synchronized IndexSearcher getIndexSearcher() throws IOException { + if (closed.get()) { + throw new SailException("Index has been closed"); + } return getCurrentMonitor().getIndexSearcher(); } @@ -270,16 +282,22 @@ public IndexSearcher getIndexSearcher() * Current monitor holds instance of IndexReader and IndexSearcher It is used to keep track of readers */ @Override - public ReaderMonitor getCurrentMonitor() { - if (currentMonitor == null) + public synchronized ReaderMonitor getCurrentMonitor() { + if (closed.get()) { + throw new SailException("Index has been closed"); + } + if (currentMonitor == null) { currentMonitor = new ReaderMonitor(this, directory); + } return currentMonitor; } - public IndexWriter getIndexWriter() + public synchronized IndexWriter getIndexWriter() throws IOException { - + if (closed.get()) { + throw new SailException("Index has been closed"); + } if (indexWriter == null) { IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer); indexWriter = new IndexWriter(directory, indexWriterConfig); @@ -294,37 +312,59 @@ public void shutDown() // try-finally setup ensures that closing of an instance is not skipped // when an earlier instance resulted in an IOException // FIXME: is there a more elegant way to ensure this? - - // This close oldMonitors which hold InderReader and IndexSeracher - // Monitor close IndexReader and IndexSearcher - if (currentMonitor != null) { - currentMonitor.close(); - currentMonitor = null; - } - if (oldmonitors.size() > 0) { - logger.warn( - "LuceneSail: On shutdown {} IndexReaders were not closed. This is due to non-closed Query Iterators, which must be closed!", - oldmonitors.size()); - } - for (AbstractReaderMonitor monitor : oldmonitors) { - monitor.close(); - } - oldmonitors.clear(); - - try { - if (indexWriter != null) { - indexWriter.close(); + if (closed.compareAndSet(false, true)) { + try { + // This close oldMonitors which hold InderReader and + // IndexSeracher + // Monitor close IndexReader and IndexSearcher + ReaderMonitor toCloseCurrentMonitor = currentMonitor; + currentMonitor = null; + if (toCloseCurrentMonitor != null) { + toCloseCurrentMonitor.close(); + } + } + finally { + List exceptions = new ArrayList<>(); + try { + synchronized (oldmonitors) { + if (oldmonitors.size() > 0) { + logger.warn( + "LuceneSail: On shutdown {} IndexReaders were not closed. This is due to non-closed Query Iterators, which must be closed!", + oldmonitors.size()); + } + for (AbstractReaderMonitor monitor : oldmonitors) { + try { + monitor.close(); + } + catch (Throwable e) { + exceptions.add(e); + } + } + oldmonitors.clear(); + } + } + finally { + try { + IndexWriter toCloseIndexWriter = indexWriter; + indexWriter = null; + if (toCloseIndexWriter != null) { + toCloseIndexWriter.close(); + } + } + finally { + if (!exceptions.isEmpty()) { + throw new UndeclaredThrowableException(exceptions.get(0)); + } + } + } } - } - finally { - indexWriter = null; } } // //////////////////////////////// Methods for updating the index @Override - protected SearchDocument getDocument(String id) + protected synchronized SearchDocument getDocument(String id) throws IOException { Document document = getDocument(idTerm(id)); @@ -332,7 +372,7 @@ protected SearchDocument getDocument(String id) } @Override - protected Iterable getDocuments(String resourceId) + protected synchronized Iterable getDocuments(String resourceId) throws IOException { List docs = getDocuments(new Term(SearchFields.URI_FIELD_NAME, resourceId)); @@ -346,12 +386,12 @@ public SearchDocument apply(Document doc) { } @Override - protected SearchDocument newDocument(String id, String resourceId, String context) { + protected synchronized SearchDocument newDocument(String id, String resourceId, String context) { return new LuceneDocument(id, resourceId, context, geoStrategyMapper); } @Override - protected SearchDocument copyDocument(SearchDocument doc) { + protected synchronized SearchDocument copyDocument(SearchDocument doc) { Document document = ((LuceneDocument)doc).getDocument(); Document newDocument = new Document(); @@ -363,28 +403,28 @@ protected SearchDocument copyDocument(SearchDocument doc) { } @Override - protected void addDocument(SearchDocument doc) + protected synchronized void addDocument(SearchDocument doc) throws IOException { getIndexWriter().addDocument(((LuceneDocument)doc).getDocument()); } @Override - protected void updateDocument(SearchDocument doc) + protected synchronized void updateDocument(SearchDocument doc) throws IOException { getIndexWriter().updateDocument(idTerm(doc.getId()), ((LuceneDocument)doc).getDocument()); } @Override - protected void deleteDocument(SearchDocument doc) + protected synchronized void deleteDocument(SearchDocument doc) throws IOException { getIndexWriter().deleteDocuments(idTerm(doc.getId())); } @Override - protected BulkUpdater newBulkUpdate() { + protected synchronized BulkUpdater newBulkUpdate() { return new SimpleBulkUpdater(this); } @@ -472,7 +512,7 @@ private static void addDocuments(LeafReader reader, Term term, Collection getDocuments(Resource subject) + public synchronized List getDocuments(Resource subject) throws IOException { String resourceId = SearchFields.getResourceID(subject); @@ -552,11 +592,14 @@ private void invalidateReaders() { synchronized (oldmonitors) { // Move current monitor to old monitors and set null - if (currentMonitor != null) - // we do NOT close it directly as it may be used by an open result + if (currentMonitor != null) { + // we do NOT close it directly as it may be used by an open + // result // iterator, hence moving it to the - // list of oldmonitors where it is handled as other older monitors + // list of oldmonitors where it is handled as other older + // monitors oldmonitors.add(currentMonitor); + } currentMonitor = null; // close all monitors if possible @@ -571,7 +614,8 @@ private void invalidateReaders() if (oldmonitors.isEmpty()) { logger.debug("Deleting unused files from Lucene index"); - // clean up unused files (marked as 'deletable' in Luke Filewalker) + // clean up unused files (marked as 'deletable' in Luke + // Filewalker) getIndexWriter().deleteUnusedFiles(); // logIndexStats(); @@ -612,9 +656,10 @@ private void logIndexStats() { } finally { - if (currentMonitor != null) { - currentMonitor.closeWhenPossible(); - currentMonitor = null; + ReaderMonitor toCloseCurrentMonitor = currentMonitor; + currentMonitor = null; + if (toCloseCurrentMonitor != null) { + toCloseCurrentMonitor.closeWhenPossible(); } } } @@ -625,7 +670,7 @@ private void logIndexStats() { } @Override - public void begin() + public synchronized void begin() throws IOException { // nothing to do @@ -637,7 +682,7 @@ public void begin() * LuceneSailConnection is committed/rollbacked. */ @Override - public void commit() + public synchronized void commit() throws IOException { getIndexWriter().commit(); @@ -646,7 +691,7 @@ public void commit() } @Override - public void rollback() + public synchronized void rollback() throws IOException { getIndexWriter().rollback(); @@ -823,7 +868,7 @@ else if (GEOF.EH_COVERS.stringValue().equals(relation)) { * the id of the document to return * @return the requested hit, or null if it fails */ - public Document getDocument(int docId, Set fieldsToLoad) { + public synchronized Document getDocument(int docId, Set fieldsToLoad) { try { return readDocument(getIndexReader(), docId, fieldsToLoad); } @@ -837,7 +882,7 @@ public Document getDocument(int docId, Set fieldsToLoad) { } } - public String getSnippet(String fieldName, String text, Highlighter highlighter) { + public synchronized String getSnippet(String fieldName, String text, Highlighter highlighter) { String snippet; try { TokenStream tokenStream = getAnalyzer().tokenStream(fieldName, new StringReader(text)); @@ -869,7 +914,7 @@ public String getSnippet(String fieldName, String text, Highlighter highlighter) /** * Evaluates the given query only for the given resource. */ - public TopDocs search(Resource resource, Query query) + public synchronized TopDocs search(Resource resource, Query query) throws IOException { // rewrite the query @@ -884,7 +929,7 @@ public TopDocs search(Resource resource, Query query) /** * Evaluates the given query and returns the results as a TopDocs instance. */ - public TopDocs search(Query query) + public synchronized TopDocs search(Query query) throws IOException { int nDocs; @@ -900,7 +945,8 @@ public TopDocs search(Query query) private QueryParser getQueryParser(URI propertyURI) { // check out which query parser to use, based on the given property URI if (propertyURI == null) - // if we have no property given, we create a default query parser which + // if we have no property given, we create a default query parser + // which // has the TEXT_FIELD_NAME as the default field return new QueryParser(SearchFields.TEXT_FIELD_NAME, this.queryAnalyzer); else @@ -925,7 +971,8 @@ public synchronized void clearContexts(Resource... contexts) logger.debug("deleting contexts: {}", Arrays.toString(contexts)); // these resources have to be read from the underlying rdf store - // and their triples have to be added to the luceneindex after deletion of + // and their triples have to be added to the luceneindex after deletion + // of // documents // HashSet resourcesToUpdate = new HashSet(); @@ -963,7 +1010,8 @@ public synchronized void clearContexts(Resource... contexts) // if (c.equals(otherContextOfDocument)) // isAlsoDeleted = true; // } - // // the otherContextOfDocument is now eihter marked for deletion or + // // the otherContextOfDocument is now eihter marked for deletion + // or // not // if (!isAlsoDeleted) { // // get ID of document @@ -1009,6 +1057,9 @@ public synchronized void clearContexts(Resource... contexts) public synchronized void clear() throws IOException { + if (closed.get()) { + throw new SailException("Index has been closed"); + } // clear // the old IndexReaders/Searchers are not outdated invalidateReaders();