Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* A reference-counted object can be retained for later use
Expand All @@ -39,12 +40,19 @@
* it must be released the same number of times.
* <p>
* - If the object has been retained and then completely released (i.e. the reference count becomes 0),
* it must not be retained/released/accessed anymore since it may have been allocated for other use.
* calling {@link #get()} will lead to an {@link IllegalStateException}.
* Depending on how the {@link ReferenceCountedObject} is built,
* calling {@link #retain()} may or may not be allowed;
* @see Builder#setValue(Object)
* @see Builder#setConstructor(Supplier)
*
* @param <T> The object type.
*/
public interface ReferenceCountedObject<T> {
/** @return the object. */
/**
* @return the object.
* @throws IllegalStateException when the object has not been retained.
*/
T get();

/**
Expand Down Expand Up @@ -74,7 +82,7 @@ default UncheckedAutoCloseableSupplier<T> retainAndReleaseOnClose() {
@Override
public T get() {
if (closed.get()) {
throw new IllegalStateException("Already closed");
throw new IllegalStateException("Failed to get: already closed");
}
return retained;
}
Expand All @@ -96,36 +104,148 @@ public void close() {
*/
boolean release();

/** The same as wrap(value, EMPTY, EMPTY), where EMPTY is an empty method. */
/**
* The same as newBuilder().setValue(value).build();
*
* @deprecated use {@link Builder}
*/
@Deprecated
static <V> ReferenceCountedObject<V> wrap(V value) {
return wrap(value, () -> {}, ignored -> {});
return ReferenceCountedObject.<V>newBuilder().setValue(value).build();
}

/**
* Wrap the given value as a {@link ReferenceCountedObject}.
* The same as newBuilder()
* .setValue(value)
* .setRetainMethod(retainMethod)
* .setBooleanReleaseMethod(releaseMethod)
* .build();
*
* @param value the value being wrapped.
* @param retainMethod a method to run when {@link #retain()} is invoked.
* @param releaseMethod a method to run when {@link #release()} is invoked,
* where the method takes a boolean which is the same as the one returned by {@link #release()}.
* @param <V> The value type.
* @return the wrapped reference-counted object.
* @deprecated use {@link Builder}
*/
@Deprecated
static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) {
Objects.requireNonNull(value, "value == null");
Objects.requireNonNull(retainMethod, "retainMethod == null");
Objects.requireNonNull(releaseMethod, "releaseMethod == null");
return ReferenceCountedObject.<V>newBuilder()
.setValue(value)
.setRetainMethod(retainMethod)
.setBooleanReleaseMethod(releaseMethod)
.build();
}

static <V> Builder<V> newBuilder() {
return new Builder<>();
}

/**
* To build {@link ReferenceCountedObject},
* where it may use either a fixed value or a constructor (but not both).
* @see Builder#setValue(Object)
* @see Builder#setConstructor(Supplier)
*
* @param <V> The type of the {@link ReferenceCountedObject} being built.
*/
class Builder<V> {
private V value = null;
private Supplier<V> constructor = null;
private Runnable retainMethod = () -> {};
private Consumer<V> releaseMethod = v -> {};

/**
* Set a fixed value for the {@link ReferenceCountedObject} being built.
* Once it has been completely released, calling {@link #retain()} is not allowed.
*
* @param value a fixed value.
*/
public Builder<V> setValue(V value) {
this.value = value;
return this;
}

/**
* Set a constructor for the {@link ReferenceCountedObject} being built.
* The value is constructed at the first {@link #retain()} call,
* After it has been completely released by {@link #release()},
* a new value will be constructed when {@link #retain()} is called again.
*
* @param constructor to construct the object.
*/
public Builder<V> setConstructor(Supplier<V> constructor) {
this.constructor = constructor;
return this;
}

/**
* @param retainMethod a method to run when {@link #retain()} is invoked.
*/
public Builder<V> setRetainMethod(Runnable retainMethod) {
this.retainMethod = retainMethod != null ? retainMethod : () -> {};
return this;
}

/**
* @param releaseMethod a method to run when {@link #release()} is invoked,
* where the method has a parameter,
* where the actual parameter is the value when it is completely released;
* otherwise, the actual parameter is null.
* The method may clean up the value when it is completely released.
*/
public Builder<V> setReleaseMethod(Consumer<V> releaseMethod) {
this.releaseMethod = releaseMethod != null ? releaseMethod : ignored -> {};
return this;
}

return new ReferenceCountedObject<V>() {
/**
* @param booleanReleaseMethod a method to run when {@link #release()} is invoked,
* where the method takes the boolean returned from {@link #release()}.
*/
public Builder<V> setBooleanReleaseMethod(Consumer<Boolean> booleanReleaseMethod) {
this.releaseMethod = booleanReleaseMethod != null ? v -> booleanReleaseMethod.accept(v != null) : ignored -> {};
return this;
}

/**
* @param releaseMethod a method to run when {@link #release()} is invoked,
*/
public Builder<V> setReleaseMethod(Runnable releaseMethod) {
this.releaseMethod = releaseMethod != null ? ignored -> releaseMethod.run() : ignored -> {};
return this;
}

public ReferenceCountedObject<V> build() {
if (value == null) {
Objects.requireNonNull(constructor, "Both value == null and constructor == null");
return new ConstructorWrapper<>(constructor, retainMethod, releaseMethod);
} else {
Preconditions.assertNull(constructor, "Both value != null and constructor != null");
return new ValueWrapper<>(value, retainMethod, releaseMethod);
}
}

/**
* Wrap a fixed value as a {@link ReferenceCountedObject}.
* When it is completely released, it cannot be retained again.
*
* @param <V> The value type.
*/
private static final class ValueWrapper<V> implements ReferenceCountedObject<V> {
private final V value;
private final Runnable retainMethod;
private final Consumer<V> releaseMethod;
private final AtomicInteger count = new AtomicInteger();

private ValueWrapper(V value, Runnable retainMethod, Consumer<V> releaseMethod) {
this.value = Objects.requireNonNull(value, "value == null");
this.retainMethod = Objects.requireNonNull(retainMethod, "retainMethod == null");
this.releaseMethod = Objects.requireNonNull(releaseMethod, "releaseMethod == null");
}

@Override
public V get() {
final int previous = count.get();
if (previous < 0) {
throw new IllegalStateException("Failed to get: object has already been completely released.");
throw new IllegalStateException("Failed to get: already completely released.");
} else if (previous == 0) {
throw new IllegalStateException("Failed to get: object has not yet been retained.");
throw new IllegalStateException("Failed to get: not yet retained.");
}
return value;
}
Expand All @@ -135,7 +255,7 @@ public V retain() {
// n < 0: exception
// n >= 0: n++
if (count.getAndUpdate(n -> n < 0? n : n + 1) < 0) {
throw new IllegalStateException("Failed to retain: object has already been completely released.");
throw new IllegalStateException("Failed to retain: already completely released.");
}

retainMethod.run();
Expand All @@ -149,19 +269,78 @@ public boolean release() {
// n == 1: n = -1
final int previous = count.getAndUpdate(n -> n <= 1? -1: n - 1);
if (previous < 0) {
throw new IllegalStateException("Failed to release: object has already been completely released.");
throw new IllegalStateException("Failed to release: already completely released.");
} else if (previous == 0) {
throw new IllegalStateException("Failed to release: object has not yet been retained.");
throw new IllegalStateException("Failed to release: not yet retained.");
}
final boolean completedReleased = previous == 1;
releaseMethod.accept(completedReleased);
return completedReleased;
final boolean completelyReleased = previous == 1;
releaseMethod.accept(completelyReleased ? value : null);
return completelyReleased;
}
};
}

/**
* Wrap a constructor as a {@link ReferenceCountedObject}.
*
* @see Builder#setConstructor(Supplier)
*/
private static final class ConstructorWrapper<V> implements ReferenceCountedObject<V> {
private final Supplier<V> constructor;
private final Runnable retainMethod;
private final Consumer<V> releaseMethod;
private ValueWrapper<V> valueWrapper;

private ConstructorWrapper(Supplier<V> constructor, Runnable retainMethod, Consumer<V> releaseMethod) {
this.constructor = Objects.requireNonNull(constructor, "constructor == null");
this.retainMethod = Objects.requireNonNull(retainMethod, "retainMethod == null");
this.releaseMethod = Objects.requireNonNull(releaseMethod, "releaseMethod == null");
}

@Override
public synchronized V get() {
if (valueWrapper == null) {
throw new IllegalStateException("Failed to get: not yet retained.");
}
return valueWrapper.get();
}

@Override
public synchronized V retain() {
if (valueWrapper == null) {
valueWrapper = new ValueWrapper<>(constructor.get(), retainMethod, releaseMethod);
}
return valueWrapper.retain();
}

@Override
public synchronized boolean release() {
if (valueWrapper == null) {
throw new IllegalStateException("Failed to release: not yet retained.");
}
if (valueWrapper.release()) {
valueWrapper = null;
return true;
}
return false;
}
}
}

/** The same as wrap(value, retainMethod, ignored -> releaseMethod.run()). */
/**
* The same as newBuilder()
* .setValue(value)
* .setRetainMethod(retainMethod)
* .setReleaseMethod(releaseMethod)
* .build();
*
* @deprecated use {@link Builder}
*/
@Deprecated
static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Runnable releaseMethod) {
return wrap(value, retainMethod, ignored -> releaseMethod.run());
return ReferenceCountedObject.<V>newBuilder()
.setValue(value)
.setRetainMethod(retainMethod)
.setReleaseMethod(releaseMethod)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedFunction;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.Preconditions;
Expand Down Expand Up @@ -84,36 +85,30 @@
public class NettyClientStreamRpc implements DataStreamClientRpc {
public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);

private static class WorkerGroupGetter implements Supplier<EventLoopGroup> {

private static final AtomicReference<CompletableFuture<ReferenceCountedObject<EventLoopGroup>>> SHARED_WORKER_GROUP
= new AtomicReference<>();
private static class WorkerGroupGetter {
private static final MemoizedFunction<RaftProperties, ReferenceCountedObject<EventLoopGroup>> SHARED
= MemoizedFunction.valueOf(properties -> ReferenceCountedObject.<EventLoopGroup>newBuilder()
.setConstructor(() -> newWorkerGroup(properties))
.setReleaseMethod(eventLoopGroup -> {
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully();
}
})
.build());

static WorkerGroupGetter newInstance(RaftProperties properties) {
final boolean shared = NettyConfigKeys.DataStream.Client.workerGroupShare(properties);
if (shared) {
final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> created = new CompletableFuture<>();
final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> current
= SHARED_WORKER_GROUP.updateAndGet(g -> g != null ? g : created);
if (current == created) {
created.complete(ReferenceCountedObject.wrap(newWorkerGroup(properties)));
}
return new WorkerGroupGetter(current.join().retain()) {
@Override
void shutdownGracefully() {
final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> returned
= SHARED_WORKER_GROUP.updateAndGet(previous -> {
Preconditions.assertSame(current, previous, "SHARED_WORKER_GROUP");
return previous.join().release() ? null : previous;
});
if (returned == null) {
get().shutdownGracefully();
}
}
};
} else {
if (!shared) {
return new WorkerGroupGetter(newWorkerGroup(properties));
}

final ReferenceCountedObject<EventLoopGroup> ref = SHARED.apply(properties);
return new WorkerGroupGetter(ref.retain()) {
@Override
void shutdownGracefully() {
ref.release();
}
};
}

static EventLoopGroup newWorkerGroup(RaftProperties properties) {
Expand All @@ -129,7 +124,6 @@ private WorkerGroupGetter(EventLoopGroup workerGroup) {
this.workerGroup = workerGroup;
}

@Override
public final EventLoopGroup get() {
return workerGroup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,11 @@ static long writeTo(ByteBuf buf, Iterable<WriteOption> options,
if (buffer.remaining() == 0) {
continue;
}
final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.wrap(
buffer, buf::retain, ignored -> buf.release());
final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.<ByteBuffer>newBuilder()
.setValue(buffer)
.setRetainMethod(buf::retain)
.setReleaseMethod(ignored -> buf.release())
.build();
try(UncheckedAutoCloseable ignore = wrapped.retainAndReleaseOnClose()) {
byteWritten += channel.write(wrapped);
} catch (Throwable t) {
Expand Down
Loading
Loading