Skip to content

Commit

Permalink
[RCI] Check blocks while having index shard permit in TransportReplic…
Browse files Browse the repository at this point in the history
…ationAction (#35332)

Today, the TransportReplicationAction checks the global level blocks and
the index level blocks before routing the operation to the primary, in the
ReroutePhase, and it happens at the very beginning of the transport
replication action execution. For the upcoming rework of the Close Index
API and in order to deal with primary relocation, we'll need to also check
for blocks before executing the operation on the primary (while holding a
permit) but before routing to the new primary.

This pull request change the AsyncPrimaryAction so that it checks for
replication action's blocks before executing the operation locally or before
routing the primary action to the newly primary shard. The check is done
while holding a PrimaryShardReference.

Related to #33888
  • Loading branch information
tlrx committed Nov 14, 2018
1 parent cee022a commit 0c5e87f
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,39 @@ protected TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}

private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}

private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
if (blockException != null) {
return blockException;
}
}
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
if (indexBlockLevel != null) {
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName);
if (blockException != null) {
return blockException;
}
}
return null;
}

protected boolean retryPrimaryException(final Throwable e) {
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
|| TransportActions.isShardNotAvailableException(e);
|| TransportActions.isShardNotAvailableException(e)
|| isRetryableClusterBlockException(e);
}

boolean isRetryableClusterBlockException(final Throwable e) {
if (e instanceof ClusterBlockException) {
return ((ClusterBlockException) e).retryable();
}
return false;
}

protected class OperationTransportHandler implements TransportRequestHandler<Request> {
Expand Down Expand Up @@ -321,6 +351,15 @@ protected void doRun() throws Exception {
@Override
public void onResponse(PrimaryShardReference primaryShardReference) {
try {
final ClusterState clusterState = clusterService.state();
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());

final ClusterBlockException blockException = blockExceptions(clusterState, indexMetaData.getIndex().getName());
if (blockException != null) {
logger.trace("cluster is blocked, action failed on primary", blockException);
throw blockException;
}

if (primaryShardReference.isRelocated()) {
primaryShardReference.close(); // release shard operation lock as soon as possible
setPhase(replicationTask, "primary_delegation");
Expand All @@ -334,7 +373,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
response.readFrom(in);
return response;
};
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
transportOptions,
Expand Down Expand Up @@ -713,35 +752,42 @@ public void onFailure(Exception e) {
protected void doRun() {
setPhase(task, "routing");
final ClusterState state = observer.setAndGetObservedState();
if (handleBlockExceptions(state)) {
return;
}

// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final String concreteIndex = concreteIndex(state);
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}
final String concreteIndex = concreteIndex(state, request);
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
}
} else {
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}

// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
// resolve all derived request fields, so we can route and apply it
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
"request waitForActiveShards must be set in resolveRequest";

final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
performRemoteAction(state, primary, node);
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
performRemoteAction(state, primary, node);
}
}
}

Expand Down Expand Up @@ -793,44 +839,11 @@ private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
return false;
}

private String concreteIndex(ClusterState state) {
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
}

private ShardRouting primary(ClusterState state) {
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
return indexShard.primaryShard();
}

private boolean handleBlockExceptions(ClusterState state) {
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
if (globalBlockLevel != null) {
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
if (blockException != null) {
handleBlockException(blockException);
return true;
}
}
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
if (indexBlockLevel != null) {
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state));
if (blockException != null) {
handleBlockException(blockException);
return true;
}
}
return false;
}

private void handleBlockException(ClusterBlockException blockException) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
}
}

private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
final TransportRequest requestToPerform) {
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {
Expand Down
Loading

0 comments on commit 0c5e87f

Please sign in to comment.