Skip to content

Commit

Permalink
HTTPCORE-567: fixed race condition that may cause a connection leak w…
Browse files Browse the repository at this point in the history
…hen the connection lease request is cancelled from another thread.
  • Loading branch information
ok2c committed Jan 26, 2019
1 parent 890179b commit 9b5b641
Showing 1 changed file with 40 additions and 26 deletions.
66 changes: 40 additions & 26 deletions httpcore/src/main/java/org/apache/http/pool/AbstractConnPool.java
Expand Up @@ -34,6 +34,7 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -177,6 +178,10 @@ protected E createEntry(final C conn) {
return pool;
}

private static Exception operationAborted() {
return new CancellationException("Operation aborted");
}

/**
* {@inheritDoc}
* <p>
Expand All @@ -198,8 +203,8 @@ public Future<E> lease(final T route, final Object state, final FutureCallback<E

@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
if (cancelled.compareAndSet(false, true)) {
done.set(true);
if (done.compareAndSet(false, true)) {
cancelled.set(true);
lock.lock();
try {
condition.signalAll();
Expand Down Expand Up @@ -235,13 +240,16 @@ public E get() throws InterruptedException, ExecutionException {

@Override
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
synchronized (this) {
try {
for (;;) {
for (;;) {
synchronized (this) {
try {
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
if (done.get()) {
throw new ExecutionException(operationAborted());
}
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
if (validateAfterInactivity > 0) {
if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
Expand All @@ -252,20 +260,26 @@ public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedExce
}
}
}
entryRef.set(leasedEntry);
done.set(true);
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
if (done.compareAndSet(false, true)) {
entryRef.set(leasedEntry);
done.set(true);
onLease(leasedEntry);
if (callback != null) {
callback.completed(leasedEntry);
}
return leasedEntry;
} else {
release(leasedEntry, true);
throw new ExecutionException(operationAborted());
}
return leasedEntry;
}
} catch (final IOException ex) {
done.set(true);
if (callback != null) {
callback.failed(ex);
} catch (final IOException ex) {
if (done.compareAndSet(false, true)) {
if (callback != null) {
callback.failed(ex);
}
}
throw new ExecutionException(ex);
}
throw new ExecutionException(ex);
}
}
}
Expand Down Expand Up @@ -296,7 +310,7 @@ public Future<E> lease(final T route, final Object state) {
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, TimeoutException {
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {

Date deadline = null;
if (timeout > 0) {
Expand All @@ -308,6 +322,9 @@ private E getPoolEntryBlocking(
E entry;
for (;;) {
Asserts.check(!this.isShutDown, "Connection pool shut down");
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
for (;;) {
entry = pool.getFree(state);
if (entry == null) {
Expand Down Expand Up @@ -368,9 +385,6 @@ private E getPoolEntryBlocking(

boolean success = false;
try {
if (future.isCancelled()) {
throw new InterruptedException("Operation interrupted");
}
pool.queue(future);
this.pending.add(future);
if (deadline != null) {
Expand All @@ -380,7 +394,7 @@ private E getPoolEntryBlocking(
success = true;
}
if (future.isCancelled()) {
throw new InterruptedException("Operation interrupted");
throw new ExecutionException(operationAborted());
}
} finally {
// In case of 'success', we were woken up by the
Expand Down

0 comments on commit 9b5b641

Please sign in to comment.