Skip to content

Commit

Permalink
[Discovery] do not use versions to optimize cluster state copying for…
Browse files Browse the repository at this point in the history
… a first update from a new master

We have an optimization which compares routing/meta data version of cluster states and tries to reuse the current object if the versions are equal. This can cause rare failures during recovery from a minimum_master_node breach when using the "new light rejoin" mechanism and simulated network disconnects. This happens where the current master updates it's state, doesn't manage to broadcast it to other nodes due to the disconnect and then steps down. The new master will start with a previous version and continue to update it. When the old master rejoins, the versions of it's state can equal but the content is different.

Also improved DiscoveryWithNetworkFailuresTests to simulate this failure (and other improvements)

Closes #6466
  • Loading branch information
bleskes committed Jul 21, 2014
1 parent 6ab3bc4 commit a79cc84
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 99 deletions.
Expand Up @@ -386,20 +386,6 @@ public void run() {
}
}
}
} else {
if (previousClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) && !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
// force an update, its a fresh update from the master as we transition from a start of not having a master to having one
// have a fresh instances of routing and metadata to remove the chance that version might be the same
Builder builder = ClusterState.builder(newClusterState);
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()));
builder.metaData(MetaData.builder(newClusterState.metaData()));
newClusterState = builder.build();
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId());
} else if (newClusterState.version() < previousClusterState.version()) {
// we got a cluster state with older version, when we are *not* the master, let it in since it might be valid
// we check on version where applicable, like at ZenDiscovery#handleNewClusterStateFromMaster
logger.debug("got smaller cluster state when not master [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "]");
}
}

newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);
Expand Down
Expand Up @@ -54,6 +54,7 @@ public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsServ
super(settings);
nodeSettingsService.addListener(new ApplySettings());
this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK));
this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);
}

/**
Expand Down
Expand Up @@ -58,6 +58,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem

private final TransportService transportService;
private final ClusterService clusterService;
private final DiscoveryService discoveryService;
private final DiscoveryNodeService discoveryNodeService;
private AllocationService allocationService;
private final ClusterName clusterName;
Expand All @@ -77,14 +78,15 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem

@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings, DiscoveryService discoveryService) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.version = version;
this.discoverySettings = discoverySettings;
this.discoveryService = discoveryService;
}

@Override
Expand Down Expand Up @@ -305,13 +307,22 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
// ignore cluster state messages that do not include "me", not in the game yet...
if (nodeSpecificClusterState.nodes().localNode() != null) {
assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";

discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
return currentState;
}

if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId());
return nodeSpecificClusterState;
}

ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState);
// if the routing table did not change, use the original one
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Expand Up @@ -83,6 +83,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final ClusterService clusterService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoveryService discoveryService;
private final DiscoveryNodeService discoveryNodeService;
private final DiscoverySettings discoverySettings;
private final ZenPingService pingService;
Expand Down Expand Up @@ -126,12 +127,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) {
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings,
DiscoveryService discoveryService) {
super(settings);
this.clusterName = clusterName;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryService = discoveryService;
this.discoveryNodeService = discoveryNodeService;
this.discoverySettings = discoverySettings;
this.pingService = pingService;
Expand Down Expand Up @@ -639,6 +642,10 @@ public void onFailure(String source, Throwable t) {
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
processNewClusterStates.add(processClusterState);


assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";

clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand Down Expand Up @@ -699,7 +706,16 @@ public ClusterState execute(ClusterState currentState) {
masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
}

if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
// its a fresh update from the master as we transition from a start of not having a master to having one
logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId());
return updatedState;
}


// some optimizations to make sure we keep old objects where possible
ClusterState.Builder builder = ClusterState.builder(updatedState);

// if the routing table did not change, use the original one
if (updatedState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
Expand Down

0 comments on commit a79cc84

Please sign in to comment.