Skip to content

Commit

Permalink
issue#293 : Ensure consistent behaviour of AbstractCloseableIteration…
Browse files Browse the repository at this point in the history
….handleClose

This includes various changes related to try-finally while also including changes related to volatile variable dereferencing to ensure that we ourselves are null-safe inside of handleClose without synchronisation.

It also specifically fixes #293 by ensuring that NPE's generated by InputStream.close do not propagate, with the appropriate null guards to ensure we ourselves cannot be generating the NPE. Ie, we give up trying to fix Apache HTTPClient weakness locally, as HTTPClient has non-null safe programming due to dereferencing instance variables after null guards rather than before the null guard.

Signed-off-by: Peter Ansell <p_ansell@yahoo.com>
  • Loading branch information
ansell committed Sep 4, 2016
1 parent f794529 commit 4e49be2
Show file tree
Hide file tree
Showing 73 changed files with 2,291 additions and 942 deletions.
Expand Up @@ -13,8 +13,10 @@
import java.lang.reflect.UndeclaredThrowableException;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.common.iteration.IterationWrapper;
import org.eclipse.rdf4j.model.Statement;
Expand All @@ -34,21 +36,19 @@ public class BackgroundGraphResult extends IterationWrapper<Statement, QueryEval
implements GraphQueryResult, Runnable, RDFHandler
{

private volatile boolean closed;
private final RDFParser parser;

private RDFParser parser;
private final Charset charset;

private Charset charset;
private final InputStream in;

private InputStream in;
private final String baseURI;

private String baseURI;
private final CountDownLatch namespacesReady = new CountDownLatch(1);

private CountDownLatch namespacesReady = new CountDownLatch(1);
private final Map<String, String> namespaces = new ConcurrentHashMap<String, String>();

private Map<String, String> namespaces = new ConcurrentHashMap<String, String>();

private QueueCursor<Statement> queue;
private final QueueCursor<Statement> queue;

public BackgroundGraphResult(RDFParser parser, InputStream in, Charset charset, String baseURI) {
this(new QueueCursor<Statement>(10), parser, in, charset, baseURI);
Expand All @@ -65,21 +65,33 @@ public BackgroundGraphResult(QueueCursor<Statement> queue, RDFParser parser, Inp
this.baseURI = baseURI;
}

@Override
public boolean hasNext()
throws QueryEvaluationException
{
if (isClosed()) {
return false;
}
return queue.hasNext();
}

@Override
public Statement next()
throws QueryEvaluationException
{
if (isClosed()) {
throw new NoSuchElementException("The iteration has been closed.");
}
return queue.next();
}

@Override
public void remove()
throws QueryEvaluationException
{
if (isClosed()) {
throw new IllegalStateException("The iteration has been closed.");
}
queue.remove();
}

Expand All @@ -88,22 +100,32 @@ protected void handleClose()
throws QueryEvaluationException
{
try {
super.handleClose();
}
finally {
closed = true;
try {
in.close();
}
catch (IOException e) {
throw new QueryEvaluationException(e);
super.handleClose();
}
finally {
queue.close();
try {
// After checking that we ourselves cannot possibly be generating an NPE ourselves,
// attempt to close the input stream we were given
InputStream toClose = in;
if (toClose != null) {
toClose.close();
}
}
catch (NullPointerException e) {
// Swallow NullPointerException that Apache HTTPClient is hiding behind a NotThreadSafe annotation
}
}
}
catch (IOException e) {
throw new QueryEvaluationException(e);
}
finally {
queue.close();
}
}

@Override
public void run() {
try {
parser.setRDFHandler(this);
Expand All @@ -129,12 +151,14 @@ public void run() {
}
}

@Override
public void startRDF()
throws RDFHandlerException
{
// no-op
}

@Override
public Map<String, String> getNamespaces() {
try {
namespacesReady.await();
Expand All @@ -145,18 +169,21 @@ public Map<String, String> getNamespaces() {
}
}

@Override
public void handleComment(String comment)
throws RDFHandlerException
{
// ignore
}

@Override
public void handleNamespace(String prefix, String uri)
throws RDFHandlerException
{
namespaces.put(prefix, uri);
}

@Override
public void handleStatement(Statement st)
throws RDFHandlerException
{
Expand All @@ -167,10 +194,12 @@ public void handleStatement(Statement st)
catch (InterruptedException e) {
throw new RDFHandlerException(e);
}
if (closed)
if (isClosed()) {
throw new RDFHandlerException("Result closed");
}
}

@Override
public void endRDF()
throws RDFHandlerException
{
Expand Down
Expand Up @@ -10,9 +10,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
Expand All @@ -32,17 +34,15 @@ public class BackgroundTupleResult extends IteratingTupleQueryResult
implements Runnable, TupleQueryResultHandler
{

private volatile boolean closed;
private final TupleQueryResultParser parser;

private TupleQueryResultParser parser;
private final InputStream in;

private InputStream in;
private final QueueCursor<BindingSet> queue;

private QueueCursor<BindingSet> queue;
private final List<String> bindingNames = new ArrayList<>();

private List<String> bindingNames;

private CountDownLatch bindingNamesReady = new CountDownLatch(1);
private final CountDownLatch bindingNamesReady = new CountDownLatch(1);

public BackgroundTupleResult(TupleQueryResultParser parser, InputStream in) {
this(new QueueCursor<BindingSet>(10), parser, in);
Expand All @@ -63,16 +63,28 @@ protected void handleClose()
{
try {
try {
closed = true;
super.handleClose();
}
finally {
in.close();
try {
// After checking that we ourselves cannot possibly be generating an NPE ourselves,
// attempt to close the input stream we were given
InputStream toClose = in;
if (toClose != null) {
toClose.close();
}
}
catch (NullPointerException e) {
// Swallow NullPointerException that Apache HTTPClient is hiding behind a NotThreadSafe annotation
}
}
}
catch (IOException e) {
throw new QueryEvaluationException(e);
}
finally {
queue.close();
}
}

@Override
Expand Down Expand Up @@ -115,7 +127,7 @@ public void run() {
public void startQueryResult(List<String> bindingNames)
throws TupleQueryResultHandlerException
{
this.bindingNames = bindingNames;
this.bindingNames.addAll(bindingNames);
bindingNamesReady.countDown();
}

Expand All @@ -129,8 +141,9 @@ public void handleSolution(BindingSet bindingSet)
catch (InterruptedException e) {
throw new TupleQueryResultHandlerException(e);
}
if (closed)
if (isClosed()) {
throw new TupleQueryResultHandlerException("Result closed");
}
}

@Override
Expand Down
Expand Up @@ -14,6 +14,7 @@
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.rdf4j.RDF4JException;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
Expand All @@ -27,13 +28,13 @@
*/
public class QueueCursor<E> extends LookAheadIteration<E, QueryEvaluationException> {

private volatile boolean done;
private final AtomicBoolean done = new AtomicBoolean(false);

private BlockingQueue<E> queue;
private final BlockingQueue<E> queue;

private E afterLast = createAfterLast();
private final E afterLast = createAfterLast();

private volatile Queue<Throwable> exceptions = new LinkedList<Throwable>();
private final Queue<Throwable> exceptions = new LinkedList<Throwable>();

/**
* Creates an <tt>QueueCursor</tt> with the given (fixed) capacity and default access policy.
Expand All @@ -55,6 +56,7 @@ public QueueCursor(int capacity) {
* in FIFO order; if <tt>false</tt> the access order is unspecified.
*/
public QueueCursor(int capacity, boolean fair) {
super();
this.queue = new ArrayBlockingQueue<E>(capacity, fair);
}

Expand All @@ -74,7 +76,7 @@ public void toss(Exception exception) {
public void put(E item)
throws InterruptedException
{
if (!done) {
if (!done.get()) {
queue.put(item);
}
}
Expand All @@ -83,7 +85,8 @@ public void put(E item)
* Indicates the method {@link #put(Object)} will not be called in the queue anymore.
*/
public void done() {
done = true;
// Lazily set here, and then come back in handleClose and use set if necessary
done.lazySet(true);
try {
queue.add(afterLast);
}
Expand All @@ -102,12 +105,12 @@ public E getNextElement()
try {
checkException();
E take;
if (done) {
if (done.get()) {
take = queue.poll();
}
else {
take = queue.take();
if (done) {
if (done.get()) {
done(); // in case the queue was full before
}
}
Expand All @@ -128,12 +131,17 @@ public E getNextElement()
public void handleClose()
throws QueryEvaluationException
{
done = true;
do {
queue.clear(); // ensure extra room is available
try {
super.handleClose();
}
finally {
done.set(true);
do {
queue.clear(); // ensure extra room is available
}
while (!queue.offer(afterLast));
checkException();
}
while (!queue.offer(afterLast));
checkException();
}

public void checkException()
Expand Down
Expand Up @@ -21,7 +21,9 @@
public interface GraphQueryResult extends QueryResult<Statement> {

/**
* Retrieves relevant namespaces from the query result.
* Retrieves relevant namespaces from the query result. <br/>
* The contents of the Map may be modified after it is returned, as the initial return may be performed
* when the first RDF Statement is encountered.
*
* @return a Map<String, String> object containing (prefix, namespace) pairs.
* @throws QueryEvaluationException
Expand Down

0 comments on commit 4e49be2

Please sign in to comment.