diff --git a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractLuceneIndex.java b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractLuceneIndex.java index 1ed944ee6e5..fb47ccdd2e2 100644 --- a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractLuceneIndex.java +++ b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractLuceneIndex.java @@ -8,8 +8,8 @@ package org.eclipse.rdf4j.sail.lucene; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; public abstract class AbstractLuceneIndex extends AbstractSearchIndex { @@ -17,7 +17,7 @@ public abstract class AbstractLuceneIndex extends AbstractSearchIndex { * keep a lit of old monitors that are still iterating but not closed (open iterators), will be all closed * on shutdown items are removed from list by ReaderMnitor.endReading() when closing */ - protected final Collection oldmonitors = new LinkedList(); + protected final Collection oldmonitors = new ArrayList(); protected abstract AbstractReaderMonitor getCurrentMonitor(); diff --git a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractReaderMonitor.java b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractReaderMonitor.java index 322dcf89f40..ff2b8a089b6 100644 --- a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractReaderMonitor.java +++ b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/AbstractReaderMonitor.java @@ -33,14 +33,22 @@ protected AbstractReaderMonitor(AbstractLuceneIndex index) { this.index = index; } - public int getReadingCount() { + public final int getReadingCount() { return readingCount.get(); } /** * */ - public void beginReading() { + public final synchronized void beginReading() { + if (closed.get()) { + throw new IllegalStateException("Cannot begin reading as we have been closed."); + } + // We cannot allow any more readers to be open at this stage, as any + // decrements towards zero on readingCount could trigger closure/removal + if (doClose.get()) { + throw new IllegalStateException("Cannot begin reading as we have moved into closing stages."); + } readingCount.incrementAndGet(); } @@ -49,16 +57,17 @@ public void beginReading() { * * @throws IOException */ - public void endReading() + public final synchronized void endReading() throws IOException { - if (readingCount.decrementAndGet() == 0 && doClose.get()) { - // when endReading is called on CurrentMonitor and it should be closed, - // close it - close();// close Lucene index remove them self from Lucene index + if (readingCount.decrementAndGet() <= 0 && doClose.get()) { + // when endReading is called on CurrentMonitor and it should be + // closed, close it + close(); + // close Lucene index remove them self from Lucene index synchronized (index.oldmonitors) { - index.oldmonitors.remove(this); // if its not in the list, then this - // is a no-operation + // if its not in the list, then this is a no-operation + index.oldmonitors.remove(this); } } } @@ -69,7 +78,7 @@ public void endReading() * @return true if the close succeeded, false otherwise. * @throws IOException */ - public boolean closeWhenPossible() + public final synchronized boolean closeWhenPossible() throws IOException { doClose.set(true); @@ -79,10 +88,10 @@ public boolean closeWhenPossible() return closed.get(); } - public void close() + public final void close() throws IOException { - if (!closed.getAndSet(true)) { + if (closed.compareAndSet(false, true)) { handleClose(); } } diff --git a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java index d258a029075..2d9f71492ab 100644 --- a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java +++ b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSail.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Resource; @@ -253,13 +254,13 @@ public class LuceneSail extends NotifyingSailWrapper { /** * The LuceneIndex holding the indexed literals. */ - private SearchIndex luceneIndex; + private volatile SearchIndex luceneIndex; protected final Properties parameters = new Properties(); - private String reindexQuery = "SELECT ?s ?p ?o ?c WHERE {{?s ?p ?o} UNION {GRAPH ?c {?s ?p ?o.}}} ORDER BY ?s"; + private volatile String reindexQuery = "SELECT ?s ?p ?o ?c WHERE {{?s ?p ?o} UNION {GRAPH ?c {?s ?p ?o.}}} ORDER BY ?s"; - private boolean incompleteQueryFails = true; + private volatile boolean incompleteQueryFails = true; private Set indexedFields; @@ -267,6 +268,8 @@ public class LuceneSail extends NotifyingSailWrapper { private IndexableStatementFilter filter = null; + private final AtomicBoolean closed = new AtomicBoolean(false); + public void setLuceneIndex(SearchIndex luceneIndex) { this.luceneIndex = luceneIndex; } @@ -286,18 +289,22 @@ public NotifyingSailConnection getConnection() public void shutDown() throws SailException { - try { - if (luceneIndex != null) { - luceneIndex.shutDown(); + if(closed.compareAndSet(false, true)) { + try { + SearchIndex toShutDownLuceneIndex = luceneIndex; + luceneIndex = null; + if (toShutDownLuceneIndex != null) { + toShutDownLuceneIndex.shutDown(); + } + } + catch (IOException e) { + throw new SailException(e); + } + finally { + // ensure that super is also invoked when the LuceneIndex causes an + // IOException + super.shutDown(); } - } - catch (IOException e) { - throw new SailException(e); - } - finally { - // ensure that super is also invoked when the LuceneIndex causes an - // IOException - super.shutDown(); } } @@ -492,27 +499,33 @@ public void registerStatementFilter(IndexableStatementFilter filter) { } protected boolean acceptStatementToIndex(Statement s) { - return (filter != null) ? filter.accept(s) : true; + IndexableStatementFilter nextFilter = filter; + return (nextFilter != null) ? nextFilter.accept(s) : true; } public Statement mapStatement(Statement statement) { IRI p = statement.getPredicate(); boolean predicateChanged = false; - if (indexedFieldsMapping != null) { - IRI res = indexedFieldsMapping.get(p); + Map nextIndexedFieldsMapping = indexedFieldsMapping; + if (nextIndexedFieldsMapping != null) { + IRI res = nextIndexedFieldsMapping.get(p); if (res != null) { p = res; predicateChanged = true; } } - if (this.indexedFields != null && !this.indexedFields.contains(p)) + Set nextIndexedFields = indexedFields; + if (nextIndexedFields != null && !nextIndexedFields.contains(p)) { return null; + } - if (predicateChanged) + if (predicateChanged) { return getValueFactory().createStatement(statement.getSubject(), p, statement.getObject(), statement.getContext()); - else + } + else { return statement; + } } protected Collection getSearchQueryInterpreters() { diff --git a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSailConnection.java b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSailConnection.java index a7ef52df97b..782f6f9f173 100644 --- a/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSailConnection.java +++ b/core/sail/fts/lucene-api/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneSailConnection.java @@ -14,6 +14,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.model.IRI; @@ -121,7 +122,7 @@ public void statementRemoved(Statement statement) { /** * To remember if the iterator was already closed and only free resources once */ - private boolean mustclose = false; + private final AtomicBoolean closed = new AtomicBoolean(false); public LuceneSailConnection(NotifyingSailConnection wrappedConnection, SearchIndex luceneIndex, LuceneSail sail) @@ -148,19 +149,21 @@ public synchronized void addStatement(Resource arg0, IRI arg1, Value arg2, Resou public void close() throws SailException { - // remember if you were closed before, some sloppy programmers - // may call close() twice. - if (mustclose) { - mustclose = false; + if (closed.compareAndSet(false, true)) { try { - luceneIndex.endReading(); + super.close(); } - catch (IOException e) { - logger.warn("could not close IndexReader or IndexSearcher " + e, e); + finally { + try { + luceneIndex.endReading(); + } + catch (IOException e) { + logger.warn("could not close IndexReader or IndexSearcher " + e, e); + } + // remember if you were closed before, some sloppy programmers + // may call close() twice. } } - - super.close(); } // //////////////////////////////// Methods related to indexing @@ -277,8 +280,8 @@ private void clearContexts(Resource... contexts) // //////////////////////////////// Methods related to querying @Override - public CloseableIteration evaluate(TupleExpr tupleExpr, - Dataset dataset, BindingSet bindings, boolean includeInferred) + public synchronized CloseableIteration evaluate( + TupleExpr tupleExpr, Dataset dataset, BindingSet bindings, boolean includeInferred) throws SailException { // Don't modify the original tuple expression @@ -319,6 +322,10 @@ private void evaluateLuceneQueries(Collection queries, Tup // - multiple different property constraints can be put into the lucene // query string (escape colons here) + if (closed.get()) { + throw new SailException("Sail has been closed already"); + } + // mark that reading is in progress try { this.luceneIndex.beginReading(); @@ -326,7 +333,6 @@ private void evaluateLuceneQueries(Collection queries, Tup catch (IOException e) { throw new SailException(e); } - this.mustclose = true; // evaluate queries, generate binding sets, and remove queries for (SearchQueryEvaluator query : queries) { diff --git a/core/sail/fts/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneIndex.java b/core/sail/fts/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneIndex.java index 48223374439..5990d69b1a2 100644 --- a/core/sail/fts/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneIndex.java +++ b/core/sail/fts/lucene/src/main/java/org/eclipse/rdf4j/sail/lucene/LuceneIndex.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.io.StringReader; +import java.lang.reflect.UndeclaredThrowableException; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.ArrayList; @@ -16,11 +17,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Hashtable; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; @@ -95,7 +98,8 @@ public class LuceneIndex extends AbstractLuceneIndex { static { - // do NOT set this to Integer.MAX_VALUE, because this breaks fuzzy queries + // do NOT set this to Integer.MAX_VALUE, because this breaks fuzzy + // queries BooleanQuery.setMaxClauseCount(1024 * 1024); } @@ -106,26 +110,28 @@ public class LuceneIndex extends AbstractLuceneIndex { /** * The Directory that holds the Lucene index files. */ - private Directory directory; + private volatile Directory directory; /** * The Analyzer used to tokenize strings and queries. */ - private Analyzer analyzer; + private volatile Analyzer analyzer; - private Analyzer queryAnalyzer; + private volatile Analyzer queryAnalyzer; /** * The IndexWriter that can be used to alter the index' contents. Created lazily. */ - private IndexWriter indexWriter; + private volatile IndexWriter indexWriter; /** * This holds IndexReader and IndexSearcher. */ - protected ReaderMonitor currentMonitor; + protected volatile ReaderMonitor currentMonitor; - private Function geoStrategyMapper; + private volatile Function geoStrategyMapper; + + private final AtomicBoolean closed = new AtomicBoolean(false); public LuceneIndex() { } @@ -151,7 +157,7 @@ public LuceneIndex(Directory directory, Analyzer analyzer) } @Override - public void initialize(Properties parameters) + public synchronized void initialize(Properties parameters) throws Exception { super.initialize(parameters); @@ -254,15 +260,21 @@ public Analyzer getAnalyzer() { // ReaderMonitor directly to be able to close the reader when they // are done. - public IndexReader getIndexReader() + public synchronized IndexReader getIndexReader() throws IOException { + if (closed.get()) { + throw new SailException("Index has been closed"); + } return getIndexSearcher().getIndexReader(); } - public IndexSearcher getIndexSearcher() + public synchronized IndexSearcher getIndexSearcher() throws IOException { + if (closed.get()) { + throw new SailException("Index has been closed"); + } return getCurrentMonitor().getIndexSearcher(); } @@ -270,16 +282,22 @@ public IndexSearcher getIndexSearcher() * Current monitor holds instance of IndexReader and IndexSearcher It is used to keep track of readers */ @Override - public ReaderMonitor getCurrentMonitor() { - if (currentMonitor == null) + public synchronized ReaderMonitor getCurrentMonitor() { + if (closed.get()) { + throw new SailException("Index has been closed"); + } + if (currentMonitor == null) { currentMonitor = new ReaderMonitor(this, directory); + } return currentMonitor; } - public IndexWriter getIndexWriter() + public synchronized IndexWriter getIndexWriter() throws IOException { - + if (closed.get()) { + throw new SailException("Index has been closed"); + } if (indexWriter == null) { IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer); indexWriter = new IndexWriter(directory, indexWriterConfig); @@ -294,37 +312,59 @@ public void shutDown() // try-finally setup ensures that closing of an instance is not skipped // when an earlier instance resulted in an IOException // FIXME: is there a more elegant way to ensure this? - - // This close oldMonitors which hold InderReader and IndexSeracher - // Monitor close IndexReader and IndexSearcher - if (currentMonitor != null) { - currentMonitor.close(); - currentMonitor = null; - } - if (oldmonitors.size() > 0) { - logger.warn( - "LuceneSail: On shutdown {} IndexReaders were not closed. This is due to non-closed Query Iterators, which must be closed!", - oldmonitors.size()); - } - for (AbstractReaderMonitor monitor : oldmonitors) { - monitor.close(); - } - oldmonitors.clear(); - - try { - if (indexWriter != null) { - indexWriter.close(); + if (closed.compareAndSet(false, true)) { + try { + // This close oldMonitors which hold InderReader and + // IndexSeracher + // Monitor close IndexReader and IndexSearcher + ReaderMonitor toCloseCurrentMonitor = currentMonitor; + currentMonitor = null; + if (toCloseCurrentMonitor != null) { + toCloseCurrentMonitor.close(); + } + } + finally { + List exceptions = new ArrayList<>(); + try { + synchronized (oldmonitors) { + if (oldmonitors.size() > 0) { + logger.warn( + "LuceneSail: On shutdown {} IndexReaders were not closed. This is due to non-closed Query Iterators, which must be closed!", + oldmonitors.size()); + } + for (AbstractReaderMonitor monitor : oldmonitors) { + try { + monitor.close(); + } + catch (Throwable e) { + exceptions.add(e); + } + } + oldmonitors.clear(); + } + } + finally { + try { + IndexWriter toCloseIndexWriter = indexWriter; + indexWriter = null; + if (toCloseIndexWriter != null) { + toCloseIndexWriter.close(); + } + } + finally { + if (!exceptions.isEmpty()) { + throw new UndeclaredThrowableException(exceptions.get(0)); + } + } + } } - } - finally { - indexWriter = null; } } // //////////////////////////////// Methods for updating the index @Override - protected SearchDocument getDocument(String id) + protected synchronized SearchDocument getDocument(String id) throws IOException { Document document = getDocument(idTerm(id)); @@ -332,7 +372,7 @@ protected SearchDocument getDocument(String id) } @Override - protected Iterable getDocuments(String resourceId) + protected synchronized Iterable getDocuments(String resourceId) throws IOException { List docs = getDocuments(new Term(SearchFields.URI_FIELD_NAME, resourceId)); @@ -346,12 +386,12 @@ public SearchDocument apply(Document doc) { } @Override - protected SearchDocument newDocument(String id, String resourceId, String context) { + protected synchronized SearchDocument newDocument(String id, String resourceId, String context) { return new LuceneDocument(id, resourceId, context, geoStrategyMapper); } @Override - protected SearchDocument copyDocument(SearchDocument doc) { + protected synchronized SearchDocument copyDocument(SearchDocument doc) { Document document = ((LuceneDocument)doc).getDocument(); Document newDocument = new Document(); @@ -363,28 +403,28 @@ protected SearchDocument copyDocument(SearchDocument doc) { } @Override - protected void addDocument(SearchDocument doc) + protected synchronized void addDocument(SearchDocument doc) throws IOException { getIndexWriter().addDocument(((LuceneDocument)doc).getDocument()); } @Override - protected void updateDocument(SearchDocument doc) + protected synchronized void updateDocument(SearchDocument doc) throws IOException { getIndexWriter().updateDocument(idTerm(doc.getId()), ((LuceneDocument)doc).getDocument()); } @Override - protected void deleteDocument(SearchDocument doc) + protected synchronized void deleteDocument(SearchDocument doc) throws IOException { getIndexWriter().deleteDocuments(idTerm(doc.getId())); } @Override - protected BulkUpdater newBulkUpdate() { + protected synchronized BulkUpdater newBulkUpdate() { return new SimpleBulkUpdater(this); } @@ -472,7 +512,7 @@ private static void addDocuments(LeafReader reader, Term term, Collection getDocuments(Resource subject) + public synchronized List getDocuments(Resource subject) throws IOException { String resourceId = SearchFields.getResourceID(subject); @@ -552,11 +592,14 @@ private void invalidateReaders() { synchronized (oldmonitors) { // Move current monitor to old monitors and set null - if (currentMonitor != null) - // we do NOT close it directly as it may be used by an open result + if (currentMonitor != null) { + // we do NOT close it directly as it may be used by an open + // result // iterator, hence moving it to the - // list of oldmonitors where it is handled as other older monitors + // list of oldmonitors where it is handled as other older + // monitors oldmonitors.add(currentMonitor); + } currentMonitor = null; // close all monitors if possible @@ -571,7 +614,8 @@ private void invalidateReaders() if (oldmonitors.isEmpty()) { logger.debug("Deleting unused files from Lucene index"); - // clean up unused files (marked as 'deletable' in Luke Filewalker) + // clean up unused files (marked as 'deletable' in Luke + // Filewalker) getIndexWriter().deleteUnusedFiles(); // logIndexStats(); @@ -612,9 +656,10 @@ private void logIndexStats() { } finally { - if (currentMonitor != null) { - currentMonitor.closeWhenPossible(); - currentMonitor = null; + ReaderMonitor toCloseCurrentMonitor = currentMonitor; + currentMonitor = null; + if (toCloseCurrentMonitor != null) { + toCloseCurrentMonitor.closeWhenPossible(); } } } @@ -625,7 +670,7 @@ private void logIndexStats() { } @Override - public void begin() + public synchronized void begin() throws IOException { // nothing to do @@ -637,7 +682,7 @@ public void begin() * LuceneSailConnection is committed/rollbacked. */ @Override - public void commit() + public synchronized void commit() throws IOException { getIndexWriter().commit(); @@ -646,7 +691,7 @@ public void commit() } @Override - public void rollback() + public synchronized void rollback() throws IOException { getIndexWriter().rollback(); @@ -823,7 +868,7 @@ else if (GEOF.EH_COVERS.stringValue().equals(relation)) { * the id of the document to return * @return the requested hit, or null if it fails */ - public Document getDocument(int docId, Set fieldsToLoad) { + public synchronized Document getDocument(int docId, Set fieldsToLoad) { try { return readDocument(getIndexReader(), docId, fieldsToLoad); } @@ -837,7 +882,7 @@ public Document getDocument(int docId, Set fieldsToLoad) { } } - public String getSnippet(String fieldName, String text, Highlighter highlighter) { + public synchronized String getSnippet(String fieldName, String text, Highlighter highlighter) { String snippet; try { TokenStream tokenStream = getAnalyzer().tokenStream(fieldName, new StringReader(text)); @@ -869,7 +914,7 @@ public String getSnippet(String fieldName, String text, Highlighter highlighter) /** * Evaluates the given query only for the given resource. */ - public TopDocs search(Resource resource, Query query) + public synchronized TopDocs search(Resource resource, Query query) throws IOException { // rewrite the query @@ -884,7 +929,7 @@ public TopDocs search(Resource resource, Query query) /** * Evaluates the given query and returns the results as a TopDocs instance. */ - public TopDocs search(Query query) + public synchronized TopDocs search(Query query) throws IOException { int nDocs; @@ -900,7 +945,8 @@ public TopDocs search(Query query) private QueryParser getQueryParser(URI propertyURI) { // check out which query parser to use, based on the given property URI if (propertyURI == null) - // if we have no property given, we create a default query parser which + // if we have no property given, we create a default query parser + // which // has the TEXT_FIELD_NAME as the default field return new QueryParser(SearchFields.TEXT_FIELD_NAME, this.queryAnalyzer); else @@ -925,7 +971,8 @@ public synchronized void clearContexts(Resource... contexts) logger.debug("deleting contexts: {}", Arrays.toString(contexts)); // these resources have to be read from the underlying rdf store - // and their triples have to be added to the luceneindex after deletion of + // and their triples have to be added to the luceneindex after deletion + // of // documents // HashSet resourcesToUpdate = new HashSet(); @@ -963,7 +1010,8 @@ public synchronized void clearContexts(Resource... contexts) // if (c.equals(otherContextOfDocument)) // isAlsoDeleted = true; // } - // // the otherContextOfDocument is now eihter marked for deletion or + // // the otherContextOfDocument is now eihter marked for deletion + // or // not // if (!isAlsoDeleted) { // // get ID of document @@ -1009,6 +1057,9 @@ public synchronized void clearContexts(Resource... contexts) public synchronized void clear() throws IOException { + if (closed.get()) { + throw new SailException("Index has been closed"); + } // clear // the old IndexReaders/Searchers are not outdated invalidateReaders();