Skip to content

Commit

Permalink
[7.x] Prevent deletion of repositories that are used by snapshot back…
Browse files Browse the repository at this point in the history
…ed indices (#73770)

This commit adds a simple check that prevents to delete
a repository that is required by an existing searchable
snapshot index within the same cluster.

Backport of #73714
  • Loading branch information
tlrx committed Jun 4, 2021
1 parent 6ecd80c commit 7bd6750
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,8 @@ public void testDeleteRepositoryWhileSnapshotting() throws Exception {
fail("shouldn't be able to delete in-use repository");
} catch (Exception ex) {
logger.info("--> in-use repository deletion failed");
assertThat(ex.getMessage(), containsString("trying to modify or unregister repository that is currently used"));
assertThat(ex.getMessage(),
equalTo("trying to modify or unregister repository [test-repo] that is currently used (snapshot is in progress)"));
}

logger.info("--> trying to move repository to another location");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand All @@ -40,6 +41,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -51,11 +53,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;

/**
* Service responsible for maintaining and providing access to snapshot repositories on nodes.
Expand All @@ -70,6 +74,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
public static final Setting<Integer> REPOSITORIES_STATS_ARCHIVE_MAX_ARCHIVED_STATS =
Setting.intSetting("repositories.stats.archive.max_archived_stats", 100, 0, Setting.Property.NodeScope);

public static final String SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY = "index.store.snapshot.repository_name";
public static final String SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY = "index.store.snapshot.repository_uuid";

private final Map<String, Repository.Factory> typesRegistry;
private final Map<String, Repository.Factory> internalTypesRegistry;

Expand Down Expand Up @@ -148,18 +155,18 @@ public void registerRepository(final PutRepositoryRequest request, final ActionL
// When verification has completed, get the repository data for the first time
final StepListener<RepositoryData> getRepositoryDataStep = new StepListener<>();
verifyStep.whenComplete(ignored -> threadPool.generic().execute(
ActionRunnable.wrap(getRepositoryDataStep, l -> repository(request.name()).getRepositoryData(l))), listener::onFailure);
ActionRunnable.wrap(getRepositoryDataStep, l -> repository(request.name()).getRepositoryData(l))), listener::onFailure);

// When the repository metadata is ready, update the repository UUID stored in the cluster state, if available
final StepListener<Void> updateRepoUuidStep = new StepListener<>();
getRepositoryDataStep.whenComplete(
repositoryData -> updateRepositoryUuidInMetadata(clusterService, request.name(), repositoryData, updateRepoUuidStep),
listener::onFailure);
repositoryData -> updateRepositoryUuidInMetadata(clusterService, request.name(), repositoryData, updateRepoUuidStep),
listener::onFailure);

// Finally respond to the outer listener with the response from the original cluster state update
updateRepoUuidStep.whenComplete(
ignored -> acknowledgementStep.addListener(listener),
listener::onFailure);
ignored -> acknowledgementStep.addListener(listener),
listener::onFailure);

} else {
acknowledgementStep.addListener(listener);
Expand Down Expand Up @@ -234,10 +241,10 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
* thread
*/
public static void updateRepositoryUuidInMetadata(
ClusterService clusterService,
final String repositoryName,
RepositoryData repositoryData,
ActionListener<Void> listener) {
ClusterService clusterService,
final String repositoryName,
RepositoryData repositoryData,
ActionListener<Void> listener) {

final String repositoryUuid = repositoryData.getUuid();
if (repositoryUuid.equals(RepositoryData.MISSING_UUID)) {
Expand All @@ -246,41 +253,41 @@ public static void updateRepositoryUuidInMetadata(
}

final RepositoriesMetadata currentReposMetadata
= clusterService.state().metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);
= clusterService.state().metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);
final RepositoryMetadata repositoryMetadata = currentReposMetadata.repository(repositoryName);
if (repositoryMetadata == null || repositoryMetadata.uuid().equals(repositoryUuid)) {
listener.onResponse(null);
return;
}

clusterService.submitStateUpdateTask("update repository UUID [" + repositoryName + "] to [" + repositoryUuid + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoriesMetadata currentReposMetadata
= currentState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);

final RepositoryMetadata repositoryMetadata = currentReposMetadata.repository(repositoryName);
if (repositoryMetadata == null || repositoryMetadata.uuid().equals(repositoryUuid)) {
return currentState;
} else {
final RepositoriesMetadata newReposMetadata = currentReposMetadata.withUuid(repositoryName, repositoryUuid);
final Metadata.Builder metadata
= Metadata.builder(currentState.metadata()).putCustom(RepositoriesMetadata.TYPE, newReposMetadata);
return ClusterState.builder(currentState).metadata(metadata).build();
}
}
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoriesMetadata currentReposMetadata
= currentState.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
final RepositoryMetadata repositoryMetadata = currentReposMetadata.repository(repositoryName);
if (repositoryMetadata == null || repositoryMetadata.uuid().equals(repositoryUuid)) {
return currentState;
} else {
final RepositoriesMetadata newReposMetadata = currentReposMetadata.withUuid(repositoryName, repositoryUuid);
final Metadata.Builder metadata
= Metadata.builder(currentState.metadata()).putCustom(RepositoriesMetadata.TYPE, newReposMetadata);
return ClusterState.builder(currentState).metadata(metadata).build();
}
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
}
});
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
}
});
}

/**
Expand Down Expand Up @@ -308,6 +315,7 @@ public ClusterState execute(ClusterState currentState) {
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (Regex.simpleMatch(request.name(), repositoryMetadata.name())) {
ensureRepositoryNotInUse(currentState, repositoryMetadata.name());
ensureNoSearchableSnapshotsIndicesInUse(currentState, repositoryMetadata);
deletedRepositories.add(repositoryMetadata.name());
changed = true;
} else {
Expand Down Expand Up @@ -382,8 +390,8 @@ protected void doRun() {

public static boolean isDedicatedVotingOnlyNode(Set<DiscoveryNodeRole> roles) {
return roles.contains(DiscoveryNodeRole.MASTER_ROLE)
&& roles.stream().noneMatch(DiscoveryNodeRole::canContainData)
&& roles.stream().anyMatch(role -> role.roleName().equals("voting_only"));
&& roles.stream().noneMatch(DiscoveryNodeRole::canContainData)
&& roles.stream().anyMatch(role -> role.roleName().equals("voting_only"));
}

/**
Expand All @@ -397,7 +405,7 @@ public void applyClusterState(ClusterChangedEvent event) {
try {
final ClusterState state = event.state();
final RepositoriesMetadata oldMetadata =
event.previousState().getMetadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);
event.previousState().getMetadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);
final RepositoriesMetadata newMetadata = state.getMetadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);

// Check if repositories got changed
Expand Down Expand Up @@ -444,7 +452,7 @@ public void applyClusterState(ClusterChangedEvent event) {
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
logger.warn(() -> new ParameterizedMessage("failed to change repository [{}]",
repositoryMetadata.name()), ex);
repositoryMetadata.name()), ex);
}
}
} else {
Expand Down Expand Up @@ -556,7 +564,9 @@ public void unregisterInternalRepository(String name) {
}
}

/** Closes the given repository. */
/**
* Closes the given repository.
*/
private void closeRepository(Repository repository) {
logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name());
repository.close();
Expand Down Expand Up @@ -608,45 +618,74 @@ private static void validate(final String repositoryName) {
}

private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
if (isRepositoryInUse(clusterState, repository)) {
throw new IllegalStateException("trying to modify or unregister repository that is currently used");
}
}

/**
* Checks if a repository is currently in use by one of the snapshots
*
* @param clusterState cluster state
* @param repository repository id
* @return true if repository is currently in use by one of the running snapshots
*/
private static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
final SnapshotsInProgress snapshots = clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
for (SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
if (repository.equals(snapshot.snapshot().getRepository())) {
return true;
throw newRepositoryInUseException(repository, "snapshot is in progress");
}
}
for (SnapshotDeletionsInProgress.Entry entry :
clusterState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries()) {
if (entry.repository().equals(repository)) {
return true;
throw newRepositoryInUseException(repository, "snapshot deletion is in progress");
}
}
for (RepositoryCleanupInProgress.Entry entry :
clusterState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).entries()) {
if (entry.repository().equals(repository)) {
return true;
throw newRepositoryInUseException(repository, "repository clean up is in progress");
}
}
for (RestoreInProgress.Entry entry : clusterState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (repository.equals(entry.snapshot().getRepository())) {
return true;
throw newRepositoryInUseException(repository, "snapshot restore is in progress");
}
}
}

private static void ensureNoSearchableSnapshotsIndicesInUse(ClusterState clusterState, RepositoryMetadata repositoryMetadata) {
long count = 0L;
List<Index> indices = null;
for (IndexMetadata indexMetadata : clusterState.metadata()) {
if (indexSettingsMatchRepositoryMetadata(indexMetadata.getSettings(), repositoryMetadata)) {
if (indices == null) {
indices = new ArrayList<>();
}
if (indices.size() < 5) {
indices.add(indexMetadata.getIndex());
}
count += 1L;
}
}
if (indices != null && indices.isEmpty() == false) {
throw newRepositoryInUseException(repositoryMetadata.name(), "found " + count
+ " searchable snapshots indices that use the repository: "
+ Strings.collectionToCommaDelimitedString(indices)
+ (count > indices.size() ? ",..." : "")
);
}
}

private static boolean indexSettingsMatchRepositoryMetadata(Settings indexSettings, RepositoryMetadata repositoryMetadata) {
if ("snapshot".equals(INDEX_STORE_TYPE_SETTING.get(indexSettings))) {
final String indexRepositoryUuid = indexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY);
if (Strings.hasLength(indexRepositoryUuid)) {
return Objects.equals(repositoryMetadata.uuid(), indexRepositoryUuid);
} else {
return Objects.equals(repositoryMetadata.name(), indexSettings.get(SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY));
}
}
return false;
}

private static IllegalStateException newRepositoryInUseException(String repository, String reason) {
return new IllegalStateException("trying to modify or unregister repository ["
+ repository
+ "] that is currently used ("
+ reason
+ ')'
);
}

@Override
protected void doStart() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,16 @@ public void unblockNode(final String repository, final String node) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnNode(repository, node).unblock();
}

protected void createRepository(String repoName, String type, Settings.Builder settings) {
logger.info("--> creating repository [{}] [{}]", repoName, type);
protected void createRepository(String repoName, String type, Settings.Builder settings, boolean verify) {
logger.info("--> creating or updating repository [{}] [{}]", repoName, type);
assertAcked(clusterAdmin().preparePutRepository(repoName)
.setType(type).setSettings(settings));
.setVerify(verify)
.setType(type)
.setSettings(settings));
}

protected void createRepository(String repoName, String type, Settings.Builder settings) {
createRepository(repoName, type, settings, true);
}

protected void createRepository(String repoName, String type, Path location) {
Expand Down

0 comments on commit 7bd6750

Please sign in to comment.