Skip to content

Commit

Permalink
Get repository metadata from the cluster state doesn't throw an excep…
Browse files Browse the repository at this point in the history
…tion if a repo is missing (#92914) (#93108)

Change the implementation of `TransportGetRepositoriesAction#getRepositories` to report the found and missing repositories instead of throwing an exception when a repository is missing. When the get-snapshots action was extended to work across repositories, it was designed to report failures on a repository-by-repository basis. A missing repository is just another per-repository failure, so we report it but it doesn't interrupt the normal flow of the method.

(cherry picked from commit 9cd0b38)

# Conflicts:
#	server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java
#	server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
#	x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java
  • Loading branch information
gmarouli committed Jan 20, 2023
1 parent 1791af0 commit b314cab
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 24 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/92914.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92914
summary: Get repository metadata from the cluster state doesn't throw an exception
if a repo is missing
area: ILM+SLM
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -25,11 +26,14 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

public class GetSnapshotsIT extends AbstractSnapshotIntegTestCase {
Expand Down Expand Up @@ -579,6 +583,26 @@ public void testSortAfter() throws Exception {
assertThat(paginatedResponse2.totalCount(), is(3));
}

public void testRetrievingSnapshotsWhenRepositoryIsMissing() throws Exception {
final String repoName = "test-repo";
final Path repoPath = randomRepoPath();
createRepository(repoName, "fs", repoPath);
final String missingRepoName = "missing";

final List<String> snapshotNames = createNSnapshots(repoName, randomIntBetween(1, 10));
snapshotNames.sort(String::compareTo);

final GetSnapshotsResponse response = clusterAdmin().prepareGetSnapshots(repoName, missingRepoName)
.setSort(GetSnapshotsRequest.SortBy.NAME)
.get();
assertThat(
response.getSnapshots().stream().map(info -> info.snapshotId().getName()).collect(Collectors.toList()),
equalTo(snapshotNames)
);
assertTrue(response.getFailures().containsKey(missingRepoName));
assertThat(response.getFailures().get(missingRepoName), instanceOf(RepositoryMissingException.class));
}

// Create a snapshot that is guaranteed to have a unique start time and duration for tests around ordering by either.
// Don't use this with more than 3 snapshots on platforms with low-resolution clocks as the durations could always collide there
// causing an infinite loop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -74,21 +75,27 @@ protected void masterOperation(
ClusterState state,
final ActionListener<GetRepositoriesResponse> listener
) {
listener.onResponse(new GetRepositoriesResponse(new RepositoriesMetadata(getRepositories(state, request.repositories()))));
RepositoriesResult result = getRepositories(state, request.repositories());
if (result.hasMissingRepositories()) {
listener.onFailure(new RepositoryMissingException(String.join(", ", result.getMissing())));
} else {
listener.onResponse(new GetRepositoriesResponse(new RepositoriesMetadata(result.getMetadata())));
}
}

/**
* Get repository metadata for given repository names from given cluster state.
*
* @param state Cluster state
* @param repoNames Repository names or patterns to get metadata for
* @return list of repository metadata
* @return a result with the repository metadata that were found in the cluster state and the missing repositories
*/
public static List<RepositoryMetadata> getRepositories(ClusterState state, String[] repoNames) {
public static RepositoriesResult getRepositories(ClusterState state, String[] repoNames) {
RepositoriesMetadata repositories = state.metadata().custom(RepositoriesMetadata.TYPE, RepositoriesMetadata.EMPTY);
if (isMatchAll(repoNames)) {
return repositories.repositories();
return new RepositoriesResult(repositories.repositories());
}
final List<String> missingRepositories = new ArrayList<>();
final List<String> includePatterns = new ArrayList<>();
final List<String> excludePatterns = new ArrayList<>();
boolean seenWildcard = false;
Expand All @@ -100,7 +107,7 @@ public static List<RepositoryMetadata> getRepositories(ClusterState state, Strin
seenWildcard = true;
} else {
if (repositories.repository(repositoryOrPattern) == null) {
throw new RepositoryMissingException(repositoryOrPattern);
missingRepositories.add(repositoryOrPattern);
}
}
includePatterns.add(repositoryOrPattern);
Expand All @@ -117,6 +124,36 @@ public static List<RepositoryMetadata> getRepositories(ClusterState state, Strin
}
}
}
return org.elasticsearch.core.List.copyOf(repositoryListBuilder);
return new RepositoriesResult(org.elasticsearch.core.List.copyOf(repositoryListBuilder), missingRepositories);
}

/**
* A holder class that consists of the repository metadata and the names of the repositories that were not found in the cluster state.
*/
public static class RepositoriesResult {

private final List<RepositoryMetadata> metadata;
private final List<String> missing;

RepositoriesResult(List<RepositoryMetadata> metadata, List<String> missing) {
this.metadata = metadata;
this.missing = missing;
}

RepositoriesResult(List<RepositoryMetadata> repositoryMetadata) {
this(repositoryMetadata, Collections.emptyList());
}

boolean hasMissingRepositories() {
return missing.isEmpty() == false;
}

public List<RepositoryMetadata> getMetadata() {
return metadata;
}

public List<String> getMissing() {
return missing;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public String next() {
}

/**
* Returns true if there is a least one failed response.
* Returns true if there is at least one failed response.
*/
public boolean isFailed() {
return failures.isEmpty() == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,7 @@ protected void masterOperation(
getMultipleReposSnapshotInfo(
request.isSingleRepositoryRequest() == false,
state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY),
maybeFilterRepositories(
TransportGetRepositoriesAction.getRepositories(state, request.repositories()),
request.sort(),
request.order(),
request.fromSortValue()
),
TransportGetRepositoriesAction.getRepositories(state, request.repositories()),
request.snapshots(),
request.ignoreUnavailable(),
request.verbose(),
Expand All @@ -133,6 +128,7 @@ protected void masterOperation(
request.offset(),
request.size(),
request.order(),
request.fromSortValue(),
SnapshotPredicates.fromRequest(request),
listener
);
Expand Down Expand Up @@ -161,7 +157,7 @@ private static List<RepositoryMetadata> maybeFilterRepositories(
private void getMultipleReposSnapshotInfo(
boolean isMultiRepoRequest,
SnapshotsInProgress snapshotsInProgress,
List<RepositoryMetadata> repos,
TransportGetRepositoriesAction.RepositoriesResult repositoriesResult,
String[] snapshots,
boolean ignoreUnavailable,
boolean verbose,
Expand All @@ -171,26 +167,32 @@ private void getMultipleReposSnapshotInfo(
int offset,
int size,
SortOrder order,
String fromSortValue,
SnapshotPredicates predicates,
ActionListener<GetSnapshotsResponse> listener
) {
// Process the missing repositories
final Map<String, ElasticsearchException> failures = new HashMap<>();
for (String missingRepo : repositoriesResult.getMissing()) {
failures.put(missingRepo, new RepositoryMissingException(missingRepo));
}

// short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
if (repos.isEmpty()) {
listener.onResponse(new GetSnapshotsResponse(Collections.emptyList(), Collections.emptyMap(), null, 0, 0));
if (repositoriesResult.getMetadata().isEmpty()) {
listener.onResponse(new GetSnapshotsResponse(Collections.emptyList(), failures, null, 0, 0));
return;
}
List<RepositoryMetadata> repositories = maybeFilterRepositories(repositoriesResult.getMetadata(), sortBy, order, fromSortValue);
final GroupedActionListener<Tuple<Tuple<String, ElasticsearchException>, SnapshotsInRepo>> groupedActionListener =
new GroupedActionListener<>(listener.map(responses -> {
assert repos.size() == responses.size();
assert repositories.size() == responses.size();
final List<SnapshotInfo> allSnapshots = responses.stream()
.map(Tuple::v2)
.filter(Objects::nonNull)
.flatMap(snapshotsInRepo -> snapshotsInRepo.snapshotInfos.stream())
.collect(Collectors.toList());
final Map<String, ElasticsearchException> failures = responses.stream()
.map(Tuple::v1)
.filter(Objects::nonNull)
.collect(Collectors.toMap(Tuple::v1, Tuple::v2));

responses.stream().map(Tuple::v1).filter(Objects::nonNull).forEach(tuple -> failures.put(tuple.v1(), tuple.v2()));
final SnapshotsInRepo snInfos = sortSnapshots(allSnapshots, sortBy, after, offset, size, order);
final List<SnapshotInfo> snapshotInfos = snInfos.snapshotInfos;
final int remaining = snInfos.remaining + responses.stream()
Expand All @@ -207,10 +209,10 @@ private void getMultipleReposSnapshotInfo(
responses.stream().map(Tuple::v2).filter(Objects::nonNull).mapToInt(s -> s.totalCount).sum(),
remaining
);
}), repos.size());
}), repositories.size());

for (final RepositoryMetadata repo : repos) {
final String repoName = repo.name();
for (final RepositoryMetadata repository : repositories) {
final String repoName = repository.name();
getSingleRepoSnapshotInfo(
snapshotsInProgress,
repoName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,74 @@ public void testSLMXpackUsage() throws Exception {
assertNotNull(slm.get("policy_stats"));
}

public void testSnapshotRetentionWithMissingRepo() throws Exception {
// Create two snapshot repositories
String repo = "test-repo";
initializeRepo(repo);
String missingRepo = "missing-repo";
initializeRepo(missingRepo);

// Create a policy per repository
final String indexName = "test";
final String policyName = "policy-1";
createSnapshotPolicy(
policyName,
"snap",
NEVER_EXECUTE_CRON_SCHEDULE,
repo,
indexName,
true,
new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1), null, null)
);
final String policyWithMissingRepo = "policy-2";
createSnapshotPolicy(
policyWithMissingRepo,
"snap",
NEVER_EXECUTE_CRON_SCHEDULE,
missingRepo,
indexName,
true,
new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1), null, null)
);

// Delete the repo of one of the policies
deleteRepository(missingRepo);

// Manually create a snapshot based on the "correct" policy
final String snapshotName = executePolicy(policyName);

// Check that the executed snapshot is created
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repo + "/" + snapshotName));
Map<String, Object> snapshotResponseMap;
try (InputStream is = response.getEntity().getContent()) {
snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
}
assertThat(snapshotResponseMap.size(), greaterThan(0));
final Map<String, Object> metadata = extractMetadata(snapshotResponseMap, snapshotName);
assertNotNull(metadata);
assertThat(metadata.get("policy"), equalTo(policyName));
assertHistoryIsPresent(policyName, true, repo, CREATE_OPERATION);
} catch (ResponseException e) {
fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity()));
}
}, 60, TimeUnit.SECONDS);

execute_retention(client());

// Check that the snapshot created by the policy has been removed by retention
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_snapshot/" + repo + "/" + snapshotName));
assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception"));
} catch (ResponseException e) {
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
}
assertHistoryIsPresent(policyName, true, repo, DELETE_OPERATION);
}, 60, TimeUnit.SECONDS);
}

public Map<String, Object> getLocation(String path) {
try {
Response executeRepsonse = client().performRequest(new Request("GET", path));
Expand Down Expand Up @@ -846,6 +914,11 @@ private static void index(RestClient client, String index, String id, Object...
assertOK(client.performRequest(request));
}

private static void execute_retention(RestClient client) throws IOException {
final Request request = new Request("POST", "/_slm/_execute_retention");
assertOK(client.performRequest(request));
}

@SuppressWarnings("unchecked")
private static Map<String, Object> policyStatsAsMap(Map<String, Object> stats) {
return ((List<Map<String, Object>>) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName())).stream()
Expand Down

0 comments on commit b314cab

Please sign in to comment.