Skip to content

Commit

Permalink
Remove debugging cruft from IndexShardOperationPermits (#95275)
Browse files Browse the repository at this point in the history
In #28567 we introduced into `IndexShardOperationPermits` the tracking
of extra information about the permits it has handed out, which would
help if a test failed due to a leaked permit. I don't think we've seen
any such test failures in a very long time, so this extra test-only code
is not really useful any more. This commit removes it.
  • Loading branch information
DaveCTurner committed Apr 18, 2023
1 parent b69271a commit 99771b1
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ protected abstract void shardOperationOnPrimary(

/**
* Execute the specified replica operation. This is done under a permit from
* {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}.
* {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String)}.
*
* @param shardRequest the request to the replica shard
* @param replica the replica shard to perform the operation on
Expand Down Expand Up @@ -1057,7 +1057,7 @@ protected void acquirePrimaryOperationPermit(
final Request request,
final ActionListener<Releasable> onAcquired
) {
primary.acquirePrimaryOperationPermit(onAcquired, executor, request, forceExecutionOnPrimary);
primary.acquirePrimaryOperationPermit(onAcquired, executor, forceExecutionOnPrimary);
}

/**
Expand All @@ -1072,7 +1072,7 @@ protected void acquireReplicaOperationPermit(
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes
) {
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request);
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor);
}

class PrimaryShardReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ private void sync(final Consumer<IndexShard> sync, final String source) {
&& e instanceof ShardNotInPrimaryModeException == false) {
logger.warn(() -> format("%s failed to execute %s sync", shard.shardId(), source), e);
}
}, ThreadPool.Names.SAME, source + " sync");
}, ThreadPool.Names.SAME);
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected void asyncShardOperation(T request, ShardId shardId, final ActionListe
try (Releasable ignore = releasable) {
doRetentionLeaseAction(indexShard, request, delegatedListener);
}
}), ThreadPool.Names.SAME, request);
}), ThreadPool.Names.SAME);
}

@Override
Expand Down
58 changes: 13 additions & 45 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,7 @@ private void ensureWriteAllowed(Engine.Operation.Origin origin) throws IllegalIn
} else {
assert origin == Engine.Operation.Origin.LOCAL_RESET;
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "locally resetting without blocking operations, active operations are [" + getActiveOperations() + "]";
: "locally resetting without blocking operations, active operations [" + getActiveOperationsCount() + "]";
}
if (writeAllowedStates.contains(state) == false) {
throw new IllegalIndexShardStateException(
Expand Down Expand Up @@ -3301,29 +3301,15 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided
* ActionListener will then be called using the provided executor.
*
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are enabled
* the tracing will capture the supplied object's {@link Object#toString()} value. Otherwise the object
* isn't used
*/
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo, false);
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, false);
}

public void acquirePrimaryOperationPermit(
ActionListener<Releasable> onPermitAcquired,
String executorOnDelay,
Object debugInfo,
boolean forceExecution
) {
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, boolean forceExecution) {
verifyNotClosed();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;

indexShardOperationPermits.acquire(
wrapPrimaryOperationPermitListener(onPermitAcquired),
executorOnDelay,
forceExecution,
debugInfo
);
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution);
}

/**
Expand Down Expand Up @@ -3374,29 +3360,23 @@ private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, l

/**
* Runs the specified runnable under a permit and otherwise calling back the specified failure callback. This method is really a
* convenience for {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)} where the listener equates to
* convenience for {@link #acquirePrimaryOperationPermit(ActionListener, String)} where the listener equates to
* try-with-resources closing the releasable after executing the runnable on successfully acquiring the permit, an otherwise calling
* back the failure callback.
*
* @param runnable the runnable to execute under permit
* @param onFailure the callback on failure
* @param executorOnDelay the executor to execute the runnable on if permit acquisition is blocked
* @param debugInfo debug info
*/
public void runUnderPrimaryPermit(
final Runnable runnable,
final Consumer<Exception> onFailure,
final String executorOnDelay,
final Object debugInfo
) {
public void runUnderPrimaryPermit(final Runnable runnable, final Consumer<Exception> onFailure, final String executorOnDelay) {
verifyNotClosed();
assert shardRouting.primary() : "runUnderPrimaryPermit should only be called on primary shard but was " + shardRouting;
final ActionListener<Releasable> onPermitAcquired = ActionListener.wrap(releasable -> {
try (Releasable ignore = releasable) {
runnable.run();
}
}, onFailure);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay);
}

private <E extends Exception> void bumpPrimaryTerm(
Expand Down Expand Up @@ -3461,7 +3441,7 @@ public void onResponse(final Releasable releasable) {

/**
* Acquire a replica operation permit whenever the shard is ready for indexing (see
* {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)}). If the given primary term is lower than then one in
* {@link #acquirePrimaryOperationPermit(ActionListener, String)}). If the given primary term is lower than then one in
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
* {@link IllegalStateException}. If permit acquisition is delayed, the listener will be invoked on the executor with the specified
* name.
Expand All @@ -3472,25 +3452,21 @@ public void onResponse(final Releasable releasable) {
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
* @param onPermitAcquired the listener for permit acquisition
* @param executorOnDelay the name of the executor to invoke the listener on if permit acquisition is delayed
* @param debugInfo an extra information that can be useful when tracing an unreleased permit. When assertions are
* enabled the tracing will capture the supplied object's {@link Object#toString()} value.
* Otherwise the object isn't used
*/
public void acquireReplicaOperationPermit(
final long opPrimaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes,
final ActionListener<Releasable> onPermitAcquired,
final String executorOnDelay,
final Object debugInfo
final String executorOnDelay
) {
innerAcquireReplicaOperationPermit(
opPrimaryTerm,
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
onPermitAcquired,
false,
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo)
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true)
);
}

Expand Down Expand Up @@ -3621,14 +3597,6 @@ public int getActiveOperationsCount() {
return indexShardOperationPermits.getActiveOperationsCount();
}

/**
* @return a list of describing each permit that wasn't released yet. The description consist of the debugInfo supplied
* when the permit was acquired plus a stack traces that was captured when the permit was request.
*/
public List<String> getActiveOperations() {
return indexShardOperationPermits.getActiveOperations();
}

/**
* Syncs the given location with the underlying storage unless already synced. This method might return immediately without
* actually fsyncing the location until the sync listener is called. Yet, unless there is already another thread fsyncing
Expand Down Expand Up @@ -3978,7 +3946,7 @@ public void afterRefresh(boolean didRefresh) {
void resetEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
: "resetting engine without blocking operations; active operations are [" + getActiveOperationsCount() + ']';
sync(); // persist the global checkpoint to disk
final SeqNoStats seqNoStats = seqNoStats();
final TranslogStats translogStats = translogStats();
Expand Down Expand Up @@ -4085,7 +4053,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
* These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String)
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
Expand Down

0 comments on commit 99771b1

Please sign in to comment.