Skip to content

Commit

Permalink
StartedShardUpdateTask task listener (#82854)
Browse files Browse the repository at this point in the history
Today node removal tasks executed by the master have a separate
ClusterStateTaskListener to feed back the result to the requestor.
It'd be preferable to use the task itself as the listener.
  • Loading branch information
idegtiarenko committed Jan 24, 2022
1 parent 7d0d409 commit 6959203
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.MessageSupplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -616,43 +615,20 @@ public void messageReceived(StartedShardEntry request, TransportChannel channel,
SHARD_STARTED_ACTION_NAME,
request
);

var update = new StartedShardUpdateTask(request, listener);

clusterService.submitStateUpdateTask(
"shard-started " + request,
request,
update,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateTaskExecutor,
new ClusterStateTaskListener() {
@Override
public void onFailure(Exception e) {
final MessageSupplier msg = () -> new ParameterizedMessage(
"{} unexpected failure while starting shard [{}]",
request.shardId,
request
);
if (e instanceof FailedToCommitClusterStateException) {
logger.debug(msg, e);
} else {
logger.error(msg, e);
}
listener.onFailure(e);
}

@Override
public void onNoLongerMaster() {
logger.debug("{} no longer master while starting shard [{}]", request.shardId, request);
listener.onFailure(new NotMasterException("shard-started"));
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}
}
update
);
}
}

public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<StartedShardEntry> {
public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<StartedShardUpdateTask> {
private final AllocationService allocationService;
private final RerouteService rerouteService;

Expand All @@ -662,52 +638,54 @@ public ShardStartedClusterStateTaskExecutor(AllocationService allocationService,
}

@Override
public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState, List<StartedShardEntry> tasks) throws Exception {
ClusterTasksResult.Builder<StartedShardEntry> builder = ClusterTasksResult.builder();
List<StartedShardEntry> tasksToBeApplied = new ArrayList<>();
public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentState, List<StartedShardUpdateTask> tasks)
throws Exception {
ClusterTasksResult.Builder<StartedShardUpdateTask> builder = ClusterTasksResult.builder();
List<StartedShardUpdateTask> tasksToBeApplied = new ArrayList<>();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
final Map<Index, IndexLongFieldRange> updatedTimestampRanges = new HashMap<>();
for (StartedShardEntry task : tasks) {
final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
for (StartedShardUpdateTask task : tasks) {
StartedShardEntry entry = task.getEntry();
final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(entry.shardId, entry.allocationId);
if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started
// events on every cluster state publishing that does not contain the shard as started yet. This means that old stale
// requests might still be in flight even after the shard has already been started or failed on the master. We just
// ignore these requests for now.
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task);
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", entry.shardId, entry);
builder.success(task);
} else {
if (matched.primary() && task.primaryTerm > 0) {
final IndexMetadata indexMetadata = currentState.metadata().index(task.shardId.getIndex());
if (matched.primary() && entry.primaryTerm > 0) {
final IndexMetadata indexMetadata = currentState.metadata().index(entry.shardId.getIndex());
assert indexMetadata != null;
final long currentPrimaryTerm = indexMetadata.primaryTerm(task.shardId.id());
if (currentPrimaryTerm != task.primaryTerm) {
assert currentPrimaryTerm > task.primaryTerm
final long currentPrimaryTerm = indexMetadata.primaryTerm(entry.shardId.id());
if (currentPrimaryTerm != entry.primaryTerm) {
assert currentPrimaryTerm > entry.primaryTerm
: "received a primary term with a higher term than in the "
+ "current cluster state (received ["
+ task.primaryTerm
+ entry.primaryTerm
+ "] but current is ["
+ currentPrimaryTerm
+ "])";
logger.debug(
"{} ignoring shard started task [{}] (primary term {} does not match current term {})",
task.shardId,
task,
task.primaryTerm,
entry.shardId,
entry,
entry.primaryTerm,
currentPrimaryTerm
);
builder.success(task);
continue;
}
}
if (matched.initializing() == false) {
assert matched.active() : "expected active shard routing for task " + task + " but found " + matched;
assert matched.active() : "expected active shard routing for task " + entry + " but found " + matched;
// same as above, this might have been a stale in-flight request, so we just ignore.
logger.debug(
"{} ignoring shard started task [{}] (shard exists but is not initializing: {})",
task.shardId,
task,
entry.shardId,
entry,
matched
);
builder.success(task);
Expand All @@ -716,29 +694,29 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
if (seenShardRoutings.contains(matched)) {
logger.trace(
"{} ignoring shard started task [{}] (already scheduled to start {})",
task.shardId,
task,
entry.shardId,
entry,
matched
);
tasksToBeApplied.add(task);
} else {
logger.debug("{} starting shard {} (shard started task: [{}])", task.shardId, matched, task);
logger.debug("{} starting shard {} (shard started task: [{}])", entry.shardId, matched, entry);
tasksToBeApplied.add(task);
shardRoutingsToBeApplied.add(matched);
seenShardRoutings.add(matched);

// expand the timestamp range recorded in the index metadata if needed
final Index index = task.shardId.getIndex();
final Index index = entry.shardId.getIndex();
IndexLongFieldRange currentTimestampMillisRange = updatedTimestampRanges.get(index);
final IndexMetadata indexMetadata = currentState.metadata().index(index);
if (currentTimestampMillisRange == null) {
currentTimestampMillisRange = indexMetadata.getTimestampRange();
}
final IndexLongFieldRange newTimestampMillisRange;
newTimestampMillisRange = currentTimestampMillisRange.extendWithShardRange(
task.shardId.id(),
entry.shardId.id(),
indexMetadata.getNumberOfShards(),
task.timestampRange
entry.timestampRange
);
if (newTimestampMillisRange != currentTimestampMillisRange) {
updatedTimestampRanges.put(index, newTimestampMillisRange);
Expand Down Expand Up @@ -872,6 +850,43 @@ public int hashCode() {
}
}

public static class StartedShardUpdateTask implements ClusterStateTaskListener {

private final StartedShardEntry entry;
private final ActionListener<TransportResponse.Empty> listener;

public StartedShardUpdateTask(StartedShardEntry entry, ActionListener<TransportResponse.Empty> listener) {
this.entry = entry;
this.listener = listener;
}

public StartedShardEntry getEntry() {
return entry;
}

@Override
public void onFailure(Exception e) {
if (e instanceof NotMasterException) {
logger.debug(() -> new ParameterizedMessage("{} no longer master while starting shard [{}]", entry.shardId, entry));
} else if (e instanceof FailedToCommitClusterStateException) {
logger.debug(() -> new ParameterizedMessage("{} unexpected failure while starting shard [{}]", entry.shardId, entry), e);
} else {
logger.error(() -> new ParameterizedMessage("{} unexpected failure while starting shard [{}]", entry.shardId, entry), e);
}
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}

@Override
public String toString() {
return "StartedShardUpdateTask{entry=" + entry + ", listener=" + listener + "}";
}
}

public static class NoLongerPrimaryShardException extends ElasticsearchException {

public NoLongerPrimaryShardException(ShardId shardId, String msg) {
Expand Down

0 comments on commit 6959203

Please sign in to comment.