Skip to content

Commit

Permalink
Check invariant at the end of stress tests and fix issue where the in…
Browse files Browse the repository at this point in the history
…correct capacity was redeemed when a connection is closed by the pool
  • Loading branch information
vietj committed Dec 5, 2017
1 parent d7eee41 commit a71f8d5
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
26 changes: 21 additions & 5 deletions src/main/java/io/vertx/core/http/impl/pool/Pool.java
Expand Up @@ -120,6 +120,22 @@ public Pool(ConnectionProvider<C> connector,
this.connectionRemoved = connectionRemoved; this.connectionRemoved = connectionRemoved;
} }


public synchronized int waitersInQueue() {
return waitersQueue.size();
}

public synchronized int waitersCount() {
return waitersCount;
}

public synchronized long weight() {
return weight;
}

public synchronized long capacity() {
return capacity;
}

/** /**
* Get a connection for a waiter asynchronously. * Get a connection for a waiter asynchronously.
* *
Expand Down Expand Up @@ -318,22 +334,22 @@ private boolean deliverToWaiter(Holder<C> conn, Waiter<C> waiter) {
// These methods assume to be called under synchronization // These methods assume to be called under synchronization


private void recycleConnection(Holder<C> conn, int c, boolean closeable) { private void recycleConnection(Holder<C> conn, int c, boolean closeable) {
long nc = conn.capacity + c; long newCapacity = conn.capacity + c;
if (nc > conn.concurrency) { if (newCapacity > conn.concurrency) {
log.debug("Attempt to recycle a connection more than permitted"); log.debug("Attempt to recycle a connection more than permitted");
return; return;
} }
if (closeable && nc == conn.concurrency && waitersQueue.isEmpty()) { if (closeable && newCapacity == conn.concurrency && waitersQueue.isEmpty()) {
available.remove(conn); available.remove(conn);
capacity -= conn.concurrency; capacity -= conn.capacity;
conn.capacity = 0; conn.capacity = 0;
connector.close(conn.connection); connector.close(conn.connection);
} else { } else {
capacity += c; capacity += c;
if (conn.capacity == 0) { if (conn.capacity == 0) {
available.add(conn); available.add(conn);
} }
conn.capacity = nc; conn.capacity = newCapacity;
} }
} }


Expand Down
16 changes: 11 additions & 5 deletions src/test/java/io/vertx/test/core/net/ConnectionPoolTest.java
Expand Up @@ -41,7 +41,6 @@ class FakeConnectionManager {
private final int queueMaxSize; private final int queueMaxSize;
private final int maxPoolSize; private final int maxPoolSize;
private Pool<FakeConnection> pool; private Pool<FakeConnection> pool;
private int size;
private Set<FakeConnection> active = new HashSet<>(); private Set<FakeConnection> active = new HashSet<>();
private boolean closed = true; private boolean closed = true;
private int seq; private int seq;
Expand All @@ -65,7 +64,7 @@ synchronized boolean contains(FakeConnection conn) {
} }


synchronized int size() { synchronized int size() {
return size; return active.size();
} }


synchronized Pool<FakeConnection> pool() { synchronized Pool<FakeConnection> pool() {
Expand All @@ -88,11 +87,9 @@ void getConnection(Waiter<FakeConnection> waiter) {
}, (channel, conn) -> { }, (channel, conn) -> {
synchronized (FakeConnectionManager.this) { synchronized (FakeConnectionManager.this) {
active.add(conn); active.add(conn);
size++;
} }
}, (channel, conn) -> { }, (channel, conn) -> {
synchronized (FakeConnectionManager.this) { synchronized (FakeConnectionManager.this) {
size--;
active.remove(conn); active.remove(conn);
} }
} }
Expand Down Expand Up @@ -421,12 +418,12 @@ public boolean handleConnection(ContextInternal ctx, FakeConnection conn) throws
throw new Exception(); throw new Exception();
} */ else { } */ else {
vertx.setTimer(10, id -> { vertx.setTimer(10, id -> {
latch.countDown();
if (action < 15) { if (action < 15) {
conn.close(); conn.close();
} else { } else {
conn.recycle(); conn.recycle();
} }
latch.countDown();
}); });
return true; return true;
} }
Expand All @@ -449,6 +446,15 @@ public boolean handleConnection(ContextInternal ctx, FakeConnection conn) throws
e.printStackTrace(); e.printStackTrace();
} }
} }

assertWaitUntil(() -> mgr.closed());

// Check state at the end
assertEquals(0, mgr.size());
assertEquals(0, mgr.pool.waitersCount());
assertEquals(0, mgr.pool.waitersInQueue());
assertEquals(0, mgr.pool.weight());
assertEquals(0, mgr.pool.capacity());
} }


class FakeWaiter extends Waiter<FakeConnection> { class FakeWaiter extends Waiter<FakeConnection> {
Expand Down

0 comments on commit a71f8d5

Please sign in to comment.