diff --git a/docs/changelog/109850.yaml b/docs/changelog/109850.yaml new file mode 100644 index 0000000000000..0f11318765aea --- /dev/null +++ b/docs/changelog/109850.yaml @@ -0,0 +1,5 @@ +pr: 109850 +summary: Ensure tasks preserve versions in `MasterService` +area: Cluster Coordination +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java index 7f9720b64cca6..296acc30a83f5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/MasterService.java @@ -524,6 +524,24 @@ public Builder incrementVersion(ClusterState clusterState) { return ClusterState.builder(clusterState).incrementVersion(); } + private static boolean versionNumbersPreserved(ClusterState oldState, ClusterState newState) { + if (oldState.nodes().getMasterNodeId() == null && newState.nodes().getMasterNodeId() != null) { + return true; // NodeJoinExecutor is special, we trust it to do the right thing with versions + } + + if (oldState.version() != newState.version()) { + return false; + } + if (oldState.metadata().version() != newState.metadata().version()) { + return false; + } + if (oldState.routingTable().version() != newState.routingTable().version()) { + // GatewayService is special and for odd legacy reasons gets to do this: + return oldState.clusterRecovered() == false && newState.clusterRecovered() && newState.routingTable().version() == 0; + } + return true; + } + /** * Submits an unbatched cluster state update task. This method exists for legacy reasons but is deprecated and forbidden in new * production code because unbatched tasks are a source of performance and stability bugs. You should instead implement your update @@ -1035,6 +1053,8 @@ private static boolean assertAllTasksComple return true; } + static final String TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME = "test_only_executor_may_change_version_number"; + private static ClusterState innerExecuteTasks( ClusterState previousClusterState, List> executionResults, @@ -1047,13 +1067,23 @@ private static ClusterState innerExecuteTas // to avoid leaking headers in production that were missed by tests try { - return executor.execute( + final var updatedState = executor.execute( new ClusterStateTaskExecutor.BatchExecutionContext<>( previousClusterState, executionResults, threadContext::newStoredContext ) ); + if (versionNumbersPreserved(previousClusterState, updatedState) == false) { + // Shenanigans! Executors mustn't meddle with version numbers. Perhaps the executor based its update on the wrong + // initial state, potentially losing an intervening cluster state update. That'd be very bad! + final var exception = new IllegalStateException( + "cluster state update executor did not preserve version numbers: [" + summary.toString() + "]" + ); + assert threadContext.getTransient(TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME) != null : exception; + throw exception; + } + return updatedState; } catch (Exception e) { logger.trace( () -> format( diff --git a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java index 43f3943c9c041..26faa295cf727 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/MasterServiceTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.component.Lifecycle; @@ -77,6 +78,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static java.util.Collections.emptySet; @@ -93,6 +95,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.startsWith; public class MasterServiceTests extends ESTestCase { @@ -498,7 +501,7 @@ public void onFailure(Exception e) {} @Override public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis(); - return ClusterState.builder(currentState).incrementVersion().build(); + return ClusterState.builder(currentState).build(); } @Override @@ -1243,7 +1246,7 @@ public void onFailure(Exception e) { public ClusterState execute(ClusterState currentState) { relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis() + randomLongBetween(1, 1000000); - return ClusterState.builder(currentState).incrementVersion().build(); + return ClusterState.builder(currentState).build(); } @Override @@ -1277,7 +1280,7 @@ public void onFailure(Exception e) { masterService.submitUnbatchedStateUpdateTask("test5", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - return ClusterState.builder(currentState).incrementVersion().build(); + return ClusterState.builder(currentState).build(); } @Override @@ -1293,7 +1296,7 @@ public void onFailure(Exception e) { masterService.submitUnbatchedStateUpdateTask("test6", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - return ClusterState.builder(currentState).incrementVersion().build(); + return ClusterState.builder(currentState).build(); } @Override @@ -2592,6 +2595,69 @@ public void onFailure(Exception e) { } } + public void testVersionNumberProtection() { + runVersionNumberProtectionTest( + currentState -> ClusterState.builder(currentState) + .version(randomFrom(currentState.version() - 1, currentState.version() + 1)) + .build() + ); + + runVersionNumberProtectionTest( + currentState -> currentState.copyAndUpdateMetadata( + b -> b.version(randomFrom(currentState.metadata().version() - 1, currentState.metadata().version() + 1)) + ) + ); + + runVersionNumberProtectionTest( + currentState -> ClusterState.builder(currentState) + .routingTable( + RoutingTable.builder(currentState.routingTable()) + .version(randomFrom(currentState.routingTable().version() - 1, currentState.routingTable().version() + 1)) + .build() + ) + .build() + ); + } + + private void runVersionNumberProtectionTest(UnaryOperator updateOperator) { + final var deterministicTaskQueue = new DeterministicTaskQueue(); + final var threadPool = deterministicTaskQueue.getThreadPool(); + final var threadContext = threadPool.getThreadContext(); + final var failureCaught = new AtomicBoolean(); + + try ( + var masterService = createMasterService(true, null, threadPool, deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor()); + var ignored = threadContext.stashContext() + ) { + final var taskId = randomIdentifier(); + + masterService.submitUnbatchedStateUpdateTask(taskId, new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return updateOperator.apply(currentState); + } + + @Override + public void onFailure(Exception e) { + assertThat( + asInstanceOf(IllegalStateException.class, e).getMessage(), + allOf(startsWith("cluster state update executor did not preserve version numbers"), containsString(taskId)) + ); + assertTrue(failureCaught.compareAndSet(false, true)); + } + }); + + // suppress assertion errors to check production behaviour + threadContext.putTransient(MasterService.TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME, new Object()); + threadContext.markAsSystemContext(); + deterministicTaskQueue.runAllRunnableTasks(); + assertFalse(deterministicTaskQueue.hasRunnableTasks()); + assertFalse(deterministicTaskQueue.hasDeferredTasks()); + + assertTrue(failureCaught.get()); + } + } + /** * Returns the cluster state that the master service uses (and that is provided by the discovery layer) */ diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index 2136b154480ff..0524412cff70b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingRoleStrategy; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; @@ -494,12 +495,22 @@ public void onFailure(Exception e) { private MasterServiceTaskQueue createSetClusterStateTaskQueue(ClusterService clusterService) { return clusterService.createTaskQueue("set-cluster-state", Priority.NORMAL, batchExecutionContext -> { - ClusterState targetState = batchExecutionContext.initialState(); + final var initialState = batchExecutionContext.initialState(); + var targetState = initialState; for (var taskContext : batchExecutionContext.taskContexts()) { targetState = taskContext.getTask().clusterState(); taskContext.success(() -> {}); } - return targetState; + // fix up the version numbers + final var finalStateBuilder = ClusterState.builder(targetState) + .version(initialState.version()) + .metadata(Metadata.builder(targetState.metadata()).version(initialState.metadata().version())); + if (initialState.clusterRecovered() || targetState.clusterRecovered() == false) { + finalStateBuilder.routingTable( + RoutingTable.builder(targetState.routingTable()).version(initialState.routingTable().version()) + ); + } + return finalStateBuilder.build(); }); } } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportMigrateToDataTiersAction.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportMigrateToDataTiersAction.java index 8cc14a42eb5f3..472b9bdd0b800 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportMigrateToDataTiersAction.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportMigrateToDataTiersAction.java @@ -123,9 +123,9 @@ protected void masterOperation( final SetOnce migratedEntities = new SetOnce<>(); submitUnbatchedTask("migrate-to-data-tiers []", new ClusterStateUpdateTask(Priority.HIGH) { @Override - public ClusterState execute(ClusterState currentState) throws Exception { + public ClusterState execute(ClusterState currentState) { Tuple migratedEntitiesTuple = migrateToDataTiersRouting( - state, + currentState, request.getNodeAttributeName(), request.getLegacyTemplateToDelete(), xContentRegistry,