Skip to content

Commit

Permalink
Backport the Close Index API refactoring to 6.x (#37359)
Browse files Browse the repository at this point in the history
This commit backports to 6.x of the Close Index API refactoring.

It cherry-picks the following commits from master:
3ca885e [Close Index API] Add TransportShardCloseAction for pre-closing verifications (#36249)
8e5dd20 [Close Index API] Refactor MetaDataIndexStateService (#36354)
7372529 [Tests] Reduce randomization in CloseWhileRelocatingShardsIT (#36694)
103c4d4 [Close Index API] Mark unavailable shard copy as stale during verification (#36755)
1959388 [Close Index API] Propagate tasks ids between Freeze, Close and Verify(#36630)
e149b08 [Close Index API] Add unique UUID to ClusterBlock (#36775)
dc371ef [Tests] Fix ReopenWhileClosingIT with correct min num shards

The following two commits were needed to adapt the change to 6.x:
ef6ae69 [Close Index API] Adapt MetaDataIndexStateServiceTests after merge
21b7653 [Tests] Adapt CloseIndexIT tests for 6.x

Related to #33888
  • Loading branch information
tlrx committed Jan 14, 2019
1 parent f2c9051 commit cedf204
Show file tree
Hide file tree
Showing 23 changed files with 2,282 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.Request;
import org.junit.After;
import org.junit.Before;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
*/
public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {

CloseIndexClusterStateUpdateRequest() {
private final long taskId;

public CloseIndexClusterStateUpdateRequest(final long taskId) {
this.taskId = taskId;
}

public long taskId() {
return taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -100,24 +99,32 @@ protected ClusterBlockException checkBlock(CloseIndexRequest request, ClusterSta
@Override
protected void masterOperation(final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}

@Override
protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
listener.onResponse(new AcknowledgedResponse(true));
return;
}
CloseIndexClusterStateUpdateRequest updateRequest = new CloseIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);

indexStateService.closeIndices(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);

indexStateService.closeIndices(closeRequest, new ActionListener<AcknowledgedResponse>() {

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
public void onResponse(final AcknowledgedResponse response) {
listener.onResponse(response);
}

@Override
public void onFailure(Exception t) {
public void onFailure(final Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.close;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;

public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {

public static final String NAME = CloseIndexAction.NAME + "[s]";

@Inject
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
final ClusterService clusterService, final IndicesService indicesService,
final ThreadPool threadPool, final ShardStateAction stateAction,
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
}

@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}

@Override
protected void acquirePrimaryOperationPermit(final IndexShard primary,
final ShardRequest request,
final ActionListener<Releasable> onAcquired) {
primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout());
}

@Override
protected void acquireReplicaOperationPermit(final IndexShard replica,
final ShardRequest request,
final ActionListener<Releasable> onAcquired,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdateOrDeletes) {
replica.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNoOfUpdateOrDeletes, onAcquired, request.timeout());
}

@Override
protected PrimaryResult<ShardRequest, ReplicationResponse> shardOperationOnPrimary(final ShardRequest shardRequest,
final IndexShard primary) throws Exception {
executeShardOperation(shardRequest, primary);
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}

@Override
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
}

private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
if (indexShard.getActiveOperationsCount() != 0) {
throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing");
}

final ClusterBlocks clusterBlocks = clusterService.state().blocks();
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
}

final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
if (indexShard.getGlobalCheckpoint() != maxSeqNo) {
throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
}
indexShard.flush(new FlushRequest());
logger.debug("{} shard is ready for closing", shardId);
}

@Override
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy(final long primaryTerm) {
return new VerifyShardBeforeCloseActionReplicasProxy(primaryTerm);
}

/**
* A {@link ReplicasProxy} that marks as stale the shards that are unavailable during the verification
* and the flush of the shard. This is done to ensure that such shards won't be later promoted as primary
* or reopened in an unverified state with potential non flushed translog operations.
*/
class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy {

VerifyShardBeforeCloseActionReplicasProxy(final long primaryTerm) {
super(primaryTerm);
}

@Override
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted, final Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
}

public static class ShardRequest extends ReplicationRequest<ShardRequest> {

private ClusterBlock clusterBlock;

ShardRequest(){
}

public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
super(shardId);
this.clusterBlock = Objects.requireNonNull(clusterBlock);
setParentTask(parentTaskId);
}

@Override
public String toString() {
return "verify shard " + shardId + " before close with block " + clusterBlock;
}

@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
clusterBlock = ClusterBlock.readClusterBlock(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
clusterBlock.writeTo(out);
}

public ClusterBlock clusterBlock() {
return clusterBlock;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.cluster.block;

import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand All @@ -31,29 +32,31 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Objects;

public class ClusterBlock implements Streamable, ToXContentFragment {

private int id;

private @Nullable String uuid;
private String description;

private EnumSet<ClusterBlockLevel> levels;

private boolean retryable;

private boolean disableStatePersistence = false;

private boolean allowReleaseResources;

private RestStatus status;

ClusterBlock() {
private ClusterBlock() {
}

public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this(id, null, description, retryable, disableStatePersistence, allowReleaseResources, status, levels);
}

public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, boolean allowReleaseResources,
RestStatus status, EnumSet<ClusterBlockLevel> levels) {
public ClusterBlock(int id, String uuid, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this.id = id;
this.uuid = uuid;
this.description = description;
this.retryable = retryable;
this.disableStatePersistence = disableStatePersistence;
Expand All @@ -66,6 +69,10 @@ public int id() {
return this.id;
}

public String uuid() {
return uuid;
}

public String description() {
return this.description;
}
Expand Down Expand Up @@ -105,6 +112,9 @@ public boolean disableStatePersistence() {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Integer.toString(id));
if (uuid != null) {
builder.field("uuid", uuid);
}
builder.field("description", description);
builder.field("retryable", retryable);
if (disableStatePersistence) {
Expand All @@ -128,6 +138,11 @@ public static ClusterBlock readClusterBlock(StreamInput in) throws IOException {
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
uuid = in.readOptionalString();
} else {
uuid = null;
}
description = in.readString();
final int len = in.readVInt();
ArrayList<ClusterBlockLevel> levels = new ArrayList<>(len);
Expand All @@ -148,6 +163,9 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeOptionalString(uuid);
}
out.writeString(description);
out.writeVInt(levels.size());
for (ClusterBlockLevel level : levels) {
Expand All @@ -164,7 +182,11 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(id).append(",").append(description).append(", blocks ");
sb.append(id).append(",");
if (uuid != null) {
sb.append(uuid).append(',');
}
sb.append(description).append(", blocks ");
String delimiter = "";
for (ClusterBlockLevel level : levels) {
sb.append(delimiter).append(level.name());
Expand All @@ -175,19 +197,19 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ClusterBlock that = (ClusterBlock) o;

if (id != that.id) return false;

return true;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ClusterBlock that = (ClusterBlock) o;
return id == that.id && Objects.equals(uuid, that.uuid);
}

@Override
public int hashCode() {
return id;
return Objects.hash(id, uuid);
}

public boolean isAllowReleaseResources() {
Expand Down
Loading

0 comments on commit cedf204

Please sign in to comment.