Skip to content

Commit

Permalink
Include last-committed data in publication (#92277)
Browse files Browse the repository at this point in the history
The cluster coordination consistency layer relies on a couple of fields
within `Metadata` which record the last _committed_ values on each node.
In contrast, the rest of the cluster state can only be changed at
_accept_ time.

In the past we would copy these fields over from the master on every
publication, but since #90101 we don't copy anything at all if the
`Metadata` is unchanged on the master. However, the master computes the
diff against the last _committed_ state whereas the receiving nodes
apply the diff to the last _accepted_ state, and this means if the
master sends a no-op `Metadata` diff then the receiving node will revert
its last-committed values to the ones included in the state it last
accepted.

With this commit we include the last-committed values alongside the
cluster state diff so that they are always copied properly.

Closes #90158
Backport of #92259 to 8.6
  • Loading branch information
DaveCTurner committed Dec 12, 2022
1 parent ab602ae commit 890f010
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/92259.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 92259
summary: Include last-committed data in publication
area: Cluster Coordination
type: bug
issues:
- 90158
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class PublicationTransportHandler {
TransportRequestOptions.Type.STATE
);

public static final Version INCLUDES_LAST_COMMITTED_DATA_VERSION = Version.V_8_6_0;

private final SerializationStatsTracker serializationStatsTracker = new SerializationStatsTracker();

public PublicationTransportHandler(
Expand Down Expand Up @@ -131,6 +133,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
// Close early to release resources used by the de-compression as early as possible
try (StreamInput input = in) {
incomingState = ClusterState.readFrom(input, transportService.getLocalNode());
assert input.read() == -1;
} catch (Exception e) {
logger.warn("unexpected error while deserializing an incoming cluster state", e);
assert false : e;
Expand All @@ -151,11 +154,30 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
ClusterState incomingState;
try {
final Diff<ClusterState> diff;
final boolean includesLastCommittedData = request.version().onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION);
final boolean clusterUuidCommitted;
final CoordinationMetadata.VotingConfiguration lastCommittedConfiguration;

// Close stream early to release resources used by the de-compression as early as possible
try (StreamInput input = in) {
diff = ClusterState.readDiffFrom(input, lastSeen.nodes().getLocalNode());
if (includesLastCommittedData) {
clusterUuidCommitted = in.readBoolean();
lastCommittedConfiguration = new CoordinationMetadata.VotingConfiguration(in);
} else {
clusterUuidCommitted = false;
lastCommittedConfiguration = null;
}
assert input.read() == -1;
}
incomingState = diff.apply(lastSeen); // might throw IncompatibleClusterStateVersionException
if (includesLastCommittedData) {
final var adjustedMetadata = incomingState.metadata()
.withLastCommittedValues(clusterUuidCommitted, lastCommittedConfiguration);
if (adjustedMetadata != incomingState.metadata()) {
incomingState = ClusterState.builder(incomingState).metadata(adjustedMetadata).build();
}
}
} catch (IncompatibleClusterStateVersionException e) {
incompatibleClusterStateDiffReceivedCount.incrementAndGet();
throw e;
Expand Down Expand Up @@ -239,7 +261,8 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
}
}

private ReleasableBytesReference serializeDiffClusterState(long clusterStateVersion, Diff<ClusterState> diff, DiscoveryNode node) {
private ReleasableBytesReference serializeDiffClusterState(ClusterState newState, Diff<ClusterState> diff, DiscoveryNode node) {
final long clusterStateVersion = newState.version();
final Version nodeVersion = node.getVersion();
final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream();
boolean success = false;
Expand All @@ -253,6 +276,10 @@ private ReleasableBytesReference serializeDiffClusterState(long clusterStateVers
stream.setVersion(nodeVersion);
stream.writeBoolean(false);
diff.writeTo(stream);
if (nodeVersion.onOrAfter(INCLUDES_LAST_COMMITTED_DATA_VERSION)) {
stream.writeBoolean(newState.metadata().clusterUUIDCommitted());
newState.getLastCommittedConfiguration().writeTo(stream);
}
uncompressedBytes = stream.position();
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
Expand Down Expand Up @@ -316,7 +343,7 @@ void buildDiffAndSerializeStates() {
} else {
serializedDiffs.computeIfAbsent(
node.getVersion(),
v -> serializeDiffClusterState(newState.version(), diffSupplier.getOrCompute(), node)
v -> serializeDiffClusterState(newState, diffSupplier.getOrCompute(), node)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
import org.elasticsearch.cluster.coordination.PublicationTransportHandler;
import org.elasticsearch.cluster.metadata.IndexAbstraction.ConcreteIndex;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -440,6 +441,42 @@ public Metadata withCoordinationMetadata(CoordinationMetadata coordinationMetada
);
}

public Metadata withLastCommittedValues(
boolean clusterUUIDCommitted,
CoordinationMetadata.VotingConfiguration lastCommittedConfiguration
) {
if (clusterUUIDCommitted == this.clusterUUIDCommitted
&& lastCommittedConfiguration.equals(this.coordinationMetadata.getLastCommittedConfiguration())) {
return this;
}
return new Metadata(
clusterUUID,
clusterUUIDCommitted,
version,
CoordinationMetadata.builder(coordinationMetadata).lastCommittedConfiguration(lastCommittedConfiguration).build(),
transientSettings,
persistentSettings,
settings,
hashesOfConsistentSettings,
totalNumberOfShards,
totalOpenIndexShards,
indices,
aliasedIndices,
templates,
customs,
allIndices,
visibleIndices,
allOpenIndices,
visibleOpenIndices,
allClosedIndices,
visibleClosedIndices,
indicesLookup,
mappingsByHash,
oldestIndexVersion,
reservedStateMetadata
);
}

/**
* Creates a copy of this instance updated with the given {@link IndexMetadata} that must only contain changes to primary terms
* and in-sync allocation ids relative to the existing entries. This method is only used by
Expand Down Expand Up @@ -1332,6 +1369,7 @@ public Map<String, MappingMetadata> getMappingsByHash() {
private static class MetadataDiff implements Diff<Metadata> {

private static final Version NOOP_METADATA_DIFF_VERSION = Version.V_8_5_0;
private static final Version NOOP_METADATA_DIFF_SAFE_VERSION = PublicationTransportHandler.INCLUDES_LAST_COMMITTED_DATA_VERSION;

private final long version;
private final String clusterUUID;
Expand Down Expand Up @@ -1418,12 +1456,15 @@ private MetadataDiff(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(NOOP_METADATA_DIFF_VERSION)) {
if (out.getVersion().onOrAfter(NOOP_METADATA_DIFF_SAFE_VERSION)) {
out.writeBoolean(empty);
if (empty) {
// noop diff
return;
}
} else if (out.getVersion().onOrAfter(NOOP_METADATA_DIFF_VERSION)) {
// noops are not safe with these versions, see #92259
out.writeBoolean(false);
}
out.writeString(clusterUUID);
out.writeBoolean(clusterUUIDCommitted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,6 @@ public void testUnresponsiveLeaderDetectedEventually() {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/90158")
public void testUnhealthyLeaderIsReplaced() {
final AtomicReference<StatusInfo> nodeHealthServiceStatus = new AtomicReference<>(new StatusInfo(HEALTHY, "healthy-info"));
final int initialClusterSize = between(1, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfiguration;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -30,6 +31,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.Task;
Expand All @@ -42,17 +44,23 @@
import org.elasticsearch.transport.BytesRefRecycler;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TestTransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.service.MasterService.STATE_UPDATE_ACTION_NAME;
Expand Down Expand Up @@ -330,4 +338,170 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public void testIncludesLastCommittedFieldsInDiffSerialization() {
final var deterministicTaskQueue = new DeterministicTaskQueue();
final var threadPool = deterministicTaskQueue.getThreadPool();

final var transportsByNode = new HashMap<DiscoveryNode, MockTransport>();
final var transportHandlersByNode = new HashMap<DiscoveryNode, PublicationTransportHandler>();
final var transportServicesByNode = new HashMap<DiscoveryNode, TransportService>();
final var receivedStateRef = new AtomicReference<ClusterState>();
final var completed = new AtomicBoolean();

final var localNode = new DiscoveryNode("localNode", buildNewFakeTransportAddress(), Version.CURRENT);
final var otherNode = new DiscoveryNode(
"otherNode",
buildNewFakeTransportAddress(),
VersionUtils.randomCompatibleVersion(random(), Version.CURRENT)
);
for (final var discoveryNode : List.of(localNode, otherNode)) {
final var transport = new MockTransport() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
@SuppressWarnings("unchecked")
final var context = (ResponseContext<TransportResponse>) getResponseHandlers().remove(requestId);
try {
transportsByNode.get(node)
.getRequestHandlers()
.getHandler(action)
.getHandler()
.messageReceived(request, new TestTransportChannel(new ActionListener<>() {
@Override
public void onResponse(TransportResponse transportResponse) {
context.handler().handleResponse(transportResponse);
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("unexpected", e);
}
}), new Task(randomNonNegativeLong(), "test", "test", "", TaskId.EMPTY_TASK_ID, Map.of()));
} catch (IncompatibleClusterStateVersionException e) {
context.handler().handleException(new RemoteTransportException("wrapped", e));
} catch (Exception e) {
throw new AssertionError("unexpected", e);
}
}
};
transportsByNode.put(discoveryNode, transport);

final var transportService = transport.createTransportService(
Settings.EMPTY,
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
ignored -> discoveryNode,
null,
Set.of()
);
transportServicesByNode.put(discoveryNode, transportService);

final var publicationTransportHandler = new PublicationTransportHandler(
transportService,
writableRegistry(),
publishRequest -> {
assertTrue(receivedStateRef.compareAndSet(null, publishRequest.getAcceptedState()));
return new PublishWithJoinResponse(
new PublishResponse(publishRequest.getAcceptedState().term(), publishRequest.getAcceptedState().version()),
Optional.empty()
);
}
);
transportHandlersByNode.put(discoveryNode, publicationTransportHandler);
}

for (final var transportService : transportServicesByNode.values()) {
transportService.start();
transportService.acceptIncomingRequests();
}

threadPool.getThreadContext().markAsSystemContext();

final var clusterState0 = ClusterState.builder(ClusterState.EMPTY_STATE)
.nodes(DiscoveryNodes.builder().add(localNode).add(otherNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
.metadata(
Metadata.builder()
.coordinationMetadata(
CoordinationMetadata.builder().lastAcceptedConfiguration(VotingConfiguration.of(localNode)).build()
)
.generateClusterUuidIfNeeded()
)
.build();

final ClusterState receivedState0;
var context0 = transportHandlersByNode.get(localNode)
.newPublicationContext(
new ClusterStatePublicationEvent(
new BatchSummary("test"),
clusterState0,
clusterState0,
new Task(randomNonNegativeLong(), "test", "test", "", TaskId.EMPTY_TASK_ID, Map.of()),
0L,
0L
)
);
try {
context0.sendPublishRequest(
otherNode,
new PublishRequest(clusterState0),
ActionListener.wrap(() -> assertTrue(completed.compareAndSet(false, true)))
);
assertTrue(completed.getAndSet(false));
receivedState0 = receivedStateRef.getAndSet(null);
assertEquals(clusterState0.stateUUID(), receivedState0.stateUUID());
assertEquals(otherNode, receivedState0.nodes().getLocalNode());
assertFalse(receivedState0.metadata().clusterUUIDCommitted());
assertEquals(VotingConfiguration.of(), receivedState0.getLastCommittedConfiguration());
final var receivedStateStats = transportHandlersByNode.get(otherNode).stats();
assertEquals(0, receivedStateStats.getCompatibleClusterStateDiffReceivedCount());
assertEquals(1, receivedStateStats.getIncompatibleClusterStateDiffReceivedCount());
assertEquals(1, receivedStateStats.getFullClusterStateReceivedCount());
} finally {
context0.decRef();
}

final var committedClusterState0 = ClusterState.builder(clusterState0)
.metadata(clusterState0.metadata().withLastCommittedValues(true, clusterState0.getLastAcceptedConfiguration()))
.build();
assertEquals(clusterState0.stateUUID(), committedClusterState0.stateUUID());
assertEquals(clusterState0.term(), committedClusterState0.term());
assertEquals(clusterState0.version(), committedClusterState0.version());

final var clusterState1 = ClusterState.builder(committedClusterState0).incrementVersion().build();
assertSame(committedClusterState0.metadata(), clusterState1.metadata());

var context1 = transportHandlersByNode.get(localNode)
.newPublicationContext(
new ClusterStatePublicationEvent(
new BatchSummary("test"),
committedClusterState0,
clusterState1,
new Task(randomNonNegativeLong(), "test", "test", "", TaskId.EMPTY_TASK_ID, Map.of()),
0L,
0L
)
);
try {
context1.sendPublishRequest(
otherNode,
new PublishRequest(clusterState1),
ActionListener.wrap(() -> assertTrue(completed.compareAndSet(false, true)))
);
assertTrue(completed.getAndSet(false));
var receivedState1 = receivedStateRef.getAndSet(null);
assertEquals(clusterState1.stateUUID(), receivedState1.stateUUID());
assertEquals(otherNode, receivedState1.nodes().getLocalNode());
assertSame(receivedState0.nodes(), receivedState1.nodes()); // it was a diff
assertTrue(receivedState1.metadata().clusterUUIDCommitted());
assertEquals(VotingConfiguration.of(localNode), receivedState1.getLastCommittedConfiguration());
final var receivedStateStats = transportHandlersByNode.get(otherNode).stats();
assertEquals(1, receivedStateStats.getCompatibleClusterStateDiffReceivedCount());
assertEquals(1, receivedStateStats.getIncompatibleClusterStateDiffReceivedCount());
assertEquals(1, receivedStateStats.getFullClusterStateReceivedCount());
} finally {
context1.decRef();
}

assertFalse(deterministicTaskQueue.hasRunnableTasks());
assertFalse(deterministicTaskQueue.hasDeferredTasks());
}
}

0 comments on commit 890f010

Please sign in to comment.