Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove query-cache serialization optimization. #9500

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -32,37 +32,26 @@
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.PagedBytesReference;
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.MemorySizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -217,11 +206,12 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
}

/**
* Loads the cache result, computing it if needed by executing the query phase. The combination of load + compute allows
* Loads the cache result, computing it if needed by executing the query phase and otherwise deserializing the cached
* value into the {@link SearchContext#queryResult() context's query result}. The combination of load + compute allows
* to have a single load operation that will cause other requests with the same key to wait till its loaded an reuse
* the same cache.
*/
public QuerySearchResultProvider load(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
public void loadIntoContext(final ShardSearchRequest request, final SearchContext context, final QueryPhase queryPhase) throws Exception {
assert canCache(request, context);
Key key = buildKey(request, context);
Loader loader = new Loader(queryPhase, context, key);
Expand All @@ -238,10 +228,11 @@ public QuerySearchResultProvider load(final ShardSearchRequest request, final Se
}
} else {
key.shard.queryCache().onHit();
// restore the cached query result into the context
final QuerySearchResult result = context.queryResult();
result.readFromWithId(context.id(), value.reference.streamInput());
result.shardTarget(context.shardTarget());
}

// try and be smart, and reuse an already loaded and constructed QueryResult of in VM execution
return new BytesQuerySearchResult(context.id(), context.shardTarget(), value.reference, loader.isLoaded() ? context.queryResult() : null);
}

private static class Loader implements Callable<Value> {
Expand Down Expand Up @@ -278,7 +269,6 @@ public Value call() throws Exception {
// for now, keep the paged data structure, which might have unused bytes to fill a page, but better to keep
// the memory properly paged instead of having varied sized bytes
final BytesReference reference = out.bytes();
assert verifyCacheSerializationSameAsQueryResult(reference, context, context.queryResult());
loaded = true;
Value value = new Value(reference, out.ramBytesUsed());
key.shard.queryCache().onCached(key, value);
Expand Down Expand Up @@ -459,89 +449,11 @@ synchronized void reap() {
}
}

private static boolean verifyCacheSerializationSameAsQueryResult(BytesReference cacheData, SearchContext context, QuerySearchResult result) throws Exception {
BytesStreamOutput out1 = new BytesStreamOutput();
new BytesQuerySearchResult(context.id(), context.shardTarget(), cacheData).writeTo(out1);
BytesStreamOutput out2 = new BytesStreamOutput();
result.writeTo(out2);
return out1.bytes().equals(out2.bytes());
}

private static Key buildKey(ShardSearchRequest request, SearchContext context) throws Exception {
// TODO: for now, this will create different keys for different JSON order
// TODO: tricky to get around this, need to parse and order all, which can be expensive
return new Key(context.indexShard(),
((DirectoryReader) context.searcher().getIndexReader()).getVersion(),
request.cacheKey());
}

/**
* this class aim is to just provide an on the wire *write* format that is the same as {@link QuerySearchResult}
* and also provide a nice wrapper for in node communication for an already constructed {@link QuerySearchResult}.
*/
private static class BytesQuerySearchResult extends QuerySearchResultProvider {

private long id;
private SearchShardTarget shardTarget;
private BytesReference data;

private transient QuerySearchResult result;

private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data) {
this(id, shardTarget, data, null);
}

private BytesQuerySearchResult(long id, SearchShardTarget shardTarget, BytesReference data, QuerySearchResult result) {
this.id = id;
this.shardTarget = shardTarget;
this.data = data;
this.result = result;
}

@Override
public boolean includeFetch() {
return false;
}

@Override
public QuerySearchResult queryResult() {
if (result == null) {
result = new QuerySearchResult(id, shardTarget);
try {
result.readFromWithId(id, data.streamInput());
} catch (Exception e) {
throw new ElasticsearchParseException("failed to parse a cached query", e);
}
}
return result;
}

@Override
public long id() {
return id;
}

@Override
public SearchShardTarget shardTarget() {
return shardTarget;
}

@Override
public void shardTarget(SearchShardTarget shardTarget) {
this.shardTarget = shardTarget;
}

@Override
public void readFrom(StreamInput in) throws IOException {
throw new ElasticsearchIllegalStateException("readFrom should not be called");
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
// shardTarget.writeTo(out); not needed
data.writeTo(out); // we need to write teh bytes as is, to be the same as QuerySearchResult
}
}
}
31 changes: 17 additions & 14 deletions src/main/java/org/elasticsearch/search/SearchService.java
Expand Up @@ -24,6 +24,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;

import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
Expand Down Expand Up @@ -271,21 +272,27 @@ public ScrollQueryFetchSearchResult executeScan(InternalScrollSearchRequest requ
}
}

/**
* Try to load the query results from the cache or execute the query phase directly if the cache cannot be used.
*/
private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context,
final QueryPhase queryPhase) throws Exception {
final boolean canCache = indicesQueryCache.canCache(request, context);
if (canCache) {
indicesQueryCache.loadIntoContext(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
}

public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws ElasticsearchException {
final SearchContext context = createAndPutContext(request);
try {
context.indexShard().searchService().onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);

QuerySearchResultProvider result;
boolean canCache = indicesQueryCache.canCache(request, context);
if (canCache) {
result = indicesQueryCache.load(request, context, queryPhase);
} else {
queryPhase.execute(context);
result = context.queryResult();
}
loadOrExecuteQueryPhase(request, context, queryPhase);

if (context.searchType() == SearchType.COUNT) {
freeContext(context.id());
Expand All @@ -294,7 +301,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t
}
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);

return result;
return context.queryResult();
} catch (Throwable e) {
// execution exception can happen while loading the cache, strip it
if (e instanceof ExecutionException) {
Expand Down Expand Up @@ -989,11 +996,7 @@ public void run() {
if (canCache != top) {
return;
}
if (canCache) {
indicesQueryCache.load(request, context, queryPhase);
} else {
queryPhase.execute(context);
}
loadOrExecuteQueryPhase(request, context, queryPhase);
long took = System.nanoTime() - now;
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed [{}], took [{}]", entry.name(), TimeValue.timeValueNanos(took));
Expand Down
@@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.stats;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.test.ElasticsearchIntegrationTest;

import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.hamcrest.Matchers.greaterThan;

public class IndicesQueryCacheTests extends ElasticsearchIntegrationTest {

// One of the primary purposes of the query cache is to cache aggs results
public void testCacheAggs() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index").setSettings(IndicesQueryCache.INDEX_CACHE_QUERY_ENABLED, true).get());
indexRandom(true,
client().prepareIndex("index", "type").setSource("f", 4),
client().prepareIndex("index", "type").setSource("f", 6),
client().prepareIndex("index", "type").setSource("f", 7));

final SearchResponse r1 = client().prepareSearch("index").setSearchType(SearchType.COUNT)
.addAggregation(histogram("histo").field("f").interval(2)).get();

// The cached is actually used
assertThat(client().admin().indices().prepareStats("index").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l));

for (int i = 0; i < 10; ++i) {
final SearchResponse r2 = client().prepareSearch("index").setSearchType(SearchType.COUNT)
.addAggregation(histogram("histo").field("f").interval(2)).get();
Histogram h1 = r1.getAggregations().get("histo");
Histogram h2 = r2.getAggregations().get("histo");
final List<? extends Bucket> buckets1 = h1.getBuckets();
final List<? extends Bucket> buckets2 = h2.getBuckets();
assertEquals(buckets1.size(), buckets2.size());
for (int j = 0; j < buckets1.size(); ++j) {
final Bucket b1 = buckets1.get(j);
final Bucket b2 = buckets2.get(j);
assertEquals(b1.getKey(), b2.getKey());
assertEquals(b1.getDocCount(), b2.getDocCount());
}
}
}

}