Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Count coordinating and primary bytes as write bytes #58575

Merged
merged 15 commits into from
Jul 1, 2020

Conversation

Tim-Brooks
Copy link
Contributor

This is a follow-up to #57573. This commit combines coordinating and
primary bytes under the same "write" bucket. Double accounting is
prevented by only accounting the bytes at either the reroute phase or
the primary phase. TransportBulkAction calls execute directly, so the
operations handler is skipped and the bytes are not double accounted.

This is a follow-up to elastic#57573. This commit combines coordinating and
primary bytes under the same "write" bucket. Double accounting is
prevented by only accounting the bytes at either the reroute phase or
the primary phase. TransportBulkAction calls execute directly, so the
operations handler is skipped and the bytes are not double accounted.
@Tim-Brooks Tim-Brooks added >non-issue :Distributed/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. v8.0.0 v7.9.0 labels Jun 25, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (:Distributed/CRUD)

@elasticmachine elasticmachine added the Team:Distributed Meta label for distributed team label Jun 25, 2020
@Tim-Brooks Tim-Brooks changed the title Count coordinating and primary bytes as write byte Count coordinating and primary bytes as write bytes Jun 25, 2020
@Tim-Brooks
Copy link
Contributor Author

This demonstrates a way to avoid double accounting without using task ids or anything like that. Unfortunately it currently is broken around CCR. If the coordinating node is the local primary, the bytes are not accounted. However, similar to TransportBulkAction I could account for the bytes in ShardFollowTasksExecutor before the TransportBulkShardOperationsAction is executed.

@ywelsch
Copy link
Contributor

ywelsch commented Jun 26, 2020

Let's adapt the CCR code as well then. Also, would be good to add more tests that show the behavior of coordinating vs coordinating + primary node.

Can you also extend WriteMemoryLimitsIT to have a third node that does purely coordination?

@@ -64,6 +64,11 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
writeMemoryLimits);
}

@Override
protected void doExecute(Task parentTask, ResyncReplicationRequest request, ActionListener<ResyncReplicationResponse> listener) {
assert false : "use TransportResyncReplicationAction#sync";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this the default implementation for doExecute in TransportWriteAction when rerouteBypassed() is true? Maybe we can then also rename that method to supportsRerouteAction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. This is a little tricky because I have to expose reroute in TransportReplicationAction. But I did that and we can discuss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh. Actually I holding off this for the moment because it gets kind of messy. We can talk more about this. I did rename the method.

@Tim-Brooks Tim-Brooks requested a review from ywelsch June 29, 2020 14:52
Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a comment about a situation that this does not correctly cover. I'm also wondering if there are some assertions we can put in place (e.g. asserting that parent task id is localNode when not accounting for some request) to make sure that the accounting is working correctly under all edge conditions (not only validated by the given test).

@Tim-Brooks Tim-Brooks requested a review from ywelsch June 30, 2020 00:29
Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've commented on the naming (as we discussed), and would also like to see a test here that covers the situation that we just addressed. This can be simulated with a primary relocation where the master is blocked from completing the relocation (i.e. moving the shard from relocated to started), or where the "start this primary relocation target (ShardStateAction) is blocked.

Can you think of a test for the CCR changes?

@Tim-Brooks
Copy link
Contributor Author

I looked into a primary delegation test IT today. It was hard. In fact, it looks to me that the primary delegation codepath must be pretty rare.

This happens in runWithPrimaryShardReference after the primary shard reference has been acquired.

                if (primaryShardReference.isRelocated()) {
                    primaryShardReference.close(); // release shard operation lock as soon as possible
                    setPhase(replicationTask, "primary_delegation");
                    // delegate primary phase to relocation target
                    // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                    // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                    final ShardRouting primary = primaryShardReference.routingEntry();
                    assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                    final Writeable.Reader<Response> reader = TransportReplicationAction.this::newResponseInstance;
                    DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
                    transportService.sendRequest(relocatingNode, transportPrimaryAction,
                        new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
                            primaryRequest.getPrimaryTerm()),
                        transportOptions,
                        new ActionListenerResponseHandler<>(onCompletionListener, reader) {
                            @Override
                            public void handleResponse(Response response) {
                                setPhase(replicationTask, "finished");
                                super.handleResponse(response);
                            }

                            @Override
                            public void handleException(TransportException exp) {
                                setPhase(replicationTask, "finished");
                                super.handleException(exp);
                            }
                        });
                }

If you follow the primaryShardReference.isRelocated() method it is calling isRelocated in ReplicationTracker.

    public boolean isRelocated() {
        return relocated;
    }

This boolean is set to true in ReplicationTracker #completeRelocationHandoff.

    public synchronized void completeRelocationHandoff() {
        assert invariant();
        assert primaryMode;
        assert handoffInProgress;
        assert relocated == false;
        primaryMode = false;
        handoffInProgress = false;
        relocated = true;
        // forget all checkpoint information
        checkpoints.forEach((key, cps) -> {
            cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
            cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
        });
        assert invariant();
    }

At the same time relocated is set to true primaryMode is set to false.

Immediately after acquiring the primary operation permit we check that we are still in primary mode. Otherwise we fail with a ShardNotInPrimaryModeException.

    private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final ActionListener<Releasable> listener) {
        return ActionListener.delegateFailure(
                listener,
                (l, r) -> {
                    if (replicationTracker.isPrimaryMode()) {
                        l.onResponse(r);
                    } else {
                        r.close();
                        l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
                    }
                });
    }

That l.onResponse(r); is a direct path to the if (primaryShardReference.isRelocated()) {. So it looks like to me, that the vast majority of time requests are just going to be rejected after acquiring the permit with ShardNotInPrimaryModeException. Maybe a few will make it past that line and in-between the ReplicationTracker will be updated with relocated = true. But I'm not sure how to implement a test that consistently makes this happen. All of my indexing requests were failing when I blocked the ShardStateAction because they were still being routed to the initial primary, but that primary was failing them with ShardNotInPrimaryModeException.

In fact that kind of put the request into a loop because it would retry, but I was still blocking the ShardStateAction. Although that is a little contrived.

@Tim-Brooks Tim-Brooks requested a review from ywelsch July 1, 2020 04:17
@Tim-Brooks
Copy link
Contributor Author

I did implement a basic CCR test.

@ywelsch
Copy link
Contributor

ywelsch commented Jul 1, 2020

I looked into a primary delegation test IT today. It was hard. In fact, it looks to me that the primary delegation codepath must be pretty rare.

Thanks for the analysis, Tim. I think the relocation behavior was broken by #42241. Prior to that PR, it was up to the callers of acquirePrimaryOperationPermit to ensure that the shard was in a desirable state. The PR changed that to require the shard to not be relocated. It shows a gap in our testing as well (the desired behavior was covered by unit tests, but not integration tests).

@dnhatn can you look into this? One option I see is to provide an additional boolean parameter to acquirePrimaryOperationPermit that determines whether taking the permit on a relocated shard is allowed. Alternatively we could throw an IndexShardRelocatedException and catch that in TransportReplicationAction, handling it accordingly.

@tbrooks8 Let's not block this PR on that though, but follow up with a test later for this PR.

Copy link
Contributor

@ywelsch ywelsch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. As discussed, I'm not sure about the name with which we should expose "coordinating + primary bytes" ("write" feels too general), but this is something we can follow-up on when we expose the information in some stats APIs.


public ConcreteShardRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
// sendFromLocalReroute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this comment?

@Tim-Brooks Tim-Brooks merged commit 9a36d6e into elastic:master Jul 1, 2020
Tim-Brooks added a commit to Tim-Brooks/elasticsearch that referenced this pull request Jul 2, 2020
This is a follow-up to elastic#57573. This commit combines coordinating and
primary bytes under the same "write" bucket. Double accounting is
prevented by only accounting the bytes at either the reroute phase or
the primary phase. TransportBulkAction calls execute directly, so the
operations handler is skipped and the bytes are not double accounted.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/CRUD A catch all label for issues around indexing, updating and getting a doc by id. Not search. >non-issue Team:Distributed Meta label for distributed team v7.9.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants