Skip to content

Commit

Permalink
Remove close method in PageCacheRecycler/Recycler
Browse files Browse the repository at this point in the history
The changes in #39317 brought to light some concurrency issues in the
close method of Recyclers as we do not wait for threads running in the
threadpool to be finished prior to the closing of the PageCacheRecycler
and the Recyclers that are used internally. #41695 was opened to
address the concurrent close issues but upon review, the closing of
these classes is not really needed as the instances should be become
available for garbage collection once there is no longer a reference to
the closed node.

Closes #41683
  • Loading branch information
jaymode committed May 8, 2019
1 parent c7db902 commit 09b1446
Show file tree
Hide file tree
Showing 11 changed files with 2 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
resourcesToClose.add(circuitBreakerService);
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
resourcesToClose.add(pageCacheRecycler);
modules.add(settingsModule);
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
Expand Down Expand Up @@ -376,7 +375,6 @@ public void close() {
closeables.add(plugin);
}
closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS));
closeables.add(injector.getInstance(PageCacheRecycler.class));
IOUtils.closeWhileHandlingException(closeables);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,4 @@ protected AbstractRecycler(Recycler.C<T> c) {
this.c = c;
}

@Override
public void close() {
// no-op by default
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ public ConcurrentDequeRecycler(C<T> c, int maxSize) {
this.size = new AtomicInteger();
}

@Override
public void close() {
assert deque.size() == size.get();
super.close();
size.set(0);
}

@Override
public V<T> obtain() {
final V<T> v = super.obtain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ public DequeRecycler(C<T> c, Deque<T> queue, int maxSize) {
this.maxSize = maxSize;
}

@Override
public void close() {
// call destroy() for every cached object
for (T t : deque) {
c.destroy(t);
}
// finally get rid of all references
deque.clear();
}

@Override
public V<T> obtain() {
final T v = deque.pollFirst();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,4 @@ public Recycler.V<T> obtain() {
return wrap(getDelegate().obtain());
}

@Override
public void close() {
getDelegate().close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ public V<T> obtain() {
return new NV<>(c.newInstance());
}

@Override
public void close() {
// no-op
}

public static class NV<T> implements Recycler.V<T> {

T value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* A recycled object, note, implementations should support calling obtain and then recycle
* on different threads.
*/
public interface Recycler<T> extends Releasable {
public interface Recycler<T> {

interface Factory<T> {
Recycler<T> build();
Expand Down Expand Up @@ -53,8 +53,6 @@ interface V<T> extends Releasable {

}

void close();

V<T> obtain();

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,6 @@ protected Recycler<T> getDelegate() {
return recyclers[slot()];
}

@Override
public void close() {
for (Recycler<T> recycler : recyclers) {
recycler.close();
}
}

};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.elasticsearch.common.util;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.recycler.AbstractRecyclerC;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -39,7 +37,7 @@
import static org.elasticsearch.common.recycler.Recyclers.none;

/** A recycler of fixed-size pages. */
public class PageCacheRecycler implements Releasable {
public class PageCacheRecycler {

public static final Setting<Type> TYPE_SETTING =
new Setting<>("cache.recycler.page.type", Type.CONCURRENT.name(), Type::parse, Property.NodeScope);
Expand Down Expand Up @@ -73,11 +71,6 @@ public class PageCacheRecycler implements Releasable {
NON_RECYCLING_INSTANCE = new PageCacheRecycler(Settings.builder().put(LIMIT_HEAP_SETTING.getKey(), "0%").build());
}

@Override
public void close() {
Releasables.close(true, bytePage, intPage, longPage, objectPage);
}

public PageCacheRecycler(Settings settings) {
final Type type = TYPE_SETTING.get(settings);
final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes();
Expand Down
2 changes: 0 additions & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ protected Node(

PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
resourcesToClose.add(pageCacheRecycler);
modules.add(settingsModule);
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
NetworkModule.getNamedWriteables().stream(),
Expand Down Expand Up @@ -844,7 +843,6 @@ public synchronized void close() throws IOException {
toClose.add(() -> stopWatch.stop());

toClose.add(injector.getInstance(NodeEnvironment.class));
toClose.add(injector.getInstance(PageCacheRecycler.class));

if (logger.isTraceEnabled()) {
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public void testReuse() {
assertNotSame(b1, b2);
}
o.close();
r.close();
}

public void testRecycle() {
Expand All @@ -111,7 +110,6 @@ public void testRecycle() {
o = r.obtain();
assertRecycled(o.v());
o.close();
r.close();
}

public void testDoubleRelease() {
Expand All @@ -128,7 +126,6 @@ public void testDoubleRelease() {
final Recycler.V<byte[]> v2 = r.obtain();
final Recycler.V<byte[]> v3 = r.obtain();
assertNotSame(v2.v(), v3.v());
r.close();
}

public void testDestroyWhenOverCapacity() {
Expand All @@ -152,9 +149,6 @@ public void testDestroyWhenOverCapacity() {
// release first ref, verify for destruction
o.close();
assertDead(data);

// close the rest
r.close();
}

public void testClose() {
Expand All @@ -171,10 +165,6 @@ public void testClose() {

// verify that recycle() ran
assertRecycled(data);

// closing the recycler should mark recycled instances via destroy()
r.close();
assertDead(data);
}

}

0 comments on commit 09b1446

Please sign in to comment.