Skip to content

Commit

Permalink
Make MasterService.patchVersions not Rebuild the Full CS (#79860) (#7…
Browse files Browse the repository at this point in the history
…9900)

This makes the method really just patch the version via a cheap
copy constructor. Moreover, it makes the cluster state builder smarter
when it comes to updating the routing nodes so they aren't rebuilt
so often as well.
  • Loading branch information
original-brownbear committed Oct 27, 2021
1 parent be29fe1 commit cb35978
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 46 deletions.
38 changes: 31 additions & 7 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.discovery.Discovery;
Expand Down Expand Up @@ -172,12 +173,21 @@ default boolean isPrivate() {

public ClusterState(long version, String stateUUID, ClusterState state) {
this(state.clusterName, version, stateUUID, state.metadata(), state.routingTable(), state.nodes(), state.blocks(),
state.customs(), -1, false);
}

public ClusterState(ClusterName clusterName, long version, String stateUUID, Metadata metadata, RoutingTable routingTable,
DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs,
int minimumMasterNodesOnPublishingMaster, boolean wasReadFromDiff) {
state.customs(), -1, false, state.routingNodes);
}
public ClusterState(
ClusterName clusterName,
long version,
String stateUUID,
Metadata metadata,
RoutingTable routingTable,
DiscoveryNodes nodes,
ClusterBlocks blocks,
ImmutableOpenMap<String, Custom> customs,
int minimumMasterNodesOnPublishingMaster,
boolean wasReadFromDiff,
@Nullable RoutingNodes routingNodes
) {
this.version = version;
this.stateUUID = stateUUID;
this.clusterName = clusterName;
Expand All @@ -188,6 +198,9 @@ public ClusterState(ClusterName clusterName, long version, String stateUUID, Met
this.customs = customs;
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
this.wasReadFromDiff = wasReadFromDiff;
this.routingNodes = routingNodes;
assert routingNodes == null || routingNodes.equals(new RoutingNodes(this)) :
"RoutingNodes [" + routingNodes + "] are not consistent with this cluster state [" + new RoutingNodes(this) + "]";
}

public long term() {
Expand Down Expand Up @@ -550,6 +563,8 @@ public static Builder builder(ClusterState state) {

public static class Builder {

private ClusterState previous;

private final ClusterName clusterName;
private long version = 0;
private String uuid = UNKNOWN_UUID;
Expand All @@ -562,6 +577,7 @@ public static class Builder {
private int minimumMasterNodesOnPublishingMaster = -1;

public Builder(ClusterState state) {
this.previous = state;
this.clusterName = state.clusterName;
this.version = state.version();
this.uuid = state.stateUUID();
Expand Down Expand Up @@ -661,8 +677,16 @@ public ClusterState build() {
if (UNKNOWN_UUID.equals(uuid)) {
uuid = UUIDs.randomBase64UUID();
}
final RoutingNodes routingNodes;
if (previous != null && routingTable.indicesRouting() == previous.routingTable.indicesRouting() && nodes == previous.nodes) {
// routing table contents and nodes haven't changed so we can try to reuse the previous state's routing nodes which are
// expensive to compute
routingNodes = previous.routingNodes;
} else {
routingNodes = null;
}
return new ClusterState(clusterName, version, uuid, metadata, routingTable, nodes, blocks, customs.build(),
minimumMasterNodesOnPublishingMaster, fromDiff);
minimumMasterNodesOnPublishingMaster, fromDiff, routingNodes);
}

public static byte[] toBytes(ClusterState state) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,34 +194,41 @@ public interface NonRestorableCustom extends Custom {

private SortedMap<String, IndexAbstraction> indicesLookup;

Metadata(String clusterUUID, boolean clusterUUIDCommitted, long version, CoordinationMetadata coordinationMetadata,
Settings transientSettings, Settings persistentSettings, DiffableStringMap hashesOfConsistentSettings,
ImmutableOpenMap<String, IndexMetadata> indices, ImmutableOpenMap<String, IndexTemplateMetadata> templates,
ImmutableOpenMap<String, Custom> customs, String[] allIndices, String[] visibleIndices, String[] allOpenIndices,
String[] visibleOpenIndices, String[] allClosedIndices, String[] visibleClosedIndices,
SortedMap<String, IndexAbstraction> indicesLookup) {
private Metadata(
String clusterUUID,
boolean clusterUUIDCommitted,
long version,
CoordinationMetadata coordinationMetadata,
Settings transientSettings,
Settings persistentSettings,
Settings settings,
DiffableStringMap hashesOfConsistentSettings,
int totalNumberOfShards,
int totalOpenIndexShards,
ImmutableOpenMap<String, IndexMetadata> indices,
ImmutableOpenMap<String, IndexTemplateMetadata> templates,
ImmutableOpenMap<String, Custom> customs,
String[] allIndices,
String[] visibleIndices,
String[] allOpenIndices,
String[] visibleOpenIndices,
String[] allClosedIndices,
String[] visibleClosedIndices,
SortedMap<String, IndexAbstraction> indicesLookup
) {
this.clusterUUID = clusterUUID;
this.clusterUUIDCommitted = clusterUUIDCommitted;
this.version = version;
this.coordinationMetadata = coordinationMetadata;
this.transientSettings = transientSettings;
this.persistentSettings = persistentSettings;
this.settings = Settings.builder().put(persistentSettings).put(transientSettings).build();
this.settings = settings;
this.hashesOfConsistentSettings = hashesOfConsistentSettings;
this.indices = indices;
this.customs = customs;
this.templates = templates;
int totalNumberOfShards = 0;
int totalOpenIndexShards = 0;
for (IndexMetadata indexMetadata : indices.values()) {
totalNumberOfShards += indexMetadata.getTotalNumberOfShards();
if (IndexMetadata.State.OPEN.equals(indexMetadata.getState())) {
totalOpenIndexShards += indexMetadata.getTotalNumberOfShards();
}
}
this.totalNumberOfShards = totalNumberOfShards;
this.totalOpenIndexShards = totalOpenIndexShards;

this.allIndices = allIndices;
this.visibleIndices = visibleIndices;
this.allOpenIndices = allOpenIndices;
Expand All @@ -231,6 +238,31 @@ public interface NonRestorableCustom extends Custom {
this.indicesLookup = indicesLookup;
}

public Metadata withIncrementedVersion() {
return new Metadata(
clusterUUID,
clusterUUIDCommitted,
version + 1,
coordinationMetadata,
transientSettings,
persistentSettings,
settings,
hashesOfConsistentSettings,
totalNumberOfShards,
totalOpenIndexShards,
indices,
templates,
customs,
allIndices,
visibleIndices,
allOpenIndices,
visibleOpenIndices,
allClosedIndices,
visibleClosedIndices,
indicesLookup
);
}

public long version() {
return this.version;
}
Expand Down Expand Up @@ -1740,9 +1772,37 @@ public Metadata build(boolean builtIndicesLookupEagerly) {
String[] allClosedIndicesArray = allClosedIndices.toArray(Strings.EMPTY_ARRAY);
String[] visibleClosedIndicesArray = visibleClosedIndices.toArray(Strings.EMPTY_ARRAY);

return new Metadata(clusterUUID, clusterUUIDCommitted, version, coordinationMetadata, transientSettings, persistentSettings,
hashesOfConsistentSettings, indices, templates.build(), customs.build(), allIndicesArray, visibleIndicesArray,
allOpenIndicesArray, visibleOpenIndicesArray, allClosedIndicesArray, visibleClosedIndicesArray, indicesLookup);
int totalNumberOfShards = 0;
int totalOpenIndexShards = 0;
for (IndexMetadata indexMetadata : indices.values()) {
totalNumberOfShards += indexMetadata.getTotalNumberOfShards();
if (IndexMetadata.State.OPEN.equals(indexMetadata.getState())) {
totalOpenIndexShards += indexMetadata.getTotalNumberOfShards();
}
}

return new Metadata(
clusterUUID,
clusterUUIDCommitted,
version,
coordinationMetadata,
transientSettings,
persistentSettings,
Settings.builder().put(persistentSettings).put(transientSettings).build(),
hashesOfConsistentSettings,
totalNumberOfShards,
totalOpenIndexShards,
indices,
templates.build(),
customs.build(),
allIndicesArray,
visibleIndicesArray,
allOpenIndicesArray,
visibleOpenIndicesArray,
allClosedIndicesArray,
visibleClosedIndicesArray,
indicesLookup
);
}

static SortedMap<String, IndexAbstraction> buildIndicesLookup(DataStreamMetadata dataStreamMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -33,6 +34,7 @@ public class RoutingNode implements Iterable<ShardRouting> {

private final String nodeId;

@Nullable
private final DiscoveryNode node;

private final LinkedHashMap<ShardId, ShardRouting> shards; // LinkedHashMap to preserve order
Expand All @@ -47,7 +49,7 @@ public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}

RoutingNode(String nodeId, DiscoveryNode node, LinkedHashMap<ShardId, ShardRouting> shards) {
RoutingNode(String nodeId, @Nullable DiscoveryNode node, LinkedHashMap<ShardId, ShardRouting> shards) {
this.nodeId = nodeId;
this.node = node;
this.shards = shards;
Expand Down Expand Up @@ -87,6 +89,7 @@ public Iterator<ShardRouting> iterator() {
*
* @return discoveryNode of this node
*/
@Nullable
public DiscoveryNode node() {
return this.node;
}
Expand Down Expand Up @@ -297,13 +300,17 @@ public String prettyPrint() {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("routingNode ([");
sb.append(node.getName());
sb.append("][");
sb.append(node.getId());
sb.append("][");
sb.append(node.getHostName());
sb.append("][");
sb.append(node.getHostAddress());
if (node != null) {
sb.append(node.getName());
sb.append("][");
sb.append(node.getId());
sb.append("][");
sb.append(node.getHostName());
sb.append("][");
sb.append(node.getHostAddress());
} else {
sb.append("null");
}
sb.append("], [");
sb.append(shards.size());
sb.append(" assigned shards])");
Expand All @@ -319,7 +326,6 @@ public boolean isEmpty() {
}

private boolean invariant() {

// initializingShards must consistent with that in shards
Collection<ShardRouting> shardRoutingsInitializing =
shards.values().stream().filter(ShardRouting::initializing).collect(Collectors.toList());
Expand All @@ -338,4 +344,21 @@ private boolean invariant() {

return true;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RoutingNode that = (RoutingNode) o;
return nodeId.equals(that.nodeId) && Objects.equals(node, that.node) && shards.equals(that.shards);
}

@Override
public int hashCode() {
return Objects.hash(nodeId, node, shards);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,46 @@ public int size() {
return nodesToShards.size();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RoutingNodes that = (RoutingNodes) o;
return readOnly == that.readOnly
&& inactivePrimaryCount == that.inactivePrimaryCount
&& inactiveShardCount == that.inactiveShardCount
&& relocatingShards == that.relocatingShards
&& activeShardCount == that.activeShardCount
&& totalShardCount == that.totalShardCount
&& nodesToShards.equals(that.nodesToShards)
&& unassignedShards.equals(that.unassignedShards)
&& assignedShards.equals(that.assignedShards)
&& attributeValuesByAttribute.equals(that.attributeValuesByAttribute)
&& recoveriesPerNode.equals(that.recoveriesPerNode
);
}

@Override
public int hashCode() {
return Objects.hash(
nodesToShards,
unassignedShards,
assignedShards,
readOnly,
inactivePrimaryCount,
inactiveShardCount,
relocatingShards,
activeShardCount,
totalShardCount,
attributeValuesByAttribute,
recoveriesPerNode
);
}

public static final class UnassignedShards implements Iterable<ShardRouting> {

private final RoutingNodes nodes;
Expand Down Expand Up @@ -990,6 +1030,26 @@ public ShardRouting[] drain() {
primaries = 0;
return mutableShardRoutings;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnassignedShards that = (UnassignedShards) o;
return primaries == that.primaries
&& ignoredPrimaries == that.ignoredPrimaries
&& unassigned.equals(that.unassigned)
&& ignored.equals(that.ignored);
}

@Override
public int hashCode() {
return Objects.hash(unassigned, ignored, primaries, ignoredPrimaries);
}
}


Expand Down Expand Up @@ -1183,5 +1243,22 @@ public static Recoveries getOrAdd(Map<String, Recoveries> map, String key) {
}
return recoveries;
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Recoveries that = (Recoveries) o;
return incoming == that.incoming && outgoing == that.outgoing;
}

@Override
public int hashCode() {
return Objects.hash(incoming, outgoing);
}
}
}

0 comments on commit cb35978

Please sign in to comment.