Skip to content

Commit

Permalink
Page-based cache recycling.
Browse files Browse the repository at this point in the history
Refactor cache recycling so that it only caches large arrays (pages) that can
later be used to build more complex data-structures such as hash tables.

 - QueueRecycler now takes a limit like other non-trivial recyclers.
 - New PageCacheRecycler (inspired of CacheRecycler) has the ability to cache
   byte[], int[], long[], double[] or Object[] arrays using a fixed amount of
   memory (either globally or per-thread depending on the Recycler impl, eg.
   queue is global while thread_local is per-thread).
 - Paged arrays in o.e.common.util can now optionally take a PageCacheRecycler
   to reuse existing pages.
 - All aggregators' data-structures now use PageCacheRecycler:
   - for all arrays (counts, mins, maxes, ...)
   - LongHash can now take a PageCacheRecycler
   - there is a new BytesRefHash (inspired from Lucene but quite different,
     still; for instance it cheats on BytesRef comparisons by using Unsafe)
     that also takes a PageCacheRecycler

Close elastic#4557
  • Loading branch information
jpountz authored and brusic committed Jan 19, 2014
1 parent ff0d660 commit 88774df
Show file tree
Hide file tree
Showing 69 changed files with 2,272 additions and 353 deletions.
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,9 @@
<exclude>org/elasticsearch/bootstrap/Bootstrap.class</exclude>
<exclude>org/elasticsearch/Version.class</exclude>
<!-- end excludes for valid system-out -->
<!-- start excludes for Unsafe -->
<exclude>org/elasticsearch/common/util/UnsafeUtils.class</exclude>
<!-- end excludes for Unsafe -->
</excludes>
<bundledSignatures>
<!-- This will automatically choose the right signatures based on 'maven.compiler.target': -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -66,12 +67,15 @@ public class TransportValidateQueryAction extends TransportBroadcastOperationAct

private final CacheRecycler cacheRecycler;

private final PageCacheRecycler pageCacheRecycler;

@Inject
public TransportValidateQueryAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ScriptService scriptService, CacheRecycler cacheRecycler) {
public TransportValidateQueryAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, IndicesService indicesService, ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
}

@Override
Expand Down Expand Up @@ -180,7 +184,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
SearchContext.setCurrent(new DefaultSearchContext(0,
new ShardSearchRequest().types(request.types()).nowInMillis(request.nowInMillis()),
null, indexShard.acquireSearcher("validate_query"), indexService, indexShard,
scriptService, cacheRecycler));
scriptService, cacheRecycler, pageCacheRecycler));
try {
ParsedQuery parsedQuery = queryParserService.parseQuery(request.source());
valid = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -67,13 +68,16 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun

private final CacheRecycler cacheRecycler;

private final PageCacheRecycler pageCacheRecycler;

@Inject
public TransportCountAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, ScriptService scriptService, CacheRecycler cacheRecycler) {
IndicesService indicesService, ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
}

@Override
Expand Down Expand Up @@ -164,7 +168,7 @@ protected ShardCountResponse shardOperation(ShardCountRequest request) throws El
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
scriptService, cacheRecycler);
scriptService, cacheRecycler, pageCacheRecycler);
SearchContext.setCurrent(context);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
Expand Down Expand Up @@ -52,14 +53,17 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication

private final ScriptService scriptService;
private final CacheRecycler cacheRecycler;
private final PageCacheRecycler pageCacheRecycler;

@Inject
public TransportShardDeleteByQueryAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ShardStateAction shardStateAction, ScriptService scriptService, CacheRecycler cacheRecycler) {
ShardStateAction shardStateAction, ScriptService scriptService, CacheRecycler cacheRecycler,
PageCacheRecycler pageCacheRecycler) {
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
}

@Override
Expand Down Expand Up @@ -109,7 +113,7 @@ protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler));
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.PRIMARY);
Expand All @@ -131,7 +135,8 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest().types(request.types()), null,
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService, cacheRecycler));
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
cacheRecycler, pageCacheRecycler));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.REPLICA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -63,14 +64,17 @@ public class TransportExplainAction extends TransportShardSingleOperationAction<

private final CacheRecycler cacheRecycler;

private final PageCacheRecycler pageCacheRecycler;

@Inject
public TransportExplainAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService,
ScriptService scriptService, CacheRecycler cacheRecycler) {
ScriptService scriptService, CacheRecycler cacheRecycler, PageCacheRecycler pageCacheRecycler) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.scriptService = scriptService;
this.cacheRecycler = cacheRecycler;
this.pageCacheRecycler = pageCacheRecycler;
}

@Override
Expand Down Expand Up @@ -114,7 +118,7 @@ protected ExplainResponse shardOperation(ExplainRequest request, int shardId) th
.filteringAliases(request.filteringAlias())
.nowInMillis(request.nowInMillis),
null, result.searcher(), indexService, indexShard,
scriptService, cacheRecycler
scriptService, cacheRecycler, pageCacheRecycler
);
SearchContext.setCurrent(context);

Expand Down
76 changes: 52 additions & 24 deletions src/main/java/org/elasticsearch/cache/recycler/CacheRecycler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cache.recycler;

import com.carrotsearch.hppc.*;
import com.google.common.base.Strings;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void close() {
@Inject
public CacheRecycler(Settings settings) {
super(settings);
String type = settings.get("type", Type.SOFT_THREAD_LOCAL.name());
final Type type = Type.parse(settings.get("type"));
int limit = settings.getAsInt("limit", 10);
int smartSize = settings.getAsInt("smart_size", 1024);

Expand Down Expand Up @@ -252,27 +253,10 @@ static int size(int sizing) {
return sizing > 0 ? sizing : 256;
}

private <T> Recycler<T> build(String type, int limit, int smartSize, Recycler.C<T> c) {
private <T> Recycler<T> build(Type type, int limit, int smartSize, Recycler.C<T> c) {
Recycler<T> recycler;
try {
// default to soft_thread_local
final Type t = type == null ? Type.SOFT_THREAD_LOCAL : Type.valueOf(type.toUpperCase(Locale.ROOT));
switch (t) {
case SOFT_THREAD_LOCAL:
recycler = new SoftThreadLocalRecycler<T>(c, limit);
break;
case THREAD_LOCAL:
recycler = new ThreadLocalRecycler<T>(c, limit);
break;
case QUEUE:
recycler = new QueueRecycler<T>(c);
break;
case NONE:
recycler = new NoneRecycler<T>(c);
break;
default:
throw new ElasticSearchIllegalArgumentException("no type support [" + type + "] for recycler");
}
recycler = type.build(c, limit);
if (smartSize > 0) {
recycler = new Recycler.Sizing<T>(recycler, smartSize);
}
Expand All @@ -284,9 +268,53 @@ private <T> Recycler<T> build(String type, int limit, int smartSize, Recycler.C<
}

public static enum Type {
SOFT_THREAD_LOCAL,
THREAD_LOCAL,
QUEUE,
NONE;
SOFT_THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new SoftThreadLocalRecycler<T>(c, limit);
}
@Override
boolean perThread() {
return true;
}
},
THREAD_LOCAL {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new ThreadLocalRecycler<T>(c, limit);
}
@Override
boolean perThread() {
return true;
}
},
QUEUE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new QueueRecycler<T>(c, limit);
}
},
NONE {
@Override
<T> Recycler<T> build(Recycler.C<T> c, int limit) {
return new NoneRecycler<T>(c);
}
};

public static Type parse(String type) {
if (Strings.isNullOrEmpty(type)) {
return SOFT_THREAD_LOCAL;
}
try {
return Type.valueOf(type.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new ElasticSearchIllegalArgumentException("no type support [" + type + "]");
}
}

abstract <T> Recycler<T> build(Recycler.C<T> c, int limit);
boolean perThread() {
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cache.recycler;

import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;

/**
*/
public class DefaultPageCacheRecyclerModule extends AbstractModule {

private final Settings settings;

public DefaultPageCacheRecyclerModule(Settings settings) {
this.settings = settings;
}

@Override
protected void configure() {
bind(PageCacheRecycler.class).asEagerSingleton();
}
}

0 comments on commit 88774df

Please sign in to comment.