Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute actions under permit in primary mode only #42241

Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,18 @@ protected void doRun() throws Exception {
primaryRequest.getTargetAllocationID(), primaryRequest.getPrimaryTerm(), actualTerm);
}

acquirePrimaryOperationPermit(indexShard, primaryRequest.getRequest(), ActionListener.wrap(
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
this::onFailure
));
acquirePrimaryOperationPermit(
indexShard,
primaryRequest.getRequest(),
ActionListener.wrap(
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
e -> {
if (e instanceof IllegalStateException && e.getMessage().equals("shard is not in primary mode")) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should become a dedicated exception?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to a dedicated exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 799f731.

onFailure(new ReplicationOperation.RetryOnPrimaryException(shardId, "shard is not in primary mode", e));
} else {
onFailure(e);
}
}));
}

void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -45,7 +43,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.function.Supplier;

Expand Down Expand Up @@ -88,14 +85,10 @@ abstract static class TransportRetentionLeaseAction<T extends Request<T>> extend

@Override
protected ShardsIterator shards(final ClusterState state, final InternalRequest request) {
final IndexShardRoutingTable shardRoutingTable = state
return state
.routingTable()
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id());
if (shardRoutingTable.primaryShard().active()) {
return shardRoutingTable.primaryShardIt();
} else {
return new PlainShardIterator(request.request().getShardId(), Collections.emptyList());
}
.shardRoutingTable(request.concreteIndex(), request.request().getShardId().id())
.primaryShardIt();
}

@Override
Expand Down Expand Up @@ -174,6 +167,7 @@ void doRetentionLeaseAction(final IndexShard indexShard, final AddRequest reques
protected Writeable.Reader<Response> getResponseReader() {
return Response::new;
}

}

@Override
Expand Down Expand Up @@ -400,9 +394,10 @@ public static class Response extends ActionResponse {
public Response() {
}

Response(StreamInput in) throws IOException {
Response(final StreamInput in) throws IOException {
super(in);
}

}

}
25 changes: 22 additions & 3 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.index.shard;

import com.carrotsearch.hppc.ObjectLongMap;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CheckIndex;
Expand Down Expand Up @@ -2496,7 +2495,7 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
verifyNotClosed();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;

indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo);
}

/**
Expand All @@ -2507,7 +2506,27 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
asyncBlockOperations(wrapPrimaryOperationPermitListener(onPermitAcquired), timeout.duration(), timeout.timeUnit());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change verifies the primary mode after acquiring permits. I think we should verify the primary mode before acquiring permits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to verify after otherwise our check could be invalidated by the time that we acquire the permit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I missed the relocation situation. Thanks for the explanation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. 😄

LGTM to you then?

}

/**
* Wraps the action to run on a primary after acquiring permit. This wrapping is used to check if the shard is in primary mode before
* executing the action.
*
* @param listener the listener to wrap
* @return the wrapped listener
*/
private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
return ActionListener.delegateFailure(
listener,
(l, r) -> {
if (replicationTracker.isPrimaryMode() == false) {
r.close();
l.onFailure(new IllegalStateException("shard is not in primary mode"));
} else {
l.onResponse(r);
}
});
}

private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,43 @@ public void testNotStartedPrimary() {
assertIndexShardCounter(0);
}

public void testShardNotInPrimaryMode() {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final ClusterState state = state(index, true, ShardRoutingState.RELOCATING);
setState(clusterService, state);
final ReplicationTask task = maybeTask();
final Request request = new Request(shardId);
PlainActionFuture<TestResponse> listener = new PlainActionFuture<>();
final AtomicBoolean executed = new AtomicBoolean();

final ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id());
final TransportReplicationAction.ConcreteShardRequest<Request> primaryRequest
= new TransportReplicationAction.ConcreteShardRequest<>(request, primaryShard.allocationId().getId(), primaryTerm);

isPrimaryMode.set(false);

new TestAction(Settings.EMPTY, "internal:test-action", transportService, clusterService, shardStateAction, threadPool) {
@Override
protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary,
ActionListener<PrimaryResult<Request, TestResponse>> listener) {
assertPhase(task, "primary");
assertFalse(executed.getAndSet(true));
super.shardOperationOnPrimary(shardRequest, primary, listener);
}
}.new AsyncPrimaryAction(primaryRequest, listener, task).run();

assertFalse(executed.get());
assertIndexShardCounter(0); // no permit should be held

final ExecutionException e = expectThrows(ExecutionException.class, listener::get);
assertThat(e.getCause(), instanceOf(ReplicationOperation.RetryOnPrimaryException.class));
assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode")));
assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause().getCause(), hasToString(containsString("shard is not in primary mode")));
}

/**
* When relocating a primary shard, there is a cluster state update at the end of relocation where the active primary is switched from
* the relocation source to the relocation target. If relocation source receives and processes this cluster state
Expand Down Expand Up @@ -1126,6 +1163,8 @@ private void assertIndexShardCounter(int expected) {

private final AtomicBoolean isRelocated = new AtomicBoolean(false);

private final AtomicBoolean isPrimaryMode = new AtomicBoolean(true);

/**
* Sometimes build a ReplicationTask for tracking the phase of the
* TransportReplicationAction. Since TransportReplicationAction has to work
Expand Down Expand Up @@ -1273,8 +1312,13 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
when(indexShard.shardId()).thenReturn(shardId);
doAnswer(invocation -> {
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);
if (isPrimaryMode.get()) {
count.incrementAndGet();
callback.onResponse(count::decrementAndGet);

} else {
callback.onFailure(new IllegalStateException("shard is not in primary mode"));
}
return null;
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
doAnswer(invocation -> {
Expand Down
128 changes: 101 additions & 27 deletions server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -636,11 +636,13 @@ public void testOperationPermitsOnPrimaryShards() throws Exception {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;

final boolean isPrimaryMode;
if (randomBoolean()) {
// relocation target
indexShard = newShard(newShardRouting(shardId, "local_node", "other node",
true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(AllocationId.newInitializing())));
assertEquals(0, indexShard.getActiveOperationsCount());
isPrimaryMode = false;
} else if (randomBoolean()) {
// simulate promotion
indexShard = newStartedShard(false);
Expand All @@ -660,21 +662,60 @@ public void testOperationPermitsOnPrimaryShards() throws Exception {
if (randomBoolean()) {
assertBusy(() -> assertEquals(0, indexShard.getActiveOperationsCount()));
}
isPrimaryMode = true;
} else {
indexShard = newStartedShard(true);
assertEquals(0, indexShard.getActiveOperationsCount());
isPrimaryMode = true;
}
final long primaryTerm = indexShard.getPendingPrimaryTerm();
Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(1, indexShard.getActiveOperationsCount());
Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(2, indexShard.getActiveOperationsCount());
assert indexShard.getReplicationTracker().isPrimaryMode() == isPrimaryMode;
final long pendingPrimaryTerm = indexShard.getPendingPrimaryTerm();
if (isPrimaryMode) {
Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(1, indexShard.getActiveOperationsCount());
Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(2, indexShard.getActiveOperationsCount());

Releasables.close(operation1, operation2);
assertEquals(0, indexShard.getActiveOperationsCount());
Releasables.close(operation1, operation2);
assertEquals(0, indexShard.getActiveOperationsCount());
} else {
indexShard.acquirePrimaryOperationPermit(
new ActionListener<>() {
@Override
public void onResponse(final Releasable releasable) {
throw new AssertionError();
}

@Override
public void onFailure(final Exception e) {
assertThat(e, instanceOf(IllegalStateException.class));
assertThat(e, hasToString(containsString("shard is not in primary mode")));
}
},
ThreadPool.Names.SAME,
"test");

final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireAllPrimaryOperationsPermits(
new ActionListener<>() {
@Override
public void onResponse(final Releasable releasable) {
throw new AssertionError();
}

@Override
public void onFailure(final Exception e) {
assertThat(e, instanceOf(IllegalStateException.class));
assertThat(e, hasToString(containsString("shard is not in primary mode")));
latch.countDown();
}
},
TimeValue.timeValueSeconds(30));
latch.await();
}

if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {
assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(primaryTerm,
assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(pendingPrimaryTerm,
indexShard.getGlobalCheckpoint(), indexShard.getMaxSeqNoOfUpdatesOrDeletes(), new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
Expand Down Expand Up @@ -1688,41 +1729,74 @@ public void testLockingBeforeAndAfterRelocated() throws Exception {
// recovery can be now finalized
recoveryThread.join();
assertTrue(shard.isRelocatedPrimary());
try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) {
// lock can again be acquired
assertTrue(shard.isRelocatedPrimary());
}
final ExecutionException e = expectThrows(ExecutionException.class, () -> acquirePrimaryOperationPermitBlockingly(shard));
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode")));

closeShards(shard);
}

public void testDelayedOperationsBeforeAndAfterRelocated() throws Exception {
final IndexShard shard = newStartedShard(true);
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
final CountDownLatch startRecovery = new CountDownLatch(1);
final CountDownLatch relocationStarted = new CountDownLatch(1);
Thread recoveryThread = new Thread(() -> {
try {
shard.relocated(primaryContext -> {});
startRecovery.await();
shard.relocated(primaryContext -> relocationStarted.countDown());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

recoveryThread.start();
List<PlainActionFuture<Releasable>> onLockAcquiredActions = new ArrayList<>();
for (int i = 0; i < 10; i++) {
PlainActionFuture<Releasable> onLockAcquired = new PlainActionFuture<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
releasable.close();
super.onResponse(releasable);
}
};
shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i);
onLockAcquiredActions.add(onLockAcquired);
}

for (PlainActionFuture<Releasable> onLockAcquired : onLockAcquiredActions) {
assertNotNull(onLockAcquired.get(30, TimeUnit.SECONDS));
final int numberOfAcquisitions = randomIntBetween(1, 10);
final int recoveryIndex = randomIntBetween(1, numberOfAcquisitions);

for (int i = 0; i < numberOfAcquisitions; i++) {

final PlainActionFuture<Releasable> onLockAcquired;
final Runnable assertion;
if (i < recoveryIndex) {
final AtomicBoolean invoked = new AtomicBoolean();
onLockAcquired = new PlainActionFuture<>() {

@Override
public void onResponse(Releasable releasable) {
invoked.set(true);
releasable.close();
super.onResponse(releasable);
}

@Override
public void onFailure(Exception e) {
throw new AssertionError();
}

};
assertion = () -> assertTrue(invoked.get());
} else if (recoveryIndex == i) {
startRecovery.countDown();
relocationStarted.await();
onLockAcquired = new PlainActionFuture<>();
assertion = () -> {
final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS));
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode")));
};
} else {
onLockAcquired = new PlainActionFuture<>();
assertion = () -> {
final ExecutionException e = expectThrows(ExecutionException.class, () -> onLockAcquired.get(30, TimeUnit.SECONDS));
assertThat(e.getCause(), instanceOf(IllegalStateException.class));
assertThat(e.getCause(), hasToString(containsString("shard is not in primary mode")));
};
}

shard.acquirePrimaryOperationPermit(onLockAcquired, ThreadPool.Names.WRITE, "i_" + i);
assertion.run();
}

recoveryThread.join();
Expand Down