From d2be022769d0ff4dac5ad6f17dbfcffd679b4478 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 26 Sep 2018 14:14:05 +0200 Subject: [PATCH] [RCI] Add NoOpEngine for closed indices (#33903) This commit adds a new NoOpEngine implementation based on the current ReadOnlyEngine. This new implementation uses an empty DirectoryReader with no segments readers and will always returns 0 docs. The NoOpEngine is the default Engine created for IndexShards of closed indices. It expects an empty translog when it is instantiated. Relates to #33888 --- .../index/engine/NoOpEngine.java | 149 ++++++++++++ .../index/engine/ReadOnlyEngine.java | 29 ++- .../elasticsearch/indices/IndicesService.java | 7 + .../index/engine/NoOpEngineTests.java | 221 ++++++++++++++++++ .../index/shard/IndexShardIT.java | 38 ++- .../indices/IndicesServiceTests.java | 2 +- .../index/engine/EngineTestCase.java | 8 + 7 files changed, 448 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java create mode 100644 server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java new file mode 100644 index 0000000000000..8e857d2606d9b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -0,0 +1,149 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.index.translog.TranslogCorruptedException; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.LongSupplier; +import java.util.stream.Stream; + +/** + * NoOpEngine is an engine implementation that does nothing but the bare minimum + * required in order to have an engine. All attempts to do something (search, + * index, get), throw {@link UnsupportedOperationException}. This does maintain + * a translog with a deletion policy so that when flushing, no translog is + * retained on disk (setting a retention size and age of 0). + * + * It's also important to notice that this does list the commits of the Store's + * Directory so that the last commit's user data can be read for the historyUUID + * and last committed segment info. + */ +public final class NoOpEngine extends ReadOnlyEngine { + + public NoOpEngine(EngineConfig engineConfig) { + super(engineConfig, null, null, true, directoryReader -> directoryReader); + boolean success = false; + try { + // The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1 + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1); + + // The translog is opened and closed to validate that the translog UUID from lucene is the same as the one in the translog + try (Translog translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier())) { + final int nbOperations = translog.totalOperations(); + if (nbOperations != 0) { + throw new IllegalArgumentException("Expected 0 translog operations but there were " + nbOperations); + } + } + success = true; + } catch (IOException | TranslogCorruptedException e) { + throw new EngineCreationFailureException(shardId, "failed to create engine", e); + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(this); + } + } + } + + @Override + protected DirectoryReader open(final Directory directory) throws IOException { + final List indexCommits = DirectoryReader.listCommits(directory); + assert indexCommits.size() == 1 : "expected only one commit point"; + IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1); + return new DirectoryReader(directory, new LeafReader[0]) { + @Override + protected DirectoryReader doOpenIfChanged() throws IOException { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexCommit commit) throws IOException { + return null; + } + + @Override + protected DirectoryReader doOpenIfChanged(IndexWriter writer, boolean applyAllDeletes) throws IOException { + return null; + } + + @Override + public long getVersion() { + return 0; + } + + @Override + public boolean isCurrent() throws IOException { + return true; + } + + @Override + public IndexCommit getIndexCommit() throws IOException { + return indexCommit; + } + + @Override + protected void doClose() throws IOException { + } + + @Override + public CacheHelper getReaderCacheHelper() { + return null; + } + }; + } + + private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, + LongSupplier globalCheckpointSupplier) throws IOException { + final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); + final String translogUUID = loadTranslogUUIDFromLastCommit(); + // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! + return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, + engineConfig.getPrimaryTermSupplier()); + } + + /** + * Reads the current stored translog ID from the last commit data. + */ + @Nullable + private String loadTranslogUUIDFromLastCommit() { + final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); + if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { + throw new IllegalStateException("Commit doesn't contain translog generation id"); + } + return commitUserData.get(Translog.TRANSLOG_UUID_KEY); + } + + @Override + public boolean ensureTranslogSynced(Stream locations) { + throw new UnsupportedOperationException("Translog synchronization should never be needed"); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 7848921b67e2e..e9415df169713 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -21,6 +21,7 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; import org.apache.lucene.search.IndexSearcher; @@ -56,7 +57,7 @@ * * @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function) */ -public final class ReadOnlyEngine extends Engine { +public class ReadOnlyEngine extends Engine { private final SegmentInfos lastCommittedSegmentInfos; private final SeqNoStats seqNoStats; @@ -95,7 +96,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; - reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(directory), config.getShardId()); + reader = ElasticsearchDirectoryReader.wrap(open(directory), config.getShardId()); if (config.getIndexSettings().isSoftDeleteEnabled()) { reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); } @@ -103,7 +104,7 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats this.indexCommit = reader.getIndexCommit(); this.searcherManager = new SearcherManager(reader, new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService())); - this.docsStats = docsStats(reader); + this.docsStats = docsStats(lastCommittedSegmentInfos); this.indexWriterLock = indexWriterLock; success = true; } finally { @@ -116,6 +117,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats } } + protected DirectoryReader open(final Directory directory) throws IOException { + return DirectoryReader.open(directory); + } + @Override protected void closeNoLock(String reason, CountDownLatch closedLatch) { if (isClosed.compareAndSet(false, true)) { @@ -129,6 +134,24 @@ protected void closeNoLock(String reason, CountDownLatch closedLatch) { } } + private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) { + long numDocs = 0; + long numDeletedDocs = 0; + long sizeInBytes = 0; + if (lastCommittedSegmentInfos != null) { + for (SegmentCommitInfo segmentCommitInfo : lastCommittedSegmentInfos) { + numDocs += segmentCommitInfo.info.maxDoc() - segmentCommitInfo.getDelCount() - segmentCommitInfo.getSoftDelCount(); + numDeletedDocs += segmentCommitInfo.getDelCount() + segmentCommitInfo.getSoftDelCount(); + try { + sizeInBytes += segmentCommitInfo.sizeInBytes(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to get size for [" + segmentCommitInfo.info.name + "]", e); + } + } + } + return new DocsStats(numDocs, numDeletedDocs, sizeInBytes); + } + public static SeqNoStats buildSeqNoStats(SegmentInfos infos) { final SequenceNumbers.CommitInfo seqNoStats = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(infos.userData.entrySet()); diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index e9f674e14a501..a5fb68b9be370 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -82,6 +82,7 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; @@ -500,6 +501,12 @@ private synchronized IndexService createIndexService(final String reason, } private EngineFactory getEngineFactory(final IndexSettings idxSettings) { + final IndexMetaData indexMetaData = idxSettings.getIndexMetaData(); + if (indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE) { + // NoOpEngine takes precedence as long as the index is closed + return NoOpEngine::new; + } + final List> engineFactories = engineFactoryProviders .stream() diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java new file mode 100644 index 0000000000000..ee76e44e97593 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -0,0 +1,221 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.store.LockObtainFailedException; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.DocsStats; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogCorruptedException; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; +import org.elasticsearch.test.IndexSettingsModule; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +public class NoOpEngineTests extends EngineTestCase { + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); + + public void testNoopEngine() throws IOException { + engine.close(); + final NoOpEngine engine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir)); + expectThrows(UnsupportedOperationException.class, () -> engine.syncFlush(null, null)); + expectThrows(UnsupportedOperationException.class, () -> engine.ensureTranslogSynced(null)); + assertThat(engine.refreshNeeded(), equalTo(false)); + assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); + engine.close(); + } + + public void testTwoNoopEngines() throws IOException { + engine.close(); + // Ensure that we can't open two noop engines for the same store + final EngineConfig engineConfig = noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir); + try (NoOpEngine ignored = new NoOpEngine(engineConfig)) { + UncheckedIOException e = expectThrows(UncheckedIOException.class, () -> new NoOpEngine(engineConfig)); + assertThat(e.getCause(), instanceOf(LockObtainFailedException.class)); + } + } + + public void testNoopAfterRegularEngine() throws IOException { + int docs = randomIntBetween(1, 10); + ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier(); + ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node", + null, true, ShardRoutingState.STARTED, allocationId); + IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build(); + tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table, Collections.emptySet()); + tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + for (int i = 0; i < docs; i++) { + ParsedDocument doc = testParsedDocument("" + i, null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + tracker.updateLocalCheckpoint(allocationId.getId(), i); + } + + flushAndTrimTranslog(engine); + + long localCheckpoint = engine.getLocalCheckpoint(); + long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo(); + engine.close(); + + final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); + assertThat(noOpEngine.getLocalCheckpoint(), equalTo(localCheckpoint)); + assertThat(noOpEngine.getSeqNoStats(100L).getMaxSeqNo(), equalTo(maxSeqNo)); + try (Engine.IndexCommitRef ref = noOpEngine.acquireLastIndexCommit(false)) { + try (IndexReader reader = DirectoryReader.open(ref.getIndexCommit())) { + assertThat(reader.numDocs(), equalTo(docs)); + } + } + noOpEngine.close(); + } + + public void testNoopEngineWithInvalidTranslogUUID() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 100); + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(engine.getLocalCheckpoint()); + } + flushAndTrimTranslog(engine); + } + + final Path newTranslogDir = createTempDir(); + // A new translog will have a different UUID than the existing store/noOp engine does + Translog newTranslog = createTranslog(newTranslogDir, () -> 1L); + newTranslog.close(); + + EngineCreationFailureException e = expectThrows(EngineCreationFailureException.class, + () -> new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, newTranslogDir))); + assertThat(e.getCause(), instanceOf(TranslogCorruptedException.class)); + } + } + + public void testNoopEngineWithNonZeroTranslogOperations() throws IOException { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + final MergePolicy mergePolicy = NoMergePolicy.INSTANCE; + EngineConfig config = config(defaultSettings, store, createTempDir(), mergePolicy, null, null, globalCheckpoint::get); + int numDocs = scaledRandomIntBetween(10, 100); + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false)); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(engine.getLocalCheckpoint()); + } + engine.syncTranslog(); + engine.flushAndClose(); + engine.close(); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new NoOpEngine(engine.engineConfig)); + assertThat(e.getMessage(), is("Expected 0 translog operations but there were " + numDocs)); + } + } + } + + public void testNoOpEngineDocStats() throws Exception { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + final int numDocs = scaledRandomIntBetween(10, 3000); + int deletions = 0; + try (InternalEngine engine = createEngine(config)) { + for (int i = 0; i < numDocs; i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + if (rarely()) { + engine.flush(); + } + globalCheckpoint.set(engine.getLocalCheckpoint()); + } + + for (int i = 0; i < numDocs; i++) { + if (randomBoolean()) { + String delId = Integer.toString(i); + Engine.DeleteResult result = engine.delete(new Engine.Delete("test", delId, newUid(delId), primaryTerm.get())); + assertTrue(result.isFound()); + globalCheckpoint.set(engine.getLocalCheckpoint()); + deletions += 1; + } + } + engine.waitForOpsToComplete(numDocs + deletions - 1); + flushAndTrimTranslog(engine); + engine.close(); + } + + final DocsStats expectedDocStats; + try (InternalEngine engine = createEngine(config)) { + expectedDocStats = engine.docStats(); + } + + try (NoOpEngine noOpEngine = new NoOpEngine(config)) { + assertEquals(expectedDocStats.getCount(), noOpEngine.docStats().getCount()); + assertEquals(expectedDocStats.getDeleted(), noOpEngine.docStats().getDeleted()); + assertEquals(expectedDocStats.getTotalSizeInBytes(), noOpEngine.docStats().getTotalSizeInBytes()); + assertEquals(expectedDocStats.getAverageSizeInBytes(), noOpEngine.docStats().getAverageSizeInBytes()); + } catch (AssertionError e) { + logger.error(config.getMergePolicy()); + throw e; + } + } + } + + private void flushAndTrimTranslog(final InternalEngine engine) { + engine.flush(true, true); + final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy(); + deletionPolicy.setRetentionSizeInBytes(-1); + deletionPolicy.setRetentionAgeInMillis(-1); + deletionPolicy.setMinTranslogGenerationForRecovery(engine.getTranslog().getGeneration().translogFileGeneration); + engine.flush(true, true); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 56a14da845fff..6c4e1cb4382ee 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -37,7 +37,6 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedRunnable; @@ -59,6 +58,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.SourceToParse; @@ -102,6 +102,7 @@ import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.NONE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -636,7 +637,7 @@ public static final IndexShard newIndexShard(IndexService indexService, IndexSha } private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) { - ShardRouting shardRouting = TestShardRouting.newShardRouting(existingShardRouting.shardId(), + ShardRouting shardRouting = newShardRouting(existingShardRouting.shardId(), existingShardRouting.currentNodeId(), null, existingShardRouting.primary(), ShardRoutingState.INITIALIZING, existingShardRouting.allocationId()); shardRouting = shardRouting.updateUnassigned(new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "fake recovery"), @@ -809,4 +810,37 @@ public void testGlobalCheckpointListenerTimeout() throws InterruptedException { assertTrue(notified.get()); } + /** + * Test that the {@link org.elasticsearch.index.engine.NoOpEngine} takes precedence over other + * engine factories if the index is closed. + */ + public void testNoOpEngineFactoryTakesPrecedence() throws IOException { + final String indexName = "closed-index"; + createIndex(indexName, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); + ensureGreen(); + + client().admin().indices().prepareClose(indexName).get(); + + final ClusterService clusterService = getInstanceFromNode(ClusterService.class); + final ClusterState clusterState = clusterService.state(); + + IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + final IndicesService indicesService = getInstanceFromNode(IndicesService.class); + + final ShardId shardId = new ShardId(indexMetaData.getIndex(), 0); + final DiscoveryNode node = clusterService.localNode(); + final ShardRouting routing = + newShardRouting(shardId, node.getId(), true, ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); + + final IndexService indexService = indicesService.createIndex(indexMetaData, Collections.emptyList()); + try { + final IndexShard indexShard = indexService.createShard(routing, id -> { + }); + indexShard.markAsRecovering("store", new RecoveryState(indexShard.routingEntry(), node, null)); + indexShard.recoverFromStore(); + assertThat(indexShard.getEngine(), instanceOf(NoOpEngine.class)); + } finally { + indexService.close("test terminated", true); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java index b4e98775d97ac..e5ef406e1ab2e 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java @@ -544,7 +544,7 @@ public void testGetEngineFactory() throws IOException { } } - public void testConflictingEngineFactories() throws IOException { + public void testConflictingEngineFactories() { final String indexName = "foobar"; final Index index = new Index(indexName, UUIDs.randomBase64UUID()); final Settings settings = Settings.builder() diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 1bbfb6fa73de3..3d847ed389806 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -566,6 +566,14 @@ public void onFailedEngine(String reason, @Nullable Exception e) { return config; } + protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath) { + return noOpConfig(indexSettings, store, translogPath, null); + } + + protected EngineConfig noOpConfig(IndexSettings indexSettings, Store store, Path translogPath, LongSupplier globalCheckpointSupplier) { + return config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier); + } + protected static final BytesReference B_1 = new BytesArray(new byte[]{1}); protected static final BytesReference B_2 = new BytesArray(new byte[]{2}); protected static final BytesReference B_3 = new BytesArray(new byte[]{3});