Skip to content

Commit

Permalink
Reroute shards automatically when high disk watermark is exceeded
Browse files Browse the repository at this point in the history
This adds a Listener interface to the ClusterInfoService, this is used
by the DiskThresholdDecider, which adds a listener to check for nodes
passing the high watermark. If a node is past the high watermark an
empty reroute is issued so shards can be reallocated if desired.

A reroute will only be issued once every
`cluster.routing.allocation.disk.reroute_interval`, which is "60s" by
default.

Refactors InternalClusterInfoService to delegate the nodes stats and
indices stats gathering into separate methods so they have be overriden
by extending classes. Each stat gathering method returns a
CountDownLatch that can be used to wait until processing for that part
is successful before calling the listeners.

Fixes #8146
  • Loading branch information
dakrone committed Oct 31, 2014
1 parent 1c7b5c6 commit 80b61b3
Show file tree
Hide file tree
Showing 12 changed files with 454 additions and 29 deletions.
55 changes: 55 additions & 0 deletions src/main/java/org/elasticsearch/action/LatchedActionListener.java
@@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action;

import java.util.concurrent.CountDownLatch;

/**
* An action listener that allows passing in a {@link CountDownLatch} that
* will be counted down after onResponse or onFailure is called
*/
public final class LatchedActionListener<T> implements ActionListener<T> {

private final ActionListener<T> delegate;
private final CountDownLatch latch;

public LatchedActionListener(ActionListener<T> delegate, CountDownLatch latch) {
this.delegate = delegate;
this.latch = latch;
}

@Override
public void onResponse(T t) {
try {
delegate.onResponse(t);
} finally {
latch.countDown();
}
}

@Override
public void onFailure(Throwable e) {
try {
delegate.onFailure(e);
} finally {
latch.countDown();
}
}
}
15 changes: 15 additions & 0 deletions src/main/java/org/elasticsearch/cluster/ClusterInfoService.java
Expand Up @@ -19,10 +19,25 @@

package org.elasticsearch.cluster;

/**
* Interface for a class used to gather information about a cluster at
* regular intervals
*/
public interface ClusterInfoService {

public static ClusterInfoService EMPTY = EmptyClusterInfoService.getInstance();

/** The latest cluster information */
public ClusterInfo getClusterInfo();

/** Add a listener that will be called every time new information is gathered */
public void addListener(Listener listener);

/**
* Interface for listeners to implement in order to perform actions when
* new information about the cluster has been gathered
*/
public interface Listener {
public void onNewInfo(ClusterInfo info);
}
}
3 changes: 2 additions & 1 deletion src/main/java/org/elasticsearch/cluster/ClusterModule.java
Expand Up @@ -47,6 +47,7 @@
public class ClusterModule extends AbstractModule implements SpawnModules {

private final Settings settings;
public static final String CLUSTER_SERVICE_IMPL = "cluster.info.service.type";

private Set<Class<? extends IndexTemplateFilter>> indexTemplateFilters = new HashSet<>();

Expand Down Expand Up @@ -87,7 +88,7 @@ protected void configure() {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();

bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton();
bind(ClusterInfoService.class).to(settings.getAsClass(CLUSTER_SERVICE_IMPL, InternalClusterInfoService.class)).asEagerSingleton();

Multibinder<IndexTemplateFilter> mbinder = Multibinder.newSetBinder(binder(), IndexTemplateFilter.class);
for (Class<? extends IndexTemplateFilter> indexTemplateFilter : indexTemplateFilters) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/cluster/DiskUsage.java
Expand Up @@ -55,6 +55,6 @@ public long getUsedBytes() {
}

public String toString() {
return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "]";
return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "%]";
}
}
Expand Up @@ -46,4 +46,9 @@ public static EmptyClusterInfoService getInstance() {
public ClusterInfo getClusterInfo() {
return emptyClusterInfo;
}

@Override
public void addListener(Listener listener) {
// no-op, no new info is ever gathered, so adding listeners is useless
}
}
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
Expand All @@ -42,8 +43,9 @@
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
* InternalClusterInfoService provides the ClusterInfoService interface,
Expand All @@ -56,7 +58,7 @@
* Every time the timer runs, gathers information about the disk usage and
* shard sizes across the cluster.
*/
public final class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {
public class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {

public static final String INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL = "cluster.info.update.interval";

Expand All @@ -70,6 +72,7 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
private final TransportIndicesStatsAction transportIndicesStatsAction;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final Set<Listener> listeners = Collections.synchronizedSet(new HashSet<Listener>());

@Inject
public InternalClusterInfoService(Settings settings, NodeSettingsService nodeSettingsService,
Expand Down Expand Up @@ -188,6 +191,11 @@ public ClusterInfo getClusterInfo() {
return new ClusterInfo(usages, shardSizes);
}

@Override
public void addListener(Listener listener) {
this.listeners.add(listener);
}

/**
* Class used to submit {@link ClusterInfoUpdateJob}s on the
* {@link InternalClusterInfoService} threadpool, these jobs will
Expand All @@ -210,6 +218,34 @@ public void run() {
}
}

/**
* Retrieve the latest nodes stats, calling the listener when complete
* @return a latch that can be used to wait for the nodes stats to complete if desired
*/
protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse> listener) {
final CountDownLatch latch = new CountDownLatch(1);
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.fs(true);
nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15));

transportNodesStatsAction.execute(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
}

/**
* Retrieve the latest indices stats, calling the listener when complete
* @return a latch that can be used to wait for the indices stats to complete if desired
*/
protected CountDownLatch updateIndicesStats(final ActionListener<IndicesStatsResponse> listener) {
final CountDownLatch latch = new CountDownLatch(1);
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear();
indicesStatsRequest.store(true);

transportIndicesStatsAction.execute(indicesStatsRequest, new LatchedActionListener<>(listener, latch));
return latch;
}

/**
* Runnable class that performs a {@Link NodesStatsRequest} to retrieve
Expand Down Expand Up @@ -252,12 +288,7 @@ public void run() {
return;
}

NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
nodesStatsRequest.clear();
nodesStatsRequest.fs(true);
nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15));

transportNodesStatsAction.execute(nodesStatsRequest, new ActionListener<NodesStatsResponse>() {
CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
@Override
public void onResponse(NodesStatsResponse nodeStatses) {
Map<String, DiskUsage> newUsages = new HashMap<>();
Expand Down Expand Up @@ -294,10 +325,7 @@ public void onFailure(Throwable e) {
}
});

IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear();
indicesStatsRequest.store(true);
transportIndicesStatsAction.execute(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
CountDownLatch indicesLatch = updateIndicesStats(new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
ShardStats[] stats = indicesStatsResponse.getShards();
Expand Down Expand Up @@ -325,8 +353,24 @@ public void onFailure(Throwable e) {
}
});

if (logger.isTraceEnabled()) {
logger.trace("Finished ClusterInfoUpdateJob");
try {
nodeLatch.await(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("Failed to update node information for ClusterInfoUpdateJob within 15s timeout");
}

try {
indicesLatch.await(15, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within 15s timeout");
}

for (Listener l : listeners) {
try {
l.onNewInfo(getClusterInfo());
} catch (Exception e) {
logger.info("Failed executing ClusterInfoService listener", e);
}
}
}
}
Expand Down
Expand Up @@ -53,10 +53,10 @@ public AllocationDecidersModule add(Class<? extends AllocationDecider> allocatio
protected void configure() {
Multibinder<AllocationDecider> allocationMultibinder = Multibinder.newSetBinder(binder(), AllocationDecider.class);
for (Class<? extends AllocationDecider> deciderClass : DEFAULT_ALLOCATION_DECIDERS) {
allocationMultibinder.addBinding().to(deciderClass);
allocationMultibinder.addBinding().to(deciderClass).asEagerSingleton();
}
for (Class<? extends AllocationDecider> allocation : allocations) {
allocationMultibinder.addBinding().to(allocation);
allocationMultibinder.addBinding().to(allocation).asEagerSingleton();
}

bind(AllocationDeciders.class).asEagerSingleton();
Expand Down

0 comments on commit 80b61b3

Please sign in to comment.