Skip to content

Commit

Permalink
[Close Index API] Add unique UUID to ClusterBlock (#36775)
Browse files Browse the repository at this point in the history
This commit adds a unique id to cluster blocks, so that they can be uniquely 
identified if needed. This is important for the Close Index API where multiple 
concurrent closing requests can be executed at the same time. By adding a 
UUID to the cluster block, we can generate unique "closing block" that can 
later be verified on shards and then checked again from the cluster state 
before closing the index. When the verification on shard is done, the closing 
block is replaced by the regular INDEX_CLOSED_BLOCK instance.

If something goes wrong, calling the Open Index API will remove the block.

Related to #33888
  • Loading branch information
tlrx committed Jan 7, 2019
1 parent f5af79b commit e149b08
Show file tree
Hide file tree
Showing 16 changed files with 773 additions and 250 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.test.rest;

import org.apache.http.util.EntityUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Request;
Expand All @@ -41,7 +40,6 @@
/**
* Tests that wait for refresh is fired if the index is closed.
*/
@LuceneTestCase.AwaitsFix(bugUrl = "to be created")
public class WaitForRefreshAndCloseIT extends ESRestTestCase {
@Before
public void setupIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
Expand All @@ -41,13 +42,14 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;

public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {

public static final String NAME = CloseIndexAction.NAME + "[s]";
public static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK;

@Inject
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
Expand Down Expand Up @@ -83,25 +85,25 @@ protected void acquireReplicaOperationPermit(final IndexShard replica,
@Override
protected PrimaryResult<ShardRequest, ReplicationResponse> shardOperationOnPrimary(final ShardRequest shardRequest,
final IndexShard primary) throws Exception {
executeShardOperation(primary);
executeShardOperation(shardRequest, primary);
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}

@Override
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception {
executeShardOperation(replica);
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
}

private void executeShardOperation(final IndexShard indexShard) {
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
if (indexShard.getActiveOperationsCount() != 0) {
throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing");
}

final ClusterBlocks clusterBlocks = clusterService.state().blocks();
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), EXPECTED_BLOCK) == false) {
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + EXPECTED_BLOCK + " before closing");
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
}

final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
Expand Down Expand Up @@ -139,17 +141,36 @@ public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String all

public static class ShardRequest extends ReplicationRequest<ShardRequest> {

private ClusterBlock clusterBlock;

ShardRequest(){
}

public ShardRequest(final ShardId shardId, final TaskId parentTaskId) {
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
super(shardId);
this.clusterBlock = Objects.requireNonNull(clusterBlock);
setParentTask(parentTaskId);
}

@Override
public String toString() {
return "verify shard before close {" + shardId + "}";
return "verify shard " + shardId + " before close with block " + clusterBlock;
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
clusterBlock = ClusterBlock.readClusterBlock(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
clusterBlock.writeTo(out);
}

public ClusterBlock clusterBlock() {
return clusterBlock;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.cluster.block;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -30,29 +32,31 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Objects;

public class ClusterBlock implements Streamable, ToXContentFragment {

private int id;

private @Nullable String uuid;
private String description;

private EnumSet<ClusterBlockLevel> levels;

private boolean retryable;

private boolean disableStatePersistence = false;

private boolean allowReleaseResources;

private RestStatus status;

ClusterBlock() {
private ClusterBlock() {
}

public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this(id, null, description, retryable, disableStatePersistence, allowReleaseResources, status, levels);
}

public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, boolean allowReleaseResources,
RestStatus status, EnumSet<ClusterBlockLevel> levels) {
public ClusterBlock(int id, String uuid, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this.id = id;
this.uuid = uuid;
this.description = description;
this.retryable = retryable;
this.disableStatePersistence = disableStatePersistence;
Expand All @@ -65,6 +69,10 @@ public int id() {
return this.id;
}

public String uuid() {
return uuid;
}

public String description() {
return this.description;
}
Expand Down Expand Up @@ -104,6 +112,9 @@ public boolean disableStatePersistence() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Integer.toString(id));
if (uuid != null) {
builder.field("uuid", uuid);
}
builder.field("description", description);
builder.field("retryable", retryable);
if (disableStatePersistence) {
Expand All @@ -127,6 +138,11 @@ public static ClusterBlock readClusterBlock(StreamInput in) throws IOException {
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
uuid = in.readOptionalString();
} else {
uuid = null;
}
description = in.readString();
final int len = in.readVInt();
ArrayList<ClusterBlockLevel> levels = new ArrayList<>(len);
Expand All @@ -143,6 +159,9 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(uuid);
}
out.writeString(description);
out.writeVInt(levels.size());
for (ClusterBlockLevel level : levels) {
Expand All @@ -157,7 +176,11 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(id).append(",").append(description).append(", blocks ");
sb.append(id).append(",");
if (uuid != null) {
sb.append(uuid).append(',');
}
sb.append(description).append(", blocks ");
String delimiter = "";
for (ClusterBlockLevel level : levels) {
sb.append(delimiter).append(level.name());
Expand All @@ -168,19 +191,19 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ClusterBlock that = (ClusterBlock) o;

if (id != that.id) return false;

return true;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ClusterBlock that = (ClusterBlock) o;
return id == that.id && Objects.equals(uuid, that.uuid);
}

@Override
public int hashCode() {
return id;
return Objects.hash(id, uuid);
}

public boolean isAllowReleaseResources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -147,6 +148,31 @@ public boolean hasIndexBlock(String index, ClusterBlock block) {
return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block);
}

public boolean hasIndexBlockWithId(String index, int blockId) {
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
if (clusterBlocks != null) {
for (ClusterBlock clusterBlock : clusterBlocks) {
if (clusterBlock.id() == blockId) {
return true;
}
}
}
return false;
}

@Nullable
public ClusterBlock getIndexBlockWithId(final String index, final int blockId) {
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
if (clusterBlocks != null) {
for (ClusterBlock clusterBlock : clusterBlocks) {
if (clusterBlock.id() == blockId) {
return clusterBlock;
}
}
}
return null;
}

public void globalBlockedRaiseException(ClusterBlockLevel level) throws ClusterBlockException {
ClusterBlockException blockException = globalBlockedException(level);
if (blockException != null) {
Expand Down Expand Up @@ -403,6 +429,18 @@ public Builder removeIndexBlock(String index, ClusterBlock block) {
return this;
}

public Builder removeIndexBlockWithId(String index, int blockId) {
final Set<ClusterBlock> indexBlocks = indices.get(index);
if (indexBlocks == null) {
return this;
}
indexBlocks.removeIf(block -> block.id() == blockId);
if (indexBlocks.isEmpty()) {
indices.remove(index);
}
return this;
}

public ClusterBlocks build() {
// We copy the block sets here in case of the builder is modified after build is called
ImmutableOpenMap.Builder<String, Set<ClusterBlock>> indicesBuilder = ImmutableOpenMap.builder(indices.size());
Expand Down
Loading

0 comments on commit e149b08

Please sign in to comment.