Skip to content

Commit

Permalink
Initialize sequence numbers on a shrunken index
Browse files Browse the repository at this point in the history
Bringing together shards in a shrunken index means that we need to
address the start of history for the shrunken index. The problem here is
that sequence numbers before the maximum of the maximum sequence numbers
on the source shards can collide in the target shards in the shrunken
index. To address this, we set the maximum sequence number and the local
checkpoint on the target shards to this maximum of the maximum sequence
numbers. This enables correct document-level semantics for documents
indexed before the shrink, and history on the shrunken index will
effectively start from here.

Relates #25321
  • Loading branch information
jasontedor committed Jun 21, 2017
1 parent 4bbb7e8 commit cc67d02
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ public static Set<ShardId> selectShrinkShards(int shardId, IndexMetaData sourceI
* @param sourceIndexMetadata the metadata of the source index
* @param targetNumberOfShards the total number of shards in the target index
* @return the routing factor for and shrunk index with the given number of target shards.
* @throws IllegalArgumentException if the number of source shards is greater than the number of target shards or if the source shards
* @throws IllegalArgumentException if the number of source shards is less than the number of target shards or if the source shards
* are not divisible by the number of target shards.
*/
public static int getRoutingFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ Index getIndex() {
return shard.indexSettings().getIndex();
}

long maxSeqNo() {
return shard.getEngine().seqNoService().getMaxSeqNo();
}

Directory getSnapshotDirectory() {
/* this directory will not be used for anything else but reading / copying files to another directory
* we prevent all write operations on this directory with UOE - nobody should close it either. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
Expand All @@ -49,6 +50,8 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -115,9 +118,9 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
logger.debug("starting recovery from local shards {}", shards);
try {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort,
shards.stream().map(s -> s.getSnapshotDirectory())
.collect(Collectors.toList()).toArray(new Directory[shards.size()]));
final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new);
final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong();
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo);
internalRecoverFromStore(indexShard);
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
Expand All @@ -131,8 +134,13 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
return false;
}

void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Sort indexSort, Directory... sources) throws IOException {
target = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
void addIndices(
final RecoveryState.Index indexRecoveryStats,
final Directory target,
final Sort indexSort,
final Directory[] sources,
final long maxSeqNo) throws IOException {
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
Expand All @@ -143,8 +151,19 @@ void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Sort i
if (indexSort != null) {
iwc.setIndexSort(indexSort);
}
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(target, indexRecoveryStats), iwc)) {
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
writer.addIndexes(sources);
/*
* We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on
* the source shards. This ensures that history after this maximum sequence number can advance and we have correct
* document-level semantics.
*/
writer.setLiveCommitData(() -> {
final HashMap<String, String> liveCommitData = new HashMap<>(2);
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
return liveCommitData.entrySet().iterator();
});
writer.commit();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.action.admin.indices.create;

import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedSetSelector;
Expand All @@ -29,17 +28,17 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -48,8 +47,8 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -58,15 +57,11 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -233,7 +228,8 @@ public void testCreateShrinkIndex() {
.put("number_of_shards", randomIntBetween(2, 7))
.put("index.version.created", version)
).get();
for (int i = 0; i < 20; i++) {
final int docs = randomIntBetween(0, 128);
for (int i = 0; i < docs; i++) {
client().prepareIndex("source", "type")
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
}
Expand All @@ -252,30 +248,43 @@ public void testCreateShrinkIndex() {
.put("index.routing.allocation.require._name", mergeNode)
.put("index.blocks.write", true)).get();
ensureGreen();

final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").get();
final long maxSeqNo =
Arrays.stream(sourceStats.getShards()).map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong();
// now merge source into a single shard index

final boolean createWithReplicas = randomBoolean();
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target")
.setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get());
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);

final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get();
for (final ShardStats shardStats : targetStats.getShards()) {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
assertThat(seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo));
}

final int size = docs > 0 ? 2 * docs : 1;
assertHitCount(client().prepareSearch("target").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);

if (createWithReplicas == false) {
// bump replicas
client().admin().indices().prepareUpdateSettings("target")
.setSettings(Settings.builder()
.put("index.number_of_replicas", 1)).get();
ensureGreen();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
assertHitCount(client().prepareSearch("target").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);
}

for (int i = 20; i < 40; i++) {
for (int i = docs; i < 2 * docs; i++) {
client().prepareIndex("target", "type")
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
}
flushAndRefresh();
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 40);
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
assertHitCount(client().prepareSearch("target").setSize(2 * size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 2 * docs);
assertHitCount(client().prepareSearch("source").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);
GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get();
assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -46,8 +47,11 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.security.AccessControlException;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Predicate;

import static org.hamcrest.CoreMatchers.equalTo;

public class StoreRecoveryTests extends ESTestCase {

public void testAddIndices() throws IOException {
Expand Down Expand Up @@ -82,7 +86,8 @@ public void testAddIndices() throws IOException {
StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger);
RecoveryState.Index indexStats = new RecoveryState.Index();
Directory target = newFSDirectory(createTempDir());
storeRecovery.addIndices(indexStats, target, indexSort, dirs);
final long maxSeqNo = randomNonNegativeLong();
storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo);
int numFiles = 0;
Predicate<String> filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false
&& f.startsWith("extra") == false;
Expand All @@ -99,6 +104,9 @@ public void testAddIndices() throws IOException {
}
DirectoryReader reader = DirectoryReader.open(target);
SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(target);
final Map<String, String> userData = segmentCommitInfos.getUserData();
assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(maxSeqNo)));
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(maxSeqNo)));
for (SegmentCommitInfo info : segmentCommitInfos) { // check that we didn't merge
assertEquals("all sources must be flush",
info.info.getDiagnostics().get("source"), "flush");
Expand Down

0 comments on commit cc67d02

Please sign in to comment.