Skip to content

Commit

Permalink
Fix Snapshots Recording Incorrect Max. Segment Counts (#74291) (#74302)
Browse files Browse the repository at this point in the history
If sequence numbers are equal across snapshots (and thus no files get written to the repository)
the segment count in the index commit is not reliable as it may have changed due to background merges.
=> fixed by always using the segment count determined from the file names in the snapshot instead

closes #74249
  • Loading branch information
original-brownbear committed Jun 18, 2021
1 parent dcb7a01 commit c25c874
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,45 @@
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BlobStoreIncrementalityIT extends AbstractSnapshotIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), InternalSettingsPlugin.class);
}

public void testIncrementalBehaviorOnPrimaryFailover() throws InterruptedException, ExecutionException, IOException {
internalCluster().startMasterOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
Expand Down Expand Up @@ -159,6 +177,73 @@ public void testForceMergeCausesFullSnapshot() throws Exception {
assertThat(secondSnapshotShardStatus.getIncrementalFileCount(), greaterThan(0));
}

public void testRecordCorrectSegmentCountsWithBackgroundMerges() throws Exception {
final String repoName = "test-repo";
createRepository(repoName, "fs");

final String indexName = "test";
// disable merges
assertAcked(prepareCreate(indexName).setSettings(indexSettingsNoReplicas(1).put(MergePolicyConfig.INDEX_MERGE_ENABLED, "false")));

// create an empty snapshot so that later snapshots run as quickly as possible
createFullSnapshot(repoName, "empty");

// create a situation where we temporarily have a bunch of segments until the merges can catch up
long id = 0;
final int rounds = scaledRandomIntBetween(3, 5);
for (int i = 0; i < rounds; ++i) {
final int numDocs = scaledRandomIntBetween(100, 1000);
BulkRequestBuilder request = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int j = 0; j < numDocs; ++j) {
request.add(
Requests.indexRequest(indexName)
.id(Long.toString(id++))
.source(jsonBuilder().startObject().field("l", randomLong()).endObject())
);
}
assertNoFailures(request.get());
}

// snapshot with a bunch of unmerged segments
final SnapshotInfo before = createFullSnapshot(repoName, "snapshot-before");
final SnapshotInfo.IndexSnapshotDetails beforeIndexDetails = before.indexSnapshotDetails().get(indexName);
final long beforeSegmentCount = beforeIndexDetails.getMaxSegmentsPerShard();

// reactivate merges
assertAcked(admin().indices().prepareClose(indexName).get());
assertAcked(
admin().indices()
.prepareUpdateSettings(indexName)
.setSettings(
Settings.builder()
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, "true")
)
);
assertAcked(admin().indices().prepareOpen(indexName).get());
assertEquals(0, admin().indices().prepareForceMerge(indexName).setFlush(true).get().getFailedShards());

// wait for merges to reduce segment count
assertBusy(() -> {
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).setSegments(true).get();
assertThat(stats.getIndex(indexName).getPrimaries().getSegments().getCount(), lessThan(beforeSegmentCount));
}, 30L, TimeUnit.SECONDS);

final SnapshotInfo after = createFullSnapshot(repoName, "snapshot-after");
final int incrementalFileCount = clusterAdmin().prepareSnapshotStatus()
.setRepository(repoName)
.setSnapshots(after.snapshotId().getName())
.get()
.getSnapshots()
.get(0)
.getStats()
.getIncrementalFileCount();
assertEquals(0, incrementalFileCount);
logger.info("--> no files have changed between snapshots, asserting that segment counts are constant as well");
final SnapshotInfo.IndexSnapshotDetails afterIndexDetails = after.indexSnapshotDetails().get(indexName);
assertEquals(beforeSegmentCount, afterIndexDetails.getMaxSegmentsPerShard());
}

private void assertCountInIndexThenDelete(String index, long expectedCount) {
logger.info("--> asserting that index [{}] contains [{}] documents", index, expectedCount);
assertDocCount(index, expectedCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2315,9 +2315,10 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s
}
afterWriteSnapBlob.run();
final ShardSnapshotResult shardSnapshotResult = new ShardSnapshotResult(
indexGeneration,
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
snapshotIndexCommit.getSegmentCount());
indexGeneration,
ByteSizeValue.ofBytes(blobStoreIndexShardSnapshot.totalSize()),
getSegmentInfoFileCount(blobStoreIndexShardSnapshot.indexFiles())
);
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), shardSnapshotResult);
listener.onResponse(shardSnapshotResult);
}, listener::onFailure);
Expand Down

0 comments on commit c25c874

Please sign in to comment.