Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify repository before cluster update #108531

Merged
merged 23 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void testRequestStats() throws Exception {
}

public void testAbortRequestStats() throws Exception {
final String repository = createRepository(randomRepositoryName());
final String repository = createRepository(randomRepositoryName(), false);

final String index = "index-no-merges";
createIndex(index, 1, 0);
Expand Down Expand Up @@ -228,7 +228,7 @@ public void testAbortRequestStats() throws Exception {
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/101608")
public void testMetrics() throws Exception {
// Create the repository and perform some activities
final String repository = createRepository(randomRepositoryName());
final String repository = createRepository(randomRepositoryName(), false);
final String index = "index-no-merges";
createIndex(index, 1, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,11 @@ public void testRepositoryConflict() throws Exception {
logger.info("--> try updating the repository, should fail because the deletion of the snapshot is in progress");
RepositoryConflictException e2 = expectThrows(
RepositoryConflictException.class,
clusterAdmin().preparePutRepository(repo).setType("mock").setSettings(Settings.builder().put("location", randomRepoPath()))
clusterAdmin().preparePutRepository(repo)
// if "true" will deadlock on snapshot thread pool, we are running with single thread which is busy at the moment
.setVerify(false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will deadlock on snapshot thread pool, we are running with single thread which is busy at the moment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have this comment in the code please?

.setType("mock")
.setSettings(Settings.builder().put("location", randomRepoPath()))
);
assertThat(e2.status(), equalTo(RestStatus.CONFLICT));
assertThat(e2.getMessage(), containsString("trying to modify or unregister repository that is currently used"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,7 @@ public void testDeleteRepositoryWhileSnapshotting() throws Exception {
client.admin()
.cluster()
.preparePutRepository("test-repo")
.setVerify(false)
.setType("fs")
.setSettings(Settings.builder().put("location", repositoryLocation.resolve("test")))
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,7 @@ record RegisterRepositoryTaskResult(AcknowledgedResponse ackResponse, boolean ch
SubscribableListener

// Trying to create the new repository on master to make sure it works
.<Void>newForked(validationStep -> ActionListener.completeWith(validationStep, () -> {
validateRepositoryCanBeCreated(request);
return null;
}))
.<Void>newForked(validationStep -> validatePutRepositoryRequest(request, validationStep))

// When publication has completed (and all acks received or timed out) then verify the repository.
// (if acks timed out then acknowledgementStep completes before the master processes this cluster state, hence why we have
Expand Down Expand Up @@ -353,6 +350,29 @@ public void validateRepositoryCanBeCreated(final PutRepositoryRequest request) {
closeRepository(createRepository(newRepositoryMetadata));
}

private void validatePutRepositoryRequest(final PutRepositoryRequest request, ActionListener<Void> resultListener) {
final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings());
try {
final var repository = createRepository(newRepositoryMetadata);
if (request.verify()) {
// verify repository on local node only, different from verifyRepository method that runs on other cluster nodes
threadPool.executor(ThreadPool.Names.SNAPSHOT)
.execute(ActionRunnable.run(ActionListener.runBefore(resultListener, () -> closeRepository(repository)), () -> {
final var token = repository.startVerification();
if (token != null) {
repository.verify(token, clusterService.localNode());
repository.endVerification(token);
}
}));
} else {
closeRepository(repository);
resultListener.onResponse(null);
}
} catch (Exception e) {
resultListener.onFailure(e);
}
}

private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
submitUnbatchedTask(clusterService, source, task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -65,28 +63,16 @@

public class RepositoriesServiceTests extends ESTestCase {

private static ThreadPool threadPool;

private ClusterService clusterService;
private RepositoriesService repositoriesService;

@BeforeClass
public static void createThreadPool() {
threadPool = new TestThreadPool(RepositoriesService.class.getName());
}

@AfterClass
public static void terminateThreadPool() {
if (threadPool != null) {
threadPool.shutdownNow();
threadPool = null;
}
}
private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
super.setUp();

threadPool = new TestThreadPool(RepositoriesService.class.getName());

final TransportService transportService = new TransportService(
Settings.EMPTY,
mock(Transport.class),
Expand All @@ -113,6 +99,8 @@ public void setUp() throws Exception {
TestRepository::new,
UnstableRepository.TYPE,
UnstableRepository::new,
VerificationFailRepository.TYPE,
VerificationFailRepository::new,
MeteredRepositoryTypeA.TYPE,
metadata -> new MeteredRepositoryTypeA(metadata, clusterService),
MeteredRepositoryTypeB.TYPE,
Expand All @@ -135,6 +123,10 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
super.tearDown();
if (threadPool != null) {
threadPool.shutdownNow();
threadPool = null;
}
clusterService.stop();
repositoriesService.stop();
}
Expand Down Expand Up @@ -181,6 +173,44 @@ public void testRegisterRejectsInvalidRepositoryNames() {
}
}

public void testPutRepositoryVerificationFails() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to also have a test that starts with an existing repository and demostrate a failed update (due to verfication) request does not change the existing repo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added new test testPutRepositoryVerificationFailsOnExisting

var repoName = randomAlphaOfLengthBetween(10, 25);
var request = new PutRepositoryRequest().name(repoName).type(VerificationFailRepository.TYPE).verify(true);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
var failure = safeAwaitFailure(resultListener);
assertThat(failure, isA(RepositoryVerificationException.class));
// also make sure that cluster state does not include failed repo
assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(repoName); });
}

public void testPutRepositoryVerificationFailsOnExisting() {
var repoName = randomAlphaOfLengthBetween(10, 25);
var request = new PutRepositoryRequest().name(repoName).type(TestRepository.TYPE).verify(true);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
var ackResponse = safeAwait(resultListener);
assertTrue(ackResponse.isAcknowledged());
Comment on lines +190 to +193
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we can directly use PlainActionFuture instead of going through safeAwait(SubscribableListener) which internally uses PlainActionFuture, e.g.:

Suggested change
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
var ackResponse = safeAwait(resultListener);
assertTrue(ackResponse.isAcknowledged());
var future = new PlainActionFuture<AcknowledgedResponse>();
repositoriesService.registerRepository(request, future);
assertTrue(safeGet(future).isAcknowledged());


// try to update existing repository with faulty repo and make sure it is not applied
request = new PutRepositoryRequest().name(repoName).type(VerificationFailRepository.TYPE).verify(true);
resultListener = new SubscribableListener<>();
repositoriesService.registerRepository(request, resultListener);
var failure = safeAwaitFailure(resultListener);
assertThat(failure, isA(RepositoryVerificationException.class));
var repository = repositoriesService.repository(repoName);
assertEquals(repository.getMetadata().type(), TestRepository.TYPE);
}

public void testPutRepositorySkipVerification() {
var repoName = randomAlphaOfLengthBetween(10, 25);
var request = new PutRepositoryRequest().name(repoName).type(VerificationFailRepository.TYPE).verify(false);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
var ackResponse = safeAwait(resultListener);
assertTrue(ackResponse.isAcknowledged());
}

public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() {
String repoName = "name";
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
Expand Down Expand Up @@ -502,6 +532,19 @@ private UnstableRepository(RepositoryMetadata metadata) {
}
}

private static class VerificationFailRepository extends TestRepository {
public static final String TYPE = "verify-fail";

private VerificationFailRepository(RepositoryMetadata metadata) {
super(metadata);
}

@Override
public String startVerification() {
throw new RepositoryVerificationException(TYPE, "failed to validate repository");
}
}

private static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository {
private static final String TYPE = "type-a";
private static final RepositoryStats STATS = new RepositoryStats(Map.of("GET", 10L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ public final void testSnapshotWithLargeSegmentFiles() throws Exception {
}

public void testRequestStats() throws Exception {
final String repository = createRepository(randomRepositoryName());
// need to use verify=false, because the verification process on master makes extra calls on placeholder repo
// hence impacting http metrics and failing test
final String repository = createRepository(randomRepositoryName(), false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional verification adds few more invocations on master node, failing all related tests in s3/gcp/azure.
Here test asserts that we have exact metrics across all nodes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here test asserts that we have exact metrics across all nodes.

This is not the case. The test failed because the repository master node uses for verification is different from the actual repository that gets created afterwards. The initial repository is closed and not counted towards sdkRequestCounts. Thus making it have lower values compared to the records on the HTTP server side. Can we add a comment to this line to explain why verify needs to be false?

final String index = "index-no-merges";
createIndex(index, 1, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.RequestBuilder;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.bootstrap.BootstrapForTesting;
Expand Down Expand Up @@ -2187,6 +2188,14 @@ public static <T> T safeGet(Future<T> future) {
}
}

public static Exception safeAwaitFailure(SubscribableListener<?> listener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think we can have a variant of this method that takes PlainActionFuture in a future PR.

return safeAwait(
SubscribableListener.newForked(
exceptionListener -> listener.addListener(ActionTestUtils.assertNoSuccessListener(exceptionListener::onResponse))
)
);
}

public static void safeSleep(TimeValue timeValue) {
safeSleep(timeValue.millis());
}
Expand Down