Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RCI] Check blocks while having index shard permit in TransportReplicationAction #35332

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,39 @@ protected TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}

private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}

private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
if (blockException != null) {
return blockException;
}
}
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
if (indexBlockLevel != null) {
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName);
if (blockException != null) {
return blockException;
}
}
return null;
}

protected boolean retryPrimaryException(final Throwable e) {
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|| TransportActions.isShardNotAvailableException(e);
|| TransportActions.isShardNotAvailableException(e)
|| isRetryableClusterBlockException(e);
}

boolean isRetryableClusterBlockException(final Throwable e) {
if (e instanceof ClusterBlockException) {
return ((ClusterBlockException) e).retryable();
}
return false;
}

protected class OperationTransportHandler implements TransportRequestHandler<Request> {
Expand Down Expand Up @@ -310,6 +340,15 @@ protected void doRun() throws Exception {
@Override
public void onResponse(PrimaryShardReference primaryShardReference) {
try {
final ClusterState clusterState = clusterService.state();
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());

final ClusterBlockException blockException = blockExceptions(clusterState, indexMetaData.getIndex().getName());
if (blockException != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't go well if the cluster block is retryable (i.e., we don't retry). How about dealing with this in retryPrimaryException and that means we can also remove the special handling in the reroute phase.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at this before submitting this pull request and it looked good to me...

The ClusterBlockException is thrown and caught by the surrounding try-catch block which calls the AsyncPrimaryAction.onFailure() which sends back the exception to where it was executed ie the ReroutePhase.performAction() which already takes care in handleException() to retry the request if the exception is retryable.

I agree that we could remove the special handling in the reroute phase and only let the AsyncPrimaryAction to check for blocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked via another channel and Boaz's suggestion makes perfect sense. In fact I thought that what was suggested was already implemented like this, but it wasn't.

logger.trace("cluster is blocked, action failed on primary", blockException);
throw blockException;
}

if (primaryShardReference.isRelocated()) {
primaryShardReference.close(); // release shard operation lock as soon as possible
setPhase(replicationTask, "primary_delegation");
Expand All @@ -323,7 +362,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
response.readFrom(in);
return response;
};
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
transportOptions,
Expand Down Expand Up @@ -696,35 +735,42 @@ public void onFailure(Exception e) {
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
if (handleBlockExceptions(state)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still want to shortcut on blocks here rather than wait for the primary to acquired a permit? Otherwise we just end up sending all requests to the primary rather than terminating them on the coordinating node (and also make them acquire a permit).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course. I've been too quick on this.

return;
}

// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final String concreteIndex = concreteIndex(state);
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}
final String concreteIndex = concreteIndex(state, request);
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
}
} else {
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}

// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
"request waitForActiveShards must be set in resolveRequest";

final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
performRemoteAction(state, primary, node);
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
performRemoteAction(state, primary, node);
}
}
}

Expand Down Expand Up @@ -776,44 +822,11 @@ private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
return false;
}

private String concreteIndex(ClusterState state) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}

private ShardRouting primary(ClusterState state) {
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
return indexShard.primaryShard();
}

private boolean handleBlockExceptions(ClusterState state) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
if (blockException != null) {
handleBlockException(blockException);
return true;
}
}
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
if (indexBlockLevel != null) {
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state));
if (blockException != null) {
handleBlockException(blockException);
return true;
}
}
return false;
}

private void handleBlockException(ClusterBlockException blockException) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
}
}

private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
final TransportRequest requestToPerform) {
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
Expand Down
Loading