Skip to content

Commit

Permalink
Rework connection close to optionally close when the connection is re…
Browse files Browse the repository at this point in the history
…cycled
  • Loading branch information
vietj committed Nov 15, 2017
1 parent 55d1d9f commit b9869e6
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 48 deletions.
Expand Up @@ -398,7 +398,7 @@ void checkLifecycle() {
} else if (close) {
conn.close();
} else {
conn.listener.onRecycle();
conn.listener.onRecycle(1, true);
}
}
}
Expand Down
Expand Up @@ -209,9 +209,13 @@ void handleReset(long errorCode) {

@Override
void handleClose() {
// commented to be used later when we properly define the HTTP/2 connection expiration from the pool
// boolean disposable = conn.streams.isEmpty();
if (request instanceof HttpClientRequestImpl) {
conn.listener.onRecycle();
}
conn.listener.onRecycle(1, false);
} /* else {
conn.listener.onRecycle(0, dispable);
} */
if (!responseEnded) {
responseEnded = true;
if (conn.metrics != null) {
Expand Down
Expand Up @@ -58,9 +58,12 @@ void onConnectSuccess(C conn,
void onConcurrencyChange(long concurrency);

/**
* Signals the connection is recycled, this should not be called more than the connection has been borrowed.
* Signals the connection can recycled, it must not redeem more than it borrowed.
*
* @param capacity the to redeem
* @param disposable wether the connection can be disposed
*/
void onRecycle();
void onRecycle(int capacity, boolean disposable);

/**
* Signals the connection is closed.
Expand Down
57 changes: 23 additions & 34 deletions src/main/java/io/vertx/core/http/impl/pool/Pool.java
Expand Up @@ -25,7 +25,6 @@

import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/**
* The pool is a queue of waiters and a list of connections.
Expand Down Expand Up @@ -161,7 +160,7 @@ private void checkPending() {
boolean handled = deliverToWaiter(conn, waiter);
if (!handled) {
synchronized (Pool.this) {
recycleConnection(conn);
recycleConnection(conn, 1,false);
checkPending();
}
}
Expand Down Expand Up @@ -196,7 +195,7 @@ public void onConnectSuccess(C conn, long concurrency, Channel channel, ContextI
synchronized (Pool.this) {
if (!consumed) {
synchronized (this) {
recycleConnection(holder);
recycleConnection(holder, 1,false);
}
}
checkPending();
Expand Down Expand Up @@ -235,8 +234,11 @@ public void onConcurrencyChange(long concurrency) {
}
}
@Override
public void onRecycle() {
Pool.this.recycle(holder);
public void onRecycle(int capacity, boolean disposable) {
if (capacity < 0) {
throw new IllegalArgumentException("Illegal capacity");
}
Pool.this.recycle(holder, capacity, disposable);
}
@Override
public void onClose() {
Expand All @@ -247,8 +249,8 @@ public void onClose() {
return connector.connect(listener, metric, waiter.context, ssl, peerHost, host, port);
}

private synchronized void recycle(Holder<C> conn) {
recycleConnection(conn);
private synchronized void recycle(Holder<C> conn, int capacity, boolean closeable) {
recycleConnection(conn, capacity, closeable);
checkPending();
checkClose();
}
Expand All @@ -274,16 +276,24 @@ private boolean deliverToWaiter(Holder<C> conn, Waiter<C> waiter) {

// These methods assume to be called under synchronization

private void recycleConnection(Holder<C> conn) {
if (conn.capacity == conn.concurrency) {
private void recycleConnection(Holder<C> conn, int c, boolean closeable) {
long nc = conn.capacity + c;
if (nc > conn.concurrency) {
log.debug("Attempt to recycle a connection more than permitted");
return;
}
capacity++;
if (conn.capacity == 0) {
available.add(conn);
if (closeable && nc == conn.concurrency && waiters.isEmpty()) {
available.remove(conn);
capacity -= conn.concurrency;
conn.capacity = 0;
connector.close(conn.connection);
} else {
capacity += c;
if (conn.capacity == 0) {
available.add(conn);
}
conn.capacity = nc;
}
conn.capacity++;
}

private void closeConnection(Holder<C> holder) {
Expand Down Expand Up @@ -319,27 +329,6 @@ private boolean initConnection(Waiter<C> waiter, Holder<C> holder, ContextImpl c
}

private void checkClose() {
if (waiters.isEmpty()) {
if (available.size() > 0) {
List<Holder<C>> toClose = Collections.emptyList();
for (Holder<C> conn : available) {
if (conn.capacity == conn.concurrency) {
if (toClose.isEmpty()) {
toClose = new ArrayList<>();
}
toClose.add(conn);
}
}
if (toClose.size() > 0) {
for (Holder<C> conn : toClose) {
available.remove(conn);
capacity -= conn.concurrency;
conn.capacity = 0;
connector.close(conn.connection);
}
}
}
}
if (all.isEmpty()) {
// No waiters and no connections - remove the ConnQueue
if (metrics != null) {
Expand Down
8 changes: 2 additions & 6 deletions src/test/java/io/vertx/test/core/Http2ClientTest.java
Expand Up @@ -201,7 +201,6 @@ public void testInvalidSettings() throws Exception {

@Test
public void testServerSettings() throws Exception {
waitFor(2);
io.vertx.core.http.Http2Settings expectedSettings = TestUtils.randomHttp2Settings();
expectedSettings.setHeaderTableSize((int)io.vertx.core.http.Http2Settings.DEFAULT_HEADER_TABLE_SIZE);
server.close();
Expand All @@ -213,13 +212,10 @@ public void testServerSettings() throws Exception {
});
});
server.requestHandler(req -> {
req.response().end();
});
startServer();
AtomicInteger count = new AtomicInteger();
client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
complete();
}).connectionHandler(conn -> {
client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> fail()).connectionHandler(conn -> {
conn.remoteSettingsHandler(settings -> {
switch (count.getAndIncrement()) {
case 0:
Expand All @@ -229,7 +225,7 @@ public void testServerSettings() throws Exception {
assertEquals(expectedSettings.getMaxConcurrentStreams(), settings.getMaxConcurrentStreams());
assertEquals(expectedSettings.getHeaderTableSize(), settings.getHeaderTableSize());
assertEquals(expectedSettings.get('\u0007'), settings.get(7));
complete();
testComplete();
break;
}
});
Expand Down
32 changes: 29 additions & 3 deletions src/test/java/io/vertx/test/core/net/ConnectionManagerTest.java
Expand Up @@ -349,6 +349,24 @@ public void testInitialNoConcurrency() {
});
}

@Test
public void testRecycleWithoutDispose() {
FakeConnectionProvider connector = new FakeConnectionProvider();
FakeConnectionManager mgr = new FakeConnectionManager(-1, 1, connector);
FakeWaiter waiter1 = new FakeWaiter();
mgr.getConnection(waiter1);
FakeConnection conn = connector.assertRequest(TEST_ADDRESS);
conn.connect();
waitUntil(waiter1::isSuccess);
conn.recycle(false);
FakeWaiter waiter2 = new FakeWaiter();
mgr.getConnection(waiter2);
waitUntil(waiter1::isSuccess);
waiter2.assertSuccess(conn);
conn.recycle(true);
assertEquals(0, mgr.size());
}

@Test
public void testStress() {
int numActors = 16;
Expand Down Expand Up @@ -658,10 +676,18 @@ synchronized void close() {
listener.onClose();
}

synchronized long recycle(boolean dispose) {
return recycle(1, dispose);
}

synchronized long recycle() {
long i = inflight--;
listener.onRecycle();
return i;
return recycle(true);
}

synchronized long recycle(int capacity, boolean dispose) {
inflight -= capacity;
listener.onRecycle(capacity, dispose);
return inflight;
}

synchronized FakeConnection concurrency(long value) {
Expand Down

0 comments on commit b9869e6

Please sign in to comment.