Skip to content

Commit

Permalink
Makes testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreSta…
Browse files Browse the repository at this point in the history
…ts more robust (#64976) (#64989)

Today this test fails because the sizes of the snapshot 
shards are only kept in a very short period of time in 
the InternalSnapshotsInfoService and are not 
guaranteed to exist once the shards are correctly 
assigned.

closes #64167
  • Loading branch information
tlrx committed Nov 12, 2020
1 parent 5c5fd50 commit e40d7e0
Showing 1 changed file with 56 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand Down Expand Up @@ -63,13 +65,15 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -467,12 +471,12 @@ public void testFollowerMappingIsUpdated() throws IOException {

public void testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreStats() throws Exception {
final String leaderIndex = "leader";
final int numberOfShards = randomIntBetween(1, 2);
final int numberOfShards = randomIntBetween(1, 5);
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex)
.setSource(getIndexSettings(numberOfShards, 0, singletonMap(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(),
TimeValue.ZERO.getStringRep())), XContentType.JSON));

final int numDocs = scaledRandomIntBetween(0, 1_000);
final int numDocs = scaledRandomIntBetween(0, 500);
if (numDocs > 0) {
final BulkRequestBuilder bulkRequest = leaderClient().prepareBulk(leaderIndex, "_doc");
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -515,69 +519,62 @@ public void testCcrRepositoryFetchesSnapshotShardSizeFromIndexShardStoreStats()
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
}

final CountDownLatch blockCcrRestore = new CountDownLatch(1);

final List<MockTransportService> transportServices = new ArrayList<>();
for (TransportService transportService : getFollowerCluster().getDataOrMasterNodeInstances(TransportService.class)) {
final MockTransportService mockTransportService = (MockTransportService) transportService;
mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PutCcrRestoreSessionAction.NAME)) {
try {
blockCcrRestore.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
final String followerIndex = "follower";
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class);

final Map<Integer, Long> fetchedSnapshotShardSizes = new ConcurrentHashMap<>();

final PlainActionFuture<Void> waitForRestoreInProgress = PlainActionFuture.newFuture();
final ClusterStateListener listener = event -> {
RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
if (restoreInProgress != null
&& restoreInProgress.isEmpty() == false
&& event.state().routingTable().hasIndex(followerIndex)) {
final IndexRoutingTable indexRoutingTable = event.state().routingTable().index(followerIndex);
for (ShardRouting shardRouting : indexRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shardRouting.unassignedInfo().getLastAllocationStatus() == AllocationStatus.FETCHING_SHARD_DATA) {
try {
assertBusy(() -> {
final Long snapshotShardSize = snapshotsInfoService.snapshotShardSizes().getShardSize(shardRouting);
assertThat(snapshotShardSize, notNullValue());
fetchedSnapshotShardSizes.put(shardRouting.getId(), snapshotShardSize);
}, 30L, TimeUnit.SECONDS);
} catch (Exception e) {
throw new AssertionError("Failed to retrieve snapshot shard size for shard " + shardRouting, e);
}
}
}
connection.sendRequest(requestId, action, request, options);
});
transportServices.add(mockTransportService);
}

try {
final String followerIndex = "follower";
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);

final PlainActionFuture<IndexRoutingTable> waitForRestoreInProgress = PlainActionFuture.newFuture();
final ClusterStateListener listener = event -> {
RestoreInProgress restoreInProgress = event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
if (restoreInProgress != null
&& restoreInProgress.isEmpty() == false
&& event.state().routingTable().hasIndex(followerIndex)) {
waitForRestoreInProgress.onResponse(event.state().routingTable().index(followerIndex));
}
};
clusterService.addListener(listener);

final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST)
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
.renameReplacement(followerIndex)
.masterNodeTimeout(TimeValue.MAX_VALUE)
.indexSettings(Settings.builder()
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true));
restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture());

final IndexRoutingTable indexRoutingTable = waitForRestoreInProgress.get(30L, TimeUnit.SECONDS);
clusterService.removeListener(listener);

final SnapshotsInfoService snapshotsInfoService = getFollowerCluster().getCurrentMasterNodeInstance(SnapshotsInfoService.class);
assertBusy(() -> {
SnapshotShardSizeInfo snapshotShardSizeInfo = snapshotsInfoService.snapshotShardSizes();
for (int shardId = 0; shardId < numberOfShards; shardId++) {
Long snapshotShardSize = snapshotShardSizeInfo.getShardSize(indexRoutingTable.shard(shardId).primaryShard());
assertThat(snapshotShardSize,
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
logger.info("--> [{}/{}] snapshot shard sizes fetched", fetchedSnapshotShardSizes.size(), numberOfShards);
if (fetchedSnapshotShardSizes.size() == numberOfShards) {
waitForRestoreInProgress.onResponse(null);
}
}, 60L, TimeUnit.SECONDS);
}
};
clusterService.addListener(listener);

blockCcrRestore.countDown();
ensureFollowerGreen(followerIndex);
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderCluster, CcrRepository.LATEST)
.indices(leaderIndex).indicesOptions(indicesOptions).renamePattern("^(.*)$")
.renameReplacement(followerIndex)
.masterNodeTimeout(TimeValue.MAX_VALUE)
.indexSettings(Settings.builder()
.put(IndexMetadata.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true));
restoreService.restoreSnapshot(restoreRequest, PlainActionFuture.newFuture());

waitForRestoreInProgress.get(30L, TimeUnit.SECONDS);
clusterService.removeListener(listener);
ensureFollowerGreen(followerIndex);

assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE));
} finally {
transportServices.forEach(MockTransportService::clearAllRules);
for (int shardId = 0; shardId < numberOfShards; shardId++) {
assertThat("Snapshot shard size fetched for follower shard [" + shardId + "] does not match leader store size",
fetchedSnapshotShardSizes.get(shardId),
equalTo(indexStats.getIndexShards().get(shardId).getPrimary().getStore().getSizeInBytes()));
}

assertHitCount(followerClient().prepareSearch(followerIndex).setSize(0).get(), numDocs);
assertAcked(followerClient().admin().indices().prepareDelete(followerIndex).setMasterNodeTimeout(TimeValue.MAX_VALUE));
}

public void testCcrRepositoryFailsToFetchSnapshotShardSizes() throws Exception {
Expand Down

0 comments on commit e40d7e0

Please sign in to comment.