Skip to content

Commit

Permalink
[7.x] Add maintenance service to clean up unused docs in snapshot blo…
Browse files Browse the repository at this point in the history
…b cache (#78263)

* Add maintenance service to clean up unused docs in snapshot blob cache (#77686)

Today we use the system index .snapshot-blob-cache to 
store parts of blobs and to avoid to fetch them again from 
the snapshot repository when recovering a searchable 
snapshot shard. This index is never cleaned up though 
and because it's a system index users won't be able to
 clean up manually in the future.

This commit adds a BlobStoreCacheMaintenanceService 
which detects the deletion of searchable snapshot indices
 and triggers the deletion of associated documents in 
.snapshot-blob-cache.

* fixes
  • Loading branch information
tlrx committed Sep 23, 2021
1 parent dae177c commit 28fbaa7
Show file tree
Hide file tree
Showing 6 changed files with 572 additions and 25 deletions.
1 change: 1 addition & 0 deletions x-pack/plugin/searchable-snapshots/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
compileOnly project(path: xpackModule('core'))
implementation project(path: 'preallocate')
internalClusterTestImplementation(testArtifact(project(xpackModule('core'))))
internalClusterTestImplementation(project(path: ':modules:reindex'))
}

addQaCheckDependencies()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.searchablesnapshots.cache.blob;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
Expand All @@ -26,11 +27,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.SearchableSnapshotsSettings;
Expand Down Expand Up @@ -62,6 +66,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_BLOB_CACHE_INDEX;
import static org.elasticsearch.xpack.searchablesnapshots.cache.shared.SharedBytes.pageAligned;
import static org.elasticsearch.xpack.searchablesnapshots.store.SearchableSnapshotDirectory.unwrapDirectory;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

Expand Down Expand Up @@ -96,7 +101,10 @@ public static void tearDownCacheSettings() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), WaitForSnapshotBlobCacheShardsActivePlugin.class);
final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(WaitForSnapshotBlobCacheShardsActivePlugin.class);
plugins.add(ReindexPlugin.class);
return plugins;
}

@Override
Expand Down Expand Up @@ -163,7 +171,7 @@ public void testBlobStoreCache() throws Exception {
storage1,
blobCacheMaxLength.getStringRep()
);
final String restoredIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String restoredIndex = "restored-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
mountSnapshot(
repositoryName,
snapshot.getName(),
Expand All @@ -178,17 +186,9 @@ public void testBlobStoreCache() throws Exception {
);
ensureGreen(restoredIndex);

// wait for all async cache fills to complete
assertBusy(() -> {
for (final SearchableSnapshotShardStats shardStats : client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
new SearchableSnapshotsStatsRequest()
).actionGet().getStats()) {
for (final SearchableSnapshotShardStats.CacheIndexInputStats indexInputStats : shardStats.getStats()) {
assertThat(Strings.toString(indexInputStats), indexInputStats.getCurrentIndexCacheFills(), equalTo(0L));
}
}
});
assertRecoveryStats(restoredIndex, false);
assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
waitForBlobCacheFillsToComplete();

for (final SearchableSnapshotShardStats shardStats : client().execute(
SearchableSnapshotsStatsAction.INSTANCE,
Expand All @@ -213,11 +213,14 @@ public void testBlobStoreCache() throws Exception {
equalTo("data_content,data_hot")
);

refreshSystemIndex();

final long numberOfCachedBlobs = systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.get()
.getHits()
.getTotalHits().value;

IndexingStats indexingStats = systemClient().admin()
.indices()
.prepareStats(SNAPSHOT_BLOB_CACHE_INDEX)
Expand All @@ -231,19 +234,24 @@ public void testBlobStoreCache() throws Exception {

logger.info("--> verifying number of documents in index [{}]", restoredIndex);
assertHitCount(client().prepareSearch(restoredIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);
assertAcked(client().admin().indices().prepareDelete(restoredIndex));

assertBusy(() -> {
refreshSystemIndex();
assertThat(
systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX).setSize(0).get().getHits().getTotalHits().value,
greaterThan(0L)
);
});
for (IndicesService indicesService : internalCluster().getDataNodeInstances(IndicesService.class)) {
for (IndexService indexService : indicesService) {
if (indexService.index().getName().equals(restoredIndex)) {
for (IndexShard indexShard : indexService) {
try {
unwrapDirectory(indexShard.store().directory()).clearStats();
} catch (AlreadyClosedException ignore) {
// ok to ignore these
}
}
}
}
}

final Storage storage2 = randomFrom(Storage.values());
logger.info("--> mount snapshot [{}] as an index for the second time [storage={}]", snapshot, storage2);
final String restoredAgainIndex = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String restoredAgainIndex = "restored-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
mountSnapshot(
repositoryName,
snapshot.getName(),
Expand All @@ -258,6 +266,10 @@ public void testBlobStoreCache() throws Exception {
);
ensureGreen(restoredAgainIndex);

assertRecoveryStats(restoredAgainIndex, false);
assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
waitForBlobCacheFillsToComplete();

logger.info("--> verifying shards of [{}] were started without using the blob store more than necessary", restoredAgainIndex);
checkNoBlobStoreAccess(useSoftDeletes);

Expand Down Expand Up @@ -293,6 +305,10 @@ public Settings onNodeStopped(String nodeName) throws Exception {
});
ensureGreen(restoredAgainIndex);

assertRecoveryStats(restoredAgainIndex, false);
assertExecutorIsIdle(SearchableSnapshots.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
waitForBlobCacheFillsToComplete();

logger.info("--> shards of [{}] should start without downloading bytes from the blob store", restoredAgainIndex);
checkNoBlobStoreAccess(useSoftDeletes);

Expand All @@ -315,8 +331,18 @@ public Settings onNodeStopped(String nodeName) throws Exception {
logger.info("--> verifying number of documents in index [{}]", restoredAgainIndex);
assertHitCount(client().prepareSearch(restoredAgainIndex).setSize(0).setTrackTotalHits(true).get(), numberOfDocs);

// TODO also test when the index is frozen
// TODO also test when prewarming is enabled
logger.info("--> deleting indices, maintenance service should clean up snapshot blob cache index");
assertAcked(client().admin().indices().prepareDelete("restored-*"));
assertBusy(() -> {
refreshSystemIndex();
assertHitCount(
systemClient().prepareSearch(SNAPSHOT_BLOB_CACHE_INDEX)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.setSize(0)
.get(),
0L
);
});
}

private void checkNoBlobStoreAccess(boolean useSoftDeletes) {
Expand Down

0 comments on commit 28fbaa7

Please sign in to comment.