Skip to content

Commit

Permalink
Supporting automatic release of index blocks. Closes elastic#39334
Browse files Browse the repository at this point in the history
  • Loading branch information
alex101101 committed Mar 22, 2019
1 parent d3152c2 commit 906dbe9
Show file tree
Hide file tree
Showing 7 changed files with 393 additions and 14 deletions.
12 changes: 12 additions & 0 deletions docs/reference/modules/cluster/disk_allocator.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,15 @@ PUT _cluster/settings
}
--------------------------------------------------
// CONSOLE

==== Auto Release of read-only (and delete) blocked indices

When a node breaches the flood stage write operations for all indices that have shards
on that node are restricted. The index can be manually unblocked. Alternatively, the auto
release feature automatically removes the read only blocks on indices if all
nodes that it is sharded across are both 5% above the flood stage watermark
and above the high watermark.

`cluster.routing.allocation.disk.auto_release_index_enabled`::
Enables the auto release of read-only (and delete) blocked indices when there is
enough free disk space. Defaults to `false`
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

package org.elasticsearch.cluster.routing.allocation;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
Expand All @@ -43,7 +46,8 @@

/**
* Listens for a node to go over the high watermark and kicks off an empty
* reroute if it does. Also responsible for logging about nodes that have
* reroute if it does. Responsible for re-enabling indices if the nodes go above the auto release threshold.
* Also responsible for logging about nodes that have
* passed the disk watermarks
*/
public class DiskThresholdMonitor {
Expand Down Expand Up @@ -109,13 +113,23 @@ public void onNewInfo(ClusterInfo info) {
}
ClusterState state = clusterStateSupplier.get();
Set<String> indicesToMarkReadOnly = new HashSet<>();
Map<String, Boolean> autoReleaseEligibility = new HashMap<>();
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
String node = entry.key;
DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage);

RoutingNode routingNode = state.getRoutingNodes().node(node);

// Only unblock index if all nodes that contain shards of it are above the node auto release threshold
if (nodeIndicesAvailableForAutoRelease(usage)) {
markIndicesAutoReleaseAvailability(routingNode, autoReleaseEligibility, true);
} else {
markIndicesAutoReleaseAvailability(routingNode, autoReleaseEligibility, false);
}

if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
RoutingNode routingNode = state.getRoutingNodes().node(node);
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
for (ShardRouting routing : routingNode) {
indicesToMarkReadOnly.add(routing.index().getName());
Expand Down Expand Up @@ -159,17 +173,51 @@ public void onNewInfo(ClusterInfo info) {
logger.info("rerouting shards: [{}]", explanation);
reroute();
}

// Get set of indices that are eligible to be automatically unblocked
// Only collect indices that are currently blocked
Set<String> indicesToDisableReadOnly = autoReleaseEligibility.entrySet().stream()
.filter(Map.Entry::getValue)
.map(Map.Entry::getKey)
.filter(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index))
.collect(Collectors.toCollection(HashSet::new));

if (!indicesToDisableReadOnly.isEmpty()) {
logger.warn("Releasing read-only (allow delete) block on indices: [{}]", indicesToDisableReadOnly);
updateIndicesReadOnly(indicesToDisableReadOnly, false);
}

indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
if (indicesToMarkReadOnly.isEmpty() == false) {
markIndicesReadOnly(indicesToMarkReadOnly);
updateIndicesReadOnly(indicesToMarkReadOnly, true);
}
}
}

// Indices for node can be made available (unblocked) if applicable
private boolean nodeIndicesAvailableForAutoRelease(DiskUsage usage) {
return diskThresholdSettings.isAutoReleaseIndexEnabled() && !(usage.getFreeBytes() <
diskThresholdSettings.getFreeBytesThresholdAutoReleaseStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdAutoReleaseStage());
}

// Tracks index's availability across all the nodes that it is distributed across. If at least one node is
// unavailable the index will be marked as unavailable for auto release of read-only-delete block
private void markIndicesAutoReleaseAvailability(RoutingNode routingNode, Map<String, Boolean> autoReleaseEligibility,
boolean available) {
if (routingNode != null) {
for (ShardRouting routing : routingNode) {
String indexName = routing.index().getName();
boolean value = autoReleaseEligibility.getOrDefault(indexName, true);
autoReleaseEligibility.put(indexName, value && available);
}
}
}

protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, boolean readOnly) {
// set read-only block but don't block on the response
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)).
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute();
client.admin().indices().prepareUpdateSettings(indicesToUpdate.toArray(Strings.EMPTY_ARRAY)).
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, readOnly).build()).execute();
}

protected void reroute() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public class DiskThresholdSettings {
public static final Setting<TimeValue> CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING =
Setting.positiveTimeSetting("cluster.routing.allocation.disk.reroute_interval", TimeValue.timeValueSeconds(60),
Setting.Property.Dynamic, Setting.Property.NodeScope);
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_INDEX_AUTO_RELEASE_INDEX_ENABLED_SETTING =
Setting.boolSetting("cluster.routing.allocation.disk.auto_release_index_enabled", false,
Setting.Property.Dynamic, Setting.Property.NodeScope);
private static final double FLOOD_STAGE_NODE_AUTO_RELEASE_FACTOR = 1.05;

private volatile String lowWatermarkRaw;
private volatile String highWatermarkRaw;
Expand All @@ -72,6 +76,9 @@ public class DiskThresholdSettings {
private volatile TimeValue rerouteInterval;
private volatile Double freeDiskThresholdFloodStage;
private volatile ByteSizeValue freeBytesThresholdFloodStage;
private volatile boolean autoReleaseIndexEnabled;
private volatile Double freeDiskThresholdAutoReleaseStage;
private volatile ByteSizeValue freeBytesThresholdAutoReleaseStage;

public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
Expand All @@ -83,12 +90,15 @@ public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings)
this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
this.autoReleaseIndexEnabled = CLUSTER_ROUTING_ALLOCATION_INDEX_AUTO_RELEASE_INDEX_ENABLED_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStage);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INDEX_AUTO_RELEASE_INDEX_ENABLED_SETTING,
this::setAutoReleaseIndexEnabled);
}

static final class LowDiskWatermarkValidator implements Setting.Validator<String> {
Expand Down Expand Up @@ -239,13 +249,38 @@ private void setHighWatermark(String highWatermark) {
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey());
setAutoReleaseStage();
}

private void setFloodStage(String floodStageRaw) {
// Watermark is expressed in terms of used data, but we need "free" data watermark
this.freeDiskThresholdFloodStage = 100.0 - thresholdPercentageFromWatermark(floodStageRaw);
this.freeBytesThresholdFloodStage = thresholdBytesFromWatermark(floodStageRaw,
CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey());
setAutoReleaseStage();
}

private void setAutoReleaseIndexEnabled(boolean enabled) {
this.autoReleaseIndexEnabled = enabled;
}

/**
* Index Auto-Release threshold is derived from the flood stage and high watermark thresholds
*/
private void setAutoReleaseStage() {
if (!canSetAutoRelease()) return;
Double freeDiskThresholdFloodStageFactor = FLOOD_STAGE_NODE_AUTO_RELEASE_FACTOR * this.freeDiskThresholdFloodStage;
ByteSizeValue freeBytesThresholdFloodStageFactor = new ByteSizeValue(
(long) FLOOD_STAGE_NODE_AUTO_RELEASE_FACTOR * this.freeBytesThresholdFloodStage.getBytes());

this.freeDiskThresholdAutoReleaseStage = freeDiskThresholdFloodStageFactor > freeDiskThresholdHigh
? freeDiskThresholdFloodStageFactor : freeDiskThresholdHigh;
this.freeBytesThresholdAutoReleaseStage = freeBytesThresholdFloodStageFactor.getBytes() > freeBytesThresholdHigh.getBytes()
? freeBytesThresholdFloodStageFactor : freeBytesThresholdHigh;
}

private boolean canSetAutoRelease() {
return (this.freeBytesThresholdHigh != null && this.freeBytesThresholdFloodStage != null);
}

/**
Expand Down Expand Up @@ -286,6 +321,16 @@ public ByteSizeValue getFreeBytesThresholdFloodStage() {
return freeBytesThresholdFloodStage;
}

public boolean isAutoReleaseIndexEnabled() { return this.autoReleaseIndexEnabled; }

public Double getFreeDiskThresholdAutoReleaseStage() {
return freeDiskThresholdAutoReleaseStage;
}

public ByteSizeValue getFreeBytesThresholdAutoReleaseStage() {
return freeBytesThresholdAutoReleaseStage;
}

public boolean includeRelocations() {
return includeRelocations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INDEX_AUTO_RELEASE_INDEX_ENABLED_SETTING,
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,9 @@

public class DiskThresholdMonitorTests extends ESAllocationTestCase {


public void testMarkFloodStageIndicesReadOnly() {
private ClusterState startCluster() {
AllocationService allocation = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
Settings settings = Settings.EMPTY;
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
.put("index.routing.allocation.require._id", "node2")).numberOfShards(1).numberOfReplicas(0))
Expand All @@ -70,10 +68,18 @@ public void testMarkFloodStageIndicesReadOnly() {
.add(newNode("node2"))).build();
clusterState = allocation.reroute(clusterState, "reroute");
logger.info("start primary shard");
clusterState = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
return allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
}


public void testMarkFloodStageIndicesReadOnly() {
Settings settings = Settings.EMPTY;
ClusterState clusterState = startCluster();

ClusterState finalState = clusterState;
AtomicBoolean reroute = new AtomicBoolean(false);
AtomicReference<Set<String>> indices = new AtomicReference<>();
AtomicReference<Set<String>> indicesToReEnable = new AtomicReference<>();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) {
@Override
Expand All @@ -82,8 +88,12 @@ protected void reroute() {
}

@Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, boolean readOnly) {
if (readOnly) {
assertTrue(indices.compareAndSet(null, indicesToUpdate));
} else {
assertTrue(indicesToReEnable.compareAndSet(null, indicesToUpdate));
}
}
};
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
Expand All @@ -92,6 +102,7 @@ protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
assertFalse(reroute.get());
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
assertNull(indicesToReEnable.get());

indices.set(null);
builder = ImmutableOpenMap.builder();
Expand All @@ -100,6 +111,8 @@ protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
assertTrue(reroute.get());
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
assertNull(indicesToReEnable.get());

IndexMetaData indexMetaData = IndexMetaData.builder(clusterState.metaData().index("test_2")).settings(Settings.builder()
.put(clusterState.metaData()
.index("test_2").getSettings())
Expand All @@ -121,8 +134,12 @@ protected void reroute() {
}

@Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, boolean readOnly) {
if (readOnly) {
assertTrue(indices.compareAndSet(null, indicesToUpdate));
} else {
assertTrue(indicesToReEnable.compareAndSet(null, indicesToUpdate));
}
}
};

Expand All @@ -134,5 +151,50 @@ protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
assertTrue(reroute.get());
assertEquals(new HashSet<>(Arrays.asList("test_1")), indices.get());
assertNull(indicesToReEnable.get());
}

public void testAutoReleaseIndices() {
Settings settings = Settings.builder()
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_INDEX_AUTO_RELEASE_INDEX_ENABLED_SETTING.getKey(), true)
.build();
ClusterState clusterState = startCluster();

AtomicReference<Set<String>> indices = new AtomicReference<>();
AtomicReference<Set<String>> indicesToDisableReadOnly = new AtomicReference<>();

// Change cluster state so that "test" index is blocked (read only)
IndexMetaData indexMetaData = IndexMetaData.builder(clusterState.metaData().index("test")).settings(Settings.builder()
.put(clusterState.metaData()
.index("test").getSettings())
.put(IndexMetaData.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build();

final ClusterState anotherFinalClusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
.put(clusterState.metaData().index("test_1"), false)
.put(clusterState.metaData().index("test_2"), false)
.put(indexMetaData, true).build())
.blocks(ClusterBlocks.builder().addBlocks(indexMetaData).build()).build();

assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test"));

DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) {
@Override
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, boolean readOnly) {
if (readOnly) {
assertTrue(indices.compareAndSet(null, indicesToUpdate));
} else {
assertTrue(indicesToDisableReadOnly.compareAndSet(null, indicesToUpdate));
}
}
};

// When free disk on node2 goes above threshold (10% high watermark in this test case). Add to indicesToDisableReadOnly set
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 11));
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
assertEquals(new HashSet<>(Arrays.asList("test")), indicesToDisableReadOnly.get());
}
}
Loading

0 comments on commit 906dbe9

Please sign in to comment.