Skip to content

Commit

Permalink
Expose all permits acquisition in IndexShard and TransportReplication…
Browse files Browse the repository at this point in the history
…Action (#35540)

This pull request exposes two new methods in the IndexShard and
TransportReplicationAction classes in order to allow transport replication
actions to acquire all index shard operation permits for their execution.

It first adds the acquireAllPrimaryOperationPermits() and the
acquireAllReplicaOperationsPermits() methods to the IndexShard class
which allow to acquire all operations permits on a shard while exposing
a Releasable. It also refactors the TransportReplicationAction class to
expose two protected methods (acquirePrimaryOperationPermit() and
acquireReplicaOperationPermit()) that can be overridden when a transport
replication action requires the acquisition of all permits on primary and/or
replica shard during execution.

Finally, it adds a TransportReplicationAllPermitsAcquisitionTests which
 illustrates how a transport replication action can grab all permits before
adding a cluster block in the cluster state, making subsequent operations
that requires a single permit to fail).

Related to elastic #33888
  • Loading branch information
tlrx committed Nov 23, 2018
1 parent 419c9c9 commit 0761683
Show file tree
Hide file tree
Showing 5 changed files with 876 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public void messageReceived(ConcreteShardRequest<Request> request, TransportChan
}
}

class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<PrimaryShardReference> {
class AsyncPrimaryAction extends AbstractRunnable {

private final Request request;
// targetAllocationID of the shard this request is meant for
Expand All @@ -345,11 +345,33 @@ class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<Prim

@Override
protected void doRun() throws Exception {
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
final ShardId shardId = request.shardId();
final IndexShard indexShard = getIndexShard(shardId);
final ShardRouting shardRouting = indexShard.routingEntry();
// we may end up here if the cluster state used to route the primary is so stale that the underlying
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
// the replica will take over and a replica will be assigned to the first node.
if (shardRouting.primary() == false) {
throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
}
final String actualAllocationId = shardRouting.allocationId().getId();
if (actualAllocationId.equals(targetAllocationID) == false) {
throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
final long actualTerm = indexShard.getPendingPrimaryTerm();
if (actualTerm != primaryTerm) {
throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID,
primaryTerm, actualTerm);
}

acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap(
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
this::onFailure
));
}

@Override
public void onResponse(PrimaryShardReference primaryShardReference) {
void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
try {
final ClusterState clusterState = clusterService.state();
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());
Expand Down Expand Up @@ -677,10 +699,10 @@ protected void doRun() throws Exception {
setPhase(task, "replica");
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
if (actualAllocationId.equals(targetAllocationID) == false) {
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID,
actualAllocationId);
}
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
}

/**
Expand Down Expand Up @@ -714,7 +736,7 @@ public void onFailure(Exception e) {
}
}

protected IndexShard getIndexShard(ShardId shardId) {
protected IndexShard getIndexShard(final ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
return indexService.getShard(shardId.id());
}
Expand Down Expand Up @@ -955,42 +977,26 @@ void retryBecauseUnavailable(ShardId shardId, String message) {
}

/**
* Tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
* Executes the logic for acquiring one or more operation permit on a primary shard. The default is to acquire a single permit but this
* method can be overridden to acquire more.
*/
private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm,
ActionListener<PrimaryShardReference> onReferenceAcquired, Object debugInfo) {
IndexShard indexShard = getIndexShard(shardId);
// we may end up here if the cluster state used to route the primary is so stale that the underlying
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
// the replica will take over and a replica will be assigned to the first node.
if (indexShard.routingEntry().primary() == false) {
throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
"actual shard is not a primary " + indexShard.routingEntry());
}
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
if (actualAllocationId.equals(allocationId) == false) {
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
}
final long actualTerm = indexShard.getPendingPrimaryTerm();
if (actualTerm != primaryTerm) {
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
primaryTerm, actualTerm);
}

ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable));
}

@Override
public void onFailure(Exception e) {
onReferenceAcquired.onFailure(e);
}
};
protected void acquirePrimaryOperationPermit(final IndexShard primary,
final Request request,
final ActionListener<Releasable> onAcquired) {
primary.acquirePrimaryOperationPermit(onAcquired, executor, request);
}

indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
/**
* Executes the logic for acquiring one or more operation permit on a replica shard. The default is to acquire a single permit but this
* method can be overridden to acquire more.
*/
protected void acquireReplicaOperationPermit(final IndexShard replica,
final ReplicaRequest request,
final ActionListener<Releasable> onAcquired,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes) {
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request);
}

class ShardReference implements Releasable {
Expand Down
130 changes: 84 additions & 46 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2348,7 +2348,18 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
}

private <E extends Exception> void bumpPrimaryTerm(long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
/**
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
* It is the responsibility of the caller to close the {@link Releasable}.
*/
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
verifyNotClosed();
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;

indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
}

private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
assert Thread.holdsLock(mutex);
assert newPrimaryTerm > pendingPrimaryTerm;
assert operationPrimaryTerm <= pendingPrimaryTerm;
Expand Down Expand Up @@ -2403,11 +2414,42 @@ public void onResponse(final Releasable releasable) {
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo));
}

/**
* Acquire all replica operation permits whenever the shard is ready for indexing (see
* {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
* {@link IllegalStateException}.
*
* @param opPrimaryTerm the operation primary term
* @param globalCheckpoint the global checkpoint associated with the request
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
* @param onPermitAcquired the listener for permit acquisition
* @param timeout the maximum time to wait for the in-flight operations block
*/
public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final TimeValue timeout) {
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
}

private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final Consumer<ActionListener<Releasable>> consumer) {
verifyNotClosed();
if (opPrimaryTerm > pendingPrimaryTerm) {
synchronized (mutex) {
if (opPrimaryTerm > pendingPrimaryTerm) {
IndexShardState shardState = state();
final IndexShardState shardState = state();
// only roll translog and update primary term if shard has made it past recovery
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
Expand All @@ -2419,58 +2461,54 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g

if (opPrimaryTerm > pendingPrimaryTerm) {
bumpPrimaryTerm(opPrimaryTerm, () -> {
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long maxSeqNo = seqNoStats().getMaxSeqNo();
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
} else {
getEngine().rollTranslogGeneration();
}
});
}
}
}
}

assert opPrimaryTerm <= pendingPrimaryTerm
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
indexShardOperationPermits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (opPrimaryTerm < operationPrimaryTerm) {
releasable.close();
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
opPrimaryTerm,
operationPrimaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
assert assertReplicationTarget();
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
releasable.close();
onPermitAcquired.onFailure(e);
return;
}
onPermitAcquired.onResponse(releasable);
}
}

@Override
public void onFailure(final Exception e) {
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
consumer.accept(new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
if (opPrimaryTerm < operationPrimaryTerm) {
releasable.close();
final String message = String.format(
Locale.ROOT,
"%s operation primary term [%d] is too old (current [%d])",
shardId,
opPrimaryTerm,
operationPrimaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
assert assertReplicationTarget();
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
releasable.close();
onPermitAcquired.onFailure(e);
return;
}
},
executorOnDelay,
true, debugInfo);
onPermitAcquired.onResponse(releasable);
}
}

@Override
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
});
}

public int getActiveOperationsCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -953,11 +953,11 @@ action.new PrimaryOperationTransportHandler().messageReceived(
logger.debug("got exception:" , throwable);
assertTrue(throwable.getClass() + " is not a retry exception", action.retryPrimaryException(throwable));
if (wrongAllocationId) {
assertThat(throwable.getMessage(), containsString("expected aID [_not_a_valid_aid_] but found [" +
assertThat(throwable.getMessage(), containsString("expected allocation id [_not_a_valid_aid_] but found [" +
primary.allocationId().getId() + "]"));
} else {
assertThat(throwable.getMessage(), containsString("expected aID [" + primary.allocationId().getId() + "] with term [" +
requestTerm + "] but found [" + primaryTerm + "]"));
assertThat(throwable.getMessage(), containsString("expected allocation id [" + primary.allocationId().getId()
+ "] with term [" + requestTerm + "] but found [" + primaryTerm + "]"));
}
}
}
Expand Down

0 comments on commit 0761683

Please sign in to comment.