Skip to content

Commit

Permalink
[Close Index API] Mark unavailable shard copy as stale during verific…
Browse files Browse the repository at this point in the history
…ation (#36755)

This pull request modifies the TransportVerifyShardBeforeCloseAction so that 
it marks unavailable shards as stale.
  • Loading branch information
tlrx committed Dec 18, 2018
1 parent cd3a1af commit 103c4d4
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
Expand All @@ -39,6 +40,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.function.Consumer;

public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {

Expand Down Expand Up @@ -109,6 +112,30 @@ private void executeShardOperation(final IndexShard indexShard) {
logger.debug("{} shard is ready for closing", shardId);
}

@Override
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy(final long primaryTerm) {
return new VerifyShardBeforeCloseActionReplicasProxy(primaryTerm);
}

/**
* A {@link ReplicasProxy} that marks as stale the shards that are unavailable during the verification
* and the flush of the shard. This is done to ensure that such shards won't be later promoted as primary
* or reopened in an unverified state with potential non flushed translog operations.
*/
class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy {

VerifyShardBeforeCloseActionReplicasProxy(final long primaryTerm) {
super(primaryTerm);
}

@Override
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted, final Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
}

public static class ShardRequest extends ReplicationRequest<ShardRequest> {

ShardRequest(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,55 @@
*/
package org.elasticsearch.action.admin.indices.close;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -47,9 +75,17 @@

public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {

private static ThreadPool threadPool;

private IndexShard indexShard;
private TransportVerifyShardBeforeCloseAction action;
private ClusterService clusterService;
private CapturingTransport transport;

@BeforeClass
public static void beforeClass() {
threadPool = new TestThreadPool(getTestClass().getName());
}

@Override
@Before
Expand All @@ -64,13 +100,32 @@ public void setUp() throws Exception {
final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3));
when(indexShard.shardId()).thenReturn(shardId);

clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test"))
clusterService = createClusterService(threadPool);
setState(clusterService, new ClusterState.Builder(clusterService.state())
.blocks(ClusterBlocks.builder().addIndexBlock("index", INDEX_CLOSED_BLOCK).build()).build());

action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, mock(TransportService.class), clusterService,
mock(IndicesService.class), mock(ThreadPool.class), mock(ShardStateAction.class), mock(ActionFilters.class),
mock(IndexNameExpressionResolver.class));
transport = new CapturingTransport();
TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, transportService, clusterService, mock(IndicesService.class),
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
}

@Override
@After
public void tearDown() throws Exception {
super.tearDown();
clusterService.close();
}

@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}

private void executeOnPrimaryOrReplica() throws Exception {
Expand Down Expand Up @@ -98,7 +153,7 @@ public void testOperationFailsWithOnGoingOps() {
}

public void testOperationFailsWithNoBlock() {
when(clusterService.state()).thenReturn(new ClusterState.Builder(new ClusterName("test")).build());
setState(clusterService, new ClusterState.Builder(new ClusterName("test")).build());

IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
assertThat(exception.getMessage(),
Expand All @@ -119,4 +174,149 @@ public void testOperationFailsWithGlobalCheckpointNotCaughtUp() {
+ maxSeqNo + "] on index shard " + indexShard.shardId()));
verify(indexShard, times(0)).flush(any(FlushRequest.class));
}

public void testUnavailableShardsMarkedAsStale() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);

final int nbReplicas = randomIntBetween(1, 10);
final ShardRoutingState[] replicaStates = new ShardRoutingState[nbReplicas];
for (int i = 0; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.STARTED;
}
final ClusterState clusterState = state(index, true, ShardRoutingState.STARTED, replicaStates);
setState(clusterService, clusterState);

IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(index).shard(shardId.id());
final IndexMetaData indexMetaData = clusterState.getMetaData().index(index);
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
final long primaryTerm = indexMetaData.primaryTerm(0);

final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(0);
final Set<String> trackedShards = shardRoutingTable.getAllAllocationIds();

List<ShardRouting> unavailableShards = randomSubsetOf(randomIntBetween(1, nbReplicas), shardRoutingTable.replicaShards());
IndexShardRoutingTable.Builder shardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardRoutingTable);
unavailableShards.forEach(shardRoutingTableBuilder::removeShard);
shardRoutingTable = shardRoutingTableBuilder.build();

final ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0));

final PlainActionFuture<PrimaryResult> listener = new PlainActionFuture<>();
TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId);
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy(primaryTerm);
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation =
new ReplicationOperation<>(request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test");
operation.execute();

final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(nbReplicas));

for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
final String actionName = capturedRequest.action;
if (actionName.startsWith(ShardStateAction.SHARD_FAILED_ACTION_NAME)) {
assertThat(capturedRequest.request, instanceOf(ShardStateAction.FailedShardEntry.class));
String allocationId = ((ShardStateAction.FailedShardEntry) capturedRequest.request).getAllocationId();
assertTrue(unavailableShards.stream().anyMatch(shardRouting -> shardRouting.allocationId().getId().equals(allocationId)));
transport.handleResponse(capturedRequest.requestId, TransportResponse.Empty.INSTANCE);

} else if (actionName.startsWith(TransportVerifyShardBeforeCloseAction.NAME)) {
assertThat(capturedRequest.request, instanceOf(ConcreteShardRequest.class));
String allocationId = ((ConcreteShardRequest) capturedRequest.request).getTargetAllocationID();
assertFalse(unavailableShards.stream().anyMatch(shardRouting -> shardRouting.allocationId().getId().equals(allocationId)));
assertTrue(inSyncAllocationIds.stream().anyMatch(inSyncAllocationId -> inSyncAllocationId.equals(allocationId)));
transport.handleResponse(capturedRequest.requestId, new TransportReplicationAction.ReplicaResponse(0L, 0L));

} else {
fail("Test does not support action " + capturedRequest.action);
}
}

final ReplicationResponse.ShardInfo shardInfo = listener.get().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(0));
assertThat(shardInfo.getFailures(), arrayWithSize(0));
assertThat(shardInfo.getSuccessful(), equalTo(1 + nbReplicas - unavailableShards.size()));
}

private static ReplicationOperation.Primary<
TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest,
PrimaryResult>
createPrimary(final ShardRouting primary, final ReplicationGroup replicationGroup) {
return new ReplicationOperation.Primary<
TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest,
PrimaryResult>() {
@Override
public ShardRouting routingEntry() {
return primary;
}

@Override
public ReplicationGroup getReplicationGroup() {
return replicationGroup;
}

@Override
public PrimaryResult perform(TransportVerifyShardBeforeCloseAction.ShardRequest request) throws Exception {
return new PrimaryResult(request);
}

@Override
public void failShard(String message, Exception exception) {

}

@Override
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
}

@Override
public void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint) {
}

@Override
public long localCheckpoint() {
return 0;
}

@Override
public long globalCheckpoint() {
return 0;
}

@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return 0;
}
};
}

private static class PrimaryResult implements ReplicationOperation.PrimaryResult<TransportVerifyShardBeforeCloseAction.ShardRequest> {

private final TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest;
private final SetOnce<ReplicationResponse.ShardInfo> shardInfo;

private PrimaryResult(final TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest) {
this.replicaRequest = replicaRequest;
this.shardInfo = new SetOnce<>();
}

@Override
public TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest() {
return replicaRequest;
}

@Override
public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
this.shardInfo.set(shardInfo);
}

public ReplicationResponse.ShardInfo getShardInfo() {
return shardInfo.get();
}
}

}

0 comments on commit 103c4d4

Please sign in to comment.