Skip to content

Commit

Permalink
Added num_queries and memory_size stats to percolate stats.
Browse files Browse the repository at this point in the history
Closes #3883
  • Loading branch information
martijnvg committed Oct 15, 2013
1 parent 48656fd commit c1ec32a
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.index.mapper.DocumentTypeListener;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.settings.IndexSettings;
Expand All @@ -32,6 +33,7 @@
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.percolator.PercolatorService;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
Expand All @@ -51,6 +53,7 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
private final IndexFieldDataService indexFieldDataService;

private final ShardIndexingService indexingService;
private final ShardPercolateService shardPercolateService;

private final ConcurrentMap<HashedBytesRef, Query> percolateQueries = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
private final ShardLifecycleListener shardLifecycleListener = new ShardLifecycleListener();
Expand All @@ -62,14 +65,15 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent {
@Inject
public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService,
ShardIndexingService indexingService, IndicesLifecycle indicesLifecycle, MapperService mapperService,
IndexCache indexCache, IndexFieldDataService indexFieldDataService) {
IndexCache indexCache, IndexFieldDataService indexFieldDataService, ShardPercolateService shardPercolateService) {
super(shardId, indexSettings);
this.queryParserService = queryParserService;
this.mapperService = mapperService;
this.indicesLifecycle = indicesLifecycle;
this.indexingService = indexingService;
this.indexCache = indexCache;
this.indexFieldDataService = indexFieldDataService;
this.shardPercolateService = shardPercolateService;

indicesLifecycle.addListener(shardLifecycleListener);
mapperService.addTypeListener(percolateTypeListener);
Expand Down Expand Up @@ -105,12 +109,18 @@ void disableRealTimePercolator() {
}

public void addPercolateQuery(String idAsString, BytesReference source) {
Query query = parsePercolatorDocument(idAsString, source);
percolateQueries.put(new HashedBytesRef(new BytesRef(idAsString)), query);
Query newquery = parsePercolatorDocument(idAsString, source);
HashedBytesRef id = new HashedBytesRef(new BytesRef(idAsString));
Query previousQuery = percolateQueries.put(id, newquery);
shardPercolateService.addedQuery(id, previousQuery, newquery);
}

public void removePercolateQuery(String idAsString) {
percolateQueries.remove(new HashedBytesRef(idAsString));
HashedBytesRef id =new HashedBytesRef(idAsString) ;
Query query = percolateQueries.remove(id);
if (query != null) {
shardPercolateService.removedQuery(id, query);
}
}

Query parsePercolatorDocument(String id, BytesReference source) {
Expand Down Expand Up @@ -234,9 +244,13 @@ private void loadQueries(IndexShard shard) {
new TermFilter(new Term(TypeFieldMapper.NAME, PercolatorService.Constants.TYPE_NAME))
)
);
QueriesLoaderCollector queries = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, indexFieldDataService);
searcher.searcher().search(query, queries);
percolateQueries.putAll(queries.queries());
QueriesLoaderCollector queryCollector = new QueriesLoaderCollector(PercolatorQueriesRegistry.this, logger, indexFieldDataService);
searcher.searcher().search(query, queryCollector);
Map<HashedBytesRef, Query> queries = queryCollector.queries();
for (Map.Entry<HashedBytesRef, Query> entry : queries.entrySet()) {
Query previousQuery = percolateQueries.put(entry.getKey(), entry.getValue());
shardPercolateService.addedQuery(entry.getKey(), previousQuery, entry.getValue());
}
} finally {
searcher.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -11,44 +12,87 @@
import java.io.IOException;

/**
* Exposes percolator related statistics.
*/
public class PercolateStats implements Streamable, ToXContent {

private long percolateCount;
private long percolateTimeInMillis;
private long current;
private long memorySizeInBytes;
private long numQueries;

/**
* Noop constructor for serialazation purposes.
*/
public PercolateStats() {
}

public PercolateStats(long percolateCount, long percolateTimeInMillis, long current) {
PercolateStats(long percolateCount, long percolateTimeInMillis, long current, long memorySizeInBytes, long numQueries) {
this.percolateCount = percolateCount;
this.percolateTimeInMillis = percolateTimeInMillis;
this.current = current;
this.memorySizeInBytes = memorySizeInBytes;
this.numQueries = numQueries;
}

/**
* @return The number of times the percolate api has been invoked.
*/
public long getCount() {
return percolateCount;
}

/**
* @return The total amount of time spend in the percolate api
*/
public long getTimeInMillis() {
return percolateTimeInMillis;
}

/**
* @return The total amount of time spend in the percolate api
*/
public TimeValue getTime() {
return new TimeValue(getTimeInMillis());
}

/**
* @return The total amount of active percolate api invocations.
*/
public long getCurrent() {
return current;
}

/**
* @return The total number of loaded percolate queries.
*/
public long getNumQueries() {
return numQueries;
}

/**
* @return The total size the loaded queries take in memory.
*/
public long getMemorySizeInBytes() {
return memorySizeInBytes;
}

/**
* @return The total size the loaded queries take in memory.
*/
public ByteSizeValue getMemorySize() {
return new ByteSizeValue(memorySizeInBytes);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.PERCOLATE);
builder.field(Fields.TOTAL, percolateCount);
builder.timeValueField(Fields.TIME_IN_MILLIS, Fields.TIME, percolateTimeInMillis);
builder.field(Fields.CURRENT, current);
builder.field(Fields.MEMORY_SIZE_IN_BYTES, memorySizeInBytes);
builder.field(Fields.MEMORY_SIZE, getMemorySize());
builder.endObject();
return builder;
}
Expand All @@ -61,6 +105,8 @@ public void add(PercolateStats percolate) {
percolateCount += percolate.getCount();
percolateTimeInMillis += percolate.getTimeInMillis();
current += percolate.getCurrent();
memorySizeInBytes += percolate.getMemorySizeInBytes();
numQueries += percolate.getNumQueries();
}

static final class Fields {
Expand All @@ -69,6 +115,8 @@ static final class Fields {
static final XContentBuilderString TIME = new XContentBuilderString("getTime");
static final XContentBuilderString TIME_IN_MILLIS = new XContentBuilderString("time_in_millis");
static final XContentBuilderString CURRENT = new XContentBuilderString("current");
static final XContentBuilderString MEMORY_SIZE_IN_BYTES = new XContentBuilderString("memory_size_in_bytes");
static final XContentBuilderString MEMORY_SIZE = new XContentBuilderString("memory_size");
}

public static PercolateStats readPercolateStats(StreamInput in) throws IOException {
Expand All @@ -82,12 +130,16 @@ public void readFrom(StreamInput in) throws IOException {
percolateCount = in.readVLong();
percolateTimeInMillis = in.readVLong();
current = in.readVLong();
memorySizeInBytes = in.readVLong();
numQueries = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(percolateCount);
out.writeVLong(percolateTimeInMillis);
out.writeVLong(current);
out.writeVLong(memorySizeInBytes);
out.writeVLong(numQueries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

package org.elasticsearch.index.percolator.stats;

import org.apache.lucene.search.Query;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.HashedBytesRef;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -30,6 +33,13 @@
import java.util.concurrent.TimeUnit;

/**
* Shard level percolator service that maintains percolator metrics:
* <ul>
* <li> total time spent in percolate api
* <li> the current number of percolate requests
* <li> number of registered percolate queries
* <li> the estimated amount of memory the registered queries take
* </ul>
*/
public class ShardPercolateService extends AbstractIndexShardComponent {

Expand All @@ -41,6 +51,9 @@ public ShardPercolateService(ShardId shardId, @IndexSettings Settings indexSetti
private final MeanMetric percolateMetric = new MeanMetric();
private final CounterMetric currentMetric = new CounterMetric();

private final CounterMetric numberOfQueries = new CounterMetric();
private final CounterMetric memorySizeInBytes = new CounterMetric();

public void prePercolate() {
currentMetric.inc();
}
Expand All @@ -50,8 +63,31 @@ public void postPercolate(long tookInNanos) {
percolateMetric.inc(tookInNanos);
}

public void addedQuery(HashedBytesRef id, Query previousQuery, Query newQuery) {
if (previousQuery != null) {
memorySizeInBytes.dec(computeSizeInMemory(id, previousQuery));
} else {
numberOfQueries.inc();
}
memorySizeInBytes.inc(computeSizeInMemory(id, newQuery));
}

public void removedQuery(HashedBytesRef id, Query query) {
numberOfQueries.dec();
memorySizeInBytes.dec(computeSizeInMemory(id, query));
}

/**
* @return The current metrics
*/
public PercolateStats stats() {
return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count());
return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count(), memorySizeInBytes.count(), numberOfQueries.count());
}

private static long computeSizeInMemory(HashedBytesRef id, Query query) {
long size = (3 * RamUsageEstimator.NUM_BYTES_INT) + RamUsageEstimator.NUM_BYTES_OBJECT_REF + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + id.bytes.bytes.length;
size += RamUsageEstimator.sizeOf(query);
return size;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ public void testPercolateStatistics() throws Exception {
IndicesStatsResponse indicesResponse = client().admin().indices().prepareStats("test").execute().actionGet();
assertThat(indicesResponse.getTotal().getPercolate().getCount(), equalTo(5l)); // We have 5 partitions
assertThat(indicesResponse.getTotal().getPercolate().getCurrent(), equalTo(0l));
assertThat(indicesResponse.getTotal().getPercolate().getNumQueries(), equalTo(2l)); // One primary and replica
assertThat(indicesResponse.getTotal().getPercolate().getMemorySizeInBytes(), greaterThan(0l));

NodesStatsResponse nodesResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();
long percolateCount = 0;
Expand All @@ -567,6 +569,8 @@ public void testPercolateStatistics() throws Exception {
indicesResponse = client().admin().indices().prepareStats().setPercolate(true).execute().actionGet();
assertThat(indicesResponse.getTotal().getPercolate().getCount(), equalTo(10l));
assertThat(indicesResponse.getTotal().getPercolate().getCurrent(), equalTo(0l));
assertThat(indicesResponse.getTotal().getPercolate().getNumQueries(), equalTo(2l));
assertThat(indicesResponse.getTotal().getPercolate().getMemorySizeInBytes(), greaterThan(0l));

percolateCount = 0;
nodesResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();
Expand Down

0 comments on commit c1ec32a

Please sign in to comment.