Skip to content

Commit

Permalink
unregister repository if validation fails
Browse files Browse the repository at this point in the history
  • Loading branch information
mhl-b committed May 15, 2024
1 parent c527daa commit 0474f8c
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -54,7 +53,7 @@ public TransportPutRepositoryAction(
actionFilters,
PutRepositoryRequest::new,
indexNameExpressionResolver,
EsExecutors.DIRECT_EXECUTOR_SERVICE
threadPool.executor(ThreadPool.Names.SNAPSHOT)
);
this.repositoriesService = repositoriesService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,53 @@ public void registerRepository(final PutRepositoryRequest request, final ActionL
// When publication has completed (and all acks received or timed out) then verify the repository.
// (if acks timed out then acknowledgementStep completes before the master processes this cluster state, hence why we have
// to wait for the publication to be complete too)
final ListenableFuture<List<DiscoveryNode>> verifyStep = new ListenableFuture<>();
final ListenableFuture<List<DiscoveryNode>> doVerifyStep = new ListenableFuture<>();
publicationStep.addListener(
listener.delegateFailureAndWrap(
(delegate, changed) -> acknowledgementStep.addListener(
delegate.delegateFailureAndWrap((l, clusterStateUpdateResponse) -> {
if (clusterStateUpdateResponse.isAcknowledged() && changed) {
// The response was acknowledged - all nodes should know about the new repository, let's verify them
verifyRepository(request.name(), verifyStep);
verifyRepository(request.name(), doVerifyStep);
} else {
verifyStep.onResponse(null);
doVerifyStep.onResponse(null);
}
})
)
)
);

// Verification starts when repository commited to cluster state.
// When verification fails, we need to remove the repository.
// Here we issue a cluster state roll-back - unregister repository.
final ListenableFuture<Boolean> verifiedStep = new ListenableFuture<>();
doVerifyStep.addListener(new ActionListener<>() {
@Override
public void onResponse(List<DiscoveryNode> ignored) {
verifiedStep.onResponse(true);
}

@Override
public void onFailure(Exception verificationException) {
final var unregisterRequest = new DeleteRepositoryRequest().name(request.name());
unregisterRepository(unregisterRequest, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
listener.onFailure(verificationException);
}

@Override
public void onFailure(Exception e) {
logger.warn("failed to unregister repository after unsuccessful validation, {}", request, e);
listener.onFailure(verificationException);
}
});
}
});

// When verification has completed, get the repository data for the first time
final ListenableFuture<RepositoryData> getRepositoryDataStep = new ListenableFuture<>();
verifyStep.addListener(
verifiedStep.addListener(
listener.delegateFailureAndWrap(
(l, ignored) -> threadPool.generic()
.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public void setUp() throws Exception {
TestRepository::new,
UnstableRepository.TYPE,
UnstableRepository::new,
VerificationFailRepository.TYPE,
VerificationFailRepository::new,
MeteredRepositoryTypeA.TYPE,
metadata -> new MeteredRepositoryTypeA(metadata, clusterService),
MeteredRepositoryTypeB.TYPE,
Expand Down Expand Up @@ -181,6 +183,26 @@ public void testRegisterRejectsInvalidRepositoryNames() {
}
}

public void testRegisterRejectsUnverifiedRepositoryWithVerifyTrue() {
var repoName = randomAlphaOfLengthBetween(10, 25);
var request = new PutRepositoryRequest().name(repoName).type(VerificationFailRepository.TYPE).verify(true);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
var failure = safeAwaitFailure(resultListener);
assertThat(failure, isA(RepositoryVerificationException.class));
// also make sure that cluster state does not include failed repo
assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(repoName); });
}

public void testAcceptUnverifiedRepositoryWithVerifyFalse() {
var repoName = randomAlphaOfLengthBetween(10, 25);
var request = new PutRepositoryRequest().name(repoName).type(VerificationFailRepository.TYPE).verify(false);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
var ackResponse = safeAwait(resultListener);
assertTrue(ackResponse.isAcknowledged());
}

public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() {
String repoName = "name";
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
Expand Down Expand Up @@ -502,6 +524,19 @@ private UnstableRepository(RepositoryMetadata metadata) {
}
}

private static class VerificationFailRepository extends TestRepository {
public static final String TYPE = "invalid";

private VerificationFailRepository(RepositoryMetadata metadata) {
super(metadata);
}

@Override
public String startVerification() {
throw new RepositoryVerificationException(TYPE, "failed to validate repository");
}
}

private static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository {
private static final String TYPE = "type-a";
private static final RepositoryStats STATS = new RepositoryStats(Map.of("GET", 10L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.RequestBuilder;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.bootstrap.BootstrapForTesting;
Expand Down Expand Up @@ -2166,6 +2167,12 @@ public static <T> T safeAwait(SubscribableListener<T> listener) {
}
}

public static <T> Exception safeAwaitFailure(SubscribableListener<T> listener) {
return safeAwait(SubscribableListener.newForked(exceptionListener -> {
listener.addListener(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse));
}));
}

public static void safeSleep(long millis) {
try {
Thread.sleep(millis);
Expand Down

0 comments on commit 0474f8c

Please sign in to comment.