Skip to content

Commit

Permalink
Encapsulate pool synchronization in a class
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 6, 2021
1 parent bcb32b7 commit 2353cb2
Show file tree
Hide file tree
Showing 7 changed files with 361 additions and 47 deletions.
28 changes: 28 additions & 0 deletions src/main/java/io/vertx/core/net/impl/pool/LockSynchronization.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.vertx.core.net.impl.pool;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockSynchronization<S> implements Synchronization<S> {

private final Lock lock = new ReentrantLock();
private final S state;

public LockSynchronization(S state) {
this.state = state;
}

@Override
public void execute(Action<S> action) {
lock.lock();
Runnable post = null;
try {
post = action.execute(state);
} finally {
lock.unlock();
if (post != null) {
post.run();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.vertx.core.net.impl.pool;

import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

public class NonBlockingSynchronization1<S> implements Synchronization<S> {

private final ConcurrentLinkedDeque<Action<S>> q = new ConcurrentLinkedDeque<>();
private final AtomicInteger s = new AtomicInteger();
private final S state;

public NonBlockingSynchronization1(S state) {
this.state = state;
}

@Override
public void execute(Action<S> action) {
q.add(action);
if (s.incrementAndGet() == 1) {
while (true) {
int cnt = 0;
Action<S> a;
while ((a = q.poll()) != null) {
cnt++;
Runnable post = a.execute(state);
if (post != null) {
post.run();
}
}
if (s.addAndGet(-cnt) == 0) {
break;
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.vertx.core.net.impl.pool;

import io.netty.util.internal.PlatformDependent;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

public class NonBlockingSynchronization2<S> implements Synchronization<S> {

private final Queue<Action<S>> q = PlatformDependent.newMpscQueue(Integer.MAX_VALUE);
private final AtomicInteger s = new AtomicInteger();
private final S state;

public NonBlockingSynchronization2(S state) {
this.state = state;
}

@Override
public void execute(Action<S> action) {
q.add(action);
while (true) {
int v = s.get();
if (v == 0) {
if (s.compareAndSet(0, 1)) {
Action<S> a;
while ((a = q.poll()) != null) {
Runnable post = a.execute(state);
if (post != null) {
post.run();
}
}
s.set(0);
}
if (q.size() == 0) {
break;
}
} else {
break;
}
}
}
}
79 changes: 32 additions & 47 deletions src/main/java/io/vertx/core/net/impl/pool/SimpleConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

public class SimpleConnectionPool<C> implements ConnectionPool<C> {
Expand Down Expand Up @@ -78,8 +76,7 @@ static class Waiter<C> {
private final int maxWeight;
private int weight;
private boolean closed;

private final Lock lock = new ReentrantLock();
private final Synchronization<SimpleConnectionPool<C>> sync;

SimpleConnectionPool(Connector<C> connector, int maxSize, int maxWeight) {
this(connector, maxSize, maxWeight, -1);
Expand All @@ -92,32 +89,20 @@ static class Waiter<C> {
this.maxWaiters = maxWaiters;
this.weight = 0;
this.maxWeight = maxWeight;
this.sync = new NonBlockingSynchronization2<>(this);
}

private static abstract class Action<C> {
abstract Runnable execute(SimpleConnectionPool<C> pool);
}

private void execute(Action<C> action) {
lock.lock();
Runnable post = null;
try {
post = action.execute(this);
} finally {
lock.unlock();
if (post != null) {
post.run();
}
}
private void execute(Synchronization.Action<SimpleConnectionPool<C>> action) {
sync.execute(action);
}

public int size() {
lock.lock();
try {
// lock.lock();
// try {
return size;
} finally {
lock.unlock();
}
// } finally {
// lock.unlock();
// }
}

public void connect(Slot<C> slot, Handler<AsyncResult<Lease<C>>> handler) {
Expand All @@ -130,7 +115,7 @@ public void connect(Slot<C> slot, Handler<AsyncResult<Lease<C>>> handler) {
});
}

private static class ConnectSuccess<C> extends Action<C> {
private static class ConnectSuccess<C> implements Synchronization.Action<SimpleConnectionPool<C>> {

private final Slot<C> slot;
private final ConnectResult<C> result;
Expand All @@ -143,7 +128,7 @@ private ConnectSuccess(Slot<C> slot, ConnectResult<C> result, Handler<AsyncResul
}

@Override
Runnable execute(SimpleConnectionPool<C> pool) {
public Runnable execute(SimpleConnectionPool<C> pool) {
int initialWeight = slot.weight;
slot.connection = result.connection();
slot.maxCapacity = (int)result.concurrency();
Expand Down Expand Up @@ -194,7 +179,7 @@ public ConnectFailed(Slot<C> removed, Throwable cause, Handler<AsyncResult<Lease
}

@Override
Runnable execute(SimpleConnectionPool<C> pool) {
public Runnable execute(SimpleConnectionPool<C> pool) {
Runnable res = super.execute(pool);
return () -> {
if (res != null) {
Expand All @@ -206,7 +191,7 @@ Runnable execute(SimpleConnectionPool<C> pool) {
}
}

private static class Remove<C> extends Action<C> {
private static class Remove<C> implements Synchronization.Action<SimpleConnectionPool<C>> {

protected final Slot<C> removed;

Expand All @@ -215,7 +200,7 @@ private Remove(Slot<C> removed) {
}

@Override
Runnable execute(SimpleConnectionPool<C> pool) {
public Runnable execute(SimpleConnectionPool<C> pool) {
int w = removed.weight;
removed.capacity = 0;
removed.maxCapacity = 0;
Expand Down Expand Up @@ -249,7 +234,7 @@ private void remove(Slot<C> removed) {
execute(new Remove<>(removed));
}

private static class Evict<C> extends Action<C> {
private static class Evict<C> implements Synchronization.Action<SimpleConnectionPool<C>> {

private final Predicate<C> predicate;
private final Handler<AsyncResult<List<C>>> handler;
Expand All @@ -260,7 +245,7 @@ public Evict(Predicate<C> predicate, Handler<AsyncResult<List<C>>> handler) {
}

@Override
Runnable execute(SimpleConnectionPool<C> pool) {
public Runnable execute(SimpleConnectionPool<C> pool) {
List<C> lst = new ArrayList<>();
for (int i = pool.size - 1;i >= 0;i--) {
Slot<C> slot = pool.slots[i];
Expand Down Expand Up @@ -289,7 +274,7 @@ public void evict(Predicate<C> predicate, Handler<AsyncResult<List<C>>> handler)
execute(new Evict<>(predicate, handler));
}

private static class Acquire<C> extends Action<C> {
private static class Acquire<C> implements Synchronization.Action<SimpleConnectionPool<C>> {

private final EventLoopContext context;
private final int weight;
Expand All @@ -302,7 +287,7 @@ public Acquire(EventLoopContext context, int weight, Handler<AsyncResult<Lease<C
}

@Override
Runnable execute(SimpleConnectionPool<C> pool) {
public Runnable execute(SimpleConnectionPool<C> pool) {
if (pool.closed) {
return () -> context.emit(Future.failedFuture("Closed"), handler);
}
Expand Down Expand Up @@ -378,7 +363,7 @@ void emit() {
}
}

private static class Recycle<C> extends Action<C> {
private static class Recycle<C> implements Synchronization.Action<SimpleConnectionPool<C>> {

private final Slot<C> slot;

Expand All @@ -387,7 +372,7 @@ public Recycle(Slot<C> slot) {
}

@Override
Runnable execute(SimpleConnectionPool<C> pool) {
public Runnable execute(SimpleConnectionPool<C> pool) {
if (slot.connection != null) {
if (pool.waiters.size() > 0) {
Waiter<C> waiter = pool.waiters.poll();
Expand All @@ -411,24 +396,24 @@ private void recycle(LeaseImpl<C> lease) {
}

public int waiters() {
lock.lock();
try {
// lock.lock();
// try {
return waiters.size();
} finally {
lock.unlock();
}
// } finally {
// lock.unlock();
// }
}

public int weight() {
lock.lock();
try {
// lock.lock();
// try {
return weight;
} finally {
lock.unlock();
}
// } finally {
// lock.unlock();
// }
}

private static class Close<C> extends Action<C> {
private static class Close<C> implements Synchronization.Action<SimpleConnectionPool<C>> {

private final Handler<AsyncResult<List<Future<C>>>> handler;

Expand All @@ -437,7 +422,7 @@ private Close(Handler<AsyncResult<List<Future<C>>>> handler) {
}

@Override
Runnable execute(SimpleConnectionPool<C> pool) {
public Runnable execute(SimpleConnectionPool<C> pool) {
List<Future<C>> list;
List<Waiter<C>> b;
if (pool.closed) {
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/vertx/core/net/impl/pool/Synchronization.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.vertx.core.net.impl.pool;

public interface Synchronization<S> {

interface Action<S> {
Runnable execute(S state);
}

void execute(Action<S> action);

}
Loading

0 comments on commit 2353cb2

Please sign in to comment.