Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow ClusterState.Custom to be created on initial cluster states #26144

Merged
merged 5 commits into from Aug 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 17 additions & 6 deletions core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Expand Up @@ -73,6 +73,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -94,7 +95,6 @@ public class ClusterModule extends AbstractModule {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final AllocationDeciders allocationDeciders;
private final AllocationService allocationService;
private final Runnable onStarted;
// pkg private for tests
final Collection<AllocationDecider> deciderList;
final ShardsAllocator shardsAllocator;
Expand All @@ -107,9 +107,24 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
this.onStarted = () -> clusterPlugins.forEach(plugin -> plugin.onNodeStarted());
}

public static Map<String, Supplier<ClusterState.Custom>> getClusterStateCustomSuppliers(List<ClusterPlugin> clusterPlugins) {
final Map<String, Supplier<ClusterState.Custom>> customSupplier = new HashMap<>();
customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new);
customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new);
customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new);
for (ClusterPlugin plugin : clusterPlugins) {
Map<String, Supplier<ClusterState.Custom>> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier();
for (String key : initialCustomSupplier.keySet()) {
if (customSupplier.containsKey(key)) {
throw new IllegalStateException("custom supplier key [" + key + "] is registered more than once");
}
}
customSupplier.putAll(initialCustomSupplier);
}
return Collections.unmodifiableMap(customSupplier);
}

public static List<Entry> getNamedWriteables() {
List<Entry> entries = new ArrayList<>();
Expand Down Expand Up @@ -243,8 +258,4 @@ protected void configure() {
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}

public Runnable onStarted() {
return onStarted;
}
}
Expand Up @@ -45,15 +45,6 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements

private final List<Entry> entries;

/**
* Constructs new restore metadata
*
* @param entries list of currently running restore processes
*/
public RestoreInProgress(List<Entry> entries) {
this.entries = entries;
}

/**
* Constructs new restore metadata
*
Expand Down
Expand Up @@ -45,6 +45,10 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
// the list of snapshot deletion request entries
private final List<Entry> entries;

public SnapshotDeletionsInProgress() {
this(Collections.emptyList());
}

private SnapshotDeletionsInProgress(List<Entry> entries) {
this.entries = Collections.unmodifiableList(entries);
}
Expand Down
Expand Up @@ -67,7 +67,7 @@ private Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation
// Only primary shards are snapshotted

SnapshotsInProgress snapshotsInProgress = allocation.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null) {
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
// Snapshots are not running
return allocation.decision(Decision.YES, NAME, "no snapshots are currently running");
}
Expand Down
Expand Up @@ -39,4 +39,10 @@ public interface ClusterApplier {
* @param listener callback that is invoked after cluster state is applied
*/
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener);

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
ClusterState.Builder newClusterStateBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe initalClusterStateBuilder? It's a bit confusing since this "new" is very different from the "new" in the previous method.


}
Expand Up @@ -97,14 +97,17 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private final AtomicReference<ClusterState> state; // last applied state

private NodeConnectionsService nodeConnectionsService;
private Supplier<ClusterState.Builder> stateBuilderSupplier;

public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Supplier<ClusterState
.Builder> stateBuilderSupplier) {
super(settings);
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
this.stateBuilderSupplier = stateBuilderSupplier;
}

public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -653,4 +656,9 @@ public void run() {
protected long currentTimeInNanos() {
return System.nanoTime();
}

@Override
public ClusterState.Builder newClusterStateBuilder() {
return stateBuilderSupplier.get();
}
}
Expand Up @@ -42,6 +42,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;

public class ClusterService extends AbstractLifecycleComponent {

Expand All @@ -58,16 +59,30 @@ public class ClusterService extends AbstractLifecycleComponent {
private final OperationRouting operationRouting;

private final ClusterSettings clusterSettings;
private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
super(settings);
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool);
this.masterService = new MasterService(settings, threadPool);
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
this.initialClusterStateCustoms = initialClusterStateCustoms;
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
}

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
public ClusterState.Builder newClusterStateBuilder() {
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
for (Map.Entry<String, Supplier<ClusterState.Custom>> entry : initialClusterStateCustoms.entrySet()) {
builder.putCustom(entry.getKey(), entry.getValue().get());
}
return builder;
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down
Expand Up @@ -117,8 +117,8 @@ protected synchronized void doStart() {
}

protected ClusterState createInitialState(DiscoveryNode localNode) {
return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.nodes(DiscoveryNodes.builder().add(localNode)
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
return builder.nodes(DiscoveryNodes.builder().add(localNode)
.localNodeId(localNode.getId())
.masterNodeId(localNode.getId())
.build())
Expand Down
Expand Up @@ -113,7 +113,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover

private final TransportService transportService;
private final MasterService masterService;
private final ClusterName clusterName;
private final DiscoverySettings discoverySettings;
protected final ZenPing zenPing; // protected to allow tests access
private final MasterFaultDetection masterFD;
Expand Down Expand Up @@ -169,7 +168,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings);
this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);
this.threadPool = threadPool;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.committedState = new AtomicReference<>();

this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings);
Expand Down Expand Up @@ -238,7 +237,8 @@ protected void doStart() {
// set initial state
assert committedState.get() == null;
assert localNode != null;
ClusterState initialState = ClusterState.builder(clusterName)
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
ClusterState initialState = builder
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
.addGlobalBlock(discoverySettings.getNoMasterBlock()))
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/elasticsearch/gateway/Gateway.java
Expand Up @@ -155,7 +155,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
metaDataBuilder.transientSettings(),
e -> logUnknownSetting("transient", e),
(e, ex) -> logInvalidSetting("transient", e, ex)));
ClusterState.Builder builder = ClusterState.builder(clusterService.getClusterName());
ClusterState.Builder builder = clusterService.newClusterStateBuilder();
builder.metaData(metaDataBuilder);
listener.onSuccess(builder.build());
}
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/java/org/elasticsearch/node/Node.java
Expand Up @@ -329,7 +329,10 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
resourcesToClose.add(resourceWatcherService);
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);

List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
clusterService.addListener(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
Expand All @@ -346,8 +349,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
modules.add(pluginModule);
}
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
ClusterModule clusterModule = new ClusterModule(settings, clusterService,
pluginsService.filterPlugins(ClusterPlugin.class), clusterInfoService);
ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.function.Supplier;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -63,6 +64,12 @@ default Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings sett
* Called when the node is started
*/
default void onNodeStarted() {

}

/**
* Returns a map of {@link ClusterState.Custom} supplier that should be invoked to initialize the initial clusterstate.
* This allows custom clusterstate extensions to be always present and prevents invariants where clusterstates are published
* but customs are not initialized.
*/
default Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() { return Collections.emptyMap(); }
}
Expand Up @@ -59,7 +59,7 @@
public class ClusterModuleTests extends ModuleTestCase {
private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE;
private ClusterService clusterService = new ClusterService(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap());
static class FakeAllocationDecider extends AllocationDecider {
protected FakeAllocationDecider(Settings settings) {
super(settings);
Expand Down Expand Up @@ -196,4 +196,48 @@ public void testAllocationDeciderOrder() {
assertSame(decider.getClass(), expectedDeciders.get(idx++));
}
}

public void testCustomSuppliers() {
Map<String, Supplier<ClusterState.Custom>> customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.emptyList());
assertEquals(3, customSuppliers.size());
assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));

customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("foo", () -> null);
}
}));
assertEquals(4, customSuppliers.size());
assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));
assertTrue(customSuppliers.containsKey("foo"));


IllegalStateException ise = expectThrows(IllegalStateException.class,
() -> ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap(SnapshotsInProgress.TYPE, () -> null);
}
})));
assertEquals(ise.getMessage(), "custom supplier key [snapshots] is registered more than once");

ise = expectThrows(IllegalStateException.class,
() -> ClusterModule.getClusterStateCustomSuppliers(Arrays.asList(new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("foo", () -> null);
}
}, new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("foo", () -> null);
}
})));
assertEquals(ise.getMessage(), "custom supplier key [foo] is registered more than once");
}
}
Expand Up @@ -74,7 +74,7 @@
public class TemplateUpgradeServiceTests extends ESTestCase {

private final ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap());

public void testCalculateChangesAddChangeAndDelete() {

Expand Down
Expand Up @@ -413,7 +413,7 @@ static class TimedClusterApplierService extends ClusterApplierService {
public volatile Long currentTimeOverride = null;

TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool);
super(settings, clusterSettings, threadPool, () -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)));
}

@Override
Expand Down
@@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.service;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.Collections;

public class ClusterSerivceTests extends ESTestCase {

public void testNewBuilderContainsCustoms() {
ClusterState.Custom custom = new ClusterState.Custom() {
@Override
public Diff<ClusterState.Custom> diff(ClusterState.Custom previousState) {
return null;
}

@Override
public String getWriteableName() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {

}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return null;
}
};
ClusterService service = new ClusterService(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.singletonMap("foo", () ->
custom));
ClusterState.Builder builder = service.newClusterStateBuilder();
assertSame(builder.build().custom("foo"), custom);
}
}