Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch custom ShardsAllocators to pull based model #20071

Merged
merged 1 commit into from
Aug 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,42 +69,38 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Configures classes and services that affect the entire cluster.
*/
public class ClusterModule extends AbstractModule {

public static final String EVEN_SHARD_COUNT_ALLOCATOR = "even_shard";
public static final String BALANCED_ALLOCATOR = "balanced"; // default
public static final Setting<String> SHARDS_ALLOCATOR_TYPE_SETTING =
new Setting<>("cluster.routing.allocation.type", BALANCED_ALLOCATOR, Function.identity(), Property.NodeScope);

private final Settings settings;
private final ExtensionPoint.SelectedType<ShardsAllocator> shardsAllocators = new ExtensionPoint.SelectedType<>("shards_allocator", ShardsAllocator.class);
private final ExtensionPoint.ClassSet<IndexTemplateFilter> indexTemplateFilters = new ExtensionPoint.ClassSet<>("index_template_filter", IndexTemplateFilter.class);
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
// pkg private for tests
final Collection<AllocationDecider> allocationDeciders;
final ShardsAllocator shardsAllocator;

// pkg private so tests can mock
Class<? extends ClusterInfoService> clusterInfoServiceImpl = InternalClusterInfoService.class;

public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins) {
this.settings = settings;
this.allocationDeciders = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
registerShardsAllocator(ClusterModule.BALANCED_ALLOCATOR, BalancedShardsAllocator.class);
registerShardsAllocator(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, BalancedShardsAllocator.class);
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
}

public void registerShardsAllocator(String name, Class<? extends ShardsAllocator> clazz) {
shardsAllocators.registerExtension(name, clazz);
}

public void registerIndexTemplateFilter(Class<? extends IndexTemplateFilter> indexTemplateFilter) {
indexTemplateFilters.registerExtension(indexTemplateFilter);
}
Expand Down Expand Up @@ -148,14 +144,29 @@ private static void addAllocationDecider(Map<Class, AllocationDecider> deciders,
}
}

private static ShardsAllocator createShardsAllocator(Settings settings, ClusterSettings clusterSettings,
List<ClusterPlugin> clusterPlugins) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings));

for (ClusterPlugin plugin : clusterPlugins) {
plugin.getShardsAllocators(settings, clusterSettings).forEach((k, v) -> {
if (allocators.put(k, v) != null) {
throw new IllegalArgumentException("ShardsAllocator [" + k + "] already defined");
}
});
}
String allocatorName = SHARDS_ALLOCATOR_TYPE_SETTING.get(settings);
Supplier<ShardsAllocator> allocatorSupplier = allocators.get(allocatorName);
if (allocatorSupplier == null) {
throw new IllegalArgumentException("Unknown ShardsAllocator [" + allocatorName + "]");
}
return Objects.requireNonNull(allocatorSupplier.get(),
"ShardsAllocator factory for [" + allocatorName + "] returned null");
}

@Override
protected void configure() {
// bind ShardsAllocator
String shardsAllocatorType = shardsAllocators.bindType(binder(), settings, ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), ClusterModule.BALANCED_ALLOCATOR);
if (shardsAllocatorType.equals(ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR)) {
final ESLogger logger = Loggers.getLogger(getClass(), settings);
logger.warn("{} allocator has been removed in 2.0 using {} instead", ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR, ClusterModule.BALANCED_ALLOCATOR);
}
indexTemplateFilters.bind(binder());

bind(ClusterInfoService.class).to(clusterInfoServiceImpl).asEagerSingleton();
Expand All @@ -178,5 +189,6 @@ protected void configure() {
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(new AllocationDeciders(settings, allocationDeciders));
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}
}
17 changes: 17 additions & 0 deletions core/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;

import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -41,4 +44,18 @@ public interface ClusterPlugin {
default Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
return Collections.emptyList();
}

/**
* Return {@link ShardsAllocator} implementations added by this plugin.
*
* The key of the returned {@link Map} is the name of the allocator, and the value
* is a function to construct the allocator.
*
* @param settings Settings for the node
* @param clusterSettings Settings for the cluster
* @return A map of allocator implementations
*/
default Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;

public class ClusterModuleTests extends ModuleTestCase {
private ClusterService clusterService = new ClusterService(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
Expand Down Expand Up @@ -126,33 +128,40 @@ public Collection<AllocationDecider> createAllocationDeciders(Settings settings,
assertTrue(module.allocationDeciders.stream().anyMatch(d -> d.getClass().equals(FakeAllocationDecider.class)));
}

private ClusterModule newClusterModuleWithShardsAllocator(Settings settings, String name, Supplier<ShardsAllocator> supplier) {
return new ClusterModule(settings, clusterService, Collections.singletonList(
new ClusterPlugin() {
@Override
public Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings settings, ClusterSettings clusterSettings) {
return Collections.singletonMap(name, supplier);
}
}
));
}

public void testRegisterShardsAllocator() {
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "custom").build();
ClusterModule module = new ClusterModule(settings, clusterService, Collections.emptyList());
module.registerShardsAllocator("custom", FakeShardsAllocator.class);
assertBinding(module, ShardsAllocator.class, FakeShardsAllocator.class);
ClusterModule module = newClusterModuleWithShardsAllocator(settings, "custom", FakeShardsAllocator::new);
assertEquals(FakeShardsAllocator.class, module.shardsAllocator.getClass());
}

public void testRegisterShardsAllocatorAlreadyRegistered() {
ClusterModule module = new ClusterModule(Settings.EMPTY, clusterService, Collections.emptyList());
try {
module.registerShardsAllocator(ClusterModule.BALANCED_ALLOCATOR, FakeShardsAllocator.class);
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(), "Can't register the same [shards_allocator] more than once for [balanced]");
}
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
newClusterModuleWithShardsAllocator(Settings.EMPTY, ClusterModule.BALANCED_ALLOCATOR, FakeShardsAllocator::new));
assertEquals("ShardsAllocator [" + ClusterModule.BALANCED_ALLOCATOR + "] already defined", e.getMessage());
}

public void testUnknownShardsAllocator() {
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "dne").build();
ClusterModule module = new ClusterModule(settings, clusterService, Collections.emptyList());
assertBindingFailure(module, "Unknown [shards_allocator]");
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () ->
new ClusterModule(settings, clusterService, Collections.emptyList()));
assertEquals("Unknown ShardsAllocator [dne]", e.getMessage());
}

public void testEvenShardsAllocatorBackcompat() {
Settings settings = Settings.builder()
.put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), ClusterModule.EVEN_SHARD_COUNT_ALLOCATOR).build();
ClusterModule module = new ClusterModule(settings, clusterService, Collections.emptyList());
assertBinding(module, ShardsAllocator.class, BalancedShardsAllocator.class);
public void testShardsAllocatorFactoryNull() {
Settings settings = Settings.builder().put(ClusterModule.SHARDS_ALLOCATOR_TYPE_SETTING.getKey(), "bad").build();
NullPointerException e = expectThrows(NullPointerException.class, () ->
newClusterModuleWithShardsAllocator(settings, "bad", () -> null));
}

public void testRegisterIndexTemplateFilterDuplicate() {
Expand Down