diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java index 1a4573cda19b7..f0f08496937c3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -34,9 +34,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; @@ -51,6 +51,7 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -480,10 +481,11 @@ public void testResyncPropagatePrimaryTerm() throws Exception { } } - public void testCommitIdInSearcher() throws Exception { + public void testSearcherId() throws Exception { final String indexName = "test_commit_id"; + final int numberOfShards = randomIntBetween(1, 5); createIndex(indexName, Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build()); indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) @@ -492,19 +494,42 @@ public void testCommitIdInSearcher() throws Exception { assertAcked(client().admin().indices().prepareClose(indexName)); assertIndexIsClosed(indexName); ensureGreen(indexName); - final String nodeWithPrimary = Iterables.get(internalCluster().nodesInclude(indexName), 0); - IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary) - .indexService(resolveIndex(indexName)).getShard(0); - final String commitId; - try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) { - assertNotNull(searcherSupplier.getCommitId()); - commitId = searcherSupplier.getCommitId(); + if (randomBoolean()) { + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); + internalCluster().ensureAtLeastNumDataNodes(2); + ensureGreen(indexName); + } + String[] searcherIds = new String[numberOfShards]; + Set allocatedNodes = internalCluster().nodesInclude(indexName); + for (String node : allocatedNodes) { + IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexServiceSafe(resolveIndex(indexName)); + for (IndexShard shard : indexService) { + try (Engine.SearcherSupplier searcher = shard.acquireSearcherSupplier()) { + assertNotNull(searcher.getSearcherId()); + if (searcherIds[shard.shardId().id()] != null) { + assertThat(searcher.getSearcherId(), equalTo(searcherIds[shard.shardId().id()])); + } else { + searcherIds[shard.shardId().id()] = searcher.getSearcherId(); + } + } + } + } + for (String node : allocatedNodes) { + if (randomBoolean()) { + internalCluster().restartNode(node); + } } - internalCluster().restartNode(nodeWithPrimary); ensureGreen(indexName); - shard = internalCluster().getInstance(IndicesService.class, nodeWithPrimary).indexService(resolveIndex(indexName)).getShard(0); - try (Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(randomFrom(Engine.SearcherScope.values()))) { - assertThat(searcherSupplier.getCommitId(), equalTo(commitId)); + allocatedNodes = internalCluster().nodesInclude(indexName); + for (String node : allocatedNodes) { + IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexServiceSafe(resolveIndex(indexName)); + for (IndexShard shard : indexService) { + try (Engine.SearcherSupplier searcher = shard.acquireSearcherSupplier()) { + assertNotNull(searcher.getSearcherId()); + assertThat(searcher.getSearcherId(), equalTo(searcherIds[shard.shardId().id()])); + } + } } } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 2dd224b3e8ac8..81a4d5cd9e3d0 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -100,7 +100,6 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -831,13 +830,6 @@ public void delete() { } } - /** - * Returns a base64 encoded string of the commit id of the given {@link SegmentInfos} - */ - public static String getCommitId(SegmentInfos segmentInfos) { - return Base64.getEncoder().encodeToString(segmentInfos.getId()); - } - /** * Return a {@link Bits} view of the provided scorer. * NOTE: that the returned {@link Bits} instance MUST be consumed in order. diff --git a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java index 5f936658090a4..7686c19bd12dc 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CommitStats.java @@ -44,7 +44,7 @@ public CommitStats(SegmentInfos segmentInfos) { userData = MapBuilder.newMapBuilder().putAll(segmentInfos.getUserData()).immutableMap(); // lucene calls the current generation, last generation. generation = segmentInfos.getLastGeneration(); - id = Lucene.getCommitId(segmentInfos); + id = Base64.getEncoder().encodeToString(segmentInfos.getId()); numDocs = Lucene.getNumDocs(segmentInfos); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index a111646153e21..7105170b89e33 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -1251,11 +1251,11 @@ public final void close() { protected abstract Searcher acquireSearcherInternal(String source); /** - * Returns a commit id associated with this searcher if it's opened from an index commit; otherwise, return null. Two searchers - * with the same commit id must have identical Lucene level indices (i.e., identical segments with same docs using same doc-ids). + * Returns an id associated with this searcher if exists. Two searchers with the same searcher id must have + * identical Lucene level indices (i.e., identical segments with same docs using same doc-ids). */ @Nullable - public String getCommitId() { + public String getSearcherId() { return null; } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 33bb53184f890..27cf30a3153ed 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.ReferenceManager; @@ -30,6 +31,7 @@ import org.apache.lucene.store.Lock; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; +import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.util.concurrent.ReleasableLock; @@ -50,6 +52,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; +import java.security.MessageDigest; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -112,7 +115,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats // yet this makes sure nobody else does. including some testing tools that try to be messy indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null; this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); - this.commitId = Lucene.getCommitId(lastCommittedSegmentInfos); + this.commitId = generateSearcherId(lastCommittedSegmentInfos); if (seqNoStats == null) { seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos); ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats); @@ -140,6 +143,25 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats } } + /** + * Generate a searcher id using the ids of the underlying segments of an index commit. Here we can't use the commit id directly + * as the search id because the commit id changes whenever IndexWriter#commit is called although the segment files stay unchanged. + * Any recovery except the local recovery performs IndexWriter#commit to generate a new translog uuid or history_uuid. + */ + static String generateSearcherId(SegmentInfos sis) { + final MessageDigest md = MessageDigests.sha256(); + for (SegmentCommitInfo si : sis) { + final byte[] segmentId = si.getId(); + if (segmentId != null) { + md.update(segmentId); + } else { + // old segments do not have segment ids + return null; + } + } + return MessageDigests.toHexString(md.digest()); + } + protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) { if (requireCompleteHistory == false) { return; @@ -585,7 +607,7 @@ protected Searcher acquireSearcherInternal(String source) { } @Override - public String getCommitId() { + public String getSearcherId() { return commitId; } }; diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index d5d1b9f73027b..7ba4a41d8ee2b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.common.UUIDs; @@ -41,6 +42,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; public class ReadOnlyEngineTests extends EngineTestCase { @@ -325,4 +327,40 @@ public void testTranslogStats() throws IOException { } } } + + public void testSearcherId() throws Exception { + IOUtils.close(engine, store); + AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + final EngineConfig config = + config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + String lastSearcherId; + try (InternalEngine engine = createEngine(config)) { + lastSearcherId = ReadOnlyEngine.generateSearcherId(engine.getLastCommittedSegmentInfos()); + assertNotNull(lastSearcherId); + int iterations = randomIntBetween(0, 10); + for (int i = 0; i < iterations; i++) { + assertThat(ReadOnlyEngine.generateSearcherId(engine.getLastCommittedSegmentInfos()), equalTo(lastSearcherId)); + final List operations = generateHistoryOnReplica(between(1, 100), + engine.getProcessedLocalCheckpoint() + 1L, false, randomBoolean(), randomBoolean()); + applyOperations(engine, operations); + engine.flush(randomBoolean(), true); + final String newCommitId = ReadOnlyEngine.generateSearcherId(engine.getLastCommittedSegmentInfos()); + assertThat(newCommitId, not(equalTo(lastSearcherId))); + if (randomBoolean()) { + engine.flush(true, true); + assertThat(ReadOnlyEngine.generateSearcherId(engine.getLastCommittedSegmentInfos()), equalTo(newCommitId)); + } + lastSearcherId = newCommitId; + } + globalCheckpoint.set(engine.getProcessedLocalCheckpoint()); + } + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, Function.identity(), true)) { + try (Engine.SearcherSupplier searcher = + readOnlyEngine.acquireSearcherSupplier(Function.identity(), randomFrom(Engine.SearcherScope.values()))) { + assertThat(searcher.getSearcherId(), equalTo(lastSearcherId)); + } + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index fb95a729928be..80593f823dfb4 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -848,7 +848,12 @@ public static List generateSingleDocHistory(boolean forReplica public List generateHistoryOnReplica(int numOps, boolean allowGapInSeqNo, boolean allowDuplicate, boolean includeNestedDocs) throws Exception { - long seqNo = 0; + return generateHistoryOnReplica(numOps, 0L, allowGapInSeqNo, allowDuplicate, includeNestedDocs); + } + + public List generateHistoryOnReplica(int numOps, long startingSeqNo, boolean allowGapInSeqNo, boolean allowDuplicate, + boolean includeNestedDocs) throws Exception { + long seqNo = startingSeqNo; final int maxIdValue = randomInt(numOps * 2); final List operations = new ArrayList<>(numOps); CheckedBiFunction nestedParsedDocFactory = nestedParsedDocFactory(); diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 9f578fc89195e..c0bdb26886de9 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -195,7 +195,7 @@ protected void doClose() { } @Override - public String getCommitId() { + public String getSearcherId() { return commitId; } }; diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java new file mode 100644 index 0000000000000..c15c7776b2b60 --- /dev/null +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.searchablesnapshots; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.snapshots.SnapshotId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Set; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase { + + public void testSearcherId() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + final int numberOfShards = between(1, 5); + assertAcked( + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards).build()) + .addMapping("{\"properties\":{\"created_date\":{\"type\": \"date\", \"format\": \"yyyy-MM-dd\"}}}") + ); + final List indexRequestBuilders = new ArrayList<>(); + final int docCount = between(0, 100); + for (int i = 0; i < docCount; i++) { + indexRequestBuilders.add(client().prepareIndex(indexName, "_doc").setSource("created_date", "2011-02-02")); + } + indexRandom(true, false, indexRequestBuilders); + assertThat( + client().admin().indices().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(), + equalTo(0) + ); + refresh(indexName); + forceMerge(); + + final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createRepository(repositoryName, "fs"); + + final SnapshotId snapshotOne = createSnapshot(repositoryName, "snapshot-1", Collections.singletonList(indexName)).snapshotId(); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + final int numberOfReplicas = between(0, 2); + final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build(); + internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1); + mountSnapshot(repositoryName, snapshotOne.getName(), indexName, indexName, indexSettings); + ensureGreen(indexName); + + final String[] searcherIds = new String[numberOfShards]; + Set allocatedNodes = internalCluster().nodesInclude(indexName); + for (String node : allocatedNodes) { + IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexServiceSafe(resolveIndex(indexName)); + for (IndexShard indexShard : indexService) { + try (Engine.SearcherSupplier searcher = indexShard.acquireSearcherSupplier()) { + assertNotNull(searcher.getSearcherId()); + if (searcherIds[indexShard.shardId().id()] != null) { + assertThat(searcher.getSearcherId(), equalTo(searcherIds[indexShard.shardId().id()])); + } else { + searcherIds[indexShard.shardId().id()] = searcher.getSearcherId(); + } + } + } + } + + for (String allocatedNode : allocatedNodes) { + if (randomBoolean()) { + internalCluster().restartNode(allocatedNode); + } + } + ensureGreen(indexName); + allocatedNodes = internalCluster().nodesInclude(indexName); + for (String node : allocatedNodes) { + IndexService indexService = internalCluster().getInstance(IndicesService.class, node).indexServiceSafe(resolveIndex(indexName)); + for (IndexShard indexShard : indexService) { + try (Engine.SearcherSupplier searcher = indexShard.acquireSearcherSupplier()) { + assertNotNull(searcher.getSearcherId()); + assertThat(searcher.getSearcherId(), equalTo(searcherIds[indexShard.shardId().id()])); + } + } + } + } +}