Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify repository before cluster update #108531

Merged
merged 23 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public void testMetrics() throws Exception {
}

public void testRequestStatsWithOperationPurposes() throws IOException {
final String repoName = createRepository(randomRepositoryName());
final String repoName = createRepository(randomRepositoryName(), true);
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
final BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(repoName);
final BlobStoreWrapper blobStore = asInstanceOf(BlobStoreWrapper.class, repository.blobStore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ public void testRepositoryConflict() throws Exception {
logger.info("--> try updating the repository, should fail because the deletion of the snapshot is in progress");
RepositoryConflictException e2 = expectThrows(
RepositoryConflictException.class,
clusterAdmin().preparePutRepository(repo).setType("mock").setSettings(Settings.builder().put("location", randomRepoPath()))
clusterAdmin().preparePutRepository(repo)
.setVerify(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will deadlock on snapshot thread pool, we are running with single thread which is busy at the moment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this comment in the code please?

.setType("mock")
.setSettings(Settings.builder().put("location", randomRepoPath()))
);
assertThat(e2.status(), equalTo(RestStatus.CONFLICT));
assertThat(e2.getMessage(), containsString("trying to modify or unregister repository that is currently used"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,7 @@ public void testDeleteRepositoryWhileSnapshotting() throws Exception {
client.admin()
.cluster()
.preparePutRepository("test-repo")
.setVerify(false)
.setType("fs")
.setSettings(Settings.builder().put("location", repositoryLocation.resolve("test")))
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ record RegisterRepositoryTaskResult(AcknowledgedResponse ackResponse, boolean ch
SubscribableListener

// Trying to create the new repository on master to make sure it works
.<Void>newForked(validationStep -> ActionListener.completeWith(validationStep, () -> {
validateRepositoryCanBeCreated(request);
return null;
}))
.<Void>newForked(validationStep -> validatePutRepositoryRequest(request, validationStep))

// When publication has completed (and all acks received or timed out) then verify the repository.
// (if acks timed out then acknowledgementStep completes before the master processes this cluster state, hence why we have
Expand Down Expand Up @@ -353,6 +350,29 @@ public void validateRepositoryCanBeCreated(final PutRepositoryRequest request) {
closeRepository(createRepository(newRepositoryMetadata));
}

private void validatePutRepositoryRequest(final PutRepositoryRequest request, ActionListener<Void> resultListener) {
final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings());
try {
final var repository = createRepository(newRepositoryMetadata);
if (request.verify()) {
// verify repository on local node only, different from verifyRepository method that runs on other cluster nodes
threadPool.executor(ThreadPool.Names.SNAPSHOT)
.execute(ActionRunnable.run(ActionListener.runBefore(resultListener, () -> closeRepository(repository)), () -> {
final var token = repository.startVerification();
if (token != null) {
repository.verify(token, clusterService.localNode());
repository.endVerification(token);
}
}));
} else {
closeRepository(repository);
resultListener.onResponse(null);
}
} catch (Exception e) {
resultListener.onFailure(e);
}
}

private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
submitUnbatchedTask(clusterService, source, task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -65,28 +63,16 @@

public class RepositoriesServiceTests extends ESTestCase {

private static ThreadPool threadPool;

private ClusterService clusterService;
private RepositoriesService repositoriesService;

@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool(RepositoriesService.class.getName());
}

@AfterClass
public static void terminateThreadPool() {
if (threadPool != null) {
threadPool.shutdownNow();
threadPool = null;
}
}
private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
super.setUp();

threadPool = new TestThreadPool(RepositoriesService.class.getName());

final TransportService transportService = new TransportService(
Settings.EMPTY,
mock(Transport.class),
Expand All @@ -113,6 +99,8 @@ public void setUp() throws Exception {
TestRepository::new,
UnstableRepository.TYPE,
UnstableRepository::new,
VerificationFailRepository.TYPE,
VerificationFailRepository::new,
MeteredRepositoryTypeA.TYPE,
metadata -> new MeteredRepositoryTypeA(metadata, clusterService),
MeteredRepositoryTypeB.TYPE,
Expand All @@ -135,6 +123,10 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
super.tearDown();
if (threadPool != null) {
threadPool.shutdownNow();
threadPool = null;
}
clusterService.stop();
repositoriesService.stop();
}
Expand Down Expand Up @@ -181,6 +173,26 @@ public void testRegisterRejectsInvalidRepositoryNames() {
}
}

public void testPutRepositoryVerificationFails() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to also have a test that starts with an existing repository and demostrate a failed update (due to verfication) request does not change the existing repo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added new test testPutRepositoryVerificationFailsOnExisting

var repoName = randomAlphaOfLengthBetween(10, 25);
var request = new PutRepositoryRequest().name(repoName).type(VerificationFailRepository.TYPE).verify(true);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
var failure = safeAwaitFailure(resultListener);
assertThat(failure, isA(RepositoryVerificationException.class));
// also make sure that cluster state does not include failed repo
assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(repoName); });
}

public void testPutRepositorySkipVerification() {
var repoName = randomAlphaOfLengthBetween(10, 25);
var request = new PutRepositoryRequest().name(repoName).type(VerificationFailRepository.TYPE).verify(false);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
var ackResponse = safeAwait(resultListener);
assertTrue(ackResponse.isAcknowledged());
}

public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() {
String repoName = "name";
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
Expand Down Expand Up @@ -502,6 +514,19 @@ private UnstableRepository(RepositoryMetadata metadata) {
}
}

private static class VerificationFailRepository extends TestRepository {
public static final String TYPE = "verify-fail";

private VerificationFailRepository(RepositoryMetadata metadata) {
super(metadata);
}

@Override
public String startVerification() {
throw new RepositoryVerificationException(TYPE, "failed to validate repository");
}
}

private static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository {
private static final String TYPE = "type-a";
private static final RepositoryStats STATS = new RepositoryStats(Map.of("GET", 10L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,13 @@ protected Settings repositorySettings(String repoName) {
return Settings.builder().put("compress", randomBoolean()).build();
}

/**
* Create a repository without verification, assuming the repository is perfectly valid.
* If the repository can be invalid, use another method
* {@link ESBlobStoreRepositoryIntegTestCase#createRepository(String repoName, boolean verify)}
*/
protected final String createRepository(final String name) {
return createRepository(name, true);
return createRepository(name, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change necessary? Is it because we will reject invalid repo earlier with this change and we still want some invalid repo to be created in tests? Sounds like an exceptional use case. Are there many such usages? If not, I feel it might be better for clarity to have a separate method such as createRepositoryWithoutVerification for them. The following method String createRepository(final String name, final boolean verify) is only used here and probably can be dropped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several tests that dont rely on verification, repository creation is a part of the setup but no assertions done. In some tests we create a snapshot thread pool with single thread, new verification code also uses snapshot thread and it causes test to hang. My understanding it happens when we intentionally block some repository calls though mocks. At least thats what I was able to reproduce with debugger.

Also not all tests I was able to catch locally, some of them fail only on CI. Even not on my GCP instance.

I can try to refactor with createRepositoryWithoutVerification for clarity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some tests we create a snapshot thread pool with single thread, new verification code also uses snapshot thread and it causes test to hang.

The existing code also performs verification in the snapshot thread pool after the repo is added to the cluster state. So it is not immediately clear to me why the new verficiation step would hang?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me reproduce these and post details here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might mess up a bit here, this change is not related to hanging tests.

Another diff in this PR addresses hanging test:
RepositoriesIT -> testRepositoryConflict

        blockMasterOnWriteIndexFile(repo);
        logger.info("--> start deletion of snapshot");
        ActionFuture<AcknowledgedResponse> future = clusterAdmin().prepareDeleteSnapshot(repo, snapshot1).execute();
        logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
        waitForBlock(blockedNode, repo);

        ...

        logger.info("--> try updating the repository, should fail because the deletion of the snapshot is in progress");
        RepositoryConflictException e2 = expectThrows(
            RepositoryConflictException.class,
            clusterAdmin().preparePutRepository(repo)
                .setVerify(true) // >>> will hang on verification
                .setType("mock")
                .setSettings(Settings.builder().put("location", randomRepoPath()))
        );

        ...

        logger.info("--> unblocking blocked node [{}]", blockedNode);
        unblockNode(repo, blockedNode); // >>> release thread here

So here we use thread pool with single thread that is busy with snapshot deletion, then we issue another call to repository service and it hangs on verification. Test unblocks node after.

Copy link
Contributor Author

@mhl-b mhl-b May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change related to metrics. We have tests that verify we have exact Repository metrics - get/put/list across nodes. But with new change master node will have few more, and there are several tests fail in s3/gcp/azure. Not consistently thou.

https://gradle-enterprise.elastic.co/s/w4amqdgguyipy

Problem is I cannot reproduce these locally, tried fix them based on gradle scan, then got few more. So I decided a blanket approach. @DaveCTurner suggested to turn verification off, not the blanket approach :)

}

protected final String createRepository(final String name, final boolean verify) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.RequestBuilder;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.bootstrap.BootstrapForTesting;
Expand Down Expand Up @@ -2178,6 +2179,14 @@ public static <T> T safeGet(Future<T> future) {
}
}

public static Exception safeAwaitFailure(SubscribableListener<?> listener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we can have a variant of this method that takes PlainActionFuture in a future PR.

return safeAwait(
SubscribableListener.newForked(
exceptionListener -> listener.addListener(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse))
)
);
}

public static void safeSleep(long millis) {
try {
Thread.sleep(millis);
Expand Down