Skip to content

Commit

Permalink
Ensure tasks preserve versions in MasterService (#109850)
Browse files Browse the repository at this point in the history
`ClusterState#version`, `Metadata#version` and `RoutingTable#version`
are all managed solely by the `MasterService`, in the sense that it's a
definite bug for the cluster state update task executor to meddle with
them. Today if we encounter such a bug then we try and publish the
resulting state anyway, which hopefully fails (triggering a master
election) but it may in theory succeed (potentially reverting older
cluster state updates). Neither is a particularly good outcome.

With this commit we add a check for consistency of these version numbers
during the cluster state computation and fail the state update without a
master failover if a discrepancy is found.

It also fixes a super-subtle bug in `TransportMigrateToDataTiersAction`
that can muck up these version numbers.
  • Loading branch information
DaveCTurner committed Jun 19, 2024
1 parent c8da581 commit 5aa9f44
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 9 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/109850.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 109850
summary: Ensure tasks preserve versions in `MasterService`
area: Cluster Coordination
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,24 @@ public Builder incrementVersion(ClusterState clusterState) {
return ClusterState.builder(clusterState).incrementVersion();
}

private static boolean versionNumbersPreserved(ClusterState oldState, ClusterState newState) {
if (oldState.nodes().getMasterNodeId() == null && newState.nodes().getMasterNodeId() != null) {
return true; // NodeJoinExecutor is special, we trust it to do the right thing with versions
}

if (oldState.version() != newState.version()) {
return false;
}
if (oldState.metadata().version() != newState.metadata().version()) {
return false;
}
if (oldState.routingTable().version() != newState.routingTable().version()) {
// GatewayService is special and for odd legacy reasons gets to do this:
return oldState.clusterRecovered() == false && newState.clusterRecovered() && newState.routingTable().version() == 0;
}
return true;
}

/**
* Submits an unbatched cluster state update task. This method exists for legacy reasons but is deprecated and forbidden in new
* production code because unbatched tasks are a source of performance and stability bugs. You should instead implement your update
Expand Down Expand Up @@ -1035,6 +1053,8 @@ private static <T extends ClusterStateTaskListener> boolean assertAllTasksComple
return true;
}

static final String TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME = "test_only_executor_may_change_version_number";

private static <T extends ClusterStateTaskListener> ClusterState innerExecuteTasks(
ClusterState previousClusterState,
List<ExecutionResult<T>> executionResults,
Expand All @@ -1047,13 +1067,23 @@ private static <T extends ClusterStateTaskListener> ClusterState innerExecuteTas
// to avoid leaking headers in production that were missed by tests

try {
return executor.execute(
final var updatedState = executor.execute(
new ClusterStateTaskExecutor.BatchExecutionContext<>(
previousClusterState,
executionResults,
threadContext::newStoredContext
)
);
if (versionNumbersPreserved(previousClusterState, updatedState) == false) {
// Shenanigans! Executors mustn't meddle with version numbers. Perhaps the executor based its update on the wrong
// initial state, potentially losing an intervening cluster state update. That'd be very bad!
final var exception = new IllegalStateException(
"cluster state update executor did not preserve version numbers: [" + summary.toString() + "]"
);
assert threadContext.getTransient(TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME) != null : exception;
throw exception;
}
return updatedState;
} catch (Exception e) {
logger.trace(
() -> format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.component.Lifecycle;
Expand Down Expand Up @@ -77,6 +78,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;
Expand All @@ -93,6 +95,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;

public class MasterServiceTests extends ESTestCase {

Expand Down Expand Up @@ -498,7 +501,7 @@ public void onFailure(Exception e) {}
@Override
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis();
return ClusterState.builder(currentState).incrementVersion().build();
return ClusterState.builder(currentState).build();
}

@Override
Expand Down Expand Up @@ -1243,7 +1246,7 @@ public void onFailure(Exception e) {
public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+ randomLongBetween(1, 1000000);
return ClusterState.builder(currentState).incrementVersion().build();
return ClusterState.builder(currentState).build();
}

@Override
Expand Down Expand Up @@ -1277,7 +1280,7 @@ public void onFailure(Exception e) {
masterService.submitUnbatchedStateUpdateTask("test5", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).incrementVersion().build();
return ClusterState.builder(currentState).build();
}

@Override
Expand All @@ -1293,7 +1296,7 @@ public void onFailure(Exception e) {
masterService.submitUnbatchedStateUpdateTask("test6", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).incrementVersion().build();
return ClusterState.builder(currentState).build();
}

@Override
Expand Down Expand Up @@ -2592,6 +2595,69 @@ public void onFailure(Exception e) {
}
}

public void testVersionNumberProtection() {
runVersionNumberProtectionTest(
currentState -> ClusterState.builder(currentState)
.version(randomFrom(currentState.version() - 1, currentState.version() + 1))
.build()
);

runVersionNumberProtectionTest(
currentState -> currentState.copyAndUpdateMetadata(
b -> b.version(randomFrom(currentState.metadata().version() - 1, currentState.metadata().version() + 1))
)
);

runVersionNumberProtectionTest(
currentState -> ClusterState.builder(currentState)
.routingTable(
RoutingTable.builder(currentState.routingTable())
.version(randomFrom(currentState.routingTable().version() - 1, currentState.routingTable().version() + 1))
.build()
)
.build()
);
}

private void runVersionNumberProtectionTest(UnaryOperator<ClusterState> updateOperator) {
final var deterministicTaskQueue = new DeterministicTaskQueue();
final var threadPool = deterministicTaskQueue.getThreadPool();
final var threadContext = threadPool.getThreadContext();
final var failureCaught = new AtomicBoolean();

try (
var masterService = createMasterService(true, null, threadPool, deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor());
var ignored = threadContext.stashContext()
) {
final var taskId = randomIdentifier();

masterService.submitUnbatchedStateUpdateTask(taskId, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return updateOperator.apply(currentState);
}

@Override
public void onFailure(Exception e) {
assertThat(
asInstanceOf(IllegalStateException.class, e).getMessage(),
allOf(startsWith("cluster state update executor did not preserve version numbers"), containsString(taskId))
);
assertTrue(failureCaught.compareAndSet(false, true));
}
});

// suppress assertion errors to check production behaviour
threadContext.putTransient(MasterService.TEST_ONLY_EXECUTOR_MAY_CHANGE_VERSION_NUMBER_TRANSIENT_NAME, new Object());
threadContext.markAsSystemContext();
deterministicTaskQueue.runAllRunnableTasks();
assertFalse(deterministicTaskQueue.hasRunnableTasks());
assertFalse(deterministicTaskQueue.hasDeferredTasks());

assertTrue(failureCaught.get());
}
}

/**
* Returns the cluster state that the master service uses (and that is provided by the discovery layer)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingRoleStrategy;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -494,12 +495,22 @@ public void onFailure(Exception e) {

private MasterServiceTaskQueue<SetClusterStateTask> createSetClusterStateTaskQueue(ClusterService clusterService) {
return clusterService.createTaskQueue("set-cluster-state", Priority.NORMAL, batchExecutionContext -> {
ClusterState targetState = batchExecutionContext.initialState();
final var initialState = batchExecutionContext.initialState();
var targetState = initialState;
for (var taskContext : batchExecutionContext.taskContexts()) {
targetState = taskContext.getTask().clusterState();
taskContext.success(() -> {});
}
return targetState;
// fix up the version numbers
final var finalStateBuilder = ClusterState.builder(targetState)
.version(initialState.version())
.metadata(Metadata.builder(targetState.metadata()).version(initialState.metadata().version()));
if (initialState.clusterRecovered() || targetState.clusterRecovered() == false) {
finalStateBuilder.routingTable(
RoutingTable.builder(targetState.routingTable()).version(initialState.routingTable().version())
);
}
return finalStateBuilder.build();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ protected void masterOperation(
final SetOnce<MigratedEntities> migratedEntities = new SetOnce<>();
submitUnbatchedTask("migrate-to-data-tiers []", new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState execute(ClusterState currentState) {
Tuple<ClusterState, MigratedEntities> migratedEntitiesTuple = migrateToDataTiersRouting(
state,
currentState,
request.getNodeAttributeName(),
request.getLegacyTemplateToDelete(),
xContentRegistry,
Expand Down

0 comments on commit 5aa9f44

Please sign in to comment.