Skip to content

Commit

Permalink
Merge pull request #322 from ansell/issues/#301-lucene-threading
Browse files Browse the repository at this point in the history
issue#301 : Work on tightening the contract for lucene sail components
  • Loading branch information
abrokenjester committed Sep 5, 2016
2 parents f794529 + d61911f commit 67092c5
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 111 deletions.
Expand Up @@ -8,16 +8,16 @@
package org.eclipse.rdf4j.sail.lucene;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;

public abstract class AbstractLuceneIndex extends AbstractSearchIndex {

/**
* keep a lit of old monitors that are still iterating but not closed (open iterators), will be all closed
* on shutdown items are removed from list by ReaderMnitor.endReading() when closing
*/
protected final Collection<AbstractReaderMonitor> oldmonitors = new LinkedList<AbstractReaderMonitor>();
protected final Collection<AbstractReaderMonitor> oldmonitors = new ArrayList<AbstractReaderMonitor>();

protected abstract AbstractReaderMonitor getCurrentMonitor();

Expand Down
Expand Up @@ -33,14 +33,22 @@ protected AbstractReaderMonitor(AbstractLuceneIndex index) {
this.index = index;
}

public int getReadingCount() {
public final int getReadingCount() {
return readingCount.get();
}

/**
*
*/
public void beginReading() {
public final synchronized void beginReading() {
if (closed.get()) {
throw new IllegalStateException("Cannot begin reading as we have been closed.");
}
// We cannot allow any more readers to be open at this stage, as any
// decrements towards zero on readingCount could trigger closure/removal
if (doClose.get()) {
throw new IllegalStateException("Cannot begin reading as we have moved into closing stages.");
}
readingCount.incrementAndGet();
}

Expand All @@ -49,16 +57,17 @@ public void beginReading() {
*
* @throws IOException
*/
public void endReading()
public final synchronized void endReading()
throws IOException
{
if (readingCount.decrementAndGet() == 0 && doClose.get()) {
// when endReading is called on CurrentMonitor and it should be closed,
// close it
close();// close Lucene index remove them self from Lucene index
if (readingCount.decrementAndGet() <= 0 && doClose.get()) {
// when endReading is called on CurrentMonitor and it should be
// closed, close it
close();
// close Lucene index remove them self from Lucene index
synchronized (index.oldmonitors) {
index.oldmonitors.remove(this); // if its not in the list, then this
// is a no-operation
// if its not in the list, then this is a no-operation
index.oldmonitors.remove(this);
}
}
}
Expand All @@ -69,7 +78,7 @@ public void endReading()
* @return <code>true</code> if the close succeeded, <code>false</code> otherwise.
* @throws IOException
*/
public boolean closeWhenPossible()
public final synchronized boolean closeWhenPossible()
throws IOException
{
doClose.set(true);
Expand All @@ -79,10 +88,10 @@ public boolean closeWhenPossible()
return closed.get();
}

public void close()
public final void close()
throws IOException
{
if (!closed.getAndSet(true)) {
if (closed.compareAndSet(false, true)) {
handleClose();
}
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Resource;
Expand Down Expand Up @@ -253,20 +254,22 @@ public class LuceneSail extends NotifyingSailWrapper {
/**
* The LuceneIndex holding the indexed literals.
*/
private SearchIndex luceneIndex;
private volatile SearchIndex luceneIndex;

protected final Properties parameters = new Properties();

private String reindexQuery = "SELECT ?s ?p ?o ?c WHERE {{?s ?p ?o} UNION {GRAPH ?c {?s ?p ?o.}}} ORDER BY ?s";
private volatile String reindexQuery = "SELECT ?s ?p ?o ?c WHERE {{?s ?p ?o} UNION {GRAPH ?c {?s ?p ?o.}}} ORDER BY ?s";

private boolean incompleteQueryFails = true;
private volatile boolean incompleteQueryFails = true;

private Set<IRI> indexedFields;

private Map<IRI, IRI> indexedFieldsMapping;

private IndexableStatementFilter filter = null;

private final AtomicBoolean closed = new AtomicBoolean(false);

public void setLuceneIndex(SearchIndex luceneIndex) {
this.luceneIndex = luceneIndex;
}
Expand All @@ -286,18 +289,22 @@ public NotifyingSailConnection getConnection()
public void shutDown()
throws SailException
{
try {
if (luceneIndex != null) {
luceneIndex.shutDown();
if(closed.compareAndSet(false, true)) {
try {
SearchIndex toShutDownLuceneIndex = luceneIndex;
luceneIndex = null;
if (toShutDownLuceneIndex != null) {
toShutDownLuceneIndex.shutDown();
}
}
catch (IOException e) {
throw new SailException(e);
}
finally {
// ensure that super is also invoked when the LuceneIndex causes an
// IOException
super.shutDown();
}
}
catch (IOException e) {
throw new SailException(e);
}
finally {
// ensure that super is also invoked when the LuceneIndex causes an
// IOException
super.shutDown();
}
}

Expand Down Expand Up @@ -492,27 +499,33 @@ public void registerStatementFilter(IndexableStatementFilter filter) {
}

protected boolean acceptStatementToIndex(Statement s) {
return (filter != null) ? filter.accept(s) : true;
IndexableStatementFilter nextFilter = filter;
return (nextFilter != null) ? nextFilter.accept(s) : true;
}

public Statement mapStatement(Statement statement) {
IRI p = statement.getPredicate();
boolean predicateChanged = false;
if (indexedFieldsMapping != null) {
IRI res = indexedFieldsMapping.get(p);
Map<IRI, IRI> nextIndexedFieldsMapping = indexedFieldsMapping;
if (nextIndexedFieldsMapping != null) {
IRI res = nextIndexedFieldsMapping.get(p);
if (res != null) {
p = res;
predicateChanged = true;
}
}
if (this.indexedFields != null && !this.indexedFields.contains(p))
Set<IRI> nextIndexedFields = indexedFields;
if (nextIndexedFields != null && !nextIndexedFields.contains(p)) {
return null;
}

if (predicateChanged)
if (predicateChanged) {
return getValueFactory().createStatement(statement.getSubject(), p, statement.getObject(),
statement.getContext());
else
}
else {
return statement;
}
}

protected Collection<SearchQueryInterpreter> getSearchQueryInterpreters() {
Expand Down
Expand Up @@ -14,6 +14,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.IRI;
Expand Down Expand Up @@ -121,7 +122,7 @@ public void statementRemoved(Statement statement) {
/**
* To remember if the iterator was already closed and only free resources once
*/
private boolean mustclose = false;
private final AtomicBoolean closed = new AtomicBoolean(false);

public LuceneSailConnection(NotifyingSailConnection wrappedConnection, SearchIndex luceneIndex,
LuceneSail sail)
Expand All @@ -148,19 +149,21 @@ public synchronized void addStatement(Resource arg0, IRI arg1, Value arg2, Resou
public void close()
throws SailException
{
// remember if you were closed before, some sloppy programmers
// may call close() twice.
if (mustclose) {
mustclose = false;
if (closed.compareAndSet(false, true)) {
try {
luceneIndex.endReading();
super.close();
}
catch (IOException e) {
logger.warn("could not close IndexReader or IndexSearcher " + e, e);
finally {
try {
luceneIndex.endReading();
}
catch (IOException e) {
logger.warn("could not close IndexReader or IndexSearcher " + e, e);
}
// remember if you were closed before, some sloppy programmers
// may call close() twice.
}
}

super.close();
}

// //////////////////////////////// Methods related to indexing
Expand Down Expand Up @@ -277,8 +280,8 @@ private void clearContexts(Resource... contexts)
// //////////////////////////////// Methods related to querying

@Override
public CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluate(TupleExpr tupleExpr,
Dataset dataset, BindingSet bindings, boolean includeInferred)
public synchronized CloseableIteration<? extends BindingSet, QueryEvaluationException> evaluate(
TupleExpr tupleExpr, Dataset dataset, BindingSet bindings, boolean includeInferred)
throws SailException
{
// Don't modify the original tuple expression
Expand Down Expand Up @@ -319,14 +322,17 @@ private void evaluateLuceneQueries(Collection<SearchQueryEvaluator> queries, Tup
// - multiple different property constraints can be put into the lucene
// query string (escape colons here)

if (closed.get()) {
throw new SailException("Sail has been closed already");
}

// mark that reading is in progress
try {
this.luceneIndex.beginReading();
}
catch (IOException e) {
throw new SailException(e);
}
this.mustclose = true;

// evaluate queries, generate binding sets, and remove queries
for (SearchQueryEvaluator query : queries) {
Expand Down

0 comments on commit 67092c5

Please sign in to comment.