diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index 0dd378dc01..ffcbf51bdc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.Supplier;
/**
* A reference-counted object can be retained for later use
@@ -39,12 +40,19 @@
* it must be released the same number of times.
*
* - If the object has been retained and then completely released (i.e. the reference count becomes 0),
- * it must not be retained/released/accessed anymore since it may have been allocated for other use.
+ * calling {@link #get()} will lead to an {@link IllegalStateException}.
+ * Depending on how the {@link ReferenceCountedObject} is built,
+ * calling {@link #retain()} may or may not be allowed;
+ * @see Builder#setValue(Object)
+ * @see Builder#setConstructor(Supplier)
*
* @param The object type.
*/
public interface ReferenceCountedObject {
- /** @return the object. */
+ /**
+ * @return the object.
+ * @throws IllegalStateException when the object has not been retained.
+ */
T get();
/**
@@ -74,7 +82,7 @@ default UncheckedAutoCloseableSupplier retainAndReleaseOnClose() {
@Override
public T get() {
if (closed.get()) {
- throw new IllegalStateException("Already closed");
+ throw new IllegalStateException("Failed to get: already closed");
}
return retained;
}
@@ -96,36 +104,148 @@ public void close() {
*/
boolean release();
- /** The same as wrap(value, EMPTY, EMPTY), where EMPTY is an empty method. */
+ /**
+ * The same as newBuilder().setValue(value).build();
+ *
+ * @deprecated use {@link Builder}
+ */
+ @Deprecated
static ReferenceCountedObject wrap(V value) {
- return wrap(value, () -> {}, ignored -> {});
+ return ReferenceCountedObject.newBuilder().setValue(value).build();
}
/**
- * Wrap the given value as a {@link ReferenceCountedObject}.
+ * The same as newBuilder()
+ * .setValue(value)
+ * .setRetainMethod(retainMethod)
+ * .setBooleanReleaseMethod(releaseMethod)
+ * .build();
*
- * @param value the value being wrapped.
- * @param retainMethod a method to run when {@link #retain()} is invoked.
- * @param releaseMethod a method to run when {@link #release()} is invoked,
- * where the method takes a boolean which is the same as the one returned by {@link #release()}.
- * @param The value type.
- * @return the wrapped reference-counted object.
+ * @deprecated use {@link Builder}
*/
+ @Deprecated
static ReferenceCountedObject wrap(V value, Runnable retainMethod, Consumer releaseMethod) {
- Objects.requireNonNull(value, "value == null");
- Objects.requireNonNull(retainMethod, "retainMethod == null");
- Objects.requireNonNull(releaseMethod, "releaseMethod == null");
+ return ReferenceCountedObject.newBuilder()
+ .setValue(value)
+ .setRetainMethod(retainMethod)
+ .setBooleanReleaseMethod(releaseMethod)
+ .build();
+ }
+
+ static Builder newBuilder() {
+ return new Builder<>();
+ }
+
+ /**
+ * To build {@link ReferenceCountedObject},
+ * where it may use either a fixed value or a constructor (but not both).
+ * @see Builder#setValue(Object)
+ * @see Builder#setConstructor(Supplier)
+ *
+ * @param The type of the {@link ReferenceCountedObject} being built.
+ */
+ class Builder {
+ private V value = null;
+ private Supplier constructor = null;
+ private Runnable retainMethod = () -> {};
+ private Consumer releaseMethod = v -> {};
+
+ /**
+ * Set a fixed value for the {@link ReferenceCountedObject} being built.
+ * Once it has been completely released, calling {@link #retain()} is not allowed.
+ *
+ * @param value a fixed value.
+ */
+ public Builder setValue(V value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Set a constructor for the {@link ReferenceCountedObject} being built.
+ * The value is constructed at the first {@link #retain()} call,
+ * After it has been completely released by {@link #release()},
+ * a new value will be constructed when {@link #retain()} is called again.
+ *
+ * @param constructor to construct the object.
+ */
+ public Builder setConstructor(Supplier constructor) {
+ this.constructor = constructor;
+ return this;
+ }
+
+ /**
+ * @param retainMethod a method to run when {@link #retain()} is invoked.
+ */
+ public Builder setRetainMethod(Runnable retainMethod) {
+ this.retainMethod = retainMethod != null ? retainMethod : () -> {};
+ return this;
+ }
+
+ /**
+ * @param releaseMethod a method to run when {@link #release()} is invoked,
+ * where the method has a parameter,
+ * where the actual parameter is the value when it is completely released;
+ * otherwise, the actual parameter is null.
+ * The method may clean up the value when it is completely released.
+ */
+ public Builder setReleaseMethod(Consumer releaseMethod) {
+ this.releaseMethod = releaseMethod != null ? releaseMethod : ignored -> {};
+ return this;
+ }
- return new ReferenceCountedObject() {
+ /**
+ * @param booleanReleaseMethod a method to run when {@link #release()} is invoked,
+ * where the method takes the boolean returned from {@link #release()}.
+ */
+ public Builder setBooleanReleaseMethod(Consumer booleanReleaseMethod) {
+ this.releaseMethod = booleanReleaseMethod != null ? v -> booleanReleaseMethod.accept(v != null) : ignored -> {};
+ return this;
+ }
+
+ /**
+ * @param releaseMethod a method to run when {@link #release()} is invoked,
+ */
+ public Builder setReleaseMethod(Runnable releaseMethod) {
+ this.releaseMethod = releaseMethod != null ? ignored -> releaseMethod.run() : ignored -> {};
+ return this;
+ }
+
+ public ReferenceCountedObject build() {
+ if (value == null) {
+ Objects.requireNonNull(constructor, "Both value == null and constructor == null");
+ return new ConstructorWrapper<>(constructor, retainMethod, releaseMethod);
+ } else {
+ Preconditions.assertNull(constructor, "Both value != null and constructor != null");
+ return new ValueWrapper<>(value, retainMethod, releaseMethod);
+ }
+ }
+
+ /**
+ * Wrap a fixed value as a {@link ReferenceCountedObject}.
+ * When it is completely released, it cannot be retained again.
+ *
+ * @param The value type.
+ */
+ private static final class ValueWrapper implements ReferenceCountedObject {
+ private final V value;
+ private final Runnable retainMethod;
+ private final Consumer releaseMethod;
private final AtomicInteger count = new AtomicInteger();
+ private ValueWrapper(V value, Runnable retainMethod, Consumer releaseMethod) {
+ this.value = Objects.requireNonNull(value, "value == null");
+ this.retainMethod = Objects.requireNonNull(retainMethod, "retainMethod == null");
+ this.releaseMethod = Objects.requireNonNull(releaseMethod, "releaseMethod == null");
+ }
+
@Override
public V get() {
final int previous = count.get();
if (previous < 0) {
- throw new IllegalStateException("Failed to get: object has already been completely released.");
+ throw new IllegalStateException("Failed to get: already completely released.");
} else if (previous == 0) {
- throw new IllegalStateException("Failed to get: object has not yet been retained.");
+ throw new IllegalStateException("Failed to get: not yet retained.");
}
return value;
}
@@ -135,7 +255,7 @@ public V retain() {
// n < 0: exception
// n >= 0: n++
if (count.getAndUpdate(n -> n < 0? n : n + 1) < 0) {
- throw new IllegalStateException("Failed to retain: object has already been completely released.");
+ throw new IllegalStateException("Failed to retain: already completely released.");
}
retainMethod.run();
@@ -149,19 +269,78 @@ public boolean release() {
// n == 1: n = -1
final int previous = count.getAndUpdate(n -> n <= 1? -1: n - 1);
if (previous < 0) {
- throw new IllegalStateException("Failed to release: object has already been completely released.");
+ throw new IllegalStateException("Failed to release: already completely released.");
} else if (previous == 0) {
- throw new IllegalStateException("Failed to release: object has not yet been retained.");
+ throw new IllegalStateException("Failed to release: not yet retained.");
}
- final boolean completedReleased = previous == 1;
- releaseMethod.accept(completedReleased);
- return completedReleased;
+ final boolean completelyReleased = previous == 1;
+ releaseMethod.accept(completelyReleased ? value : null);
+ return completelyReleased;
}
- };
+ }
+
+ /**
+ * Wrap a constructor as a {@link ReferenceCountedObject}.
+ *
+ * @see Builder#setConstructor(Supplier)
+ */
+ private static final class ConstructorWrapper implements ReferenceCountedObject {
+ private final Supplier constructor;
+ private final Runnable retainMethod;
+ private final Consumer releaseMethod;
+ private ValueWrapper valueWrapper;
+
+ private ConstructorWrapper(Supplier constructor, Runnable retainMethod, Consumer releaseMethod) {
+ this.constructor = Objects.requireNonNull(constructor, "constructor == null");
+ this.retainMethod = Objects.requireNonNull(retainMethod, "retainMethod == null");
+ this.releaseMethod = Objects.requireNonNull(releaseMethod, "releaseMethod == null");
+ }
+
+ @Override
+ public synchronized V get() {
+ if (valueWrapper == null) {
+ throw new IllegalStateException("Failed to get: not yet retained.");
+ }
+ return valueWrapper.get();
+ }
+
+ @Override
+ public synchronized V retain() {
+ if (valueWrapper == null) {
+ valueWrapper = new ValueWrapper<>(constructor.get(), retainMethod, releaseMethod);
+ }
+ return valueWrapper.retain();
+ }
+
+ @Override
+ public synchronized boolean release() {
+ if (valueWrapper == null) {
+ throw new IllegalStateException("Failed to release: not yet retained.");
+ }
+ if (valueWrapper.release()) {
+ valueWrapper = null;
+ return true;
+ }
+ return false;
+ }
+ }
}
- /** The same as wrap(value, retainMethod, ignored -> releaseMethod.run()). */
+ /**
+ * The same as newBuilder()
+ * .setValue(value)
+ * .setRetainMethod(retainMethod)
+ * .setReleaseMethod(releaseMethod)
+ * .build();
+ *
+ * @deprecated use {@link Builder}
+ */
+ @Deprecated
static ReferenceCountedObject wrap(V value, Runnable retainMethod, Runnable releaseMethod) {
- return wrap(value, retainMethod, ignored -> releaseMethod.run());
+ return ReferenceCountedObject.newBuilder()
+ .setValue(value)
+ .setRetainMethod(retainMethod)
+ .setReleaseMethod(releaseMethod)
+ .build();
}
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 4970d244a1..2bfeea31e1 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -56,6 +56,7 @@
import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedFunction;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.Preconditions;
@@ -84,36 +85,30 @@
public class NettyClientStreamRpc implements DataStreamClientRpc {
public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
- private static class WorkerGroupGetter implements Supplier {
-
- private static final AtomicReference>> SHARED_WORKER_GROUP
- = new AtomicReference<>();
+ private static class WorkerGroupGetter {
+ private static final MemoizedFunction> SHARED
+ = MemoizedFunction.valueOf(properties -> ReferenceCountedObject.newBuilder()
+ .setConstructor(() -> newWorkerGroup(properties))
+ .setReleaseMethod(eventLoopGroup -> {
+ if (eventLoopGroup != null) {
+ eventLoopGroup.shutdownGracefully();
+ }
+ })
+ .build());
static WorkerGroupGetter newInstance(RaftProperties properties) {
final boolean shared = NettyConfigKeys.DataStream.Client.workerGroupShare(properties);
- if (shared) {
- final CompletableFuture> created = new CompletableFuture<>();
- final CompletableFuture> current
- = SHARED_WORKER_GROUP.updateAndGet(g -> g != null ? g : created);
- if (current == created) {
- created.complete(ReferenceCountedObject.wrap(newWorkerGroup(properties)));
- }
- return new WorkerGroupGetter(current.join().retain()) {
- @Override
- void shutdownGracefully() {
- final CompletableFuture> returned
- = SHARED_WORKER_GROUP.updateAndGet(previous -> {
- Preconditions.assertSame(current, previous, "SHARED_WORKER_GROUP");
- return previous.join().release() ? null : previous;
- });
- if (returned == null) {
- get().shutdownGracefully();
- }
- }
- };
- } else {
+ if (!shared) {
return new WorkerGroupGetter(newWorkerGroup(properties));
}
+
+ final ReferenceCountedObject ref = SHARED.apply(properties);
+ return new WorkerGroupGetter(ref.retain()) {
+ @Override
+ void shutdownGracefully() {
+ ref.release();
+ }
+ };
}
static EventLoopGroup newWorkerGroup(RaftProperties properties) {
@@ -129,7 +124,6 @@ private WorkerGroupGetter(EventLoopGroup workerGroup) {
this.workerGroup = workerGroup;
}
- @Override
public final EventLoopGroup get() {
return workerGroup;
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 0e10b0f4dc..f3b98054d0 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -311,8 +311,11 @@ static long writeTo(ByteBuf buf, Iterable options,
if (buffer.remaining() == 0) {
continue;
}
- final ReferenceCountedObject wrapped = ReferenceCountedObject.wrap(
- buffer, buf::retain, ignored -> buf.release());
+ final ReferenceCountedObject wrapped = ReferenceCountedObject.newBuilder()
+ .setValue(buffer)
+ .setRetainMethod(buf::retain)
+ .setReleaseMethod(ignored -> buf.release())
+ .build();
try(UncheckedAutoCloseable ignore = wrapped.retainAndReleaseOnClose()) {
byteWritten += channel.write(wrapped);
} catch (Throwable t) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
index 2436310141..c1d7a3bfa2 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java
@@ -24,6 +24,8 @@
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
public class TestReferenceCountedObject {
static void assertValues(
AtomicInteger retained, int expectedRetained,
@@ -35,20 +37,13 @@ static void assertValues(
static void assertRelease(ReferenceCountedObject> ref,
AtomicInteger retained, int expectedRetained,
AtomicInteger released, int expectedReleased) {
- final boolean returned = ref.release();
+ final boolean completelyReleased = ref.release();
assertValues(retained, expectedRetained, released, expectedReleased);
- Assertions.assertEquals(expectedRetained == expectedReleased, returned);
+ Assertions.assertEquals(expectedRetained == expectedReleased, completelyReleased);
}
- @Test
- @Timeout(value = 1)
- public void testWrap() {
- final String value = "testWrap";
- final AtomicInteger retained = new AtomicInteger();
- final AtomicInteger released = new AtomicInteger();
- final ReferenceCountedObject ref = ReferenceCountedObject.wrap(
- value, retained::getAndIncrement, released::getAndIncrement);
-
+ static void runTestWrapper(String value, ReferenceCountedObject ref,
+ AtomicInteger retained, AtomicInteger released) {
assertValues(retained, 0, released, 0);
try {
ref.get();
@@ -94,6 +89,24 @@ public void testWrap() {
e.printStackTrace(System.out);
}
+ assertValues(retained, 4, released, 4);
+ }
+
+ @Test
+ @Timeout(value = 1)
+ public void testValueWrapper() {
+ final String value = "testValue";
+ final AtomicInteger retained = new AtomicInteger();
+ final AtomicInteger released = new AtomicInteger();
+ final ReferenceCountedObject ref = ReferenceCountedObject.newBuilder()
+ .setValue(value)
+ .setRetainMethod(retained::getAndIncrement)
+ .setReleaseMethod(released::getAndIncrement)
+ .build();
+
+ runTestWrapper(value, ref, retained, released);
+
+ // for the ValueWrapper, it cannot be retained/released after it is completely released
try {
ref.retain();
Assertions.fail();
@@ -115,10 +128,46 @@ public void testWrap() {
}
}
+ @Test
+ @Timeout(value = 1)
+ public void testConstructorWrapper() {
+ final String prefix = "constructor";
+ final AtomicInteger valueCount = new AtomicInteger();
+ final AtomicInteger retained = new AtomicInteger();
+ final AtomicInteger released = new AtomicInteger();
+ final ReferenceCountedObject ref = ReferenceCountedObject.newBuilder()
+ .setConstructor(() -> prefix + valueCount.getAndIncrement())
+ .setRetainMethod(retained::getAndIncrement)
+ .setReleaseMethod(released::getAndIncrement)
+ .build();
+ Assertions.assertEquals(0, valueCount.get());
+ runTestWrapper(prefix + valueCount, ref, retained, released);
+ Assertions.assertEquals(1, valueCount.get());
+
+ // for the ConstructorWrapper, it can be retained/released after it is completely released
+ retained.set(0);
+ released.set(0);
+ runTestWrapper(prefix + valueCount, ref, retained, released);
+ Assertions.assertEquals(2, valueCount.get());
+ }
+
+ @Test
+ @Timeout(value = 1)
+ public void testBuilder() {
+ // Do not set value and constructor
+ assertThrows(NullPointerException.class, () -> ReferenceCountedObject.newBuilder().build());
+
+ // Set both value and constructor
+ assertThrows(IllegalStateException.class, () -> ReferenceCountedObject.newBuilder()
+ .setValue("")
+ .setConstructor(() -> "")
+ .build());
+ }
+
@Test
@Timeout(value = 1)
public void testReleaseWithoutRetaining() {
- final ReferenceCountedObject ref = ReferenceCountedObject.wrap("");
+ final ReferenceCountedObject