Skip to content

Commit

Permalink
Zen Discovery Cluster Events to have Priority.URGENT
Browse files Browse the repository at this point in the history
Master node cluster state events resulting in zen discovery (node gets added, removed, for example) should be processed with priority URGENT as its always better to process them as fast as possible, and not let other events get in the way.
closes elastic#3361
  • Loading branch information
kimchy committed Jul 21, 2013
1 parent b718fb7 commit d4b1ecf
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
Expand Up @@ -311,7 +311,7 @@ private void innerJoinCluster() {
if (localNode.equals(masterNode)) {
this.master = true;
nodesFD.start(); // start the nodes FD
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-join (elected_as_master)", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
Expand Down Expand Up @@ -370,7 +370,7 @@ private void handleLeaveRequest(final DiscoveryNode node) {
return;
}
if (master) {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.HIGH, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
Expand Down Expand Up @@ -401,7 +401,7 @@ private void handleNodeFailure(final DiscoveryNode node, String reason) {
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder()
Expand Down Expand Up @@ -434,7 +434,7 @@ private void handleMinimumMasterNodesChanged(final int minimumMasterNodes) {
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final int prevMinimumMasterNode = ZenDiscovery.this.electMaster.minimumMasterNodes();
Expand Down Expand Up @@ -465,7 +465,7 @@ private void handleMasterGone(final DiscoveryNode masterNode, final String reaso

logger.info("master_left [{}], reason [{}]", masterNode, reason);

clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
Expand Down Expand Up @@ -516,7 +516,7 @@ public void clusterStateProcessed(ClusterState clusterState) {

void handleNewClusterStateFromMaster(final ClusterState newState) {
if (master) {
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-master_receive_cluster_state_from_another_master [" + newState.nodes().masterNode() + "]", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (newState.version() > currentState.version()) {
Expand All @@ -542,7 +542,7 @@ public void handleException(TransportException exp) {
logger.debug("got a new state from master node, though we are already trying to rejoin the cluster");
}

clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {

Expand Down Expand Up @@ -611,7 +611,7 @@ private ClusterState handleJoinRequest(final DiscoveryNode node) {
// node calling the join request
membership.sendValidateJoinRequestBlocking(node, state, pingTimeout);

clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().nodeExists(node.id())) {
Expand Down Expand Up @@ -820,7 +820,7 @@ public RejoinClusterRequest newInstance() {

@Override
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("received a request to rejoin the cluster from [" + request.fromNodeId + "]", Priority.URGENT, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
Expand Down

0 comments on commit d4b1ecf

Please sign in to comment.