Skip to content

Commit

Permalink
Clear cache: allow to invalidate specific filter cache keys
Browse files Browse the repository at this point in the history
closes #2653
  • Loading branch information
kimchy committed Feb 14, 2013
1 parent 4b6b189 commit c8aa0b7
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 50 deletions.
Expand Up @@ -34,8 +34,8 @@ public class ClearIndicesCacheRequest extends BroadcastOperationRequest<ClearInd
private boolean filterCache = false;
private boolean fieldDataCache = false;
private boolean idCache = false;
private boolean bloomCache = false;
private String[] fields = null;
private String[] filterKeys = null;

ClearIndicesCacheRequest() {
}
Expand Down Expand Up @@ -73,21 +73,21 @@ public String[] fields() {
return this.fields;
}

public boolean idCache() {
return this.idCache;
public ClearIndicesCacheRequest filterKeys(String... filterKeys) {
this.filterKeys = filterKeys;
return this;
}

public ClearIndicesCacheRequest idCache(boolean idCache) {
this.idCache = idCache;
return this;
public String[] filterKeys() {
return this.filterKeys;
}

public boolean bloomCache() {
return this.bloomCache;
public boolean idCache() {
return this.idCache;
}

public ClearIndicesCacheRequest bloomCache(boolean bloomCache) {
this.bloomCache = bloomCache;
public ClearIndicesCacheRequest idCache(boolean idCache) {
this.idCache = idCache;
return this;
}

Expand All @@ -96,29 +96,16 @@ public void readFrom(StreamInput in) throws IOException {
filterCache = in.readBoolean();
fieldDataCache = in.readBoolean();
idCache = in.readBoolean();
bloomCache = in.readBoolean();
int size = in.readVInt();
if (size > 0) {
fields = new String[size];
for (int i = 0; i < size; i++) {
fields[i] = in.readUTF();
}
}
fields = in.readStringArray();
filterKeys = in.readStringArray();
}

public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(filterCache);
out.writeBoolean(fieldDataCache);
out.writeBoolean(idCache);
out.writeBoolean(bloomCache);
if (fields == null) {
out.writeVInt(0);
} else {
out.writeVInt(fields.length);
for (String field : fields) {
out.writeUTF(field);
}
}
out.writeStringArrayNullable(fields);
out.writeStringArrayNullable(filterKeys);
}
}
Expand Up @@ -48,6 +48,11 @@ public ClearIndicesCacheRequestBuilder setFields(String... fields) {
return this;
}

public ClearIndicesCacheRequestBuilder setFilterKeys(String... filterKeys) {
request.filterKeys(filterKeys);
return this;
}

public ClearIndicesCacheRequestBuilder setIdCache(boolean idCache) {
request.idCache(idCache);
return this;
Expand Down
Expand Up @@ -33,8 +33,8 @@ class ShardClearIndicesCacheRequest extends BroadcastShardOperationRequest {
private boolean filterCache = false;
private boolean fieldDataCache = false;
private boolean idCache = false;
private boolean bloomCache = false;
private String[] fields = null;
private String[] filterKeys = null;

ShardClearIndicesCacheRequest() {
}
Expand All @@ -44,8 +44,8 @@ public ShardClearIndicesCacheRequest(String index, int shardId, ClearIndicesCach
filterCache = request.filterCache();
fieldDataCache = request.fieldDataCache();
idCache = request.idCache();
bloomCache = request.bloomCache();
fields = request.fields();
filterKeys = request.filterKeys();
}

public boolean filterCache() {
Expand All @@ -60,14 +60,14 @@ public boolean idCache() {
return this.idCache;
}

public boolean bloomCache() {
return this.bloomCache;
}

public String[] fields() {
return this.fields;
}

public String[] filterKeys() {
return this.filterKeys;
}

public ShardClearIndicesCacheRequest waitForOperations(boolean waitForOperations) {
this.filterCache = waitForOperations;
return this;
Expand All @@ -79,14 +79,8 @@ public void readFrom(StreamInput in) throws IOException {
filterCache = in.readBoolean();
fieldDataCache = in.readBoolean();
idCache = in.readBoolean();
bloomCache = in.readBoolean();
int size = in.readVInt();
if (size > 0) {
fields = new String[size];
for (int i = 0; i < size; i++) {
fields[i] = in.readUTF();
}
}
fields = in.readStringArray();
filterKeys = in.readStringArray();
}

@Override
Expand All @@ -95,14 +89,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(filterCache);
out.writeBoolean(fieldDataCache);
out.writeBoolean(idCache);
out.writeBoolean(bloomCache);
if (fields == null) {
out.writeVInt(0);
} else {
out.writeVInt(fields.length);
for (String field : fields) {
out.writeUTF(field);
}
}
out.writeStringArrayNullable(fields);
out.writeStringArrayNullable(filterKeys);
}
}
Expand Up @@ -124,6 +124,10 @@ protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRe
clearedAtLeastOne = true;
service.cache().filter().clear("api");
}
if (request.filterKeys() != null && request.filterKeys().length > 0) {
clearedAtLeastOne = true;
service.cache().filter().clear("api", request.filterKeys());
}
if (request.fieldDataCache()) {
clearedAtLeastOne = true;
if (request.fields() == null || request.fields().length == 0) {
Expand Down
Expand Up @@ -49,6 +49,8 @@ public EntriesStats(long sizeInBytes, long count) {

void clear(String reason);

void clear(String reason, String[] keys);

EntriesStats entriesStats();

long evictions();
Expand Down
Expand Up @@ -64,6 +64,11 @@ public void clear(String reason) {
// nothing to do here
}

@Override
public void clear(String reason, String[] keys) {
// nothing to do there
}

@Override
public void clear(IndexReader reader) {
// nothing to do here
Expand Down
Expand Up @@ -85,6 +85,16 @@ public void clear(String reason) {
}
}

@Override
public void clear(String reason, String[] keys) {
logger.debug("clear keys [], reason [{}]", reason, keys);
for (String key : keys) {
for (Object readerKey : seenReaders.keySet()) {
indicesFilterCache.cache().invalidate(new FilterCacheKey(this, readerKey, new CacheKeyFilter.Key(key)));
}
}
}

@Override
public void onClose(SegmentReader owner) {
clear(owner);
Expand Down
Expand Up @@ -68,6 +68,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel)
clearIndicesCacheRequest.idCache(request.paramAsBoolean("id", clearIndicesCacheRequest.idCache()));
clearIndicesCacheRequest.bloomCache(request.paramAsBoolean("bloom", clearIndicesCacheRequest.bloomCache()));
clearIndicesCacheRequest.fields(request.paramAsStringArray("fields", clearIndicesCacheRequest.fields()));
clearIndicesCacheRequest.filterKeys(request.paramAsStringArray("filter_keys", clearIndicesCacheRequest.filterKeys()));

BroadcastOperationThreading operationThreading = BroadcastOperationThreading.fromString(request.param("operationThreading"), BroadcastOperationThreading.SINGLE_THREAD);
if (operationThreading == BroadcastOperationThreading.NO_THREADS) {
Expand Down
@@ -0,0 +1,79 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.test.integration.indices.cache;

import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

/**
*/
@Test
public class ClearCacheTests extends AbstractNodesTests {

private Client client;

@BeforeClass
public void createNodes() throws Exception {
startNode("node1", ImmutableSettings.settingsBuilder().put("index.cache.stats.refresh_interval", 0));
client = getClient();
}

@AfterClass
public void closeNodes() {
client.close();
closeAllNodes();
}

protected Client getClient() {
return client("node1");
}

@Test
public void testClearCacheFilterKeys() {
client.admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet();
client.prepareIndex("test", "type", "1").setSource("field", "value").execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();

NodesStatsResponse nodesStats = client.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.nodes()[0].indices().getCache().filterSizeInBytes(), equalTo(0l));

SearchResponse searchResponse = client.prepareSearch().setQuery(filteredQuery(matchAllQuery(), FilterBuilders.termFilter("field", "value").cacheKey("test_key"))).execute().actionGet();
assertThat(searchResponse.hits().getHits().length, equalTo(1));
nodesStats = client.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.nodes()[0].indices().getCache().filterSizeInBytes(), greaterThan(0l));

client.admin().indices().prepareClearCache().setFilterKeys("test_key").execute().actionGet();
nodesStats = client.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.nodes()[0].indices().getCache().filterSizeInBytes(), equalTo(0l));
}
}

0 comments on commit c8aa0b7

Please sign in to comment.