Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new setting to disable persistent tasks allocations #29137

Merged
merged 4 commits into from
Mar 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/reference/modules/cluster/misc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,29 @@ PUT /_cluster/settings
}
-------------------------------
// CONSOLE


[[persistent-tasks-allocation]]
==== Persistent Tasks Allocations

Plugins can create a kind of tasks called persistent tasks. Those tasks are
usually long-live tasks and are stored in the cluster state, allowing the
tasks to be revived after a full cluster restart.

Every time a persistent task is created, the master nodes takes care of
assigning the task to a node of the cluster, and the assigned node will then
pick up the task and execute it locally. The process of assigning persistent
tasks to nodes is controlled by the following property, which can be updated
dynamically:

`cluster.persistent_tasks.allocation.enable`::
+
--
Enable or disable allocation for persistent tasks:

* `all` - (default) Allows persistent tasks to be assigned to nodes
* `none` - No allocations are allowed for any type of persistent task

This setting does not affect the persistent tasks that are already being executed.
Only newly created persistent tasks, or tasks that must be reassigned (after a node
left the cluster, for example), are impacted by this setting.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.elasticsearch.monitor.os.OsService;
import org.elasticsearch.monitor.process.ProcessService;
import org.elasticsearch.node.Node;
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.BaseRestHandler;
Expand Down Expand Up @@ -420,6 +421,7 @@ public void apply(Settings value, Settings current, Settings previous) {
FastVectorHighlighter.SETTING_TV_HIGHLIGHT_MULTI_VALUE,
Node.BREAKER_TYPE_KEY,
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
IndexGraveyard.SETTING_MAX_TOMBSTONES
IndexGraveyard.SETTING_MAX_TOMBSTONES,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING
)));
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.persistent.decider.AssignmentDecision;
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.tasks.Task;

import java.util.Objects;
Expand All @@ -45,12 +47,14 @@ public class PersistentTasksClusterService extends AbstractComponent implements

private final ClusterService clusterService;
private final PersistentTasksExecutorRegistry registry;
private final EnableAssignmentDecider decider;

public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorRegistry registry, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
clusterService.addListener(this);
this.registry = registry;
this.decider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings());
}

/**
Expand Down Expand Up @@ -224,6 +228,12 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(final
final @Nullable Params taskParams,
final ClusterState currentState) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);

AssignmentDecision decision = decider.canAssign();
if (decision.getType() == AssignmentDecision.Type.NO) {
return new Assignment(null, "persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
}

return persistentTasksExecutor.getAssignment(taskParams, currentState);
}

Expand All @@ -249,7 +259,8 @@ public void onFailure(String source, Exception e) {

/**
* Returns true if the cluster state change(s) require to reassign some persistent tasks. It can happen in the following
* situations: a node left or is added, the routing table changed, the master node changed or the persistent tasks changed.
* situations: a node left or is added, the routing table changed, the master node changed, the metadata changed or the
* persistent tasks changed.
*/
boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
final PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Expand All @@ -259,7 +270,12 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {

boolean masterChanged = event.previousState().nodes().isLocalNodeElectedMaster() == false;

if (persistentTasksChanged(event) || event.nodesChanged() || event.routingTableChanged() || masterChanged) {
if (persistentTasksChanged(event)
|| event.nodesChanged()
|| event.routingTableChanged()
|| event.metaDataChanged()
|| masterChanged) {

for (PersistentTask<?> task : tasks.tasks()) {
if (needsReassignment(task.getAssignment(), event.state().nodes())) {
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent.decider;

import java.util.Locale;
import java.util.Objects;

/**
* {@link AssignmentDecision} represents the decision made during the process of
* assigning a persistent task to a node of the cluster.
*
* @see EnableAssignmentDecider
*/
public final class AssignmentDecision {

public static final AssignmentDecision YES = new AssignmentDecision(Type.YES, "");

private final Type type;
private final String reason;

public AssignmentDecision(final Type type, final String reason) {
this.type = Objects.requireNonNull(type);
this.reason = Objects.requireNonNull(reason);
}

public Type getType() {
return type;
}

public String getReason() {
return reason;
}

@Override
public String toString() {
return "assignment decision [type=" + type + ", reason=" + reason + "]";
}

public enum Type {
NO(0), YES(1);

private final int id;

Type(int id) {
this.id = id;
}

public int getId() {
return id;
}

public static Type resolve(final String s) {
return Type.valueOf(s.toUpperCase(Locale.ROOT));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent.decider;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;

import java.util.Locale;

import static org.elasticsearch.common.settings.Setting.Property.Dynamic;
import static org.elasticsearch.common.settings.Setting.Property.NodeScope;

/**
* {@link EnableAssignmentDecider} is used to allow/disallow the persistent tasks
* to be assigned to cluster nodes.
* <p>
* Allocation settings can have the following values (non-casesensitive):
* <ul>
* <li> <code>NONE</code> - no persistent tasks can be assigned
* <li> <code>ALL</code> - all persistent tasks can be assigned to nodes
* </ul>
*
* @see Allocation
*/
public class EnableAssignmentDecider {

public static final Setting<Allocation> CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING =
new Setting<>("cluster.persistent_tasks.allocation.enable", Allocation.ALL.toString(), Allocation::fromString, Dynamic, NodeScope);

private volatile Allocation enableAssignment;

public EnableAssignmentDecider(final Settings settings, final ClusterSettings clusterSettings) {
this.enableAssignment = CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, this::setEnableAssignment);
}

public void setEnableAssignment(final Allocation enableAssignment) {
this.enableAssignment = enableAssignment;
}

/**
* Returns a {@link AssignmentDecision} whether the given persistent task can be assigned
* to a node of the cluster. The decision depends on the current value of the setting
* {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING}.
*
* @return the {@link AssignmentDecision}
*/
public AssignmentDecision canAssign() {
if (enableAssignment == Allocation.NONE) {
return new AssignmentDecision(AssignmentDecision.Type.NO, "no persistent task assignments are allowed due to cluster settings");
}
return AssignmentDecision.YES;
}

/**
* Allocation values or rather their string representation to be used used with
* {@link EnableAssignmentDecider#CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING}
* via cluster settings.
*/
public enum Allocation {

NONE,
ALL;

public static Allocation fromString(final String strValue) {
if (strValue == null) {
return null;
} else {
String value = strValue.toUpperCase(Locale.ROOT);
try {
return valueOf(value);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Illegal value [" + value + "] for ["
+ CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING.getKey() + "]");
}
}
}

@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* task.
* <p>
* 2. The master node updates the {@link org.elasticsearch.persistent.PersistentTasksCustomMetaData} in the cluster state to indicate
* that there is a new persistent task is running in the system.
* that there is a new persistent task running in the system.
* <p>
* 3. The {@link org.elasticsearch.persistent.PersistentTasksNodeService} running on every node in the cluster monitors changes in
* the cluster state and starts execution of all new tasks assigned to the node it is running on.
Expand Down
Loading