Skip to content

Commit

Permalink
Batch add index block cluster state updates (#84374)
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Mar 8, 2022
1 parent efe308a commit f114cc9
Show file tree
Hide file tree
Showing 3 changed files with 382 additions and 83 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/84374.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84374
summary: Batch add index block cluster state updates
area: Indices APIs
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
Expand All @@ -59,7 +58,6 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -123,6 +121,8 @@ public class MetadataIndexStateService {
private final ClusterStateTaskExecutor<OpenIndicesTask> opensExecutor;
private final ClusterStateTaskExecutor<AddBlocksToCloseTask> addBlocksToCloseExecutor;
private final ClusterStateTaskExecutor<CloseIndicesTask> closesExecutor;
private final ClusterStateTaskExecutor<AddBlocksTask> addBlocksExecutor;
private final ClusterStateTaskExecutor<FinalizeBlocksTask> finalizeBlocksExecutor;

@Inject
public MetadataIndexStateService(
Expand All @@ -145,6 +145,8 @@ public MetadataIndexStateService(
this.opensExecutor = new OpenIndicesExecutor();
this.addBlocksToCloseExecutor = new AddBlocksToCloseExecutor();
this.closesExecutor = new CloseIndicesExecutor();
this.addBlocksExecutor = new AddBlocksExecutor();
this.finalizeBlocksExecutor = new FinalizeBlocksExecutor();
}

/**
Expand Down Expand Up @@ -481,92 +483,125 @@ public void addIndexBlock(AddIndexBlockClusterStateUpdateRequest request, Action

clusterService.submitStateUpdateTask(
"add-index-block-[" + request.getBlock().name + "]-" + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
new AddBlocksTask(request, listener),
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
addBlocksExecutor
);
}

private Map<Index, ClusterBlock> blockedIndices;
private class AddBlocksExecutor implements ClusterStateTaskExecutor<AddBlocksTask> {

@Override
public ClusterState execute(final ClusterState currentState) {
final Tuple<ClusterState, Map<Index, ClusterBlock>> tup = addIndexBlock(
concreteIndices,
currentState,
request.getBlock()
);
blockedIndices = tup.v2();
return tup.v1();
}
@Override
public ClusterState execute(ClusterState currentState, List<TaskContext<AddBlocksTask>> taskContexts) throws Exception {
ClusterState state = currentState;

@Override
public void clusterStateProcessed(final ClusterState oldState, final ClusterState newState) {
if (oldState == newState) {
assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed";
listener.onResponse(AddIndexBlockResponse.EMPTY);
} else {
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(
new WaitForBlocksApplied(
blockedIndices,
request,
ActionListener.wrap(
verifyResults -> clusterService.submitStateUpdateTask(
"finalize-index-block-["
+ request.getBlock().name
+ "]-["
+ blockedIndices.keySet().stream().map(Index::getName).collect(Collectors.joining(", "))
+ "]",
new ClusterStateUpdateTask(Priority.URGENT) {
private final List<AddBlockResult> indices = new ArrayList<>();

@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
Tuple<ClusterState, List<AddBlockResult>> addBlockResult = finalizeBlock(
currentState,
blockedIndices,
verifyResults,
request.getBlock()
);
assert verifyResults.size() == addBlockResult.v2().size();
indices.addAll(addBlockResult.v2());
return addBlockResult.v1();
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(
final ClusterState oldState,
final ClusterState newState
) {

final boolean acknowledged = indices.stream().noneMatch(AddBlockResult::hasFailures);
listener.onResponse(new AddIndexBlockResponse(acknowledged, acknowledged, indices));
}
},
newExecutor()
),
listener::onFailure
for (final var taskContext : taskContexts) {
try {
final var task = taskContext.getTask();
final Tuple<ClusterState, Map<Index, ClusterBlock>> blockResult = addIndexBlock(
task.request.indices(),
state,
task.request.getBlock()
);
state = blockResult.v1();
final Map<Index, ClusterBlock> blockedIndices = blockResult.v2();
taskContext.success(task.listener.delegateFailure((delegate1, clusterState) -> {
if (blockedIndices.isEmpty()) {
delegate1.onResponse(AddIndexBlockResponse.EMPTY);
} else {
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(
new WaitForBlocksApplied(
blockedIndices,
task.request,
delegate1.delegateFailure((delegate2, verifyResults) -> {
clusterService.submitStateUpdateTask(
"finalize-index-block-["
+ task.request.getBlock().name
+ "]-["
+ blockedIndices.keySet().stream().map(Index::getName).collect(Collectors.joining(", "))
+ "]",
new FinalizeBlocksTask(task.request, blockedIndices, verifyResults, delegate2),
ClusterStateTaskConfig.build(Priority.URGENT),
finalizeBlocksExecutor
);
})
)
)
);
}
);
}
}));
} catch (Exception e) {
taskContext.onFailure(e);
}
}

@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
return state;
}
}

private record AddBlocksTask(AddIndexBlockClusterStateUpdateRequest request, ActionListener<AddIndexBlockResponse> listener)
implements
ClusterStateTaskListener {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}

private static class FinalizeBlocksExecutor implements ClusterStateTaskExecutor<FinalizeBlocksTask> {

@Override
public ClusterState execute(ClusterState currentState, List<TaskContext<FinalizeBlocksTask>> taskContexts) throws Exception {
ClusterState state = currentState;

for (final var taskContext : taskContexts) {
try {
final var task = taskContext.getTask();
final Tuple<ClusterState, List<AddBlockResult>> finalizeResult = finalizeBlock(
state,
task.blockedIndices,
task.verifyResults,
task.request.getBlock()
);
state = finalizeResult.v1();
final List<AddBlockResult> indices = finalizeResult.v2();
assert indices.size() == task.verifyResults.size();

taskContext.success(task.listener.delegateFailure((delegate, clusterState) -> {
final boolean acknowledged = indices.stream().noneMatch(AddBlockResult::hasFailures);
delegate.onResponse(new AddIndexBlockResponse(acknowledged, acknowledged, indices));
}));
} catch (Exception e) {
taskContext.onFailure(e);
}
},
newExecutor()
);
}

return state;
}
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private static <T extends ClusterStateUpdateTask> ClusterStateTaskExecutor<T> newExecutor() {
return ClusterStateTaskExecutor.unbatched();
private record FinalizeBlocksTask(
AddIndexBlockClusterStateUpdateRequest request,
Map<Index, ClusterBlock> blockedIndices,
Map<Index, AddBlockResult> verifyResults,
ActionListener<AddIndexBlockResponse> listener
) implements ClusterStateTaskListener {

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
assert false : "not called";
}
}

/**
Expand Down Expand Up @@ -922,7 +957,7 @@ static Tuple<ClusterState, List<IndexResult>> closeRoutingTable(
}

public void openIndices(final OpenIndexClusterStateUpdateRequest request, final ActionListener<ShardsAcknowledgedResponse> listener) {
onlyOpenIndices(request, ActionListener.wrap(response -> {
onlyOpenIndices(request, listener.delegateFailure((delegate, response) -> {
if (response.isAcknowledged()) {
String[] indexNames = Arrays.stream(request.indices()).map(Index::getName).toArray(String[]::new);
activeShardsObserver.waitForActiveShards(
Expand All @@ -936,14 +971,14 @@ public void openIndices(final OpenIndexClusterStateUpdateRequest request, final
Arrays.toString(indexNames)
);
}
listener.onResponse(ShardsAcknowledgedResponse.of(true, shardsAcknowledged));
delegate.onResponse(ShardsAcknowledgedResponse.of(true, shardsAcknowledged));
},
listener::onFailure
);
} else {
listener.onResponse(ShardsAcknowledgedResponse.NOT_ACKNOWLEDGED);
delegate.onResponse(ShardsAcknowledgedResponse.NOT_ACKNOWLEDGED);
}
}, listener::onFailure));
}));
}

private void onlyOpenIndices(final OpenIndexClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
Expand Down

0 comments on commit f114cc9

Please sign in to comment.