Skip to content

Commit

Permalink
Cache completion stats between refreshes
Browse files Browse the repository at this point in the history
Computing the stats for completion fields may involve a significant amount of
work since it walks every field of every segment looking for completion fields.
Innocuous-looking APIs like `GET _stats` or `GET _cluster/stats` do this for
every shard in the cluster. This repeated work is unnecessary since these stats
do not change between refreshes; in many indices they remain constant for a
long time.

This commit introduces a cache for these stats which is invalidated on a
refresh, allowing most stats calls to bypass the work needed to compute them on
most shards.

Closes elastic#51915
  • Loading branch information
DaveCTurner committed Feb 6, 2020
1 parent 0582bb4 commit 5be0d83
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
---
setup:

- do:
indices.create:
index: test1
wait_for_active_shards: all
body:
settings:
# Limit the number of shards so that shards are unlikely
# to be relocated or being initialized between the test
# set up and the test execution
index.number_of_shards: 3
index.number_of_replicas: 0
mappings:
properties:
bar:
type: text
fielddata: true
fields:
completion:
type: completion

- do:
cluster.health:
wait_for_no_relocating_shards: true

- do:
index:
index: test1
id: 1
body: { "bar": "bar" }

- do:
index:
index: test1
id: 2
body: { "bar": "foo" }

- do:
indices.refresh: {}

---
"Completion stats":
- do:
indices.stats: { completion_fields: "*" }

- match: { _shards.failed: 0}
- gt: { _all.total.completion.fields.bar\.completion.size_in_bytes: 0 }
- gt: { _all.total.completion.size_in_bytes: 0 }
- set: { _all.total.completion.size_in_bytes: original_size }

- do:
index:
index: test1
id: 3
body: { "bar": "foo", "baz": "foo" }

- do:
indices.refresh: {}

- do:
indices.stats: { completion_fields: "*" }

- match: { _shards.failed: 0}
- gt: { _all.total.completion.size_in_bytes: $original_size }
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectLongHashMap;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.suggest.document.CompletionTerms;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.search.suggest.completion.CompletionStats;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

class CompletionStatsCache implements ReferenceManager.RefreshListener {

private final Supplier<Engine.Searcher> searcherSupplier;

/**
* Contains a future (i.e. non-null) if another thread is already computing stats, in which case wait for this computation to
* complete. Contains null otherwise, in which case compute the stats ourselves and save them here for other threads to use.
* Futures are eventually completed with stats that include all fields, requiring further filtering (see
* {@link CompletionStatsCache#filterCompletionStatsByFieldName}).
*/
private final AtomicReference<PlainActionFuture<CompletionStats>> completionStatsFutureRef = new AtomicReference<>();

CompletionStatsCache(Supplier<Engine.Searcher> searcherSupplier) {
this.searcherSupplier = searcherSupplier;
}

CompletionStats get(String... fieldNamePatterns) {
final PlainActionFuture<CompletionStats> newFuture = new PlainActionFuture<>();
final PlainActionFuture<CompletionStats> oldFuture = completionStatsFutureRef.compareAndExchange(null, newFuture);

if (oldFuture != null) {
// we lost the race, someone else is already computing stats, so we wait for that to finish
return filterCompletionStatsByFieldName(fieldNamePatterns, oldFuture.actionGet());
}

// we won the race, nobody else is already computing stats, so it's up to us
ActionListener.completeWith(newFuture, () -> {
long sizeInBytes = 0;
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>();

try (Engine.Searcher currentSearcher = searcherSupplier.get()) {
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
LeafReader atomicReader = atomicReaderContext.reader();
for (FieldInfo info : atomicReader.getFieldInfos()) {
Terms terms = atomicReader.terms(info.name);
if (terms instanceof CompletionTerms) {
// TODO: currently we load up the suggester for reporting its size
final long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
completionFields.addTo(info.name, fstSize);
sizeInBytes += fstSize;
}
}
}
}

return new CompletionStats(sizeInBytes, new FieldMemoryStats(completionFields));
});

return filterCompletionStatsByFieldName(fieldNamePatterns, newFuture.actionGet());
}

private static CompletionStats filterCompletionStatsByFieldName(String[] fieldNamePatterns, CompletionStats fullCompletionStats) {
final FieldMemoryStats fieldMemoryStats;
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
final ObjectLongHashMap<String> completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
for (ObjectLongCursor<String> fieldCursor : fullCompletionStats.getFields()) {
if (Regex.simpleMatch(fieldNamePatterns, fieldCursor.key)) {
completionFields.addTo(fieldCursor.key, fieldCursor.value);
}
}
fieldMemoryStats = new FieldMemoryStats(completionFields);
} else {
fieldMemoryStats = null;
}
return new CompletionStats(fullCompletionStats.getSizeInBytes(), fieldMemoryStats);
}

@Override
public void beforeRefresh() {
}

@Override
public void afterRefresh(boolean didRefresh) {
if (didRefresh) {
completionStatsFutureRef.set(null);
}
}
}
35 changes: 5 additions & 30 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,22 @@

package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.ObjectLongHashMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.suggest.document.CompletionTerms;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
Expand All @@ -49,7 +44,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.FieldMemoryStats;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -62,7 +56,6 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
Expand Down Expand Up @@ -177,32 +170,14 @@ public MergeStats getMergeStats() {
/** Returns how many bytes we are currently moving from heap to disk */
public abstract long getWritingBytes();


final CompletionStatsCache completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));

/**
* Returns the {@link CompletionStats} for this engine
*/
public CompletionStats completionStats(String... fieldNamePatterns) throws IOException {
try (Searcher currentSearcher = acquireSearcher("completion_stats", SearcherScope.INTERNAL)) {
long sizeInBytes = 0;
ObjectLongHashMap<String> completionFields = null;
if (fieldNamePatterns != null && fieldNamePatterns.length > 0) {
completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length);
}
for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) {
LeafReader atomicReader = atomicReaderContext.reader();
for (FieldInfo info : atomicReader.getFieldInfos()) {
Terms terms = atomicReader.terms(info.name);
if (terms instanceof CompletionTerms) {
// TODO: currently we load up the suggester for reporting its size
long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed();
if (Regex.simpleMatch(fieldNamePatterns, info.name)) {
completionFields.addTo(info.name, fstSize);
}
sizeInBytes += fstSize;
}
}
}
return new CompletionStats(sizeInBytes, completionFields == null ? null : new FieldMemoryStats(completionFields));
}
public CompletionStats completionStats(String... fieldNamePatterns) {
return completionStatsCache.get(fieldNamePatterns);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public InternalEngine(EngineConfig engineConfig) {
}
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint());
this.internalReaderManager.addListener(lastRefreshedCheckpointListener);
this.externalReaderManager.addListener(completionStatsCache);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1026,11 +1026,7 @@ public TranslogStats translogStats() {

public CompletionStats completionStats(String... fields) {
readAllowed();
try {
return getEngine().completionStats(fields);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return getEngine().completionStats(fields);
}

/**
Expand Down
Loading

0 comments on commit 5be0d83

Please sign in to comment.