From 0822d4d0e2728621b888722fa6bee2e6bf6f35c2 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Wed, 17 Jan 2024 10:16:21 +0530 Subject: [PATCH 1/4] Introducing mixed mode support for remote store migration Signed-off-by: Gaurav Bafna --- .../RemoteStoreMigrationTestCase.java | 109 ++++++++++++++++++ .../coordination/JoinTaskExecutor.java | 71 ++++++++---- .../common/settings/ClusterSettings.java | 1 + .../common/settings/FeatureFlagSettings.java | 3 +- .../opensearch/common/util/FeatureFlags.java | 11 ++ .../remotestore/RemoteStoreNodeService.java | 60 +++++++++- .../coordination/JoinTaskExecutorTests.java | 61 ++++++++++ .../opensearch/test/InternalTestCluster.java | 26 ++--- 8 files changed, 305 insertions(+), 37 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMigrationTestCase.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMigrationTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMigrationTestCase.java new file mode 100644 index 0000000000000..2fef81f0bb274 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMigrationTestCase.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; + +import static org.opensearch.node.remotestore.RemoteStoreNodeService.DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class RemoteStoreMigrationTestCase extends OpenSearchIntegTestCase { + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; + + protected Path segmentRepoPath; + protected Path translogRepoPath; + + static boolean addRemote = false; + + protected Settings nodeSettings(int nodeOrdinal) { + if (segmentRepoPath == null || translogRepoPath == null) { + segmentRepoPath = randomRepoPath().toAbsolutePath(); + translogRepoPath = randomRepoPath().toAbsolutePath(); + } + if (addRemote) { + logger.info("Adding remote store node"); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) + .put("discovery.initial_state_timeout", "500ms") + .build(); + } else { + logger.info("Adding docrep node"); + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("discovery.initial_state_timeout", "500ms").build(); + } + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + } + + public void testAddRemoteNode() throws IOException { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + List cmNodes = internalCluster().startNodes(1); + Client client = internalCluster().client(cmNodes.get(0)); + addRemote = true; + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + internalCluster().startNode(); + internalCluster().startNode(); + internalCluster().validateClusterFormed(); + + // add incompatible remote node in remote mixed cluster + Settings.Builder badSettings = Settings.builder() + .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, "REPOSITORY_2_NAME", translogRepoPath)) + .put("discovery.initial_state_timeout", "500ms"); + String badNode = internalCluster().startNode(badSettings); + assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < internalCluster().getNodeNames().length); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(badNode)); + + // add remote node in docrep cluster + updateSettingsRequest.persistentSettings(Settings.builder().put(DIRECTION_SETTING.getKey(), "docrep")); + assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + String wrongNode = internalCluster().startNode(); + assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < internalCluster().getNodeNames().length); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(wrongNode)); + } + + public void testAddDocRepNode() throws Exception { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + List cmNodes = internalCluster().startNodes(1); + addRemote = false; + Client client = internalCluster().client(cmNodes.get(0)); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .put(DIRECTION_SETTING.getKey(), "remote_store") + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + String wrongNode = internalCluster().startNode(); + String[] allNodes = internalCluster().getNodeNames(); + assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < allNodes.length); + + updateSettingsRequest.persistentSettings(Settings.builder().put(DIRECTION_SETTING.getKey(), "docrep")); + assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + assertBusy(() -> assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() == allNodes.length)); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(wrongNode)); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index f701a2f52277d..119e6c23c715b 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -58,6 +58,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -176,12 +177,18 @@ public ClusterTasksResult execute(ClusterState currentState, List jo DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes()); - // TODO: We are using one of the existing node to build the repository metadata, this will need to be updated - // once we start supporting mixed compatibility mode. An optimization can be done as this will get invoked + // An optimization can be done as this will get invoked // for every set of node join task which we can optimize to not compute if cluster state already has // repository information. + Optional remoteDN = newState.nodes() + .getNodes() + .values() + .stream() + .filter(DiscoveryNode::isRemoteStoreNode) + .findFirst(); + DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get()); RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( - (currentNodes.getNodes().values()).stream().findFirst().get(), + dn, currentState.getMetadata().custom(RepositoriesMetadata.TYPE) ); @@ -212,6 +219,16 @@ public ClusterTasksResult execute(ClusterState currentState, List jo // would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness ensureNodeCommissioned(node, currentState.metadata()); nodesBuilder.add(node); + + if (remoteDN.isEmpty()) { + // This is hit only on cases where we encounter first remote node in remote store migration + logger.info("Updating system repository now for remote store migration"); + repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + node, + currentState.getMetadata().custom(RepositoriesMetadata.TYPE) + ); + } + nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); @@ -495,36 +512,46 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod assert existingNodes.isEmpty() == false; - // TODO: The below check is valid till we don't support migration, once we start supporting migration a remote - // store node will be able to join a non remote store cluster and vice versa. #7986 CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings()); if (STRICT.equals(remoteStoreCompatibilityMode)) { + DiscoveryNode existingNode = existingNodes.get(0); if (joiningNode.isRemoteStoreNode()) { + ensureRemoteStoreNodesCompatibility(joiningNode, existingNode); + } else { if (existingNode.isRemoteStoreNode()) { - RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode); - RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode); - if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) { - throw new IllegalStateException( - "a remote store node [" - + joiningNode - + "] is trying to join a remote store cluster with incompatible node attributes in " - + "comparison with existing node [" - + existingNode - + "]" - ); - } - } else { throw new IllegalStateException( - "a remote store node [" + joiningNode + "] is trying to join a non remote store cluster" + "a non remote store node [" + joiningNode + "] is trying to join a remote store cluster" ); } - } else { - if (existingNode.isRemoteStoreNode()) { + } + } else { + if (remoteStoreCompatibilityMode == CompatibilityMode.MIXED) { + if (joiningNode.isRemoteStoreNode()) { + Optional remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); + remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode)); + } + } + } + } + + private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) { + if (joiningNode.isRemoteStoreNode()) { + if (existingNode.isRemoteStoreNode()) { + RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode); + RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode); + if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) { throw new IllegalStateException( - "a non remote store node [" + joiningNode + "] is trying to join a remote store cluster" + "a remote store node [" + + joiningNode + + "] is trying to join a remote store cluster with incompatible node attributes in " + + "comparison with existing node [" + + existingNode + + "]" ); } + } else { + throw new IllegalStateException("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"); } } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 0c97d62c44a5e..c6371be3d86ee 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -698,6 +698,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, + RemoteStoreNodeService.DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index e6d7ba0c60772..47da53b52c325 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -35,6 +35,7 @@ protected FeatureFlagSettings( FeatureFlags.TELEMETRY_SETTING, FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING, - FeatureFlags.DOC_ID_FUZZY_SET_SETTING + FeatureFlags.DOC_ID_FUZZY_SET_SETTING, + FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 075dc9934e130..b51efeab21254 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -20,6 +20,11 @@ * @opensearch.internal */ public class FeatureFlags { + /** + * Gates the visibility of the remote store migration support from docrep . + */ + public static final String REMOTE_STORE_MIGRATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.migration.enabled"; + /** * Gates the ability for Searchable Snapshots to read snapshots that are older than the * guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis. @@ -98,6 +103,12 @@ public static boolean isEnabled(Setting featureFlag) { } } + public static final Setting REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting( + REMOTE_STORE_MIGRATION_EXPERIMENTAL, + false, + Property.NodeScope + ); + public static final Setting EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope); public static final Setting IDENTITY_SETTING = Setting.boolSetting(IDENTITY, false, Property.NodeScope); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index ca2413a057a6b..c683166ba050c 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Setting; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryException; @@ -27,6 +28,8 @@ import java.util.Map; import java.util.function.Supplier; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; + /** * Contains all the method needed for a remote store backed node lifecycle. */ @@ -39,6 +42,33 @@ public class RemoteStoreNodeService { "remote_store.compatibility_mode", CompatibilityMode.STRICT.name(), CompatibilityMode::parseString, + value -> { + if (value == CompatibilityMode.MIXED + && FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING) == false) { + throw new IllegalArgumentException( + " mixed mode is under an experimental feature and can be activated only by enabling " + + REMOTE_STORE_MIGRATION_EXPERIMENTAL + + " feature flag in the JVM options " + ); + } + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting DIRECTION_SETTING = new Setting<>( + "direction", + Direction.NONE.name(), + Direction::parseString, + value -> { + if (value != Direction.NONE && FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING) == false) { + throw new IllegalArgumentException( + " direction is under an experimental feature and can be activated only by enabling " + + REMOTE_STORE_MIGRATION_EXPERIMENTAL + + " feature flag in the JVM options " + ); + } + }, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -49,7 +79,8 @@ public class RemoteStoreNodeService { * @opensearch.internal */ public enum CompatibilityMode { - STRICT("strict"); + STRICT("strict"), + MIXED("mixed"); public final String mode; @@ -73,6 +104,33 @@ public static CompatibilityMode parseString(String compatibilityMode) { } } + public enum Direction { + REMOTE_STORE("remote_store"), + NONE("none"), + DOCREP("docrep"); + + public final String direction; + + Direction(String d) { + this.direction = d; + } + + public static Direction parseString(String direction) { + try { + return Direction.valueOf(direction.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "[" + + direction + + "] direction is not supported. " + + "supported modes are [" + + CompatibilityMode.values().toString() + + "]" + ); + } + } + } + public RemoteStoreNodeService(Supplier repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; this.threadPool = threadPool; diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index 5952cc1bcaac2..ba90045daeb55 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -52,6 +52,7 @@ import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -67,11 +68,14 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.VersionUtils.allVersions; import static org.opensearch.test.VersionUtils.maxCompatibleVersion; import static org.opensearch.test.VersionUtils.randomCompatibleVersion; @@ -393,6 +397,7 @@ public void testJoinClusterWithNonRemoteStoreNodeJoiningNonRemoteStoreCluster() } public void testPreventJoinClusterWithRemoteStoreNodeJoiningNonRemoteStoreCluster() { + final DiscoveryNode existingNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) @@ -406,6 +411,62 @@ public void testPreventJoinClusterWithRemoteStoreNodeJoiningNonRemoteStoreCluste assertTrue(e.getMessage().equals("a remote store node [" + joiningNode + "] is trying to join a non remote " + "store cluster")); } + public void testRemoteStoreNodeJoiningNonRemoteStoreClusterMixedMode() { + final DiscoveryNode existingNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + final Settings settings = Settings.builder() + .put(DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .build(); + final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + Metadata metadata = Metadata.builder().persistentSettings(settings).build(); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(existingNode).localNodeId(existingNode.getId()).build()) + .metadata(metadata) + .build(); + + DiscoveryNode joiningNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO)); + JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata()); + } + + public void testAllTypesNodeJoiningRemoteStoreClusterMixedMode() { + final DiscoveryNode docrepNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode remoteNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO)); + final Settings settings = Settings.builder() + .put(DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .build(); + final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + Metadata metadata = Metadata.builder().persistentSettings(settings).build(); + ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(docrepNode) + .localNodeId(docrepNode.getId()) + .add(remoteNode) + .localNodeId(remoteNode.getId()) + .build() + ) + .metadata(metadata) + .build(); + + // compatible remote node should not be able to join a mixed mode having a remote node + DiscoveryNode goodRemoteNode = newDiscoveryNode(remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO)); + JoinTaskExecutor.ensureNodesCompatibility(goodRemoteNode, currentState.getNodes(), currentState.metadata()); + + // incompatible node should not be able to join a mixed mode + DiscoveryNode badRemoteNode = newDiscoveryNode(remoteStoreNodeAttributes(TRANSLOG_REPO, TRANSLOG_REPO)); + assertThrows( + IllegalStateException.class, + () -> JoinTaskExecutor.ensureNodesCompatibility(badRemoteNode, currentState.getNodes(), currentState.metadata()) + ); + + // DocRep node should be able to join a mixed mode + DiscoveryNode docrepNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + JoinTaskExecutor.ensureNodesCompatibility(docrepNode2, currentState.getNodes(), currentState.metadata()); + } + public void testJoinClusterWithRemoteStoreNodeJoiningRemoteStoreCluster() { final DiscoveryNode existingNode = new DiscoveryNode( UUIDs.base64UUID(), diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index c2b964aa96212..92419494d6616 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -36,7 +36,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; @@ -153,6 +152,18 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.apache.lucene.tests.util.LuceneTestCase.TEST_NIGHTLY; +import static org.apache.lucene.tests.util.LuceneTestCase.rarely; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING; import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.common.unit.TimeValue.timeValueSeconds; @@ -166,18 +177,6 @@ import static org.opensearch.test.NodeRoles.removeRoles; import static org.opensearch.test.OpenSearchTestCase.assertBusy; import static org.opensearch.test.OpenSearchTestCase.randomFrom; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.apache.lucene.tests.util.LuceneTestCase.TEST_NIGHTLY; -import static org.apache.lucene.tests.util.LuceneTestCase.rarely; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * InternalTestCluster manages a set of JVM private nodes and allows convenient access to them. @@ -1319,6 +1318,7 @@ public synchronized void validateClusterFormed() { assertTrue("Expected node to exist: " + expectedNode + debugString, discoveryNodes.nodeExists(expectedNode)); } }); + //ToDo Fix me states.forEach(cs -> { if (cs.nodes().getNodes().values().stream().findFirst().get().isRemoteStoreNode()) { RepositoriesMetadata repositoriesMetadata = cs.metadata().custom(RepositoriesMetadata.TYPE); From f2ce306f72b076bf2656f5f164d62fab001647ea Mon Sep 17 00:00:00 2001 From: Gaurav Bafna Date: Thu, 8 Feb 2024 09:39:29 +0530 Subject: [PATCH 2/4] Adding changelog Signed-off-by: Gaurav Bafna --- CHANGELOG.md | 1 + .../opensearch/test/InternalTestCluster.java | 26 +++++++++---------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ad0932a86a07..636c599ae474a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625)) - [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887)) - [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028)) +- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 92419494d6616..c2b964aa96212 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -36,6 +36,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; @@ -152,18 +153,6 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import static org.apache.lucene.tests.util.LuceneTestCase.TEST_NIGHTLY; -import static org.apache.lucene.tests.util.LuceneTestCase.rarely; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING; import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.common.unit.TimeValue.timeValueSeconds; @@ -177,6 +166,18 @@ import static org.opensearch.test.NodeRoles.removeRoles; import static org.opensearch.test.OpenSearchTestCase.assertBusy; import static org.opensearch.test.OpenSearchTestCase.randomFrom; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.apache.lucene.tests.util.LuceneTestCase.TEST_NIGHTLY; +import static org.apache.lucene.tests.util.LuceneTestCase.rarely; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * InternalTestCluster manages a set of JVM private nodes and allows convenient access to them. @@ -1318,7 +1319,6 @@ public synchronized void validateClusterFormed() { assertTrue("Expected node to exist: " + expectedNode + debugString, discoveryNodes.nodeExists(expectedNode)); } }); - //ToDo Fix me states.forEach(cs -> { if (cs.nodes().getNodes().values().stream().findFirst().get().isRemoteStoreNode()) { RepositoriesMetadata repositoriesMetadata = cs.metadata().custom(RepositoriesMetadata.TYPE); From b71aea442cf96c9dfde098776d1547d67fcd247c Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Tue, 13 Feb 2024 14:27:42 +0530 Subject: [PATCH 3/4] Allocation of new shards on remote store nodes --- .../org/opensearch/cluster/ClusterModule.java | 2 + .../decider/RemoteStoreAllocationDecider.java | 110 +++++ .../cluster/ClusterModuleTests.java | 4 +- .../RemoteStoreAllocationDeciderTests.java | 434 ++++++++++++++++++ 4 files changed, 549 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/decider/RemoteStoreAllocationDecider.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreAllocationDeciderTests.java diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index bad881f8bda76..050645dc9663d 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -67,6 +67,7 @@ import org.opensearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.RemoteStoreAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider; @@ -370,6 +371,7 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new TargetPoolAllocationDecider()); + addAllocationDecider(deciders, new RemoteStoreAllocationDecider(settings, clusterSettings)); clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/RemoteStoreAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/RemoteStoreAllocationDecider.java new file mode 100644 index 0000000000000..236038f93bab4 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/RemoteStoreAllocationDecider.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.node.remotestore.RemoteStoreNodeService.Direction; +import org.opensearch.cluster.node.DiscoveryNode; + +/** + * An allocation decider to oversee shard allocation or relocation to remote-store enabled + * nodes. If Direction is set as "REMOTE_STORE", new primary shards can only go to remote-store enabled + * nodes. Replica shards can go to remote nodes only if corresponding primary also exists on a remote node. + * + * @opensearch.internal + */ +public class RemoteStoreAllocationDecider extends AllocationDecider { + public static final String NAME = "remote_store_enabled_cluster"; + + private Direction direction; + + public RemoteStoreAllocationDecider(Settings settings, ClusterSettings clusterSettings) { + this.direction = RemoteStoreNodeService.DIRECTION_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(RemoteStoreNodeService.DIRECTION_SETTING, this::setDirection); + } + + private void setDirection (Direction direction) { + this.direction = direction; + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + DiscoveryNode targetNode = node.node(); + if (!shardRouting.assignedToNode()){ + if (shardRouting.primary()) { + if (direction.equals(Direction.REMOTE_STORE) && !targetNode.isRemoteStoreNode()) { + return allocation.decision(Decision.NO, NAME, + "for REMOTE_STORE direction, new primary shards can not be allocated to non-remote nodes"); + } + } + else { + ShardRouting primaryShardRouting = allocation.routingNodes().activePrimary(shardRouting.shardId()); + if (primaryShardRouting == null) { + // ReplicaAfterPrimaryActiveAllocationDecider should prevent this case from occurring + return allocation.decision(Decision.NO, NAME, "no active primary shard yet"); + } + + DiscoveryNode primaryShardNode = allocation.routingNodes() + .stream() + .filter(nd -> nd.nodeId().equals(primaryShardRouting.currentNodeId())) + .findFirst().get().node(); + + if (direction.equals(Direction.REMOTE_STORE)) { + if (!primaryShardNode.isRemoteStoreNode() && targetNode.isRemoteStoreNode()) { + return allocation.decision(Decision.NO, NAME, + "can not allocate replica shard on a remote node when primary shard is not already active on some remote node"); + } + } + } + return allocation.decision( + Decision.YES, + NAME, + "for %s direction, allocation of a %s shard is allowed on a %s", + direction, + (shardRouting.primary() ? "primary" : "replica"), + isRemoteStoreEnabled(targetNode) + ); + } + return allocation.decision(Decision.YES, NAME, "it is a relocation, returning YES for now"); + } + + private static String isRemoteStoreEnabled (DiscoveryNode node) { + return (node.isRemoteStoreNode() ? "remote_store_enabled" : "non_remote_store_enabled") + " node"; + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index 535444cd866b8..7e391f96ba909 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -51,6 +51,7 @@ import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.RemoteStoreAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider; @@ -240,7 +241,8 @@ public void testAllocationDeciderOrder() { ShardsLimitAllocationDecider.class, AwarenessAllocationDecider.class, NodeLoadAwareAllocationDecider.class, - TargetPoolAllocationDecider.class + TargetPoolAllocationDecider.class, + RemoteStoreAllocationDecider.class ); Collection deciders = ClusterModule.createAllocationDeciders( Settings.EMPTY, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreAllocationDeciderTests.java new file mode 100644 index 0000000000000..c528a246836e8 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreAllocationDeciderTests.java @@ -0,0 +1,434 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.Version; +import org.opensearch.cluster.*; +import org.opensearch.cluster.routing.allocation.decider.RemoteStoreAllocationDecider; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.allocation.decider.*; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.node.remotestore.RemoteStoreNodeService; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.DIRECTION_SETTING; +import static org.hamcrest.core.Is.is; + +public class RemoteStoreAllocationDeciderTests extends OpenSearchAllocationTestCase { + private final Logger logger = LogManager.getLogger(RemoteStoreAllocationDeciderTests.class); + private final String TEST_INDEX = "TEST_INDEX"; + + private final Settings directionEnabledNodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + + private final Settings remoteStoreDirectionSettings = Settings.builder() + .put(DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .build(); + + private final ClusterSettings remoteStoreDirectionClusterSettings = new ClusterSettings(remoteStoreDirectionSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + private ClusterState getInitialClusterState(Settings settings) { + Metadata metadata = Metadata.builder() + .persistentSettings(settings) + .put(IndexMetadata.builder(TEST_INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(metadata.index(TEST_INDEX)).build(); + + return ClusterState.builder(ClusterName.DEFAULT) + .metadata(metadata) + .routingTable(routingTable) + .build(); + } + + private DiscoveryNode getNonRemoteNode () { + return new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + } + + public DiscoveryNode getRemoteNode () { + Map attributes = new HashMap<>(); + attributes.put(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, "REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE"); + return new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + attributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + } + public void testDontAllocateNewPrimaryShardOnNonRemoteNodeForRemoteStoreDirection () { + FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); + ClusterState clusterState = getInitialClusterState(remoteStoreDirectionSettings); + DiscoveryNode nonRemoteNode = getNonRemoteNode(); + assertFalse(nonRemoteNode.isRemoteStoreNode()); + + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(nonRemoteNode) + .localNodeId(nonRemoteNode.getId()) + .build() + ) + .build(); + + ShardRouting primaryShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).primaryShard(); + RoutingNode nonRemoteRoutingNode = clusterState.getRoutingNodes().node(nonRemoteNode.getId()); + + RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + + RoutingAllocation routingAllocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + routingAllocation.debugDecision(true); + + Decision decision = remoteStoreClusterAllocationDecider.canAllocate(primaryShardRouting, nonRemoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.NO)); + assertThat(decision.getExplanation(), is("for REMOTE_STORE direction, new primary shards can not be allocated to non-remote nodes")); + } + + public void testAllocateNewPrimaryShardOnRemoteNodeForRemoteStoreDirection () { + FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); + ClusterState clusterState = getInitialClusterState(remoteStoreDirectionSettings); + + DiscoveryNode remoteNode = getRemoteNode(); + assertTrue(remoteNode.isRemoteStoreNode()); + + clusterState = ClusterState.builder(clusterState) + .nodes( + DiscoveryNodes.builder() + .add(remoteNode) + .localNodeId(remoteNode.getId()) + .build() + ) + .build(); + + ShardRouting primaryShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).primaryShard(); + RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode.getId()); + + RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + + RoutingAllocation routingAllocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + routingAllocation.debugDecision(true); + + Decision decision = remoteStoreClusterAllocationDecider.canAllocate(primaryShardRouting, remoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.YES)); + assertThat(decision.getExplanation(), is("for REMOTE_STORE direction, allocation of a primary shard is allowed on a remote_store_enabled node")); + } + + public void testDontAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnNonRemoteNodeForRemoteStoreDirection () { + FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); + + ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + + DiscoveryNode nonRemoteNode = getNonRemoteNode(); + assertFalse(nonRemoteNode.isRemoteStoreNode()); + DiscoveryNode remoteNode = getRemoteNode(); + assertTrue(remoteNode.isRemoteStoreNode()); + + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(shardId.getIndexName()) + .settings(settings(Version.CURRENT).put(remoteStoreDirectionSettings).put(directionEnabledNodeSettings)) + .numberOfShards(1) + .numberOfReplicas(1) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId) + .addShard( + // primary on non-remote node + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + nonRemoteNode.getId(), + true, + ShardRoutingState.STARTED + ) + ) + .addShard( + // new replica's allocation + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode) + .localNodeId(nonRemoteNode.getId()) + .add(remoteNode) + .localNodeId(remoteNode.getId()) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(metadata) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode.getId()); + + RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + + RoutingAllocation routingAllocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + routingAllocation.debugDecision(true); + + Decision decision = remoteStoreClusterAllocationDecider.canAllocate(replicaShardRouting, remoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.NO)); + assertThat(decision.getExplanation(), is("can not allocate replica shard on a remote node when primary shard is not already active on some remote node")); + + } + + public void testAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnRemoteNodeForRemoteStoreDirection () { + FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); + + ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + + DiscoveryNode remoteNode1 = getRemoteNode(); + assertTrue(remoteNode1.isRemoteStoreNode()); + DiscoveryNode remoteNode2 = getRemoteNode(); + assertTrue(remoteNode2.isRemoteStoreNode()); + + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(shardId.getIndexName()) + .settings(settings(Version.CURRENT).put(remoteStoreDirectionSettings).put(directionEnabledNodeSettings)) + .numberOfShards(1) + .numberOfReplicas(1) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId) + .addShard( + // primary on remote node + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + remoteNode1.getId(), + true, + ShardRoutingState.STARTED + ) + ) + .addShard( + // new replica's allocation + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(remoteNode1) + .localNodeId(remoteNode1.getId()) + .add(remoteNode2) + .localNodeId(remoteNode2.getId()) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(metadata) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode2.getId()); + + RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + + RoutingAllocation routingAllocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + routingAllocation.debugDecision(true); + + Decision decision = remoteStoreClusterAllocationDecider.canAllocate(replicaShardRouting, remoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.YES)); + assertThat(decision.getExplanation(), is("for REMOTE_STORE direction, allocation of a replica shard is allowed on a remote_store_enabled node")); + } + + public void testAllocateNewReplicaShardOnNonRemoteNodeForRemoteStoreDirection () { + FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); + ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + + DiscoveryNode nonRemoteNode1 = getNonRemoteNode(); + assertFalse(nonRemoteNode1.isRemoteStoreNode()); + DiscoveryNode nonRemoteNode2 = getNonRemoteNode(); + assertFalse(nonRemoteNode2.isRemoteStoreNode()); + + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(shardId.getIndexName()) + .settings(settings(Version.CURRENT).put(remoteStoreDirectionSettings).put(directionEnabledNodeSettings)) + .numberOfShards(1) + .numberOfReplicas(1) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId) + .addShard( + // primary shard on non-remote node + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + nonRemoteNode1.getId(), + true, + ShardRoutingState.STARTED + ) + ) + .addShard( + // new replica's allocation + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode1) + .localNodeId(nonRemoteNode1.getId()) + .add(nonRemoteNode2) + .localNodeId(nonRemoteNode2.getId()) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(metadata) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + + ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + RoutingNode nonRemoteRoutingNode = clusterState.getRoutingNodes().node(nonRemoteNode2.getId()); + + RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + + RoutingAllocation routingAllocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + routingAllocation.debugDecision(true); + + Decision decision = remoteStoreClusterAllocationDecider.canAllocate(replicaShardRouting, nonRemoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.YES)); + assertThat(decision.getExplanation(), is("for REMOTE_STORE direction, allocation of a replica shard is allowed on a non_remote_store_enabled node")); + } + +} From 48bb53b92ba9bf8b83ba858672880a1d297fb69f Mon Sep 17 00:00:00 2001 From: Lakshya Taragi Date: Thu, 15 Feb 2024 14:21:13 +0530 Subject: [PATCH 4/4] Add Integration Tests for new shard allocation on remote nodes --- .../RemoteStoreShardAllocationIT.java | 463 ++++++++++++++++++ .../RemoteStoreAllocationDeciderTests.java | 115 ++++- 2 files changed, 562 insertions(+), 16 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreShardAllocationIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreShardAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreShardAllocationIT.java new file mode 100644 index 0000000000000..90b60daf08147 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreShardAllocationIT.java @@ -0,0 +1,463 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.client.Client; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.test.OpenSearchIntegTestCase; + +import static org.opensearch.node.remotestore.RemoteStoreNodeService.DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class RemoteStoreShardAllocationIT extends OpenSearchIntegTestCase { + + private static final String TEST_INDEX = "test_index"; + + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; + + protected Path segmentRepoPath; + protected Path translogRepoPath; + + static boolean addRemote = false; + private String cmNodeName = null; + private final ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + private Client client; + + protected Settings nodeSettings (int nodeOrdinal) { + if (segmentRepoPath == null || translogRepoPath == null) { + segmentRepoPath = randomRepoPath().toAbsolutePath(); + translogRepoPath = randomRepoPath().toAbsolutePath(); + } + if (addRemote) { + logger.info("Adding remote_store_enabled node"); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) + .put("discovery.initial_state_timeout", "500ms") + .build(); + } else { + logger.info("Adding non_remote_store_enabled node"); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("discovery.initial_state_timeout", "500ms") + .build(); + } + } + + @Override + protected Settings featureFlagSettings () { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + } + + private void initializeCluster () { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + List cmNodes = internalCluster().startNodes(1); + cmNodeName = cmNodes.get(0); + client = internalCluster().client(); + } + + private void setClusterMode (String mode) { + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), mode)); + assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + private void setDirection (String direction) { + updateSettingsRequest.persistentSettings(Settings.builder().put(DIRECTION_SETTING.getKey(), direction)); + assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + } + + private DiscoveryNode assertNodeInCluster (String nodeName) { + Map nodes = client.admin().cluster().prepareState().get().getState().nodes().getNodes(); + DiscoveryNode discoveryNode = null; + for (Map.Entry entry : nodes.entrySet()) { + DiscoveryNode node = entry.getValue(); + if (node.getName().equals(nodeName)) { + discoveryNode = node; + break; + } + } + assertNotNull(discoveryNode); + return discoveryNode; + } + + public void testDontAllocateNewPrimaryShardOnNonRemoteNodeForRemoteStoreDirection () throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(); + + logger.info(" --> add non-remote node"); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + assertFalse(nonRemoteNode.isRemoteStoreNode()); + + logger.info(" --> set remote_store direction"); + setDirection("remote_store"); + + logger.info(" --> allocate primary shard"); + client.admin() + .indices() + .prepareCreate(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.include._name", nonRemoteNodeName) + .put("index.routing.allocation.exclude._name", cmNodeName) + ) + .execute() + .actionGet(); + + logger.info(" --> verify non-allocation of primary shard"); + RoutingTable routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + ShardRouting primaryShardRouting = routingTable.index(TEST_INDEX).shard(0).primaryShard(); + assertFalse(primaryShardRouting.active()); + assertNull(primaryShardRouting.currentNodeId()); + assertEquals(primaryShardRouting.state(), ShardRoutingState.UNASSIGNED); + + logger.info("New primary shard can not be allocated to a non-remote node for remote_store direction"); + } + + public void testAllocateNewPrimaryShardOnRemoteNodeForRemoteStoreDirection () throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(); + + logger.info(" --> set mixed cluster compatibility mode"); + setClusterMode("mixed"); + + logger.info(" --> add remote node"); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + assertTrue(remoteNode.isRemoteStoreNode()); + + logger.info(" --> set remote_store direction"); + setDirection("remote_store"); + + logger.info(" --> allocate primary shard"); + client.admin() + .indices() + .prepareCreate(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.include._name", remoteNodeName) + .put("index.routing.allocation.exclude._name", cmNodeName) + ) + .execute() + .actionGet(); + + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of primary shard"); + RoutingTable routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + ShardRouting primaryShardRouting = routingTable.index(TEST_INDEX).shard(0).primaryShard(); + assertTrue(primaryShardRouting.active()); + assertNotNull(primaryShardRouting.currentNodeId()); + assertEquals(primaryShardRouting.currentNodeId(), remoteNode.getId()); + + logger.info("--> New primary shard can only be allocated to a remote node for remote_store direction"); + } + + public void testDontAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnNonRemoteNodeForRemoteStoreDirection () throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(); + + logger.info(" --> set mixed cluster compatibility mode"); + setClusterMode("mixed"); + + logger.info(" --> add remote and non-remote nodes"); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + assertFalse(nonRemoteNode.isRemoteStoreNode()); + assertTrue(remoteNode.isRemoteStoreNode()); + + logger.info(" --> allocate primary shard on non-remote node"); + client.admin() + .indices() + .prepareCreate(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.routing.allocation.include._name", nonRemoteNodeName) + .put("index.routing.allocation.exclude._name", cmNodeName + "," + remoteNodeName) + ) + .setWaitForActiveShards(ActiveShardCount.ONE) + .execute() + .actionGet(); + + logger.info(" --> verify allocation of primary shard"); + RoutingTable routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + ShardRouting primaryShardRouting = routingTable.index(TEST_INDEX).shard(0).primaryShard(); + assertTrue(primaryShardRouting.active()); + assertNotNull(primaryShardRouting.currentNodeId()); + assertEquals(primaryShardRouting.currentNodeId(), nonRemoteNode.getId()); + + logger.info(" --> verify non-allocation of replica shard"); + ShardRouting replicaShardRouting = routingTable.index(TEST_INDEX).shard(0).replicaShards().get(0); + assertFalse(replicaShardRouting.active()); + assertNull(replicaShardRouting.currentNodeId()); + assertEquals(replicaShardRouting.state(), ShardRoutingState.UNASSIGNED); + + logger.info(" --> set remote_store direction"); + setDirection("remote_store"); + + logger.info(" --> allocate replica shard on remote node"); + client.admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.routing.allocation.include._name", remoteNodeName) + .put("index.routing.allocation.exclude._name", cmNodeName + "," + nonRemoteNodeName) + ) + .execute() + .actionGet(); + + logger.info(" --> verify non-allocation of replica shard"); + routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + replicaShardRouting = routingTable.index(TEST_INDEX).shard(0).replicaShards().get(0); + assertFalse(replicaShardRouting.active()); + assertNull(replicaShardRouting.currentNodeId()); + assertEquals(replicaShardRouting.state(), ShardRoutingState.UNASSIGNED); + } + + public void testAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnRemoteNodeForRemoteStoreDirection () throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(); + + logger.info(" --> set mixed cluster compatibility mode"); + setClusterMode("mixed"); + + logger.info(" --> add remote nodes"); + addRemote = true; + String remoteNodeName1 = internalCluster().startNode(); + String remoteNodeName2 = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode remoteNode1 = assertNodeInCluster(remoteNodeName1); + DiscoveryNode remoteNode2 = assertNodeInCluster(remoteNodeName2); + assertTrue(remoteNode1.isRemoteStoreNode()); + assertTrue(remoteNode2.isRemoteStoreNode()); + + logger.info(" --> set remote_store direction"); + setDirection("remote_store"); + + logger.info(" --> allocate primary shard on remote node"); + client.admin() + .indices() + .prepareCreate(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.routing.allocation.include._name", remoteNodeName1) + .put("index.routing.allocation.exclude._name", cmNodeName + "," + remoteNodeName2) + ) + .setWaitForActiveShards(ActiveShardCount.ONE) + .execute() + .actionGet(); + + logger.info(" --> verify allocation of primary shard"); + RoutingTable routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + ShardRouting primaryShardRouting = routingTable.index(TEST_INDEX).shard(0).primaryShard(); + assertTrue(primaryShardRouting.active()); + assertNotNull(primaryShardRouting.currentNodeId()); + assertEquals(primaryShardRouting.currentNodeId(), remoteNode1.getId()); + + logger.info(" --> verify non-allocation of replica shard"); + ShardRouting replicaShardRouting = routingTable.index(TEST_INDEX).shard(0).replicaShards().get(0); + assertFalse(replicaShardRouting.active()); + assertNull(replicaShardRouting.currentNodeId()); + assertEquals(replicaShardRouting.state(), ShardRoutingState.UNASSIGNED); + + logger.info(" --> allocate replica shard on the other remote node"); + client.admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.routing.allocation.include._name", remoteNodeName2) + .put("index.routing.allocation.exclude._name", cmNodeName + "," + remoteNodeName1) + ) + .execute() + .actionGet(); + + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of replica shard"); + routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + replicaShardRouting = routingTable.index(TEST_INDEX).shard(0).replicaShards().get(0); + assertTrue(replicaShardRouting.active()); + assertNotNull(replicaShardRouting.currentNodeId()); + assertEquals(replicaShardRouting.currentNodeId(), remoteNode2.getId()); + } + + public void testAllocateNewReplicaShardOnNonRemoteNodeIfPrimaryShardOnNonRemoteNodeForRemoteStoreDirection () throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(); + + logger.info(" --> add non-remote nodes"); + addRemote = false; + String nonRemoteNodeName1 = internalCluster().startNode(); + String nonRemoteNodeName2 = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode1 = assertNodeInCluster(nonRemoteNodeName1); + DiscoveryNode nonRemoteNode2 = assertNodeInCluster(nonRemoteNodeName2); + assertFalse(nonRemoteNode1.isRemoteStoreNode()); + assertFalse(nonRemoteNode2.isRemoteStoreNode()); + + logger.info(" --> allocate primary shard on non-remote node"); + client.admin() + .indices() + .prepareCreate(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.routing.allocation.include._name", nonRemoteNodeName1) + .put("index.routing.allocation.exclude._name", cmNodeName + "," + nonRemoteNodeName2) + ) + .setWaitForActiveShards(ActiveShardCount.ONE) + .execute() + .actionGet(); + + logger.info(" --> verify allocation of primary shard"); + RoutingTable routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + ShardRouting primaryShardRouting = routingTable.index(TEST_INDEX).shard(0).primaryShard(); + assertTrue(primaryShardRouting.active()); + assertNotNull(primaryShardRouting.currentNodeId()); + assertEquals(primaryShardRouting.currentNodeId(), nonRemoteNode1.getId()); + + logger.info(" --> verify non-allocation of replica shard"); + ShardRouting replicaShardRouting = routingTable.index(TEST_INDEX).shard(0).replicaShards().get(0); + assertFalse(replicaShardRouting.active()); + assertNull(replicaShardRouting.currentNodeId()); + assertEquals(replicaShardRouting.state(), ShardRoutingState.UNASSIGNED); + + logger.info(" --> set remote_store direction"); + setDirection("remote_store"); + + logger.info(" --> allocate replica shard on the other non-remote node"); + client.admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.routing.allocation.include._name", nonRemoteNodeName2) + .put("index.routing.allocation.exclude._name", cmNodeName + "," + nonRemoteNodeName1) + ) + .execute() + .actionGet(); + + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of replica shad"); + routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + replicaShardRouting = routingTable.index(TEST_INDEX).shard(0).replicaShards().get(0); + assertTrue(replicaShardRouting.active()); + assertNotNull(replicaShardRouting.currentNodeId()); + assertEquals(replicaShardRouting.currentNodeId(), nonRemoteNode2.getId()); + } + + public void testAllocateNewReplicaShardOnNonRemoteNodeIfPrimaryShardOnRemoteNodeForRemoteStoreDirection () throws Exception { + logger.info(" --> initialize cluster"); + initializeCluster(); + + logger.info(" --> set mixed cluster compatibility mode"); + setClusterMode("mixed"); + + logger.info(" --> add remote and non-remote nodes"); + addRemote = false; + String nonRemoteNodeName = internalCluster().startNode(); + addRemote = true; + String remoteNodeName = internalCluster().startNode(); + internalCluster().validateClusterFormed(); + DiscoveryNode nonRemoteNode = assertNodeInCluster(nonRemoteNodeName); + DiscoveryNode remoteNode = assertNodeInCluster(remoteNodeName); + assertFalse(nonRemoteNode.isRemoteStoreNode()); + assertTrue(remoteNode.isRemoteStoreNode()); + + logger.info(" --> set remote_store direction"); + setDirection("remote_store"); + + logger.info(" --> allocate primary on remote node"); + client.admin() + .indices() + .prepareCreate(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.routing.allocation.include._name", remoteNodeName) + .put("index.routing.allocation.exclude._name", cmNodeName + "," + nonRemoteNodeName) + ) + .setWaitForActiveShards(ActiveShardCount.ONE) + .execute() + .actionGet(); + + logger.info(" --> verify allocation of primary shard"); + RoutingTable routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + ShardRouting primaryShardRouting = routingTable.index(TEST_INDEX).shard(0).primaryShard(); + assertTrue(primaryShardRouting.active()); + assertNotNull(primaryShardRouting.currentNodeId()); + assertEquals(primaryShardRouting.currentNodeId(), remoteNode.getId()); + + logger.info(" --> verify non-allocation of replica shard"); + ShardRouting replicaShardRouting = routingTable.index(TEST_INDEX).shard(0).replicaShards().get(0); + assertFalse(replicaShardRouting.active()); + assertNull(replicaShardRouting.currentNodeId()); + assertEquals(replicaShardRouting.state(), ShardRoutingState.UNASSIGNED); + + logger.info(" --> allocate replica shard on non-remote node"); + client.admin() + .indices() + .prepareUpdateSettings(TEST_INDEX) + .setSettings( + Settings.builder() + .put("index.routing.allocation.include._name", nonRemoteNodeName) + .put("index.routing.allocation.exclude._name", cmNodeName + "," + remoteNodeName) + ) + .execute() + .actionGet(); + + ensureGreen(TEST_INDEX); + + logger.info(" --> verify allocation of replica shard"); + routingTable = client.admin().cluster().prepareState().execute().actionGet().getState().getRoutingTable(); + replicaShardRouting = routingTable.index(TEST_INDEX).shard(0).replicaShards().get(0); + assertTrue(replicaShardRouting.active()); + assertNotNull(replicaShardRouting.currentNodeId()); + assertEquals(replicaShardRouting.currentNodeId(), nonRemoteNode.getId()); + } + + +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreAllocationDeciderTests.java index c528a246836e8..f02f004a98034 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreAllocationDeciderTests.java @@ -125,10 +125,10 @@ public void testDontAllocateNewPrimaryShardOnNonRemoteNodeForRemoteStoreDirectio ShardRouting primaryShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).primaryShard(); RoutingNode nonRemoteRoutingNode = clusterState.getRoutingNodes().node(nonRemoteNode.getId()); - RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + RemoteStoreAllocationDecider remoteStoreAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); RoutingAllocation routingAllocation = new RoutingAllocation( - new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + new AllocationDeciders(Collections.singleton(remoteStoreAllocationDecider)), clusterState.getRoutingNodes(), clusterState, null, @@ -137,7 +137,7 @@ public void testDontAllocateNewPrimaryShardOnNonRemoteNodeForRemoteStoreDirectio ); routingAllocation.debugDecision(true); - Decision decision = remoteStoreClusterAllocationDecider.canAllocate(primaryShardRouting, nonRemoteRoutingNode, routingAllocation); + Decision decision = remoteStoreAllocationDecider.canAllocate(primaryShardRouting, nonRemoteRoutingNode, routingAllocation); assertThat(decision.type(), is(Decision.Type.NO)); assertThat(decision.getExplanation(), is("for REMOTE_STORE direction, new primary shards can not be allocated to non-remote nodes")); } @@ -161,10 +161,10 @@ public void testAllocateNewPrimaryShardOnRemoteNodeForRemoteStoreDirection () { ShardRouting primaryShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).primaryShard(); RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode.getId()); - RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + RemoteStoreAllocationDecider remoteStoreAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); RoutingAllocation routingAllocation = new RoutingAllocation( - new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + new AllocationDeciders(Collections.singleton(remoteStoreAllocationDecider)), clusterState.getRoutingNodes(), clusterState, null, @@ -173,7 +173,7 @@ public void testAllocateNewPrimaryShardOnRemoteNodeForRemoteStoreDirection () { ); routingAllocation.debugDecision(true); - Decision decision = remoteStoreClusterAllocationDecider.canAllocate(primaryShardRouting, remoteRoutingNode, routingAllocation); + Decision decision = remoteStoreAllocationDecider.canAllocate(primaryShardRouting, remoteRoutingNode, routingAllocation); assertThat(decision.type(), is(Decision.Type.YES)); assertThat(decision.getExplanation(), is("for REMOTE_STORE direction, allocation of a primary shard is allowed on a remote_store_enabled node")); } @@ -245,10 +245,10 @@ public void testDontAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnNonRemote ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode.getId()); - RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + RemoteStoreAllocationDecider remoteStoreAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); RoutingAllocation routingAllocation = new RoutingAllocation( - new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + new AllocationDeciders(Collections.singleton(remoteStoreAllocationDecider)), clusterState.getRoutingNodes(), clusterState, null, @@ -257,7 +257,7 @@ public void testDontAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnNonRemote ); routingAllocation.debugDecision(true); - Decision decision = remoteStoreClusterAllocationDecider.canAllocate(replicaShardRouting, remoteRoutingNode, routingAllocation); + Decision decision = remoteStoreAllocationDecider.canAllocate(replicaShardRouting, remoteRoutingNode, routingAllocation); assertThat(decision.type(), is(Decision.Type.NO)); assertThat(decision.getExplanation(), is("can not allocate replica shard on a remote node when primary shard is not already active on some remote node")); @@ -330,10 +330,10 @@ public void testAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnRemoteNodeFor ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode2.getId()); - RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + RemoteStoreAllocationDecider remoteStoreAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); RoutingAllocation routingAllocation = new RoutingAllocation( - new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + new AllocationDeciders(Collections.singleton(remoteStoreAllocationDecider)), clusterState.getRoutingNodes(), clusterState, null, @@ -342,12 +342,12 @@ public void testAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnRemoteNodeFor ); routingAllocation.debugDecision(true); - Decision decision = remoteStoreClusterAllocationDecider.canAllocate(replicaShardRouting, remoteRoutingNode, routingAllocation); + Decision decision = remoteStoreAllocationDecider.canAllocate(replicaShardRouting, remoteRoutingNode, routingAllocation); assertThat(decision.type(), is(Decision.Type.YES)); assertThat(decision.getExplanation(), is("for REMOTE_STORE direction, allocation of a replica shard is allowed on a remote_store_enabled node")); } - public void testAllocateNewReplicaShardOnNonRemoteNodeForRemoteStoreDirection () { + public void testAllocateNewReplicaShardOnNonRemoteNodeIfPrimaryShardOnNonRemoteNodeForRemoteStoreDirection () { FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); @@ -414,10 +414,10 @@ public void testAllocateNewReplicaShardOnNonRemoteNodeForRemoteStoreDirection () ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); RoutingNode nonRemoteRoutingNode = clusterState.getRoutingNodes().node(nonRemoteNode2.getId()); - RemoteStoreAllocationDecider remoteStoreClusterAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + RemoteStoreAllocationDecider remoteStoreAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); RoutingAllocation routingAllocation = new RoutingAllocation( - new AllocationDeciders(Collections.singleton(remoteStoreClusterAllocationDecider)), + new AllocationDeciders(Collections.singleton(remoteStoreAllocationDecider)), clusterState.getRoutingNodes(), clusterState, null, @@ -426,7 +426,90 @@ public void testAllocateNewReplicaShardOnNonRemoteNodeForRemoteStoreDirection () ); routingAllocation.debugDecision(true); - Decision decision = remoteStoreClusterAllocationDecider.canAllocate(replicaShardRouting, nonRemoteRoutingNode, routingAllocation); + Decision decision = remoteStoreAllocationDecider.canAllocate(replicaShardRouting, nonRemoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.YES)); + assertThat(decision.getExplanation(), is("for REMOTE_STORE direction, allocation of a replica shard is allowed on a non_remote_store_enabled node")); + } + + public void testAllocateNewReplicaShardOnNonRemoteNodeIfPrimaryShardOnRemoteNodeForRemoteStoreDirection () { + FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); + + ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + + DiscoveryNode nonRemoteNode = getNonRemoteNode(); + assertFalse(nonRemoteNode.isRemoteStoreNode()); + DiscoveryNode remoteNode = getRemoteNode(); + assertTrue(remoteNode.isRemoteStoreNode()); + + Metadata metadata = Metadata.builder() + .put( + IndexMetadata.builder(shardId.getIndexName()) + .settings(settings(Version.CURRENT).put(remoteStoreDirectionSettings).put(directionEnabledNodeSettings)) + .numberOfShards(1) + .numberOfReplicas(1) + ) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId).addShard( + // primary shard on non-remote node + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + remoteNode.getId(), + true, + ShardRoutingState.STARTED + ) + ) + .addShard( + // new replica's allocation + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode) + .localNodeId(nonRemoteNode.getId()) + .add(remoteNode) + .localNodeId(remoteNode.getId()) + .build(); + + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(metadata) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + + ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + RoutingNode nonRemoteRoutingNode = clusterState.getRoutingNodes().node(nonRemoteNode.getId()); + + RemoteStoreAllocationDecider remoteStoreAllocationDecider = new RemoteStoreAllocationDecider(remoteStoreDirectionSettings, remoteStoreDirectionClusterSettings); + + RoutingAllocation routingAllocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(remoteStoreAllocationDecider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + routingAllocation.debugDecision(true); + + Decision decision = remoteStoreAllocationDecider.canAllocate(replicaShardRouting, nonRemoteRoutingNode, routingAllocation); assertThat(decision.type(), is(Decision.Type.YES)); assertThat(decision.getExplanation(), is("for REMOTE_STORE direction, allocation of a replica shard is allowed on a non_remote_store_enabled node")); }