Skip to content

Commit

Permalink
Get repository metadata from the cluster state doesn't throw an excep…
Browse files Browse the repository at this point in the history
…tion if a repo is missing (#92914) (#93106)

Change the implementation of `TransportGetRepositoriesAction#getRepositories` to report the found and missing repositories instead of throwing an exception when a repository is missing. When the get-snapshots action was extended to work across repositories, it was designed to report failures on a repository-by-repository basis. A missing repository is just another per-repository failure, so we report it but it doesn't interrupt the normal flow of the method.

(cherry picked from commit 9cd0b38)

# Conflicts:
#	server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
  • Loading branch information
gmarouli committed Jan 19, 2023
1 parent dc59ba6 commit 417d021
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 26 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/92914.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92914
summary: Get repository metadata from the cluster state doesn't throw an exception
if a repo is missing
area: ILM+SLM
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -29,8 +30,10 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

public class GetSnapshotsIT extends AbstractSnapshotIntegTestCase {
Expand Down Expand Up @@ -578,6 +581,23 @@ public void testSortAfter() throws Exception {
assertThat(paginatedResponse2.totalCount(), is(3));
}

public void testRetrievingSnapshotsWhenRepositoryIsMissing() throws Exception {
final String repoName = "test-repo";
final Path repoPath = randomRepoPath();
createRepository(repoName, "fs", repoPath);
final String missingRepoName = "missing";

final List<String> snapshotNames = createNSnapshots(repoName, randomIntBetween(1, 10));
snapshotNames.sort(String::compareTo);

final GetSnapshotsResponse response = clusterAdmin().prepareGetSnapshots(repoName, missingRepoName)
.setSort(GetSnapshotsRequest.SortBy.NAME)
.get();
assertThat(response.getSnapshots().stream().map(info -> info.snapshotId().getName()).toList(), equalTo(snapshotNames));
assertTrue(response.getFailures().containsKey(missingRepoName));
assertThat(response.getFailures().get(missingRepoName), instanceOf(RepositoryMissingException.class));
}

// Create a snapshot that is guaranteed to have a unique start time and duration for tests around ordering by either.
// Don't use this with more than 3 snapshots on platforms with low-resolution clocks as the durations could always collide there
// causing an infinite loop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,27 @@ protected void masterOperation(
ClusterState state,
final ActionListener<GetRepositoriesResponse> listener
) {
listener.onResponse(new GetRepositoriesResponse(new RepositoriesMetadata(getRepositories(state, request.repositories()))));
RepositoriesResult result = getRepositories(state, request.repositories());
if (result.hasMissingRepositories()) {
listener.onFailure(new RepositoryMissingException(String.join(", ", result.missing())));
} else {
listener.onResponse(new GetRepositoriesResponse(new RepositoriesMetadata(result.metadata)));
}
}

/**
* Get repository metadata for given repository names from given cluster state.
*
* @param state Cluster state
* @param repoNames Repository names or patterns to get metadata for
* @return list of repository metadata
* @return a result with the repository metadata that were found in the cluster state and the missing repositories
*/
public static List<RepositoryMetadata> getRepositories(ClusterState state, String[] repoNames) {
public static RepositoriesResult getRepositories(ClusterState state, String[] repoNames) {
RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);
if (isMatchAll(repoNames)) {
return repositories.repositories();
return new RepositoriesResult(repositories.repositories());
}
final List<String> missingRepositories = new ArrayList<>();
final List<String> includePatterns = new ArrayList<>();
final List<String> excludePatterns = new ArrayList<>();
boolean seenWildcard = false;
Expand All @@ -102,7 +108,7 @@ public static List<RepositoryMetadata> getRepositories(ClusterState state, Strin
seenWildcard = true;
} else {
if (repositories.repository(repositoryOrPattern) == null) {
throw new RepositoryMissingException(repositoryOrPattern);
missingRepositories.add(repositoryOrPattern);
}
}
includePatterns.add(repositoryOrPattern);
Expand All @@ -119,6 +125,20 @@ public static List<RepositoryMetadata> getRepositories(ClusterState state, Strin
}
}
}
return List.copyOf(repositoryListBuilder);
return new RepositoriesResult(List.copyOf(repositoryListBuilder), missingRepositories);
}

/**
* A holder class that consists of the repository metadata and the names of the repositories that were not found in the cluster state.
*/
public record RepositoriesResult(List<RepositoryMetadata> metadata, List<String> missing) {

RepositoriesResult(List<RepositoryMetadata> repositoryMetadata) {
this(repositoryMetadata, List.of());
}

boolean hasMissingRepositories() {
return missing.isEmpty() == false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public String next() {
}

/**
* Returns true if there is a least one failed response.
* Returns true if there is at least one failed response.
*/
public boolean isFailed() {
return failures.isEmpty() == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.util.function.BiPredicate;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -112,12 +111,7 @@ protected void masterOperation(
getMultipleReposSnapshotInfo(
request.isSingleRepositoryRequest() == false,
state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY),
maybeFilterRepositories(
TransportGetRepositoriesAction.getRepositories(state, request.repositories()),
request.sort(),
request.order(),
request.fromSortValue()
),
TransportGetRepositoriesAction.getRepositories(state, request.repositories()),
request.snapshots(),
request.ignoreUnavailable(),
request.verbose(),
Expand All @@ -127,6 +121,7 @@ protected void masterOperation(
request.offset(),
request.size(),
request.order(),
request.fromSortValue(),
SnapshotPredicates.fromRequest(request),
request.includeIndexNames(),
listener
Expand Down Expand Up @@ -156,7 +151,7 @@ private static List<RepositoryMetadata> maybeFilterRepositories(
private void getMultipleReposSnapshotInfo(
boolean isMultiRepoRequest,
SnapshotsInProgress snapshotsInProgress,
List<RepositoryMetadata> repos,
TransportGetRepositoriesAction.RepositoriesResult repositoriesResult,
String[] snapshots,
boolean ignoreUnavailable,
boolean verbose,
Expand All @@ -166,27 +161,33 @@ private void getMultipleReposSnapshotInfo(
int offset,
int size,
SortOrder order,
String fromSortValue,
SnapshotPredicates predicates,
boolean indices,
ActionListener<GetSnapshotsResponse> listener
) {
// Process the missing repositories
final Map<String, ElasticsearchException> failures = new HashMap<>();
for (String missingRepo : repositoriesResult.missing()) {
failures.put(missingRepo, new RepositoryMissingException(missingRepo));
}

// short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
if (repos.isEmpty()) {
listener.onResponse(new GetSnapshotsResponse(Collections.emptyList(), Collections.emptyMap(), null, 0, 0));
if (repositoriesResult.metadata().isEmpty()) {
listener.onResponse(new GetSnapshotsResponse(List.of(), failures, null, 0, 0));
return;
}
List<RepositoryMetadata> repositories = maybeFilterRepositories(repositoriesResult.metadata(), sortBy, order, fromSortValue);
final GroupedActionListener<Tuple<Tuple<String, ElasticsearchException>, SnapshotsInRepo>> groupedActionListener =
new GroupedActionListener<>(listener.map(responses -> {
assert repos.size() == responses.size();
assert repositories.size() == responses.size();
final List<SnapshotInfo> allSnapshots = responses.stream()
.map(Tuple::v2)
.filter(Objects::nonNull)
.flatMap(snapshotsInRepo -> snapshotsInRepo.snapshotInfos.stream())
.toList();
final Map<String, ElasticsearchException> failures = responses.stream()
.map(Tuple::v1)
.filter(Objects::nonNull)
.collect(Collectors.toMap(Tuple::v1, Tuple::v2));

responses.stream().map(Tuple::v1).filter(Objects::nonNull).forEach(tuple -> failures.put(tuple.v1(), tuple.v2()));
final SnapshotsInRepo snInfos = sortSnapshots(allSnapshots, sortBy, after, offset, size, order);
final List<SnapshotInfo> snapshotInfos = snInfos.snapshotInfos;
final int remaining = snInfos.remaining + responses.stream()
Expand All @@ -203,10 +204,10 @@ private void getMultipleReposSnapshotInfo(
responses.stream().map(Tuple::v2).filter(Objects::nonNull).mapToInt(s -> s.totalCount).sum(),
remaining
);
}), repos.size());
}), repositories.size());

for (final RepositoryMetadata repo : repos) {
final String repoName = repo.name();
for (final RepositoryMetadata repository : repositories) {
final String repoName = repository.name();
getSingleRepoSnapshotInfo(
snapshotsInProgress,
repoName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,74 @@ public void testSLMXpackUsage() throws Exception {
assertNotNull(slm.get("policy_stats"));
}

public void testSnapshotRetentionWithMissingRepo() throws Exception {
// Create two snapshot repositories
String repo = "test-repo";
initializeRepo(repo);
String missingRepo = "missing-repo";
initializeRepo(missingRepo);

// Create a policy per repository
final String indexName = "test";
final String policyName = "policy-1";
createSnapshotPolicy(
policyName,
"snap",
NEVER_EXECUTE_CRON_SCHEDULE,
repo,
indexName,
true,
new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1), null, null)
);
final String policyWithMissingRepo = "policy-2";
createSnapshotPolicy(
policyWithMissingRepo,
"snap",
NEVER_EXECUTE_CRON_SCHEDULE,
missingRepo,
indexName,
true,
new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1), null, null)
);

// Delete the repo of one of the policies
deleteRepository(missingRepo);

// Manually create a snapshot based on the "correct" policy
final String snapshotName = executePolicy(policyName);

// Check that the executed snapshot is created
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repo + "/" + snapshotName));
Map<String, Object> snapshotResponseMap;
try (InputStream is = response.getEntity().getContent()) {
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
assertThat(snapshotResponseMap.size(), greaterThan(0));
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
assertNotNull(metadata);
assertThat(metadata.get("policy"), equalTo(policyName));
assertHistoryIsPresent(policyName, true, repo, CREATE_OPERATION);
} catch (ResponseException e) {
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
}
}, 60, TimeUnit.SECONDS);

execute_retention(client());

// Check that the snapshot created by the policy has been removed by retention
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repo + "/" + snapshotName));
assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception"));
} catch (ResponseException e) {
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
}
assertHistoryIsPresent(policyName, true, repo, DELETE_OPERATION);
}, 60, TimeUnit.SECONDS);
}

public Map<String, Object> getLocation(String path) {
try {
Response executeRepsonse = client().performRequest(new Request("GET", path));
Expand Down Expand Up @@ -834,6 +902,11 @@ private static void index(RestClient client, String index, String id, Object...
assertOK(client.performRequest(request));
}

private static void execute_retention(RestClient client) throws IOException {
final Request request = new Request("POST", "/_slm/_execute_retention");
assertOK(client.performRequest(request));
}

@SuppressWarnings("unchecked")
private static Map<String, Object> policyStatsAsMap(Map<String, Object> stats) {
return ((List<Map<String, Object>>) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName())).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,14 @@ void getAllRetainableSnapshots(
snapshots.computeIfAbsent(info.repository(), repo -> new ArrayList<>()).add(info);
}
}
if (resp.isFailed()) {
for (String repo : resp.getFailures().keySet()) {
logger.debug(() -> "unable to retrieve snapshots for [" + repo + "] repositories: ", resp.getFailures().get(repo));
}
}
listener.onResponse(snapshots);
}, e -> {
logger.debug(() -> "unable to retrieve snapshots for [" + repositories + "] repositories", e);
logger.debug(() -> "unable to retrieve snapshots for [" + repositories + "] repositories: ", e);
listener.onFailure(e);
}));
}
Expand Down

0 comments on commit 417d021

Please sign in to comment.