Skip to content

Commit

Permalink
Query DSL: Allow to associate a custom cache key with a filter, closes
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Jul 21, 2011
1 parent 3995e78 commit fb12a1f
Show file tree
Hide file tree
Showing 51 changed files with 398 additions and 60 deletions.
Expand Up @@ -49,7 +49,7 @@ public class ExistsFieldQueryExtension implements FieldQueryExtension {
}

// we always cache this one, really does not change...
filter = parseContext.cacheFilter(filter);
filter = parseContext.cacheFilter(filter, null);

filter = wrapSmartNameFilter(filter, smartNameFieldMappers, parseContext);

Expand Down
Expand Up @@ -51,10 +51,10 @@ public class MissingFieldQueryExtension implements FieldQueryExtension {
}

// we always cache this one, really does not change... (exists)
filter = parseContext.cacheFilter(filter);
filter = parseContext.cacheFilter(filter, null);
filter = new NotFilter(filter);
// cache the not filter as well, so it will be faster
filter = parseContext.cacheFilter(filter);
filter = parseContext.cacheFilter(filter, null);

filter = wrapSmartNameFilter(filter, smartNameFieldMappers, parseContext);

Expand Down
Expand Up @@ -68,7 +68,7 @@ public class ResidentFilterCache extends AbstractConcurrentMapFilterCache implem
super.close();
}

@Override protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
@Override protected ConcurrentMap<Object, DocSet> buildFilterMap() {
MapMaker mapMaker = new MapMaker();
if (maxSize != -1) {
mapMaker.maximumSize(maxSize);
Expand Down
Expand Up @@ -68,7 +68,7 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements
super.close();
}

@Override protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
@Override protected ConcurrentMap<Object, DocSet> buildFilterMap() {
// DocSet are not really stored with strong reference only when searching on them...
// Filter might be stored in query cache
MapMaker mapMaker = new MapMaker().softValues();
Expand Down
Expand Up @@ -47,7 +47,7 @@
*/
public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComponent implements FilterCache, IndexReader.ReaderFinishedListener {

final ConcurrentMap<Object, FilterCacheValue<ConcurrentMap<Filter, DocSet>>> cache;
final ConcurrentMap<Object, FilterCacheValue<ConcurrentMap<Object, DocSet>>> cache;

final boolean labEnabled;
final ByteSizeValue labMaxAlloc;
Expand Down Expand Up @@ -75,11 +75,11 @@ protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings
this.labChunkSizeBytes = (int) (labChunkSize.bytes() / RamUsage.NUM_BYTES_LONG);
}

protected ConcurrentMap<Object, FilterCacheValue<ConcurrentMap<Filter, DocSet>>> buildCache() {
return new ConcurrentHashMap<Object, FilterCacheValue<ConcurrentMap<Filter, DocSet>>>();
protected ConcurrentMap<Object, FilterCacheValue<ConcurrentMap<Object, DocSet>>> buildCache() {
return new ConcurrentHashMap<Object, FilterCacheValue<ConcurrentMap<Object, DocSet>>>();
}

protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
protected ConcurrentMap<Object, DocSet> buildFilterMap() {
return newConcurrentMap();
}

Expand All @@ -92,15 +92,15 @@ protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
}

@Override public void finished(IndexReader reader) {
FilterCacheValue<ConcurrentMap<Filter, DocSet>> readerValue = cache.remove(reader.getCoreCacheKey());
FilterCacheValue<ConcurrentMap<Object, DocSet>> readerValue = cache.remove(reader.getCoreCacheKey());
// help soft/weak handling GC
if (readerValue != null) {
readerValue.value().clear();
}
}

@Override public void clear(IndexReader reader) {
FilterCacheValue<ConcurrentMap<Filter, DocSet>> readerValue = cache.remove(reader.getCoreCacheKey());
FilterCacheValue<ConcurrentMap<Object, DocSet>> readerValue = cache.remove(reader.getCoreCacheKey());
// help soft/weak handling GC
if (readerValue != null) {
readerValue.value().clear();
Expand All @@ -111,7 +111,7 @@ protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
long sizeInBytes = 0;
long totalCount = 0;
int segmentsCount = 0;
for (FilterCacheValue<ConcurrentMap<Filter, DocSet>> readerValue : cache.values()) {
for (FilterCacheValue<ConcurrentMap<Object, DocSet>> readerValue : cache.values()) {
segmentsCount++;
for (DocSet docSet : readerValue.value().values()) {
sizeInBytes += docSet.sizeInBytes();
Expand Down Expand Up @@ -151,27 +151,32 @@ static class FilterCacheFilterWrapper extends Filter {
}

@Override public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
FilterCacheValue<ConcurrentMap<Filter, DocSet>> cacheValue = cache.cache.get(reader.getCoreCacheKey());
FilterCacheValue<ConcurrentMap<Object, DocSet>> cacheValue = cache.cache.get(reader.getCoreCacheKey());
if (cacheValue == null) {
LongsLAB longsLAB = null;
if (cache.labEnabled) {
longsLAB = new LongsLAB(cache.labChunkSizeBytes, cache.labMaxAllocBytes);
}
cacheValue = new FilterCacheValue<ConcurrentMap<Filter, DocSet>>(cache.buildFilterMap(), longsLAB);
FilterCacheValue<ConcurrentMap<Filter, DocSet>> prev = cache.cache.putIfAbsent(reader.getCoreCacheKey(), cacheValue);
cacheValue = new FilterCacheValue<ConcurrentMap<Object, DocSet>>(cache.buildFilterMap(), longsLAB);
FilterCacheValue<ConcurrentMap<Object, DocSet>> prev = cache.cache.putIfAbsent(reader.getCoreCacheKey(), cacheValue);
if (prev != null) {
cacheValue = prev;
} else {
reader.addReaderFinishedListener(cache);
}
}
DocSet docSet = cacheValue.value().get(filter);
Object key = filter;
if (filter instanceof CacheKeyFilter) {
key = ((CacheKeyFilter) filter).cacheKey();
}

DocSet docSet = cacheValue.value().get(key);
if (docSet != null) {
return docSet;
}
DocIdSet docIdSet = filter.getDocIdSet(reader);
docSet = FilterCacheValue.cacheable(reader, cacheValue.longsLAB(), docIdSet);
DocSet prev = cacheValue.value().putIfAbsent(filter, docSet);
DocSet prev = cacheValue.value().putIfAbsent(key, docSet);
if (prev != null) {
docSet = prev;
}
Expand Down
Expand Up @@ -164,7 +164,11 @@ static class FilterCacheFilterWrapper extends Filter {
}

@Override public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
FilterCacheKey cacheKey = new FilterCacheKey(reader.getCoreCacheKey(), filter);
Object filterKey = filter;
if (filter instanceof CacheKeyFilter) {
filterKey = ((CacheKeyFilter) filter).cacheKey();
}
FilterCacheKey cacheKey = new FilterCacheKey(reader.getCoreCacheKey(), filterKey);
ConcurrentMap<FilterCacheKey, FilterCacheValue<DocSet>> innerCache = cache.cache();

FilterCacheValue<DocSet> cacheValue = innerCache.get(cacheKey);
Expand Down Expand Up @@ -223,30 +227,30 @@ public static class FilterCacheValueWeigher implements Weigher<FilterCacheValue<

public static class FilterCacheKey {
private final Object readerKey;
private final Filter filter;
private final Object filterKey;

public FilterCacheKey(Object readerKey, Filter filter) {
public FilterCacheKey(Object readerKey, Object filterKey) {
this.readerKey = readerKey;
this.filter = filter;
this.filterKey = filterKey;
}

public Object readerKey() {
return readerKey;
}

public Filter filter() {
return filter;
public Object filterKey() {
return filterKey;
}

@Override public boolean equals(Object o) {
if (this == o) return true;
// if (o == null || getClass() != o.getClass()) return false;
FilterCacheKey that = (FilterCacheKey) o;
return (readerKey == that.readerKey && filter.equals(that.filter));
return (readerKey == that.readerKey && filterKey.equals(that.filterKey));
}

@Override public int hashCode() {
return readerKey.hashCode() + 31 * filter().hashCode();
return readerKey.hashCode() + 31 * filterKey.hashCode();
}
}
}
@@ -0,0 +1,103 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.cache.filter.support;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.Filter;
import org.elasticsearch.common.Unicode;

import java.io.IOException;
import java.util.Arrays;

public interface CacheKeyFilter {

public static class Key {

private final byte[] bytes;

// we pre-compute the hashCode for better performance (especially in IdCache)
private final int hashCode;

public Key(byte[] bytes) {
this.bytes = bytes;
this.hashCode = Arrays.hashCode(bytes);
}

public Key(String str) {
this(Unicode.fromStringAsBytes(str));
}

public byte[] bytes() {
return this.bytes;
}

public String utf8ToString() {
return Unicode.fromBytes(bytes);
}

@Override public boolean equals(Object o) {
if (this == o) return true;
if (o.getClass() != this.getClass()) {
return false;
}
Key bytesWrap = (Key) o;
return Arrays.equals(bytes, bytesWrap.bytes);
}

@Override public int hashCode() {
return hashCode;
}
}

public static class Wrapper extends Filter implements CacheKeyFilter {

private final Filter filter;

private final Key key;

public Wrapper(Filter filter, Key key) {
this.filter = filter;
this.key = key;
}

@Override public Key cacheKey() {
return key;
}

@Override public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
return filter.getDocIdSet(reader);
}

@Override public int hashCode() {
return filter.hashCode();
}

@Override public boolean equals(Object obj) {
return filter.equals(obj);
}

@Override public String toString() {
return filter.toString();
}
}

Key cacheKey();
}
Expand Up @@ -68,7 +68,7 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements
super.close();
}

@Override protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
@Override protected ConcurrentMap<Object, DocSet> buildFilterMap() {
MapMaker mapMaker = new MapMaker().weakValues();
if (maxSize != -1) {
mapMaker.maximumSize(maxSize);
Expand Down
Expand Up @@ -125,7 +125,7 @@ public Term term(String value) {
}

@Override public Query fieldQuery(String value, QueryParseContext context) {
return new DeletionAwareConstantScoreQuery(context.cacheFilter(fieldFilter(value)));
return new DeletionAwareConstantScoreQuery(context.cacheFilter(fieldFilter(value), null));
}

@Override public boolean useFieldQueryWithQueryString() {
Expand Down
Expand Up @@ -35,6 +35,7 @@ public class AndFilterBuilder extends BaseFilterBuilder {
private ArrayList<FilterBuilder> filters = Lists.newArrayList();

private Boolean cache;
private String cacheKey;

private String filterName;

Expand All @@ -60,6 +61,11 @@ public AndFilterBuilder cache(boolean cache) {
return this;
}

public AndFilterBuilder cacheKey(String cacheKey) {
this.cacheKey = cacheKey;
return this;
}

/**
* Sets the filter name for the filter that can be used when searching for matched_filters per hit.
*/
Expand All @@ -78,6 +84,9 @@ public AndFilterBuilder filterName(String filterName) {
if (cache != null) {
builder.field("_cache", cache);
}
if (cacheKey != null) {
builder.field("_cache_key", cacheKey);
}
if (filterName != null) {
builder.field("_name", filterName);
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.search.AndFilter;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.cache.filter.support.CacheKeyFilter;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -49,6 +50,7 @@ public class AndFilterParser implements FilterParser {
ArrayList<Filter> filters = newArrayList();

boolean cache = false;
CacheKeyFilter.Key cacheKey = null;

String filterName = null;
String currentFieldName = null;
Expand Down Expand Up @@ -76,6 +78,8 @@ public class AndFilterParser implements FilterParser {
cache = parser.booleanValue();
} else if ("_name".equals(currentFieldName)) {
filterName = parser.text();
} else if ("_cache_key".equals(currentFieldName) || "_cacheKey".equals(currentFieldName)) {
cacheKey = new CacheKeyFilter.Key(parser.text());
}
}
}
Expand All @@ -88,7 +92,7 @@ public class AndFilterParser implements FilterParser {
// no need to cache this one
Filter filter = new AndFilter(filters);
if (cache) {
filter = parseContext.cacheFilter(filter);
filter = parseContext.cacheFilter(filter, cacheKey);
}
if (filterName != null) {
parseContext.addNamedFilter(filterName, filter);
Expand Down
Expand Up @@ -35,6 +35,7 @@ public class BoolFilterBuilder extends BaseFilterBuilder {
private ArrayList<Clause> clauses = new ArrayList<Clause>();

private Boolean cache;
private String cacheKey;

private String filterName;

Expand Down Expand Up @@ -80,6 +81,11 @@ public BoolFilterBuilder cache(boolean cache) {
return this;
}

public BoolFilterBuilder cacheKey(String cacheKey) {
this.cacheKey = cacheKey;
return this;
}

@Override protected void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("bool");
for (Clause clause : clauses) {
Expand All @@ -100,6 +106,9 @@ public BoolFilterBuilder cache(boolean cache) {
if (cache != null) {
builder.field("_cache", cache);
}
if (cacheKey != null) {
builder.field("_cache_key", cacheKey);
}
builder.endObject();
}

Expand Down

0 comments on commit fb12a1f

Please sign in to comment.