Skip to content

Commit

Permalink
Batch execute template and pipeline cluster state operations (#86017)
Browse files Browse the repository at this point in the history
This commit changes the cluster state operations for templates (legacy, component, and composable) as well as ingest pipelines to be bulk executed. This means that they can be processed much faster when creating/updating many simultaneously.

Relates to #77505
  • Loading branch information
dakrone committed May 4, 2022
1 parent 2daf287 commit 9af8cf2
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 146 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/86017.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 86017
summary: Batch execute template and pipeline cluster state operations
area: "Indices APIs"
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected void masterOperation(
) {
indexTemplateService.removeTemplates(
new MetadataIndexTemplateService.RemoveRequest(request.name()).masterTimeout(request.masterNodeTimeout()),
new MetadataIndexTemplateService.RemoveListener() {
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse response) {
listener.onResponse(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ protected void masterOperation(
.masterTimeout(request.masterNodeTimeout())
.version(request.version()),

new MetadataIndexTemplateService.PutListener() {
new ActionListener<>() {
@Override
public void onResponse(MetadataIndexTemplateService.PutResponse response) {
listener.onResponse(AcknowledgedResponse.of(response.acknowledged()));
public void onResponse(AcknowledgedResponse response) {
listener.onResponse(response);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -121,6 +123,53 @@ public class MetadataIndexTemplateService {
private final SystemIndices systemIndices;
private final Set<IndexSettingProvider> indexSettingProviders;

/**
* This is the cluster state task executor for all template-based actions.
*/
private static final ClusterStateTaskExecutor<TemplateClusterStateUpdateTask> TEMPLATE_TASK_EXECUTOR = (currentState, taskContexts) -> {
ClusterState state = currentState;
for (final var taskContext : taskContexts) {
try {
final var task = taskContext.getTask();
state = task.execute(state);
taskContext.success(new ClusterStateTaskExecutor.LegacyClusterTaskResultActionListener(task, currentState));
} catch (Exception e) {
taskContext.onFailure(e);
}
}
return state;
};

/**
* A specialized cluster state update task that always takes a listener handling an
* AcknowledgedResponse, as all template actions have simple acknowledged yes/no responses.
*/
private abstract static class TemplateClusterStateUpdateTask extends ClusterStateUpdateTask {
private final ActionListener<AcknowledgedResponse> listener;

TemplateClusterStateUpdateTask(Priority priority, TimeValue timeout, ActionListener<AcknowledgedResponse> listener) {
super(priority, timeout);
this.listener = listener;
}

public abstract ClusterState doExecute(ClusterState currentState) throws Exception;

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return doExecute(currentState);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(AcknowledgedResponse.TRUE);
}
}

@Inject
public MetadataIndexTemplateService(
ClusterService clusterService,
Expand All @@ -145,18 +194,12 @@ private void submitUnbatchedTask(String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
}

public void removeTemplates(final RemoveRequest request, final RemoveListener listener) {
submitUnbatchedTask(
public void removeTemplates(final RemoveRequest request, final ActionListener<AcknowledgedResponse> listener) {
clusterService.submitStateUpdateTask(
"remove-index-template [" + request.name + "]",
new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

new TemplateClusterStateUpdateTask(Priority.URGENT, request.masterTimeout, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
public ClusterState doExecute(ClusterState currentState) {
Set<String> templateNames = new HashSet<>();
for (Map.Entry<String, IndexTemplateMetadata> cursor : currentState.metadata().templates().entrySet()) {
String templateName = cursor.getKey();
Expand All @@ -179,12 +222,9 @@ public ClusterState execute(ClusterState currentState) {
}
return ClusterState.builder(currentState).metadata(metadata).build();
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(AcknowledgedResponse.TRUE);
}
}
},
ClusterStateTaskConfig.build(Priority.URGENT, request.masterTimeout),
TEMPLATE_TASK_EXECUTOR
);
}

Expand All @@ -200,25 +240,16 @@ public void putComponentTemplate(
final ComponentTemplate template,
final ActionListener<AcknowledgedResponse> listener
) {
submitUnbatchedTask(
clusterService.submitStateUpdateTask(
"create-component-template [" + name + "], cause [" + cause + "]",
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {

new TemplateClusterStateUpdateTask(Priority.URGENT, masterTimeout, listener) {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState doExecute(ClusterState currentState) throws Exception {
return addComponentTemplate(currentState, create, name, template);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(AcknowledgedResponse.TRUE);
}
}
},
ClusterStateTaskConfig.build(Priority.URGENT, masterTimeout),
TEMPLATE_TASK_EXECUTOR
);
}

Expand Down Expand Up @@ -371,25 +402,16 @@ public void removeComponentTemplate(
final ActionListener<AcknowledgedResponse> listener
) {
validateNotInUse(state.metadata(), names);
submitUnbatchedTask(
clusterService.submitStateUpdateTask(
"remove-component-template [" + String.join(",", names) + "]",
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {

new TemplateClusterStateUpdateTask(Priority.URGENT, masterTimeout, listener) {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) {
public ClusterState doExecute(ClusterState currentState) {
return innerRemoveComponentTemplate(currentState, names);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(AcknowledgedResponse.TRUE);
}
}
},
ClusterStateTaskConfig.build(Priority.URGENT, masterTimeout),
TEMPLATE_TASK_EXECUTOR
);
}

Expand Down Expand Up @@ -487,25 +509,16 @@ public void putIndexTemplateV2(
final ActionListener<AcknowledgedResponse> listener
) {
validateV2TemplateRequest(clusterService.state().metadata(), name, template);
submitUnbatchedTask(
clusterService.submitStateUpdateTask(
"create-index-template-v2 [" + name + "], cause [" + cause + "]",
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {

new TemplateClusterStateUpdateTask(Priority.URGENT, masterTimeout, listener) {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState doExecute(ClusterState currentState) throws Exception {
return addIndexTemplateV2(currentState, create, name, template);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(AcknowledgedResponse.TRUE);
}
}
},
ClusterStateTaskConfig.build(Priority.URGENT, masterTimeout),
TEMPLATE_TASK_EXECUTOR
);
}

Expand Down Expand Up @@ -880,25 +893,16 @@ public void removeIndexTemplateV2(
final TimeValue masterTimeout,
final ActionListener<AcknowledgedResponse> listener
) {
submitUnbatchedTask(
clusterService.submitStateUpdateTask(
"remove-index-template-v2 [" + String.join(",", names) + "]",
new ClusterStateUpdateTask(Priority.URGENT, masterTimeout) {

new TemplateClusterStateUpdateTask(Priority.URGENT, masterTimeout, listener) {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) {
public ClusterState doExecute(ClusterState currentState) {
return innerRemoveIndexTemplateV2(currentState, names);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(AcknowledgedResponse.TRUE);
}
}
},
ClusterStateTaskConfig.build(Priority.URGENT, masterTimeout),
TEMPLATE_TASK_EXECUTOR
);
}

Expand Down Expand Up @@ -990,7 +994,7 @@ static Set<String> dataStreamsUsingTemplates(final ClusterState state, final Set
.collect(Collectors.toSet());
}

public void putTemplate(final PutRequest request, final PutListener listener) {
public void putTemplate(final PutRequest request, final ActionListener<AcknowledgedResponse> listener) {
Settings.Builder updatedSettingsBuilder = Settings.builder();
updatedSettingsBuilder.put(request.settings).normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX);
request.settings(updatedSettingsBuilder.build());
Expand All @@ -1013,26 +1017,17 @@ public void putTemplate(final PutRequest request, final PutListener listener) {

final IndexTemplateMetadata.Builder templateBuilder = IndexTemplateMetadata.builder(request.name);

submitUnbatchedTask(
clusterService.submitStateUpdateTask(
"create-index-template [" + request.name + "], cause [" + request.cause + "]",
new ClusterStateUpdateTask(Priority.URGENT, request.masterTimeout) {

new TemplateClusterStateUpdateTask(Priority.URGENT, request.masterTimeout, listener) {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
public ClusterState doExecute(ClusterState currentState) throws Exception {
validateTemplate(request.settings, request.mappings, indicesService);
return innerPutTemplate(currentState, request, templateBuilder);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(new PutResponse(true));
}
}
},
ClusterStateTaskConfig.build(Priority.URGENT, request.masterTimeout),
TEMPLATE_TASK_EXECUTOR
);
}

Expand Down Expand Up @@ -1480,7 +1475,7 @@ private static void validateCompositeTemplate(
try {
MapperService mapperService = tempIndexService.mapperService();
for (CompressedXContent mapping : mappings) {
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mapping, MergeReason.INDEX_TEMPLATE);
mapperService.merge(MapperService.SINGLE_MAPPING_NAME, mapping, MapperService.MergeReason.INDEX_TEMPLATE);
}

if (template.getDataStreamTemplate() != null) {
Expand Down Expand Up @@ -1550,12 +1545,7 @@ private void validate(String name, Template template, List<String> indexPatterns
name,
maybeTemplate.map(Template::settings).orElse(Settings.EMPTY),
indexPatterns,
maybeTemplate.map(Template::aliases)
.orElse(Collections.emptyMap())
.values()
.stream()
.map(MetadataIndexTemplateService::toAlias)
.toList()
maybeTemplate.map(Template::aliases).orElse(emptyMap()).values().stream().map(MetadataIndexTemplateService::toAlias).toList()
);
}

Expand Down Expand Up @@ -1655,13 +1645,6 @@ private void validate(String name, @Nullable Settings settings, List<String> ind
}
}

public interface PutListener {

void onResponse(PutResponse response);

void onFailure(Exception e);
}

public static class PutRequest {
final String name;
final String cause;
Expand Down Expand Up @@ -1721,18 +1704,6 @@ public PutRequest version(Integer version) {
}
}

public static class PutResponse {
private final boolean acknowledged;

public PutResponse(boolean acknowledged) {
this.acknowledged = acknowledged;
}

public boolean acknowledged() {
return acknowledged;
}
}

public static class RemoveRequest {
final String name;
TimeValue masterTimeout = MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT;
Expand All @@ -1746,11 +1717,4 @@ public RemoveRequest masterTimeout(TimeValue masterTimeout) {
return this;
}
}

public interface RemoveListener {

void onResponse(AcknowledgedResponse response);

void onFailure(Exception e);
}
}

0 comments on commit 9af8cf2

Please sign in to comment.