Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made possible to dynamically update discovery.zen.publish_timeout cluster setting #5068

Merged
merged 1 commit into from Feb 11, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.*;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
Expand Down Expand Up @@ -81,6 +82,7 @@ public ClusterDynamicSettingsModule() {
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_MAX_BYTES_SETTING, Validator.MEMORY_SIZE);
clusterDynamicSettings.addDynamicSetting(InternalCircuitBreakerService.CIRCUIT_BREAKER_OVERHEAD_SETTING, Validator.NON_NEGATIVE_DOUBLE);
clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME);
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE);
}

public void addDynamicSettings(String... settings) {
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/org/elasticsearch/cluster/settings/Validator.java
Expand Up @@ -55,6 +55,24 @@ public String validate(String setting, String value) {
}
};

public static final Validator TIME_NON_NEGATIVE = new Validator() {
@Override
public String validate(String setting, String value) {
try {
TimeValue timeValue = TimeValue.parseTimeValue(value, null);
if (timeValue == null) {
return "cannot parse value [" + value + "] as time";
}
if (timeValue.millis() < 0) {
return "cannot parse value [" + value + "] as non negative time";
}
} catch (ElasticsearchParseException ex) {
return "cannot parse value [" + value + "] as time";
}
return null;
}
};

public static final Validator FLOAT = new Validator() {
@Override
public String validate(String setting, String value) {
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/org/elasticsearch/discovery/Discovery.java
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.RestStatus;

Expand All @@ -39,8 +38,6 @@ public interface Discovery extends LifecycleComponent<Discovery> {

final ClusterBlock NO_MASTER_BLOCK = new ClusterBlock(2, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);

public static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);

DiscoveryNode localNode();

void addListener(InitialStateDiscoveryListener listener);
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/org/elasticsearch/discovery/DiscoverySettings.java
@@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.discovery;

import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.settings.NodeSettingsService;

/**
* Exposes common discovery settings that may be supported by all the different discovery implementations
*/
public class DiscoverySettings extends AbstractComponent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be better to put this into a common discovery class that both discovery impls inherit from?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to have it as a separate component. For instance if you look at ZenDiscovery, the real work is done by PublishClusterStateAction which needs to be notified whenever the setting changes. It's just easier to have a separate DiscoverySettings class that can be passed in as a constructor argument.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed


public static final String PUBLISH_TIMEOUT = "discovery.zen.publish_timeout";

private static final TimeValue DEFAULT_PUBLISH_TIMEOUT = TimeValue.timeValueSeconds(30);

private volatile TimeValue publishTimeout = DEFAULT_PUBLISH_TIMEOUT;

@Inject
public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
nodeSettingsService.addListener(new ApplySettings());
}

/**
* Returns the current publish timeout
*/
public TimeValue getPublishTimeout() {
return publishTimeout;
}

private class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
TimeValue newPublishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, null);
if (newPublishTimeout != null) {
if (newPublishTimeout.millis() != publishTimeout.millis()) {
logger.info("updating [{}] from [{}] to [{}]", PUBLISH_TIMEOUT, publishTimeout, newPublishTimeout);
publishTimeout = newPublishTimeout;
}
}
}
}
}
Expand Up @@ -62,7 +62,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private final ClusterName clusterName;
private final Version version;

private final TimeValue publishTimeout;
private final DiscoverySettings discoverySettings;

private DiscoveryNode localNode;

Expand All @@ -76,15 +76,14 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem

@Inject
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
DiscoveryNodeService discoveryNodeService, Version version) {
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
this.clusterService = clusterService;
this.transportService = transportService;
this.discoveryNodeService = discoveryNodeService;
this.version = version;

this.publishTimeout = settings.getAsTime("discovery.zen.publish_timeout", DEFAULT_PUBLISH_TIMEOUT);
this.discoverySettings = discoverySettings;
}

@Override
Expand Down Expand Up @@ -336,6 +335,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
}

TimeValue publishTimeout = discoverySettings.getPublishTimeout();
if (publishTimeout.millis() > 0) {
try {
boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout);
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.InitialStateDiscoveryListener;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
Expand Down Expand Up @@ -118,7 +119,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Inject
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version) {
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) {
super(settings);
this.clusterName = clusterName;
this.threadPool = threadPool;
Expand Down Expand Up @@ -146,7 +147,7 @@ public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threa
this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService);
this.nodesFD.addListener(new NodeFailureListener());

this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener());
this.publishClusterState = new PublishClusterStateAction(settings, transportService, this, new NewClusterStateListener(), discoverySettings);
this.pingService.setNodesProvider(this);
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());

Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.ClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
Expand All @@ -59,18 +60,15 @@ static interface NewStateProcessed {
private final TransportService transportService;
private final DiscoveryNodesProvider nodesProvider;
private final NewClusterStateListener listener;

private final TimeValue publishTimeout;
private final DiscoverySettings discoverySettings;

public PublishClusterStateAction(Settings settings, TransportService transportService, DiscoveryNodesProvider nodesProvider,
NewClusterStateListener listener) {
NewClusterStateListener listener, DiscoverySettings discoverySettings) {
super(settings);
this.transportService = transportService;
this.nodesProvider = nodesProvider;
this.listener = listener;

this.publishTimeout = settings.getAsTime("discovery.zen.publish_timeout", Discovery.DEFAULT_PUBLISH_TIMEOUT);

this.discoverySettings = discoverySettings;
transportService.registerHandler(PublishClusterStateRequestHandler.ACTION, new PublishClusterStateRequestHandler());
}

Expand Down Expand Up @@ -137,6 +135,7 @@ public void handleException(TransportException exp) {
}
}

TimeValue publishTimeout = discoverySettings.getPublishTimeout();
if (publishTimeout.millis() > 0) {
// only wait if the publish timeout is configured...
try {
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

Expand All @@ -45,7 +46,7 @@ public class AckClusterUpdateSettingsTests extends ElasticsearchIntegrationTest
protected Settings nodeSettings(int nodeOrdinal) {
//to test that the acknowledgement mechanism is working we better disable the wait for publish
//otherwise the operation is most likely acknowledged even if it doesn't support ack
return ImmutableSettings.builder().put("discovery.zen.publish_timeout", 0).build();
return ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, 0).build();
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/org/elasticsearch/cluster/ack/AckTests.java
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
Expand All @@ -61,7 +62,7 @@ public class AckTests extends ElasticsearchIntegrationTest {
protected Settings nodeSettings(int nodeOrdinal) {
//to test that the acknowledgement mechanism is working we better disable the wait for publish
//otherwise the operation is most likely acknowledged even if it doesn't support ack
return ImmutableSettings.builder().put("discovery.zen.publish_timeout", 0).build();
return ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, 0).build();
}

@Test
Expand Down
Expand Up @@ -23,12 +23,12 @@
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.hamcrest.Matchers;
import org.junit.Test;

import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;

@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 1)
public class ClusterSettingsTests extends ElasticsearchIntegrationTest {
Expand Down Expand Up @@ -99,4 +99,28 @@ public void clusterSettingsUpdateResponse() {
assertThat(response3.getPersistentSettings().get(key1), notNullValue());
assertThat(response3.getPersistentSettings().get(key2), notNullValue());
}

@Test
public void testUpdateDiscoveryPublishTimeout() {
ClusterUpdateSettingsResponse response = client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, "1s").build())
.get();

assertThat(response.getTransientSettings().getAsMap().get(DiscoverySettings.PUBLISH_TIMEOUT), equalTo("1s"));

response = client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, "whatever").build())
.get();

assertThat(response.getTransientSettings().getAsMap().entrySet(), Matchers.emptyIterable());

response = client().admin().cluster()
.prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, -1).build())
.get();

assertThat(response.getTransientSettings().getAsMap().entrySet(), Matchers.emptyIterable());
}
}