Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch open-indices cluster state updates #83760

Merged
merged 13 commits into from Feb 10, 2022
Expand Up @@ -92,7 +92,7 @@ protected void masterOperation(
.indices(concreteIndices)
.waitForActiveShards(request.waitForActiveShards());

indexStateService.openIndex(updateRequest, new ActionListener<>() {
indexStateService.openIndices(updateRequest, new ActionListener<>() {

@Override
public void onResponse(ShardsAcknowledgedResponse response) {
Expand Down
Expand Up @@ -32,14 +32,17 @@
import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateAckListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
Expand All @@ -55,6 +58,8 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
Expand All @@ -76,6 +81,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -111,12 +117,10 @@ public class MetadataIndexStateService {

private final ClusterService clusterService;
private final AllocationService allocationService;
private final IndexMetadataVerifier indexMetadataVerifier;
private final IndicesService indicesService;
private final ShardLimitValidator shardLimitValidator;
private final ThreadPool threadPool;
private final NodeClient client;
private final ThreadPool threadPool;
private final ActiveShardsObserver activeShardsObserver;
private final ClusterStateTaskExecutor<OpenIndicesTask> opensExecutor;

@Inject
public MetadataIndexStateService(
Expand All @@ -128,14 +132,12 @@ public MetadataIndexStateService(
NodeClient client,
ThreadPool threadPool
) {
this.indicesService = indicesService;
this.clusterService = clusterService;
this.allocationService = allocationService;
this.threadPool = threadPool;
this.client = client;
this.indexMetadataVerifier = indexMetadataVerifier;
this.shardLimitValidator = shardLimitValidator;
this.threadPool = threadPool;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.opensExecutor = new OpenIndicesExecutor(allocationService, indexMetadataVerifier, indicesService, shardLimitValidator);
}

/**
Expand Down Expand Up @@ -343,7 +345,7 @@ static ClusterState addIndexClosedBlocks(
* @param block The type of block to add
* @return a tuple of the updated cluster state, as well as the blocks that got added
*/
static Tuple<ClusterState, Map<Index, ClusterBlock>> addIndexBlock(
private static Tuple<ClusterState, Map<Index, ClusterBlock>> addIndexBlock(
final Index[] indices,
final ClusterState currentState,
final APIBlock block
Expand Down Expand Up @@ -538,7 +540,7 @@ public void onFailure(final Exception e) {
* this action succeed then the shard is considered to be ready for closing. When all shards of a given index are ready for closing,
* the index is considered ready to be closed.
*/
class WaitForClosedBlocksApplied extends ActionRunnable<Map<Index, IndexResult>> {
private class WaitForClosedBlocksApplied extends ActionRunnable<Map<Index, IndexResult>> {

private final Map<Index, ClusterBlock> blockedIndices;
private final CloseIndexClusterStateUpdateRequest request;
Expand Down Expand Up @@ -670,7 +672,7 @@ private void sendVerifyShardBeforeCloseRequest(
* Helper class that coordinates with shards to ensure that blocks have been properly applied to all shards using
* {@link TransportVerifyShardIndexBlockAction}.
*/
class WaitForBlocksApplied extends ActionRunnable<Map<Index, AddBlockResult>> {
private class WaitForBlocksApplied extends ActionRunnable<Map<Index, AddBlockResult>> {

private final Map<Index, ClusterBlock> blockedIndices;
private final AddIndexBlockClusterStateUpdateRequest request;
Expand Down Expand Up @@ -884,8 +886,8 @@ static Tuple<ClusterState, Collection<IndexResult>> closeRoutingTable(
);
}

public void openIndex(final OpenIndexClusterStateUpdateRequest request, final ActionListener<ShardsAcknowledgedResponse> listener) {
onlyOpenIndex(request, ActionListener.wrap(response -> {
public void openIndices(final OpenIndexClusterStateUpdateRequest request, final ActionListener<ShardsAcknowledgedResponse> listener) {
onlyOpenIndices(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
String[] indexNames = Arrays.stream(request.indices()).map(Index::getName).toArray(String[]::new);
activeShardsObserver.waitForActiveShards(
Expand All @@ -895,7 +897,7 @@ public void openIndex(final OpenIndexClusterStateUpdateRequest request, final Ac
shardsAcknowledged -> {
if (shardsAcknowledged == false) {
logger.debug(
"[{}] indices opened, but the operation timed out while waiting for " + "enough shards to be started.",
"[{}] indices opened, but the operation timed out while waiting for enough shards to be started.",
Arrays.toString(indexNames)
);
}
Expand All @@ -909,90 +911,18 @@ public void openIndex(final OpenIndexClusterStateUpdateRequest request, final Ac
}, listener::onFailure));
}

private void onlyOpenIndex(final OpenIndexClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
private void onlyOpenIndices(final OpenIndexClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
if (request.indices() == null || request.indices().length == 0) {
throw new IllegalArgumentException("Index name is required");
}

final String indicesAsString = Arrays.toString(request.indices());
clusterService.submitStateUpdateTask(
"open-indices " + indicesAsString,
new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) {
@Override
public ClusterState execute(final ClusterState currentState) {
final ClusterState updatedState = openIndices(request.indices(), currentState);
// no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return allocationService.reroute(updatedState, "indices opened [" + indicesAsString + "]");
}
},
ClusterStateTaskExecutor.unbatched()
);
}

ClusterState openIndices(final Index[] indices, final ClusterState currentState) {
final List<IndexMetadata> indicesToOpen = new ArrayList<>();
for (Index index : indices) {
final IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index);
if (indexMetadata.getState() != IndexMetadata.State.OPEN) {
indicesToOpen.add(indexMetadata);
} else if (currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID)) {
indicesToOpen.add(indexMetadata);
}
}

shardLimitValidator.validateShardLimit(currentState, indices);
if (indicesToOpen.isEmpty()) {
return currentState;
}

logger.info(
() -> new ParameterizedMessage(
"opening indices [{}]",
String.join(",", indicesToOpen.stream().map(i -> (CharSequence) i.getIndex().toString())::iterator)
)
new OpenIndicesTask(request, listener),
ClusterStateTaskConfig.build(Priority.URGENT, request.masterNodeTimeout()),
this.opensExecutor
);

final Metadata.Builder metadata = Metadata.builder(currentState.metadata());
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion();

for (IndexMetadata indexMetadata : indicesToOpen) {
final Index index = indexMetadata.getIndex();
if (indexMetadata.getState() != IndexMetadata.State.OPEN) {
final Settings.Builder updatedSettings = Settings.builder().put(indexMetadata.getSettings());
updatedSettings.remove(VERIFIED_BEFORE_CLOSE_SETTING.getKey());

IndexMetadata newIndexMetadata = IndexMetadata.builder(indexMetadata)
.state(IndexMetadata.State.OPEN)
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
.settings(updatedSettings)
.timestampRange(IndexLongFieldRange.NO_SHARDS)
.build();

// The index might be closed because we couldn't import it due to an old incompatible
// version, so we need to verify its compatibility.
newIndexMetadata = indexMetadataVerifier.verifyIndexMetadata(newIndexMetadata, minIndexCompatibilityVersion);
try {
indicesService.verifyIndexMetadata(newIndexMetadata, newIndexMetadata);
} catch (Exception e) {
throw new ElasticsearchException("Failed to verify index " + index, e);
}
metadata.put(newIndexMetadata, true);
}

// Always removes index closed blocks (note: this can fail on-going close index actions)
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
}

ClusterState updatedState = ClusterState.builder(currentState).metadata(metadata).blocks(blocks).build();

final RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
for (IndexMetadata previousIndexMetadata : indicesToOpen) {
if (previousIndexMetadata.getState() != IndexMetadata.State.OPEN) {
routingTable.addAsFromCloseToOpen(updatedState.metadata().getIndexSafe(previousIndexMetadata.getIndex()));
}
}
return ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
}

/**
Expand All @@ -1003,7 +933,7 @@ ClusterState openIndices(final Index[] indices, final ClusterState currentState)
* @param block the full block to convert to
* @return the updated cluster state, as well as the (failed and successful) index-level results for adding the block
*/
static Tuple<ClusterState, Collection<AddBlockResult>> finalizeBlock(
private static Tuple<ClusterState, Collection<AddBlockResult>> finalizeBlock(
final ClusterState currentState,
final Map<Index, ClusterBlock> blockedIndices,
final Map<Index, AddBlockResult> verifyResult,
Expand Down Expand Up @@ -1079,7 +1009,7 @@ public static ClusterBlock createIndexClosingBlock() {
return new ClusterBlock(
INDEX_CLOSED_BLOCK_ID,
UUIDs.randomBase64UUID(),
"index preparing to close. Reopen the index to allow " + "writes again or retry closing the index to fully close the index.",
"index preparing to close. Reopen the index to allow writes again or retry closing the index to fully close the index.",
false,
false,
false,
Expand Down Expand Up @@ -1108,4 +1038,160 @@ public static ClusterBlock createUUIDBasedBlock(ClusterBlock clusterBlock) {
clusterBlock.levels()
);
}

public static class OpenIndicesExecutor implements ClusterStateTaskExecutor<OpenIndicesTask> {
joegallo marked this conversation as resolved.
Show resolved Hide resolved

private final AllocationService allocationService;
private final IndexMetadataVerifier indexMetadataVerifier;
private final IndicesService indicesService;
private final ShardLimitValidator shardLimitValidator;

public OpenIndicesExecutor(
AllocationService allocationService,
IndexMetadataVerifier indexMetadataVerifier,
IndicesService indicesService,
ShardLimitValidator shardLimitValidator
) {
this.allocationService = allocationService;
this.indexMetadataVerifier = indexMetadataVerifier;
this.indicesService = indicesService;
this.shardLimitValidator = shardLimitValidator;
}

@Override
public ClusterTasksResult<OpenIndicesTask> execute(ClusterState currentState, List<OpenIndicesTask> tasks) throws Exception {
ClusterTasksResult.Builder<OpenIndicesTask> builder = ClusterTasksResult.builder();
ClusterState state = currentState;

try {
// build an in-order de-duplicated array of all the indices to open
final Set<Index> indicesToOpen = new LinkedHashSet<>(tasks.size());
for (OpenIndicesTask task : tasks) {
for (Index index : task.request.indices()) {
indicesToOpen.add(index);
}
joegallo marked this conversation as resolved.
Show resolved Hide resolved
}
Index[] indices = indicesToOpen.toArray(new Index[0]);
joegallo marked this conversation as resolved.
Show resolved Hide resolved

// open them
state = openIndices(indices, state);

// do a final reroute
final String indicesAsString = Arrays.toString(indices);
joegallo marked this conversation as resolved.
Show resolved Hide resolved
state = allocationService.reroute(state, "indices opened [" + indicesAsString + "]");

for (OpenIndicesTask task : tasks) {
builder.success(task);
}
} catch (Exception e) {
for (OpenIndicesTask task : tasks) {
builder.failure(task, e);
}
}

return builder.build(state);
}

private ClusterState openIndices(final Index[] indices, final ClusterState currentState) {
final List<IndexMetadata> indicesToOpen = new ArrayList<>(indices.length);
for (Index index : indices) {
final IndexMetadata indexMetadata = currentState.metadata().getIndexSafe(index);
if (indexMetadata.getState() != IndexMetadata.State.OPEN) {
indicesToOpen.add(indexMetadata);
} else if (currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID)) {
indicesToOpen.add(indexMetadata);
}
}

shardLimitValidator.validateShardLimit(currentState, indices);
if (indicesToOpen.isEmpty()) {
return currentState;
}

logger.info(
() -> new ParameterizedMessage(
"opening indices [{}]",
String.join(",", indicesToOpen.stream().map(i -> (CharSequence) i.getIndex().toString())::iterator)
joegallo marked this conversation as resolved.
Show resolved Hide resolved
)
);

final Metadata.Builder metadata = Metadata.builder(currentState.metadata());
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion();

for (IndexMetadata indexMetadata : indicesToOpen) {
final Index index = indexMetadata.getIndex();
if (indexMetadata.getState() != IndexMetadata.State.OPEN) {
final Settings.Builder updatedSettings = Settings.builder().put(indexMetadata.getSettings());
updatedSettings.remove(VERIFIED_BEFORE_CLOSE_SETTING.getKey());

IndexMetadata newIndexMetadata = IndexMetadata.builder(indexMetadata)
.state(IndexMetadata.State.OPEN)
.settingsVersion(indexMetadata.getSettingsVersion() + 1)
.settings(updatedSettings)
.timestampRange(IndexLongFieldRange.NO_SHARDS)
.build();

// The index might be closed because we couldn't import it due to an old incompatible
// version, so we need to verify its compatibility.
newIndexMetadata = indexMetadataVerifier.verifyIndexMetadata(newIndexMetadata, minIndexCompatibilityVersion);
try {
indicesService.verifyIndexMetadata(newIndexMetadata, newIndexMetadata);
} catch (Exception e) {
throw new ElasticsearchException("Failed to verify index " + index, e);
}
metadata.put(newIndexMetadata, true);
}

// Always removes index closed blocks (note: this can fail on-going close index actions)
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
}

ClusterState updatedState = ClusterState.builder(currentState).metadata(metadata).blocks(blocks).build();

final RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
for (IndexMetadata previousIndexMetadata : indicesToOpen) {
if (previousIndexMetadata.getState() != IndexMetadata.State.OPEN) {
routingTable.addAsFromCloseToOpen(updatedState.metadata().getIndexSafe(previousIndexMetadata.getIndex()));
}
}
return ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
}
}

public static class OpenIndicesTask implements ClusterStateTaskListener, ClusterStateAckListener {
joegallo marked this conversation as resolved.
Show resolved Hide resolved

private final OpenIndexClusterStateUpdateRequest request;
private final ActionListener<AcknowledgedResponse> listener;

public OpenIndicesTask(OpenIndexClusterStateUpdateRequest request, ActionListener<AcknowledgedResponse> listener) {
this.request = request;
this.listener = listener;
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}

@Override
public void onAllNodesAcked(@Nullable Exception e) {
listener.onResponse(AcknowledgedResponse.of(e == null));
}

@Override
public void onAckTimeout() {
listener.onResponse(AcknowledgedResponse.FALSE);
}

@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
}
}