Skip to content

Commit

Permalink
Execute actions under permit in primary mode only (elastic#42241)
Browse files Browse the repository at this point in the history
Today when executing an action on a primary shard under permit, we do
not enforce that the shard is in primary mode before executing the
action. This commit addresses this by wrapping actions to be executed
under permit in a check that the shard is in primary mode before
executing the action.
  • Loading branch information
jasontedor authored and Gurkan Kaymak committed May 27, 2019
1 parent e90d68d commit f59484e
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,12 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.index.seqno.RetentionLeaseNotFoundException.class,
org.elasticsearch.index.seqno.RetentionLeaseNotFoundException::new,
154,
Version.V_6_7_0);
Version.V_6_7_0),
SHARD_NOT_IN_PRIMARY_MODE_EXCEPTION(
org.elasticsearch.index.shard.ShardNotInPrimaryModeException.class,
org.elasticsearch.index.shard.ShardNotInPrimaryModeException::new,
155,
Version.V_6_8_1);

final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
Expand Down Expand Up @@ -307,10 +308,18 @@ protected void doRun() throws Exception {
primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
}

acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap(
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
this::onFailure
));
acquirePrimaryOperationPermit(
indexShard,
primaryRequest.getRequest(),
ActionListener.wrap(
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
e -> {
if (e instanceof ShardNotInPrimaryModeException) {
onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
} else {
onFailure(e);
}
}));
}

void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -45,7 +43,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Supplier;

Expand Down Expand Up @@ -88,14 +85,10 @@ abstract static class TransportRetentionLeaseAction<T extends Request<T>> extend

@Override
protected ShardsIterator shards(final ClusterState state, final InternalRequest request) {
final IndexShardRoutingTable shardRoutingTable = state
return state
.routingTable()
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id());
if (shardRoutingTable.primaryShard().active()) {
return shardRoutingTable.primaryShardIt();
} else {
return new PlainShardIterator(request.request().getShardId(), Collections.emptyList());
}
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id())
.primaryShardIt();
}

@Override
Expand Down Expand Up @@ -174,6 +167,7 @@ void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest reques
protected Writeable.Reader<Response> getResponseReader() {
return Response::new;
}

}

@Override
Expand Down Expand Up @@ -400,9 +394,10 @@ public static class Response extends ActionResponse {
public Response() {
}

Response(StreamInput in) throws IOException {
Response(final StreamInput in) throws IOException {
super(in);
}

}

}
25 changes: 22 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.index.shard;

import com.carrotsearch.hppc.ObjectLongMap;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CheckIndex;
Expand Down Expand Up @@ -2496,7 +2495,7 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
verifyNotClosed();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;

indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo);
}

/**
Expand All @@ -2507,7 +2506,27 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
}

/**
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
* executing the action.
*
* @param listener the listener to wrap
* @return the wrapped listener
*/
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
return ActionListener.delegateFailure(
listener,
(l, r) -> {
if (replicationTracker.isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
});
}

private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.shard;

import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

public class ShardNotInPrimaryModeException extends IllegalIndexShardStateException {

public ShardNotInPrimaryModeException(final ShardId shardId, final IndexShardState currentState) {
super(shardId, currentState, "shard is not in primary mode");
}

public ShardNotInPrimaryModeException(final StreamInput in) throws IOException {
super(in);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.indices.InvalidIndexTemplateException;
import org.elasticsearch.indices.recovery.RecoverFilesRecoveryException;
Expand Down Expand Up @@ -816,6 +817,7 @@ public void testIds() {
ids.put(152, NoSuchRemoteClusterException.class);
ids.put(153, RetentionLeaseAlreadyExistsException.class);
ids.put(154, RetentionLeaseNotFoundException.class);
ids.put(155, ShardNotInPrimaryModeException.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardNotInPrimaryModeException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
Expand Down Expand Up @@ -390,6 +392,43 @@ public void testNotStartedPrimary() {
assertIndexShardCounter(0);
}

public void testShardNotInPrimaryMode() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
setState(clusterService, state);
final ReplicationTask task = maybeTask();
final Request request = new Request(shardId);
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final AtomicBoolean executed = new AtomicBoolean();

final ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id());
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm);

isPrimaryMode.set(false);

new TestAction(Settings.EMPTY, "internal:test-action", transportService, clusterService, shardStateAction, threadPool) {
@Override
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
ActionListener<PrimaryResult<Request, TestResponse>> listener) {
assertPhase(task, "primary");
assertFalse(executed.getAndSet(true));
super.shardOperationOnPrimary(shardRequest, primary, listener);
}
}.new AsyncPrimaryAction(primaryRequest, listener, task).run();

assertFalse(executed.get());
assertIndexShardCounter(0); // no permit should be held

final ExecutionException e = expectThrows(ExecutionException.class, listener::get);
assertThat(e.getCause(), instanceOf(ReplicationOperation.RetryOnPrimaryException.class));
assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode")));
assertThat(e.getCause().getCause(), instanceOf(ShardNotInPrimaryModeException.class));
assertThat(e.getCause().getCause(), hasToString(containsString("shard is not in primary mode")));
}

/**
* When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from
* the relocation source to the relocation target. If relocation source receives and processes this cluster state
Expand Down Expand Up @@ -1126,6 +1165,8 @@ private void assertIndexShardCounter(int expected) {

private final AtomicBoolean isRelocated = new AtomicBoolean(false);

private final AtomicBoolean isPrimaryMode = new AtomicBoolean(true);

/**
* Sometimes build a ReplicationTask for tracking the phase of the
* TransportReplicationAction. Since TransportReplicationAction has to work
Expand Down Expand Up @@ -1271,10 +1312,16 @@ private IndexService mockIndexService(final IndexMetaData indexMetaData, Cluster
private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService) {
final IndexShard indexShard = mock(IndexShard.class);
when(indexShard.shardId()).thenReturn(shardId);
when(indexShard.state()).thenReturn(IndexShardState.STARTED);
doAnswer(invocation -> {
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
if (isPrimaryMode.get()) {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);

} else {
callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED));
}
return null;
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
doAnswer(invocation -> {
Expand Down

0 comments on commit f59484e

Please sign in to comment.