Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Made template filtering generic and extensible #7454

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportMessage;

import java.util.Map;
import java.util.Set;
Expand All @@ -38,8 +39,9 @@
*/
public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequest<CreateIndexClusterStateUpdateRequest> {

final String cause;
final String index;
private final TransportMessage originalMessage;
private final String cause;
private final String index;

private IndexMetaData.State state = IndexMetaData.State.OPEN;

Expand All @@ -54,7 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
private final Set<ClusterBlock> blocks = Sets.newHashSet();


CreateIndexClusterStateUpdateRequest(String cause, String index) {
CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index) {
this.originalMessage = originalMessage;
this.cause = cause;
this.index = index;
}
Expand Down Expand Up @@ -89,6 +92,10 @@ public CreateIndexClusterStateUpdateRequest state(IndexMetaData.State state) {
return this;
}

public TransportMessage originalMessage() {
return originalMessage;
}

public String cause() {
return cause;
}
Expand Down
Expand Up @@ -77,7 +77,7 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt
cause = "api";
}

CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(cause, request.index())
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -240,6 +241,15 @@ public PutIndexTemplateRequest mapping(String type, Map<String, Object> source)
}
}

/**
* A specialized simplified mapping source method, takes the form of simple properties definition:
* ("field1", "type=string,store=true").
*/
public PutIndexTemplateRequest mapping(String type, Object... source) {
mapping(type, PutMappingRequest.buildFromSimplifiedDef(type, source));
return this;
}

Map<String, String> mappings() {
return this.mappings;
}
Expand Down
Expand Up @@ -109,6 +109,15 @@ public PutIndexTemplateRequestBuilder addMapping(String type, String source) {
return this;
}

/**
* A specialized simplified mapping source method, takes the form of simple properties definition:
* ("field1", "type=string,store=true").
*/
public PutIndexTemplateRequestBuilder addMapping(String type, Object... source) {
request.mapping(type, source);
return this;
}

/**
* Sets the aliases that will be associated with the index when it gets created
*/
Expand Down
19 changes: 18 additions & 1 deletion src/main/java/org/elasticsearch/cluster/ClusterModule.java
Expand Up @@ -20,7 +20,9 @@
package org.elasticsearch.cluster;

import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.action.index.*;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
Expand All @@ -32,20 +34,30 @@
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexDynamicSettingsModule;

import java.util.HashSet;
import java.util.Set;

/**
*
*/
public class ClusterModule extends AbstractModule implements SpawnModules {

private final Settings settings;

private Set<Class<? extends IndexTemplateFilter>> indexTemplateFilters = new HashSet<>();

public ClusterModule(Settings settings) {
this.settings = settings;
}

public void registerIndexTemplateFilter(Class<? extends IndexTemplateFilter> indexTemplateFilter) {
indexTemplateFilters.add(indexTemplateFilter);
}

@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new AllocationModule(settings),
Expand Down Expand Up @@ -76,5 +88,10 @@ protected void configure() {
bind(MappingUpdatedAction.class).asEagerSingleton();

bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton();

Multibinder<IndexTemplateFilter> mbinder = Multibinder.newSetBinder(binder(), IndexTemplateFilter.class);
for (Class<? extends IndexTemplateFilter> indexTemplateFilter : indexTemplateFilters) {
mbinder.addBinding().to(indexTemplateFilter);
}
}
}
@@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;

/**
* Enables filtering the index templates that will be applied for an index, per create index request.
*/
public interface IndexTemplateFilter {

/**
* @return {@code true} if the given template should be applied on the newly created index,
* {@code false} otherwise.
*/
boolean apply(CreateIndexClusterStateUpdateRequest request, IndexTemplateMetaData template);

static class Compound implements IndexTemplateFilter {

private IndexTemplateFilter[] filters;

Compound(IndexTemplateFilter... filters) {
this.filters = filters;
}

@Override
public boolean apply(CreateIndexClusterStateUpdateRequest request, IndexTemplateMetaData template) {
for (IndexTemplateFilter filter : filters) {
if (!filter.apply(request, template)) {
return false;
}
}
return true;
}
}
}
Expand Up @@ -72,10 +72,7 @@
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

Expand All @@ -88,6 +85,7 @@
public class MetaDataCreateIndexService extends AbstractComponent {

public final static int MAX_INDEX_NAME_BYTES = 100;
private static final DefaultIndexTemplateFilter DEFAULT_INDEX_TEMPLATE_FILTER = new DefaultIndexTemplateFilter();

private final Environment environment;
private final ThreadPool threadPool;
Expand All @@ -98,11 +96,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
private final Version version;
private final String riverIndexName;
private final AliasValidator aliasValidator;
private final IndexTemplateFilter indexTemplateFilter;

@Inject
public MetaDataCreateIndexService(Settings settings, Environment environment, ThreadPool threadPool, ClusterService clusterService, IndicesService indicesService,
AllocationService allocationService, MetaDataService metaDataService, Version version, @RiverIndexName String riverIndexName,
AliasValidator aliasValidator) {
AliasValidator aliasValidator, Set<IndexTemplateFilter> indexTemplateFilters) {
super(settings);
this.environment = environment;
this.threadPool = threadPool;
Expand All @@ -113,18 +112,21 @@ public MetaDataCreateIndexService(Settings settings, Environment environment, Th
this.version = version;
this.riverIndexName = riverIndexName;
this.aliasValidator = aliasValidator;
}

public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
for (Map.Entry<String, String> entry : request.settings().getAsMap().entrySet()) {
if (!entry.getKey().startsWith("index.")) {
updatedSettingsBuilder.put("index." + entry.getKey(), entry.getValue());
} else {
updatedSettingsBuilder.put(entry.getKey(), entry.getValue());
if (indexTemplateFilters.isEmpty()) {
this.indexTemplateFilter = DEFAULT_INDEX_TEMPLATE_FILTER;
} else {
IndexTemplateFilter[] templateFilters = new IndexTemplateFilter[indexTemplateFilters.size() + 1];
templateFilters[0] = DEFAULT_INDEX_TEMPLATE_FILTER;
int i = 1;
for (IndexTemplateFilter indexTemplateFilter : indexTemplateFilters) {
templateFilters[i++] = indexTemplateFilter;
}
this.indexTemplateFilter = new IndexTemplateFilter.Compound(templateFilters);
}
request.settings(updatedSettingsBuilder.build());
}

public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {

// we lock here, and not within the cluster service callback since we don't want to
// block the whole cluster state handling
Expand Down Expand Up @@ -192,6 +194,17 @@ public void validateIndexName(String index, ClusterState state) throws Elasticse
}

private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {

ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why these lines were added? is there a test to show this functionality?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they weren't added, the github comparison makes things a bit hard to read here...

for (Map.Entry<String, String> entry : request.settings().getAsMap().entrySet()) {
if (!entry.getKey().startsWith("index.")) {
updatedSettingsBuilder.put("index." + entry.getKey(), entry.getValue());
} else {
updatedSettingsBuilder.put(entry.getKey(), entry.getValue());
}
}
request.settings(updatedSettingsBuilder.build());

clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener) {

@Override
Expand Down Expand Up @@ -230,7 +243,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {

// we only find a template when its an API call (a new index)
// find templates, highest order are better matching
List<IndexTemplateMetaData> templates = findTemplates(request, currentState);
List<IndexTemplateMetaData> templates = findTemplates(request, currentState, indexTemplateFilter);

Map<String, Custom> customs = Maps.newHashMap();

Expand Down Expand Up @@ -486,11 +499,11 @@ private void addMappings(Map<String, Map<String, Object>> mappings, File mapping
}
}

private List<IndexTemplateMetaData> findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state) {
private List<IndexTemplateMetaData> findTemplates(CreateIndexClusterStateUpdateRequest request, ClusterState state, IndexTemplateFilter indexTemplateFilter) {
List<IndexTemplateMetaData> templates = Lists.newArrayList();
for (ObjectCursor<IndexTemplateMetaData> cursor : state.metaData().templates().values()) {
IndexTemplateMetaData template = cursor.value;
if (Regex.simpleMatch(template.template(), request.index())) {
if (indexTemplateFilter.apply(request, template)) {
templates.add(template);
}
}
Expand All @@ -506,7 +519,7 @@ private List<IndexTemplateMetaData> findTemplates(CreateIndexClusterStateUpdateR
byte[] templatesData = Streams.copyToByteArray(templatesFile);
parser = XContentHelper.createParser(templatesData, 0, templatesData.length);
IndexTemplateMetaData template = IndexTemplateMetaData.Builder.fromXContent(parser);
if (Regex.simpleMatch(template.template(), request.index())) {
if (indexTemplateFilter.apply(request, template)) {
templates.add(template);
}
} catch (Exception e) {
Expand All @@ -530,4 +543,11 @@ public int compare(IndexTemplateMetaData o1, IndexTemplateMetaData o2) {
private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) throws ElasticsearchException {
validateIndexName(request.index(), state);
}

private static class DefaultIndexTemplateFilter implements IndexTemplateFilter {
@Override
public boolean apply(CreateIndexClusterStateUpdateRequest request, IndexTemplateMetaData template) {
return Regex.simpleMatch(template.template(), request.index());
}
}
}