Navigation Menu

Skip to content

Commit

Permalink
Internal pool refactoring to prepare for using pool action to modify …
Browse files Browse the repository at this point in the history
…pool state
  • Loading branch information
vietj committed Mar 6, 2021
1 parent 694998c commit 39c58be
Showing 1 changed file with 83 additions and 72 deletions.
155 changes: 83 additions & 72 deletions src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java
Expand Up @@ -52,77 +52,7 @@ public Slot(SimpleConnectionPool<C> pool, EventLoopContext context, int index, i

@Override
public void remove() {
pool.lock.lock();
int w = weight;
capacity = 0;
maxCapacity = 0;
connection = null;
weight = 0;
Waiter<C> waiter = pool.waiters.poll();
if (waiter != null) {
Slot<C> slot = new Slot<>(pool, waiter.context, index, waiter.weight);
pool.weight -= w;
pool.weight += waiter.weight;
pool.slots[index] = slot;
pool.lock.unlock();
slot.connect(waiter.handler);
} else if (pool.size > 1) {
Slot<C> tmp = pool.slots[pool.size - 1];
tmp.index = index;
pool.slots[index] = tmp;
pool.slots[pool.size - 1] = null;
pool.size--;
pool.weight -= w;
pool.lock.unlock();
} else {
pool.slots[0] = null;
pool.size--;
pool.weight -= w;
pool.lock.unlock();
}
}

public void connect(Handler<AsyncResult<Lease<C>>> handler) {
pool.connector.connect(context, this, ar -> {
if (ar.succeeded()) {
pool.lock.lock();
ConnectResult<C> result = ar.result();
int initialWeight = weight;
connection = result.connection();
maxCapacity = (int)result.concurrency();
weight = (int) result.weight();
capacity = maxCapacity;
pool.weight += (result.weight() - initialWeight);
if (pool.closed) {
pool.lock.unlock();
context.emit(Future.failedFuture("Closed"), handler);
} else {
int c = 1;
LeaseImpl<C>[] extra = null;
int m = Math.min(capacity - 1, pool.waiters.size());
if (m > 0) {
c += m;
extra = new LeaseImpl[m];
for (int i = 0;i < m;i++) {
extra[i] = new LeaseImpl<>(this, pool.waiters.poll().handler);
}
}
capacity -= c;
pool.lock.unlock();
new LeaseImpl<>(this, handler).emit();
if (extra != null) {
for (LeaseImpl<C> lease : extra) {
lease.emit();
}
}
}
this.result.complete(connection);
} else {
remove();
context.emit(Future.failedFuture(ar.cause()), handler);
result.fail(ar.cause());
}
});
pool.remove(this);
}
}

Expand Down Expand Up @@ -173,6 +103,87 @@ public int size() {
}
}

public void connect(Slot<C> slot, Handler<AsyncResult<Lease<C>>> handler) {
connector.connect(slot.context, slot, ar -> {
if (ar.succeeded()) {
connectSucceeded(slot, ar.result(), handler);
} else {
connectFailed(slot, ar.cause(), handler);
}
});
}

private void connectSucceeded(Slot<C> slot, ConnectResult<C> result, Handler<AsyncResult<Lease<C>>> handler) {
lock.lock();
int initialWeight = slot.weight;
slot.connection = result.connection();
slot.maxCapacity = (int)result.concurrency();
slot.weight = (int) result.weight();
slot.capacity = slot.maxCapacity;
weight += (result.weight() - initialWeight);
if (closed) {
lock.unlock();
slot.context.emit(Future.failedFuture("Closed"), handler);
} else {
int c = 1;
LeaseImpl<C>[] extra = null;
int m = Math.min(slot.capacity - 1, waiters.size());
if (m > 0) {
c += m;
extra = new LeaseImpl[m];
for (int i = 0;i < m;i++) {
extra[i] = new LeaseImpl<>(slot, waiters.poll().handler);
}
}
slot.capacity -= c;
lock.unlock();
new LeaseImpl<>(slot, handler).emit();
if (extra != null) {
for (LeaseImpl<C> lease : extra) {
lease.emit();
}
}
}
slot.result.complete(slot.connection);
}

private void connectFailed(Slot<C> slot, Throwable cause, Handler<AsyncResult<Lease<C>>> handler) {
remove(slot);
slot.context.emit(Future.failedFuture(cause), handler);
slot.result.fail(cause);
}

private void remove(Slot<C> removed) {
lock.lock();
int w = removed.weight;
removed.capacity = 0;
removed.maxCapacity = 0;
removed.connection = null;
removed.weight = 0;
Waiter<C> waiter = waiters.poll();
if (waiter != null) {
Slot<C> slot = new Slot<>(this, waiter.context, removed.index, waiter.weight);
weight -= w;
weight += waiter.weight;
slots[removed.index] = slot;
lock.unlock();
connect(slot, waiter.handler);
} else if (size > 1) {
Slot<C> tmp = slots[size - 1];
tmp.index = removed.index;
slots[removed.index] = tmp;
slots[size - 1] = null;
size--;
weight -= w;
lock.unlock();
} else {
slots[0] = null;
size--;
weight -= w;
lock.unlock();
}
}

@Override
public void evict(Predicate<C> predicate, Handler<AsyncResult<List<C>>> handler) {
lock.lock();
Expand Down Expand Up @@ -232,7 +243,7 @@ public void acquire(EventLoopContext context, int weight, Handler<AsyncResult<Le
Slot<C> slot = new Slot<>(this, context, size, weight);
slots[size++] = slot;
lock.unlock();
slot.connect(handler);
connect(slot, handler);
return;
} else {
throw new IllegalStateException();
Expand Down

0 comments on commit 39c58be

Please sign in to comment.