Skip to content

Commit

Permalink
Made possible to dynamically update discovery.zen.publish_timeout c…
Browse files Browse the repository at this point in the history
…luster setting

`discovery.zen.publish_timeout` controls how long the master node is going to try and wait for all the nodes to respond to a cluster state publish before going ahead with the following updates in the queue (default 30s). Up until now changing the settings required restarting each node. The setting is now dynamic and can be changed through the cluster update settings api.

Closes #5063
  • Loading branch information
javanna committed Feb 11, 2014
1 parent ae1acc0 commit 9e867f7
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 19 deletions.
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.*;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
Expand Down Expand Up @@ -81,6 +82,7 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
}

public void addDynamicSettings(String... settings) {
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/org/elasticsearch/cluster/settings/Validator.java
Expand Up @@ -55,6 +55,24 @@ public String validate(String setting, String value) {
}
};

public static final Validator TIME_NON_NEGATIVE = new Validator() {
@Override
public String validate(String setting, String value) {
try {
TimeValue timeValue = TimeValue.parseTimeValue(value, null);
if (timeValue == null) {
return "cannot parse value [" + value + "] as time";
}
if (timeValue.millis() < 0) {
return "cannot parse value [" + value + "] as non negative time";
}
} catch (ElasticsearchParseException ex) {
return "cannot parse value [" + value + "] as time";
}
return null;
}
};

public static final Validator FLOAT = new Validator() {
@Override
public String validate(String setting, String value) {
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/org/elasticsearch/discovery/Discovery.java
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.RestStatus;

Expand All @@ -39,8 +38,6 @@ public interface Discovery extends LifecycleComponent<Discovery> {

final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);

public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);

DiscoveryNode localNode();

void addListener(InitialStateDiscoveryListener listener);
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/org/elasticsearch/discovery/DiscoverySettings.java
@@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.settings.NodeSettingsService;

/**
* Exposes common discovery settings that may be supported by all the different discovery implementations
*/
public class DiscoverySettings extends AbstractComponent {

public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout";

private static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);

private volatile TimeValue publishTimeout = DEFAULT_PUBLISH_TIMEOUT;

@Inject
public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
nodeSettingsService.addListener(new ApplySettings());
}

/**
* Returns the current publish timeout
*/
public TimeValue getPublishTimeout() {
return publishTimeout;
}

private class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
TimeValue newPublishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, null);
if (newPublishTimeout != null) {
if (newPublishTimeout.millis() != publishTimeout.millis()) {
logger.info("updating [{}] from [{}] to [{}]", PUBLISH_TIMEOUT, publishTimeout, newPublishTimeout);
publishTimeout = newPublishTimeout;
}
}
}
}
}
Expand Up @@ -62,7 +62,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private final ClusterName clusterName;
private final Version version;

private final TimeValue publishTimeout;
private final DiscoverySettings discoverySettings;

private DiscoveryNode localNode;

Expand All @@ -76,15 +76,14 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem

@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService, Version version) {
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.version = version;

this.publishTimeout = settings.getAsTime("discovery.zen.publish_timeout", DEFAULT_PUBLISH_TIMEOUT);
this.discoverySettings = discoverySettings;
}

@Override
Expand Down Expand Up @@ -336,6 +335,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
}

TimeValue publishTimeout = discoverySettings.getPublishTimeout();
if (publishTimeout.millis() > 0) {
try {
boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout);
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
Expand Down Expand Up @@ -118,7 +119,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version) {
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
this.threadPool = threadPool;
Expand Down Expand Up @@ -146,7 +147,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
this.nodesFD.addListener(new NodeFailureListener());

this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings);
this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());

Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.ClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
Expand All @@ -59,18 +60,15 @@ static interface NewStateProcessed {
private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final NewClusterStateListener listener;

private final TimeValue publishTimeout;
private final DiscoverySettings discoverySettings;

public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewClusterStateListener listener) {
NewClusterStateListener listener, DiscoverySettings discoverySettings) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;

this.publishTimeout = settings.getAsTime("discovery.zen.publish_timeout", Discovery.DEFAULT_PUBLISH_TIMEOUT);

this.discoverySettings = discoverySettings;
transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler());
}

Expand Down Expand Up @@ -137,6 +135,7 @@ public void handleException(TransportException exp) {
}
}

TimeValue publishTimeout = discoverySettings.getPublishTimeout();
if (publishTimeout.millis() > 0) {
// only wait if the publish timeout is configured...
try {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

Expand All @@ -45,7 +46,7 @@ public class AckClusterUpdateSettingsTests extends ElasticsearchIntegrationTest
protected Settings nodeSettings(int nodeOrdinal) {
//to test that the acknowledgement mechanism is working we better disable the wait for publish
//otherwise the operation is most likely acknowledged even if it doesn't support ack
return ImmutableSettings.builder().put("discovery.zen.publish_timeout", 0).build();
return ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, 0).build();
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/org/elasticsearch/cluster/ack/AckTests.java
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
Expand All @@ -61,7 +62,7 @@ public class AckTests extends ElasticsearchIntegrationTest {
protected Settings nodeSettings(int nodeOrdinal) {
//to test that the acknowledgement mechanism is working we better disable the wait for publish
//otherwise the operation is most likely acknowledged even if it doesn't support ack
return ImmutableSettings.builder().put("discovery.zen.publish_timeout", 0).build();
return ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, 0).build();
}

@Test
Expand Down
Expand Up @@ -23,12 +23,12 @@
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;

import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;

@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 1)
public class ClusterSettingsTests extends ElasticsearchIntegrationTest {
Expand Down Expand Up @@ -99,4 +99,28 @@ public void clusterSettingsUpdateResponse() {
assertThat(response3.getPersistentSettings().get(key1), notNullValue());
assertThat(response3.getPersistentSettings().get(key2), notNullValue());
}

@Test
public void testUpdateDiscoveryPublishTimeout() {
ClusterUpdateSettingsResponse response = client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, "1s").build())
.get();

assertThat(response.getTransientSettings().getAsMap().get(DiscoverySettings.PUBLISH_TIMEOUT), equalTo("1s"));

response = client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, "whatever").build())
.get();

assertThat(response.getTransientSettings().getAsMap().entrySet(), Matchers.emptyIterable());

response = client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, -1).build())
.get();

assertThat(response.getTransientSettings().getAsMap().entrySet(), Matchers.emptyIterable());
}
}

0 comments on commit 9e867f7

Please sign in to comment.