Skip to content

Commit

Permalink
Anonymize AbstractRefCounted (#77208)
Browse files Browse the repository at this point in the history
Today `AbstractRefCounted` has a `name` field which is only used to
construct the exception message when calling `incRef()` after it's been
closed. This isn't really necessary, the stack trace will identify the
reference in question and give loads more useful detail besides. It's
also slightly irksome to have to name every single implementation.

This commit drops the name and the constructor parameter, and also
introduces a handy factory method for use when there's no extra state
needed and you just want to run a method or lambda when all references
are released.
  • Loading branch information
DaveCTurner committed Sep 3, 2021
1 parent a479181 commit 7c513a7
Show file tree
Hide file tree
Showing 22 changed files with 89 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@
import java.util.concurrent.atomic.AtomicInteger;

/**
* A basic RefCounted implementation that is initialized with a
* ref count of 1 and calls {@link #closeInternal()} once it reaches
* a 0 ref count
* A basic {@link RefCounted} implementation that is initialized with a ref count of 1 and calls {@link #closeInternal()} once it reaches
* a 0 ref count.
*/
public abstract class AbstractRefCounted implements RefCounted {
private final AtomicInteger refCount = new AtomicInteger(1);
private final String name;
public static final String ALREADY_CLOSED_MESSAGE = "already closed, can't increment ref count";

public AbstractRefCounted(String name) {
this.name = name;
}
private final AtomicInteger refCount = new AtomicInteger(1);

@Override
public final void incRef() {
Expand Down Expand Up @@ -63,14 +59,16 @@ public final boolean decRef() {
}

/**
* Called whenever the ref count is incremented or decremented. Can be implemented by implementations to a record of access to the
* instance for debugging purposes.
* Called whenever the ref count is incremented or decremented. Can be overridden to record access to the instance for debugging
* purposes.
*/
protected void touch() {
}

protected void alreadyClosed() {
throw new IllegalStateException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
final int currentRefCount = refCount.get();
assert currentRefCount == 0 : currentRefCount;
throw new IllegalStateException(ALREADY_CLOSED_MESSAGE);
}

/**
Expand All @@ -80,15 +78,21 @@ public int refCount() {
return this.refCount.get();
}


/** gets the name of this instance */
public String getName() {
return name;
}

/**
* Method that is invoked once the reference count reaches zero.
* Implementations of this method must handle all exceptions and may not throw any exceptions.
*/
protected abstract void closeInternal();

/**
* Construct an {@link AbstractRefCounted} which runs the given {@link Runnable} when all references are released.
*/
public static AbstractRefCounted of(Runnable onClose) {
return new AbstractRefCounted() {
@Override
protected void closeInternal() {
onClose.run();
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -20,7 +19,8 @@
import static org.hamcrest.Matchers.is;

public class RefCountedTests extends ESTestCase {
public void testRefCount() throws IOException {

public void testRefCount() {
MyRefCounted counted = new MyRefCounted();

int incs = randomIntBetween(1, 100);
Expand Down Expand Up @@ -56,12 +56,9 @@ public void testRefCount() throws IOException {

counted.decRef();
assertFalse(counted.tryIncRef());
try {
counted.incRef();
fail(" expected exception");
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("test is already closed can't increment refCount current count [0]"));
}
assertThat(
expectThrows(IllegalStateException.class, counted::incRef).getMessage(),
equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE));

try {
counted.ensureOpen();
Expand All @@ -77,29 +74,26 @@ public void testMultiThreaded() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final CopyOnWriteArrayList<Exception> exceptions = new CopyOnWriteArrayList<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
@Override
public void run() {
try {
latch.await();
for (int j = 0; j < 10000; j++) {
counted.incRef();
try {
counted.ensureOpen();
} finally {
counted.decRef();
}
threads[i] = new Thread(() -> {
try {
latch.await();
for (int j = 0; j < 10000; j++) {
counted.incRef();
try {
counted.ensureOpen();
} finally {
counted.decRef();
}
} catch (Exception e) {
exceptions.add(e);
}
} catch (Exception e) {
exceptions.add(e);
}
};
});
threads[i].start();
}
latch.countDown();
for (int i = 0; i < threads.length; i++) {
threads[i].join();
for (Thread thread : threads) {
thread.join();
}
counted.decRef();
try {
Expand All @@ -110,17 +104,12 @@ public void run() {
}
assertThat(counted.refCount(), is(0));
assertThat(exceptions, Matchers.emptyIterable());

}

private final class MyRefCounted extends AbstractRefCounted {
private static final class MyRefCounted extends AbstractRefCounted {

private final AtomicBoolean closed = new AtomicBoolean(false);

MyRefCounted() {
super("test");
}

@Override
protected void closeInternal() {
this.closed.set(true);
Expand Down
1 change: 0 additions & 1 deletion libs/nio/src/main/java/org/elasticsearch/nio/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ private static class RefCountedCloseable extends AbstractRefCounted {
private final Releasable closeable;

private RefCountedCloseable(Releasable closeable) {
super("byte array page");
this.closeable = closeable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,9 @@ private SharedGroup getGenericGroup() {

private static class RefCountedGroup extends AbstractRefCounted {

public static final String NAME = "ref-counted-event-loop-group";
private final EventLoopGroup eventLoopGroup;

private RefCountedGroup(EventLoopGroup eventLoopGroup) {
super(NAME);
this.eventLoopGroup = eventLoopGroup;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class AmazonEc2Reference extends AbstractRefCounted implements Releasable
private final AmazonEC2 client;

AmazonEc2Reference(AmazonEC2 client) {
super("AWS_EC2_CLIENT");
this.client = client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class AmazonS3Reference extends AbstractRefCounted implements Releasable
private final AmazonS3 client;

AmazonS3Reference(AmazonS3 client) {
super("AWS_S3_CLIENT");
this.client = client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,9 @@ private void onException(Exception exception) {

private static class RefCountedNioGroup extends AbstractRefCounted implements NioGroup {

public static final String NAME = "ref-counted-nio-group";
private final NioSelectorGroup nioGroup;

private RefCountedNioGroup(NioSelectorGroup nioGroup) {
super(NAME);
this.nioGroup = nioGroup;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ private static final class RefCountedReleasable extends AbstractRefCounted {
private final Releasable releasable;

RefCountedReleasable(Releasable releasable) {
super("bytes-reference");
this.releasable = releasable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ private final class CachedItem extends AbstractRefCounted {
private final CancellationChecks cancellationChecks = new CancellationChecks();

CachedItem(Key key) {
super("cached item");
this.key = key;
incRef(); // start with a refcount of 2 so we're not closed while adding the first listener
this.future.addListener(new ActionListener<Value>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
private final AtomicLong totalChannelsAccepted = new AtomicLong();
private final Set<HttpChannel> httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final PlainActionFuture<Void> allClientsClosedListener = PlainActionFuture.newFuture();
private final RefCounted refCounted = new AbstractRefCounted("abstract-http-server-transport") {
@Override
protected void closeInternal() {
allClientsClosedListener.onResponse(null);
}
};
private final RefCounted refCounted = AbstractRefCounted.of(() -> allClientsClosedListener.onResponse(null));
private final Set<HttpServerChannel> httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final HttpClientStatsTracker httpClientStatsTracker;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final ShardLock shardLock;
private final OnClose onClose;

private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@Override
protected void closeInternal() {
// close us once we are done
Store.this.closeInternal();
}
};
private final AbstractRefCounted refCounter = AbstractRefCounted.of(this::closeInternal); // close us once we are done

public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY);
Expand Down
31 changes: 14 additions & 17 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,24 +310,21 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
// avoid closing these resources while ongoing requests are still being processed, we use a
// ref count which will only close them when both this service and all index services are
// actually closed
indicesRefCount = new AbstractRefCounted("indices") {
@Override
protected void closeInternal() {
try {
IOUtils.close(
analysisRegistry,
indexingMemoryController,
indicesFieldDataCache,
cacheCleaner,
indicesRequestCache,
indicesQueryCache);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
closeLatch.countDown();
}
indicesRefCount = AbstractRefCounted.of(() -> {
try {
IOUtils.close(
analysisRegistry,
indexingMemoryController,
indicesFieldDataCache,
cacheCleaner,
indicesRequestCache,
indicesQueryCache);
} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
closeLatch.countDown();
}
};
});

final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
nodeWriteDanglingIndicesInfo = WRITE_DANGLING_INDICES_INFO_SETTING.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
public class MultiFileWriter extends AbstractRefCounted implements Releasable {

public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
super("multi_file_writer");
this.store = store;
this.indexState = indexState;
this.tempFilePrefix = tempFilePrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public RecoveryTarget(IndexShard indexShard,
DiscoveryNode sourceNode,
SnapshotFilesProvider snapshotFilesProvider,
PeerRecoveryTargetService.RecoveryListener listener) {
super("recovery_status");
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,7 @@ public ReaderContext(ShardSearchContextId id,
this.singleSession = singleSession;
this.keepAlive = new AtomicLong(keepAliveInMillis);
this.lastAccessTime = new AtomicLong(nowInMillis());
this.refCounted = new AbstractRefCounted("reader_context") {
@Override
protected void closeInternal() {
doClose();
}
};
this.refCounted = AbstractRefCounted.of(this::doClose);
}

public void validate(TransportRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,8 @@ public class ClusterConnectionManager implements ConnectionManager {

private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
@Override
protected void closeInternal() {
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
closeLatch.countDown();
}
};
private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);

private final Transport transport;
private final ConnectionProfile defaultProfile;
private final AtomicBoolean closing = new AtomicBoolean(false);
Expand Down Expand Up @@ -237,6 +224,19 @@ private void internalClose(boolean waitForPendingConnections) {
}
}

private void pendingConnectionsComplete() {
final Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
closeLatch.countDown();
}

private void internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
ActionListener<Transport.Connection> listener) {
transport.openConnection(node, connectionProfile, listener.map(connection -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
Expand Down Expand Up @@ -350,7 +351,7 @@ public void onFailure(Exception e) {
} catch (AlreadyClosedException ex) {
throw ex;
} catch (IllegalStateException ex) {
assertEquals("reader_context is already closed can't increment refCount current count [0]", ex.getMessage());
assertEquals(AbstractRefCounted.ALREADY_CLOSED_MESSAGE, ex.getMessage());
} catch (SearchContextMissingException ex) {
// that's fine
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ private static final class LeakAwareRefCounted extends AbstractRefCounted {
private final Releasable releasable;

LeakAwareRefCounted(Releasable releasable) {
super("leak-aware-ref-counted");
this.releasable = releasable;
leak = LeakTracker.INSTANCE.track(releasable);
}
Expand Down

0 comments on commit 7c513a7

Please sign in to comment.