From 5be0d83709ec31e3f6d6438a84110b59ea7ea81b Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 5 Feb 2020 19:36:28 +0000 Subject: [PATCH] Cache completion stats between refreshes Computing the stats for completion fields may involve a significant amount of work since it walks every field of every segment looking for completion fields. Innocuous-looking APIs like `GET _stats` or `GET _cluster/stats` do this for every shard in the cluster. This repeated work is unnecessary since these stats do not change between refreshes; in many indices they remain constant for a long time. This commit introduces a cache for these stats which is invalidated on a refresh, allowing most stats calls to bypass the work needed to compute them on most shards. Closes #51915 --- .../indices.stats/40_updates_on_refresh.yml | 66 +++++ .../index/engine/CompletionStatsCache.java | 115 +++++++++ .../elasticsearch/index/engine/Engine.java | 35 +-- .../index/engine/InternalEngine.java | 1 + .../elasticsearch/index/shard/IndexShard.java | 6 +- .../engine/CompletionStatsCacheTests.java | 230 ++++++++++++++++++ 6 files changed, 418 insertions(+), 35 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/40_updates_on_refresh.yml create mode 100644 server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java create mode 100644 server/src/test/java/org/elasticsearch/index/engine/CompletionStatsCacheTests.java diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/40_updates_on_refresh.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/40_updates_on_refresh.yml new file mode 100644 index 0000000000000..0750c8cc8c401 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/40_updates_on_refresh.yml @@ -0,0 +1,66 @@ +--- +setup: + + - do: + indices.create: + index: test1 + wait_for_active_shards: all + body: + settings: + # Limit the number of shards so that shards are unlikely + # to be relocated or being initialized between the test + # set up and the test execution + index.number_of_shards: 3 + index.number_of_replicas: 0 + mappings: + properties: + bar: + type: text + fielddata: true + fields: + completion: + type: completion + + - do: + cluster.health: + wait_for_no_relocating_shards: true + + - do: + index: + index: test1 + id: 1 + body: { "bar": "bar" } + + - do: + index: + index: test1 + id: 2 + body: { "bar": "foo" } + + - do: + indices.refresh: {} + +--- +"Completion stats": + - do: + indices.stats: { completion_fields: "*" } + + - match: { _shards.failed: 0} + - gt: { _all.total.completion.fields.bar\.completion.size_in_bytes: 0 } + - gt: { _all.total.completion.size_in_bytes: 0 } + - set: { _all.total.completion.size_in_bytes: original_size } + + - do: + index: + index: test1 + id: 3 + body: { "bar": "foo", "baz": "foo" } + + - do: + indices.refresh: {} + + - do: + indices.stats: { completion_fields: "*" } + + - match: { _shards.failed: 0} + - gt: { _all.total.completion.size_in_bytes: $original_size } diff --git a/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java b/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java new file mode 100644 index 0000000000000..3f917e05eb8ee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/CompletionStatsCache.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import com.carrotsearch.hppc.ObjectLongHashMap; +import com.carrotsearch.hppc.cursors.ObjectLongCursor; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.Terms; +import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.suggest.document.CompletionTerms; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.FieldMemoryStats; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.search.suggest.completion.CompletionStats; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +class CompletionStatsCache implements ReferenceManager.RefreshListener { + + private final Supplier searcherSupplier; + + /** + * Contains a future (i.e. non-null) if another thread is already computing stats, in which case wait for this computation to + * complete. Contains null otherwise, in which case compute the stats ourselves and save them here for other threads to use. + * Futures are eventually completed with stats that include all fields, requiring further filtering (see + * {@link CompletionStatsCache#filterCompletionStatsByFieldName}). + */ + private final AtomicReference> completionStatsFutureRef = new AtomicReference<>(); + + CompletionStatsCache(Supplier searcherSupplier) { + this.searcherSupplier = searcherSupplier; + } + + CompletionStats get(String... fieldNamePatterns) { + final PlainActionFuture newFuture = new PlainActionFuture<>(); + final PlainActionFuture oldFuture = completionStatsFutureRef.compareAndExchange(null, newFuture); + + if (oldFuture != null) { + // we lost the race, someone else is already computing stats, so we wait for that to finish + return filterCompletionStatsByFieldName(fieldNamePatterns, oldFuture.actionGet()); + } + + // we won the race, nobody else is already computing stats, so it's up to us + ActionListener.completeWith(newFuture, () -> { + long sizeInBytes = 0; + final ObjectLongHashMap completionFields = new ObjectLongHashMap<>(); + + try (Engine.Searcher currentSearcher = searcherSupplier.get()) { + for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) { + LeafReader atomicReader = atomicReaderContext.reader(); + for (FieldInfo info : atomicReader.getFieldInfos()) { + Terms terms = atomicReader.terms(info.name); + if (terms instanceof CompletionTerms) { + // TODO: currently we load up the suggester for reporting its size + final long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed(); + completionFields.addTo(info.name, fstSize); + sizeInBytes += fstSize; + } + } + } + } + + return new CompletionStats(sizeInBytes, new FieldMemoryStats(completionFields)); + }); + + return filterCompletionStatsByFieldName(fieldNamePatterns, newFuture.actionGet()); + } + + private static CompletionStats filterCompletionStatsByFieldName(String[] fieldNamePatterns, CompletionStats fullCompletionStats) { + final FieldMemoryStats fieldMemoryStats; + if (fieldNamePatterns != null && fieldNamePatterns.length > 0) { + final ObjectLongHashMap completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length); + for (ObjectLongCursor fieldCursor : fullCompletionStats.getFields()) { + if (Regex.simpleMatch(fieldNamePatterns, fieldCursor.key)) { + completionFields.addTo(fieldCursor.key, fieldCursor.value); + } + } + fieldMemoryStats = new FieldMemoryStats(completionFields); + } else { + fieldMemoryStats = null; + } + return new CompletionStats(fullCompletionStats.getSizeInBytes(), fieldMemoryStats); + } + + @Override + public void beforeRefresh() { + } + + @Override + public void afterRefresh(boolean didRefresh) { + if (didRefresh) { + completionStatsFutureRef.set(null); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 4a7eb59469aac..e2da82f1d6ac5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -19,27 +19,22 @@ package org.elasticsearch.index.engine; -import com.carrotsearch.hppc.ObjectLongHashMap; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexReader; -import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.Term; -import org.apache.lucene.index.Terms; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.QueryCache; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.similarities.Similarity; -import org.apache.lucene.search.suggest.document.CompletionTerms; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -49,7 +44,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.FieldMemoryStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -62,7 +56,6 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion; import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.VersionType; @@ -177,32 +170,14 @@ public MergeStats getMergeStats() { /** Returns how many bytes we are currently moving from heap to disk */ public abstract long getWritingBytes(); + + final CompletionStatsCache completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); + /** * Returns the {@link CompletionStats} for this engine */ - public CompletionStats completionStats(String... fieldNamePatterns) throws IOException { - try (Searcher currentSearcher = acquireSearcher("completion_stats", SearcherScope.INTERNAL)) { - long sizeInBytes = 0; - ObjectLongHashMap completionFields = null; - if (fieldNamePatterns != null && fieldNamePatterns.length > 0) { - completionFields = new ObjectLongHashMap<>(fieldNamePatterns.length); - } - for (LeafReaderContext atomicReaderContext : currentSearcher.getIndexReader().leaves()) { - LeafReader atomicReader = atomicReaderContext.reader(); - for (FieldInfo info : atomicReader.getFieldInfos()) { - Terms terms = atomicReader.terms(info.name); - if (terms instanceof CompletionTerms) { - // TODO: currently we load up the suggester for reporting its size - long fstSize = ((CompletionTerms) terms).suggester().ramBytesUsed(); - if (Regex.simpleMatch(fieldNamePatterns, info.name)) { - completionFields.addTo(info.name, fstSize); - } - sizeInBytes += fstSize; - } - } - } - return new CompletionStats(sizeInBytes, completionFields == null ? null : new FieldMemoryStats(completionFields)); - } + public CompletionStats completionStats(String... fieldNamePatterns) { + return completionStatsCache.get(fieldNamePatterns); } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c73ecbc88144a..de831127a8f83 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -247,6 +247,7 @@ public InternalEngine(EngineConfig engineConfig) { } this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); this.internalReaderManager.addListener(lastRefreshedCheckpointListener); + this.externalReaderManager.addListener(completionStatsCache); maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { try (Searcher searcher = diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 759feb1d7c68e..44eb7516a3c58 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1026,11 +1026,7 @@ public TranslogStats translogStats() { public CompletionStats completionStats(String... fields) { readAllowed(); - try { - return getEngine().completionStats(fields); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + return getEngine().completionStats(fields); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/CompletionStatsCacheTests.java b/server/src/test/java/org/elasticsearch/index/engine/CompletionStatsCacheTests.java new file mode 100644 index 0000000000000..90645f0e6294d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/CompletionStatsCacheTests.java @@ -0,0 +1,230 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.engine; + +import org.apache.lucene.codecs.PostingsFormat; +import org.apache.lucene.codecs.lucene84.Lucene84Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.QueryCachingPolicy; +import org.apache.lucene.search.suggest.document.Completion84PostingsFormat; +import org.apache.lucene.search.suggest.document.SuggestField; +import org.apache.lucene.store.Directory; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class CompletionStatsCacheTests extends ESTestCase { + + public void testCompletionStatsCache() throws IOException, InterruptedException { + final IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); + final PostingsFormat postingsFormat = new Completion84PostingsFormat(); + indexWriterConfig.setCodec(new Lucene84Codec() { + @Override + public PostingsFormat getPostingsFormatForField(String field) { + return postingsFormat; // all fields are suggest fields + } + }); + + final QueryCachingPolicy queryCachingPolicy = new QueryCachingPolicy() { + @Override + public void onUse(Query query) { + } + + @Override + public boolean shouldCache(Query query) { + return false; + } + }; + + try (Directory directory = newDirectory(); + IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { + + final Document document = new Document(); + document.add(new SuggestField("suggest1", "val", 1)); + document.add(new SuggestField("suggest2", "val", 1)); + document.add(new SuggestField("suggest2", "anotherval", 1)); + document.add(new SuggestField("otherfield", "val", 1)); + document.add(new SuggestField("otherfield", "anotherval", 1)); + document.add(new SuggestField("otherfield", "yetmoreval", 1)); + indexWriter.addDocument(document); + + final OpenCloseCounter openCloseCounter = new OpenCloseCounter(); + final CompletionStatsCache completionStatsCache = new CompletionStatsCache(() -> { + openCloseCounter.countOpened(); + try { + final DirectoryReader directoryReader = DirectoryReader.open(indexWriter); + return new Engine.Searcher("test", directoryReader, null, null, queryCachingPolicy, () -> { + openCloseCounter.countClosed(); + IOUtils.close(directoryReader); + }); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + + final int threadCount = 6; + final TestHarness testHarness = new TestHarness(completionStatsCache, threadCount); + final Thread[] threads = new Thread[threadCount]; + threads[0] = new Thread(() -> testHarness.getStats(0, "*")); + threads[1] = new Thread(() -> testHarness.getStats(1, "suggest1", "suggest2")); + threads[2] = new Thread(() -> testHarness.getStats(2, "sug*")); + threads[3] = new Thread(() -> testHarness.getStats(3, "no match*")); + threads[4] = new Thread(() -> testHarness.getStats(4)); + threads[5] = new Thread(() -> testHarness.getStats(5, (String[]) null)); + + for (Thread thread : threads) { + thread.start(); + } + + testHarness.start(); + + for (Thread thread : threads) { + thread.join(); + } + + // 0: "*" should match all fields: + final long suggest1Size = testHarness.getResult(0).getFields().get("suggest1"); + final long suggest2Size = testHarness.getResult(0).getFields().get("suggest2"); + final long otherFieldSize = testHarness.getResult(0).getFields().get("otherfield"); + final long totalSizeInBytes = testHarness.getResult(0).getSizeInBytes(); + assertThat(suggest1Size, greaterThan(0L)); + assertThat(suggest2Size, greaterThan(0L)); + assertThat(otherFieldSize, greaterThan(0L)); + assertThat(totalSizeInBytes, equalTo(suggest1Size + suggest2Size + otherFieldSize)); + + // 1: enumerating fields omits the other ones + assertThat(testHarness.getResult(1).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertThat(testHarness.getResult(1).getFields().get("suggest1"), equalTo(suggest1Size)); + assertThat(testHarness.getResult(1).getFields().get("suggest2"), equalTo(suggest2Size)); + assertFalse(testHarness.getResult(1).getFields().containsField("otherfield")); + + // 2: wildcards also exclude some fields + assertThat(testHarness.getResult(2).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertThat(testHarness.getResult(2).getFields().get("suggest1"), equalTo(suggest1Size)); + assertThat(testHarness.getResult(2).getFields().get("suggest2"), equalTo(suggest2Size)); + assertFalse(testHarness.getResult(2).getFields().containsField("otherfield")); + + // 3: non-matching wildcard returns empty set of fields + assertThat(testHarness.getResult(3).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertFalse(testHarness.getResult(3).getFields().containsField("suggest1")); + assertFalse(testHarness.getResult(3).getFields().containsField("suggest2")); + assertFalse(testHarness.getResult(3).getFields().containsField("otherfield")); + + // 4: no fields means per-fields stats is null + assertThat(testHarness.getResult(4).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertNull(testHarness.getResult(4).getFields()); + + // 5: null fields means per-fields stats is null + assertThat(testHarness.getResult(5).getSizeInBytes(), equalTo(totalSizeInBytes)); + assertNull(testHarness.getResult(5).getFields()); + + // the stats were only computed once + openCloseCounter.assertCount(1); + + // the stats are not recomputed on a refresh + completionStatsCache.afterRefresh(true); + openCloseCounter.assertCount(1); + + // but they are recomputed on the next get + completionStatsCache.get(); + openCloseCounter.assertCount(2); + + // and they do update + final Document document2 = new Document(); + document2.add(new SuggestField("suggest1", "foo", 1)); + document2.add(new SuggestField("suggest2", "bar", 1)); + document2.add(new SuggestField("otherfield", "baz", 1)); + indexWriter.addDocument(document2); + completionStatsCache.afterRefresh(true); + final CompletionStats updatedStats = completionStatsCache.get(); + assertThat(updatedStats.getSizeInBytes(), greaterThan(totalSizeInBytes)); + openCloseCounter.assertCount(3); + + // beforeRefresh does not invalidate the cache + completionStatsCache.beforeRefresh(); + completionStatsCache.get(); + openCloseCounter.assertCount(3); + + // afterRefresh does not invalidate the cache if no refresh took place + completionStatsCache.afterRefresh(false); + completionStatsCache.get(); + openCloseCounter.assertCount(3); + } + } + + private static class OpenCloseCounter { + private final AtomicInteger openCount = new AtomicInteger(); + private final AtomicInteger closeCount = new AtomicInteger(); + + void countOpened() { + openCount.incrementAndGet(); + } + + void countClosed() { + closeCount.incrementAndGet(); + } + + void assertCount(int expectedCount) { + assertThat(openCount.get(), equalTo(expectedCount)); + assertThat(closeCount.get(), equalTo(expectedCount)); + } + } + + private static class TestHarness { + private final CompletionStatsCache completionStatsCache; + private final CyclicBarrier cyclicBarrier; + private final CompletionStats[] results; + + TestHarness(CompletionStatsCache completionStatsCache, int resultCount) { + this.completionStatsCache = completionStatsCache; + results = new CompletionStats[resultCount]; + cyclicBarrier = new CyclicBarrier(resultCount + 1); + } + + void getStats(int threadIndex, String... fieldPatterns) { + start(); + results[threadIndex] = completionStatsCache.get(fieldPatterns); + } + + void start() { + try { + cyclicBarrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new AssertionError(e); + } + } + + CompletionStats getResult(int index) { + return results[index]; + } + } + +}