Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
Expand All @@ -56,6 +57,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Service responsible for maintaining and providing access to snapshot repositories on nodes.
Expand Down Expand Up @@ -86,7 +88,9 @@ public RepositoriesService(Settings settings, ClusterService clusterService, Tra
// Doesn't make sense to maintain repositories on non-master and non-data nodes
// Nothing happens there anyway
if (DiscoveryNode.isDataNode(settings) || DiscoveryNode.isMasterNode(settings)) {
clusterService.addHighPriorityApplier(this);
if (isDedicatedVotingOnlyNode(DiscoveryNode.getRolesFromSettings(settings)) == false) {
clusterService.addHighPriorityApplier(this);
}
}
this.verifyAction = new VerifyNodeRepositoryAction(transportService, clusterService, this);
}
Expand Down Expand Up @@ -279,6 +283,10 @@ protected void doRun() {
});
}

static boolean isDedicatedVotingOnlyNode(Set<DiscoveryNodeRole> roles) {
return roles.contains(DiscoveryNodeRole.MASTER_ROLE) && roles.contains(DiscoveryNodeRole.DATA_ROLE) == false &&
roles.stream().anyMatch(role -> role.roleName().equals("voting_only"));
}

/**
* Checks if new repositories appeared in or disappeared from cluster metadata and updates current list of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public void verify(String repository, String verificationToken, final ActionList
final List<DiscoveryNode> nodes = new ArrayList<>();
for (ObjectCursor<DiscoveryNode> cursor : masterAndDataNodes) {
DiscoveryNode node = cursor.value;
nodes.add(node);
if (RepositoriesService.isDedicatedVotingOnlyNode(node.getRoles()) == false) {
nodes.add(node);
}
}
final CopyOnWriteArrayList<VerificationFailure> errors = new CopyOnWriteArrayList<>();
final AtomicInteger counter = new AtomicInteger(nodes.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,47 @@
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.hamcrest.Matchers;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.test.NodeRoles.addRoles;
import static org.elasticsearch.test.NodeRoles.onlyRole;
import static org.elasticsearch.test.NodeRoles.onlyRoles;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.nullValue;

Expand All @@ -29,7 +54,7 @@ public class VotingOnlyNodePluginTests extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(LocalStateVotingOnlyNodePlugin.class);
return Arrays.asList(LocalStateVotingOnlyNodePlugin.class, RepositoryVerifyAccessPlugin.class);
}

public void testRequireVotingOnlyNodeToBeMasterEligible() {
Expand Down Expand Up @@ -115,4 +140,86 @@ public void testVotingOnlyNodesCannotBeMasterWithoutFullMasterNodes() throws Exc
final String newMasterId = client().admin().cluster().prepareState().get().getState().nodes().getMasterNodeId();
assertNotEquals(oldMasterId, newMasterId);
}

public void testBasicSnapshotRestoreWorkFlow() {
internalCluster().setBootstrapMasterNodeIndex(0);
internalCluster().startNodes(2);
// dedicated voting-only master node
final String dedicatedVotingOnlyNode = internalCluster().startNode(
onlyRoles(Set.of(DiscoveryNodeRole.MASTER_ROLE, VotingOnlyNodePlugin.VOTING_ONLY_NODE_ROLE)));
// voting-only master node that also has data
final String nonDedicatedVotingOnlyNode = internalCluster().startNode(
addRoles(Set.of(VotingOnlyNodePlugin.VOTING_ONLY_NODE_ROLE)));

assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("verifyaccess-fs").setSettings(Settings.builder().put("location", randomRepoPath())
.put("compress", randomBoolean())));
createIndex("test-idx-1");
createIndex("test-idx-2");
createIndex("test-idx-3");
ensureGreen();

VerifyRepositoryResponse verifyResponse = client().admin().cluster().prepareVerifyRepository("test-repo").get();
// only the da
assertEquals(3, verifyResponse.getNodes().size());
assertTrue(verifyResponse.getNodes().stream().noneMatch(nw -> nw.getName().equals(dedicatedVotingOnlyNode)));
assertTrue(verifyResponse.getNodes().stream().anyMatch(nw -> nw.getName().equals(nonDedicatedVotingOnlyNode)));

final String[] indicesToSnapshot = {"test-idx-*", "-test-idx-3"};

logger.info("--> snapshot");
Client client = client();
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices(indicesToSnapshot).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
Matchers.equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

List<SnapshotInfo> snapshotInfos = client.admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots(randomFrom("test-snap", "_all", "*", "*-snap", "test*")).get().getSnapshots("test-repo");
assertThat(snapshotInfos.size(), Matchers.equalTo(1));
SnapshotInfo snapshotInfo = snapshotInfos.get(0);
assertThat(snapshotInfo.state(), Matchers.equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.version(), Matchers.equalTo(Version.CURRENT));

logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();

logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));

ensureGreen();
}

public static class RepositoryVerifyAccessPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService, RecoverySettings recoverySettings) {
return Collections.singletonMap("verifyaccess-fs", (metadata) ->
new AccessVerifyingRepo(metadata, env, namedXContentRegistry, clusterService, recoverySettings));
}

private static class AccessVerifyingRepo extends FsRepository {

private final ClusterService clusterService;

private AccessVerifyingRepo(RepositoryMetadata metadata, Environment environment, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService, RecoverySettings recoverySettings) {
super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings);
this.clusterService = clusterService;
}

@Override
protected BlobStore createBlobStore() throws Exception {
final DiscoveryNode localNode = clusterService.state().nodes().getLocalNode();
if (localNode.getRoles().contains(VotingOnlyNodePlugin.VOTING_ONLY_NODE_ROLE)) {
assertTrue(localNode.isDataNode());
}
return super.createBlobStore();
}
}
}
}