Skip to content

Commit

Permalink
Optimize multiple cluster state processing on receiving nodes
Browse files Browse the repository at this point in the history
Nodes that receive the cluster state, and they have several of those pending, can optimize and try and process potentially only one of those.
closes #5139
  • Loading branch information
kimchy committed Feb 17, 2014
1 parent bf07693 commit ccfd7f8
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
Expand Up @@ -369,9 +369,9 @@ public void run() {
newClusterState = builder.build();
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId());
} else if (newClusterState.version() < previousClusterState.version()) {
// we got this cluster state from the master, filter out based on versions (don't call listeners)
logger.debug("got old cluster state [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "], ignoring");
return;
// we got a cluster state with older version, when we are *not* the master, let it in since it might be valid
// we check on version where applicable, like at ZenDiscovery#handleNewClusterStateFromMaster
logger.debug("got smaller cluster state when not master [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "]");
}
}

Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.discovery.local;

import com.google.common.base.Objects;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -306,6 +307,10 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
return currentState;
}

ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState);
// if the routing table did not change, use the original one
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {
Expand Down
83 changes: 73 additions & 10 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.discovery.zen;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
Expand All @@ -64,6 +66,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -527,8 +530,22 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

void handleNewClusterStateFromMaster(final ClusterState newState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
static class ProcessClusterState {
final ClusterState clusterState;
final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed;
volatile boolean processed;

ProcessClusterState(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
this.clusterState = clusterState;
this.newStateProcessed = newStateProcessed;
}
}

private final BlockingQueue<ProcessClusterState> processNewClusterStates = ConcurrentCollections.newBlockingQueue();

void handleNewClusterStateFromMaster(ClusterState newClusterState, final PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
if (master) {
final ClusterState newState = newClusterState;
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
Expand Down Expand Up @@ -560,43 +577,89 @@ public void onFailure(String source, Throwable t) {

});
} else {
if (newState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newState.nodes().masterNode());
if (newClusterState.nodes().localNode() == null) {
logger.warn("received a cluster state from [{}] and not part of the cluster, should not happen", newClusterState.nodes().masterNode());
newStateProcessed.onNewClusterStateFailed(new ElasticsearchIllegalStateException("received state from a node that is not part of the cluster"));
} else {
if (currentJoinThread != null) {
logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
}

clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
processNewClusterStates.add(processClusterState);

clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
// we already processed it in a previous event
if (processClusterState.processed) {
return currentState;
}

// TODO: once improvement that we can do is change the message structure to include version and masterNodeId
// at the start, this will allow us to keep the "compressed bytes" around, and only parse the first page
// to figure out if we need to use it or not, and only once we picked the latest one, parse the whole state


// try and get the state with the highest version out of all the ones with the same master node id
ProcessClusterState stateToProcess = processNewClusterStates.poll();
if (stateToProcess == null) {
return currentState;
}
stateToProcess.processed = true;
while (true) {
ProcessClusterState potentialState = processNewClusterStates.peek();
// nothing else in the queue, bail
if (potentialState == null) {
break;
}
// if its not from the same master, then bail
if (!Objects.equal(stateToProcess.clusterState.nodes().masterNodeId(), potentialState.clusterState.nodes().masterNodeId())) {
break;
}

// we are going to use it for sure, poll (remove) it
potentialState = processNewClusterStates.poll();
potentialState.processed = true;

if (potentialState.clusterState.version() > stateToProcess.clusterState.version()) {
// we found a new one
stateToProcess = potentialState;
}
}

ClusterState updatedState = stateToProcess.clusterState;

// if the new state has a smaller version, and it has the same master node, then no need to process it
if (updatedState.version() < currentState.version() && Objects.equal(updatedState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
return currentState;
}

// we don't need to do this, since we ping the master, and get notified when it has moved from being a master
// because it doesn't have enough master nodes...
//if (!electMaster.hasEnoughMasterNodes(newState.nodes())) {
// return disconnectFromCluster(newState, "not enough master nodes on new cluster state received from [" + newState.nodes().masterNode() + "]");
//}

latestDiscoNodes = newState.nodes();
latestDiscoNodes = updatedState.nodes();

// check to see that we monitor the correct master of the cluster
if (masterFD.masterNode() == null || !masterFD.masterNode().equals(latestDiscoNodes.masterNode())) {
masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
}

ClusterState.Builder builder = ClusterState.builder(newState);
ClusterState.Builder builder = ClusterState.builder(updatedState);
// if the routing table did not change, use the original one
if (newState.routingTable().version() == currentState.routingTable().version()) {
if (updatedState.routingTable().version() == currentState.routingTable().version()) {
builder.routingTable(currentState.routingTable());
}
// same for metadata
if (newState.metaData().version() == currentState.metaData().version()) {
if (updatedState.metaData().version() == currentState.metaData().version()) {
builder.metaData(currentState.metaData());
} else {
// if its not the same version, only copy over new indices or ones that changed the version
MetaData.Builder metaDataBuilder = MetaData.builder(newState.metaData()).removeAllIndices();
for (IndexMetaData indexMetaData : newState.metaData()) {
MetaData.Builder metaDataBuilder = MetaData.builder(updatedState.metaData()).removeAllIndices();
for (IndexMetaData indexMetaData : updatedState.metaData()) {
IndexMetaData currentIndexMetaData = currentState.metaData().index(indexMetaData.index());
if (currentIndexMetaData == null || currentIndexMetaData.version() != indexMetaData.version()) {
metaDataBuilder.put(indexMetaData, false);
Expand Down

0 comments on commit ccfd7f8

Please sign in to comment.