Skip to content

Commit

Permalink
Close released unused connections to the pool when there are no waiters
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Nov 14, 2017
1 parent 8188819 commit 08ba4a6
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
23 changes: 23 additions & 0 deletions src/main/java/io/vertx/core/http/impl/pool/Pool.java
Expand Up @@ -24,6 +24,7 @@
import io.vertx.core.spi.metrics.HttpClientMetrics; import io.vertx.core.spi.metrics.HttpClientMetrics;


import java.util.*; import java.util.*;
import java.util.stream.Stream;


/** /**
* The endpoint is a queue of waiters and it delegates to the connection pool, the pooling strategy. * The endpoint is a queue of waiters and it delegates to the connection pool, the pooling strategy.
Expand Down Expand Up @@ -201,6 +202,7 @@ public void onClose(C conn) {
private synchronized void recycle(ConnectionHolder<C> conn) { private synchronized void recycle(ConnectionHolder<C> conn) {
recycleConnection(conn); recycleConnection(conn);
checkPending(); checkPending();
checkClose();
} }


private void recycleConnection(ConnectionHolder<C> conn) { private void recycleConnection(ConnectionHolder<C> conn) {
Expand Down Expand Up @@ -263,6 +265,27 @@ private boolean deliverInternal(ConnectionHolder<C> conn, Waiter<C> waiter) {
} }


private void checkClose() { private void checkClose() {
if (waiters.isEmpty()) {
if (available.size() > 0) {
List<ConnectionHolder<C>> toClose = Collections.emptyList();
for (ConnectionHolder<C> conn : available) {
if (conn.inflight == 0) {
if (toClose.isEmpty()) {
toClose = new ArrayList<>();
}
toClose.add(conn);
}
}
if (toClose.size() > 0) {
for (ConnectionHolder<C> conn : toClose) {
available.remove(conn);
capacity -= conn.concurrency;
conn.inflight = (int) conn.concurrency;
connector.close(conn.connection);
}
}
}
}
if (all.isEmpty()) { if (all.isEmpty()) {
// No waiters and no connections - remove the ConnQueue // No waiters and no connections - remove the ConnQueue
if (metrics != null) { if (metrics != null) {
Expand Down
19 changes: 9 additions & 10 deletions src/test/java/io/vertx/test/core/net/ConnectionManagerTest.java
Expand Up @@ -112,6 +112,9 @@ public void testConnectPoolEmpty() {
assertWaitUntil(waiter::isComplete); assertWaitUntil(waiter::isComplete);
waiter.assertInitialized(conn); waiter.assertInitialized(conn);
waiter.assertSuccess(conn); waiter.assertSuccess(conn);
waiter.release();
assertEquals(0, mgr.size());
assertTrue(mgr.closed());
} }


@Test @Test
Expand Down Expand Up @@ -157,7 +160,7 @@ public void testRecycleConnection() {
FakeWaiter waiter2 = new FakeWaiter(); FakeWaiter waiter2 = new FakeWaiter();
mgr.getConnection(waiter2); mgr.getConnection(waiter2);
connector.assertRequests(TEST_ADDRESS, 0); connector.assertRequests(TEST_ADDRESS, 0);
waiter1.recycle(); waiter1.release();
assertWaitUntil(waiter2::isComplete); assertWaitUntil(waiter2::isComplete);
waiter2.assertSuccess(conn); waiter2.assertSuccess(conn);
} }
Expand Down Expand Up @@ -194,7 +197,7 @@ public void testRecycleInvalidConnection() {
FakeWaiter waiter2 = new FakeWaiter(); FakeWaiter waiter2 = new FakeWaiter();
mgr.getConnection(waiter2); mgr.getConnection(waiter2);
conn.invalidate(); conn.invalidate();
waiter1.recycle(); waiter1.release();
waitUntil(() -> connector.requests(TEST_ADDRESS) == 1); waitUntil(() -> connector.requests(TEST_ADDRESS) == 1);
assertFalse(mgr.closed()); assertFalse(mgr.closed());
FakeConnection conn2 = connector.assertRequest(TEST_ADDRESS); FakeConnection conn2 = connector.assertRequest(TEST_ADDRESS);
Expand Down Expand Up @@ -267,10 +270,7 @@ public void testInitialConcurrency() {
waiters.forEach(waiter -> { waiters.forEach(waiter -> {
waitUntil(waiter::isSuccess); waitUntil(waiter::isSuccess);
}); });
waiters.forEach(FakeWaiter::recycle); waiters.forEach(FakeWaiter::release);
FakeWaiter waiter = new FakeWaiter();
mgr.getConnection(waiter);
waitUntil(waiter::isComplete);
} }


@Test @Test
Expand All @@ -288,7 +288,7 @@ public void testInitialNoConcurrency() {
conn.concurrency(0).connect().awaitConnected(); conn.concurrency(0).connect().awaitConnected();
conn.concurrency(n - 1); conn.concurrency(n - 1);
waitUntil(() -> waiters.stream().filter(FakeWaiter::isSuccess).count() == n - 1); waitUntil(() -> waiters.stream().filter(FakeWaiter::isSuccess).count() == n - 1);
waiters.stream().filter(FakeWaiter::isSuccess).findFirst().get().recycle(); waiters.stream().filter(FakeWaiter::isSuccess).findFirst().get().release();
waiters.forEach(waiter -> { waiters.forEach(waiter -> {
waitUntil(waiter::isSuccess); waitUntil(waiter::isSuccess);
}); });
Expand Down Expand Up @@ -357,7 +357,6 @@ public boolean handleConnection(FakeConnection conn) throws Exception {
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("DONE");
}); });
actors[i].start(); actors[i].start();
} }
Expand Down Expand Up @@ -443,7 +442,7 @@ public synchronized boolean handleConnection(FakeConnection conn) throws Excepti
} }
} }


long recycle() { long release() {
FakeConnection conn = (FakeConnection) result; FakeConnection conn = (FakeConnection) result;
return conn.recycle(); return conn.recycle();
} }
Expand Down Expand Up @@ -701,7 +700,7 @@ public long connect(ConnectionListener<FakeConnection> listener,


@Override @Override
public void close(FakeConnection conn) { public void close(FakeConnection conn) {

conn.listener.onClose(conn);
} }
} }
} }

0 comments on commit 08ba4a6

Please sign in to comment.