Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot streaming #2458

Merged
merged 3 commits into from Dec 1, 2017
Merged

Snapshot streaming #2458

merged 3 commits into from Dec 1, 2017

Conversation

@anshulpundir
Copy link
Contributor

@anshulpundir anshulpundir commented Nov 20, 2017

Based on proposal #2416

Signed-off-by: Anshul Pundir anshul.pundir@docker.com

@anshulpundir anshulpundir force-pushed the anshulpundir:snap2 branch from 769eb95 to 5eb7bc8 Nov 20, 2017
@GordonTheTurtle GordonTheTurtle removed the dco/no label Nov 20, 2017
@anshulpundir anshulpundir changed the title Snapshot streaming. Snapshot streaming Nov 20, 2017
// RaftMessageStream returns a gRPC stream that can be used to stream raft messages
// to be processed on a raft member.
// It is called from the RaftMember willing to send a message to its destination ('To' field)
rpc RaftMessageStream(stream ProcessRaftMessageRequest) returns (ProcessRaftMessageResponse) {

This comment has been minimized.

@stevvooe

stevvooe Nov 20, 2017
Contributor

This needs a response type wrapper.

This comment has been minimized.

@stevvooe

stevvooe Nov 20, 2017
Contributor

This seems confusing. The doc says that it returns a stream, but it only seems to take a stream as input. Who is the caller and callee in this scenario?

@@ -487,7 +488,7 @@ func DefaultNodeConfig() *raft.Config {
func DefaultRaftConfig() api.RaftConfig {
return api.RaftConfig{
KeepOldSnapshots: 0,
SnapshotInterval: 10000,

This comment has been minimized.

@stevvooe

stevvooe Nov 20, 2017
Contributor

Why the big change? Was this for testing?


// Append the received snapshot data.
if recvdMsg.Message.Type == raftpb.MsgSnap {
assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...)

This comment has been minimized.

@stevvooe

stevvooe Nov 20, 2017
Contributor

Doesn't this arrive at the same problem? This buffers the whole thing into a single buffer. How does this address the issue?

This comment has been minimized.

@anshulpundir

anshulpundir Nov 21, 2017
Author Contributor

The problem was being able to stream large snapshots without arbitrarily large gRPC message limits and timeout outs. This addresses that problem. Memory constraint is not the problem we're trying to solve, at least yet.

This comment has been minimized.

@stevvooe

stevvooe Nov 21, 2017
Contributor

@anshulpundir Fair enough. :)

This could double (or more!) memory usage.

Here's how we can decide: Do we have to load the entire snapshot into memory to apply it? If yes, then this is fine, but if not, we should buffer this to disk (tmp is fine).

This comment has been minimized.

@anshulpundir

anshulpundir Nov 21, 2017
Author Contributor

Buffering to disk won't be any better would it ? to apply we'd still need to load the snapshot into memory before passing it to etcd/raft.

Also, I think MemoryStorage::ApplySnapshot() doesn't actually do a memory copy, so we should still just need 1x memory.

This comment has been minimized.

@stevvooe

stevvooe Nov 21, 2017
Contributor

There are three components:

  1. "The bytes" of the snapshot itself.
  2. The structure that gets applied to the store.
  3. The store itself (not raft, but the database).

With the current model, I think the minimum is a 2x bump. Ideally, we can stream it in and avoid 1 and 2. Buffering to disk avoids 1, if 2 is necessary.

Looking at the code, there are two components: the entries and the snapshot data, a []byte. From the looks of it, the snapshot is applied always as a []byte, so we can never avoid 2. If a snapshot is applied in this fashion, there may exist a snapshot that has a similar size, which can contribute to bloat.

So, I think I agree with you: this code is probably fine as is, as long as this doesn't get copied (and it doesn't look like it does). Following that, I don't think there is a easy way to avoid the 2x memory bump by applying a snapshot in a stream.

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

i might be mistaken, but I don't think this code is any worse off than the regular raft message rpc call, except for the allocations and copies required to expand the array from the successive appends.

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

that said, maybe it's advantageous to take the couple of extra bytes to send the total message length with each stream request, so we can on the first message allocate an appropriately sized byte slice.

This comment has been minimized.

@anshulpundir

anshulpundir Nov 27, 2017
Author Contributor

I did consider that, but streaming snapshots should not happen that frequently, so this is probably OK for now. We can always make the change to send the size later, if needed.

// Function to check if streaming is needed.
// Returns true if the message type is MsgSnap/MsgApp
// and size larger than MaxRaftMsgSize.
func needsStreaming(m *raftpb.Message) bool {

This comment has been minimized.

@stevvooe

stevvooe Nov 20, 2017
Contributor

I would advocate for always using streaming unless the other end doesn't support it.

This comment has been minimized.

@anshulpundir

anshulpundir Nov 22, 2017
Author Contributor

Can you please elaborate on this ?

I thought a bit more about this. If the message is smaller than GrpcMaxMsgSize, it's less efficient to first try to stream and then fall back to a basic send if unsupported.

Having said that, I do see the value in always streaming, and eventually deprecating the other endpoint, unless there is a performance overhead to streaming a single message.

Can you please also elaborate on this ?

@stevvooe

This comment has been minimized.

@aaronlehmann

aaronlehmann Nov 24, 2017
Collaborator

I also like the idea of always using streaming. It gives the streaming version of the RPC more real-world coverage instead of only using it in unusual corner cases. It would let us eventually deprecate the non-streaming version.

I don't think the mixed-version scenario is important to optimize.

This comment has been minimized.

@anshulpundir

anshulpundir Nov 27, 2017
Author Contributor

I agree with the point about giving this more real-world coverage. I'll make this change.

@stevvooe stevvooe changed the title Snapshot streaming [WIP] Snapshot streaming Nov 20, 2017
Copy link
Contributor

@stevvooe stevvooe left a comment

Reviews inline.

This still seems to buffer the entire snapshot into the Rss of the swarm process. It would be best to buffer into a local file or apply the snapshot to a temp location as it comes in.

@anshulpundir anshulpundir force-pushed the anshulpundir:snap2 branch from 5eb7bc8 to 34088af Nov 20, 2017
@codecov
Copy link

@codecov codecov bot commented Nov 20, 2017

Codecov Report

Merging #2458 into master will decrease coverage by 2.09%.
The diff coverage is 93.67%.

@@            Coverage Diff            @@
##           master    #2458     +/-   ##
=========================================
- Coverage   63.74%   61.64%   -2.1%     
=========================================
  Files          64      128     +64     
  Lines       11793    21076   +9283     
=========================================
+ Hits         7517    12993   +5476     
- Misses       3664     6680   +3016     
- Partials      612     1403    +791
@@ -3,6 +3,8 @@ package raft_test
import (
"errors"
"fmt"
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/swarmkit/manager/state/raft/transport"

This comment has been minimized.

@aaronlehmann

aaronlehmann Nov 24, 2017
Collaborator

Can you put these below with the other non-stdlib imports?

@@ -949,3 +951,50 @@ func TestStress(t *testing.T) {
assert.True(t, find)
}
}

func GetSnapshotMessage(from, to uint64, size int) *raftpb.Message {

This comment has been minimized.

@aaronlehmann

aaronlehmann Nov 24, 2017
Collaborator

Put this in manager/state/raft/testutils instead of duplicating it in two places.

// Get the max payload size.
payloadSize := raftMessagePayloadSize(m)

// split the snpashot into smaller messages.

This comment has been minimized.

@aaronlehmann

aaronlehmann Nov 24, 2017
Collaborator

typo

// split the snpashot into smaller messages.
for snapDataIndex := 0; snapDataIndex < size; {
remainingSize := size - snapDataIndex
chunkSize := int(math.Min(float64(remainingSize), float64(payloadSize)))

This comment has been minimized.

@aaronlehmann

aaronlehmann Nov 24, 2017
Collaborator

Using floating point math here doesn't feel right.

This comment has been minimized.

@anshulpundir

anshulpundir Nov 27, 2017
Author Contributor

holy shit I know right!

you'd prefer just doing if/else ? @aaronlehmann

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

instead of math.Min why not just do min by hand for integers, so there's no round trip through floats?

chunkSize = payloadSize
if remainingSize < payloadSize {
    chunkSize = remainingSize
}
for chunkIndex := 0; chunkIndex < chunkSize; chunkIndex++ {
raftMsg.Snapshot.Data[chunkIndex] = m.Snapshot.Data[snapDataIndex]
snapDataIndex++
}

This comment has been minimized.

@aaronlehmann

aaronlehmann Nov 24, 2017
Collaborator

Use the builtin function copy instead of this byte-by-byte loop.

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

i don't believe we're mutating the bytes, so there's probably a way to avoid copying and leverage the same underlying storage array for all of the slices.

this would need to be documented, so that someone didn't come later on and break that assumption.

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

i think i misunderstood this code actually...

This comment has been minimized.

@anshulpundir

anshulpundir Nov 27, 2017
Author Contributor

Avoiding copy here would be ideal, but I'm not sure if there is a way to do this. I'll make the change if it is.

This comment has been minimized.

@anshulpundir

anshulpundir Nov 27, 2017
Author Contributor

I think it should be possible to avoid copy @dperny

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

yeah, we don't need to do any copying at all. instead of this loop, just do

raftMsg.Snapshot.Data = m.Snapshot.Data[snapDataIndex:(snapDataIndex+chunkSize)]

And then you can change the for loop above to be something like

var chunkSize int
for snapDataIndex := 0; snapDataIndex < size; snapDataIndex = snapDataIndex + chunkSize {
    // do the processing in here
}
// Function to check if streaming is needed.
// Returns true if the message type is MsgSnap/MsgApp
// and size larger than MaxRaftMsgSize.
func needsStreaming(m *raftpb.Message) bool {

This comment has been minimized.

@aaronlehmann

aaronlehmann Nov 24, 2017
Collaborator

I also like the idea of always using streaming. It gives the streaming version of the RPC more real-world coverage instead of only using it in unusual corner cases. It would let us eventually deprecate the non-streaming version.

I don't think the mixed-version scenario is important to optimize.

@anshulpundir
Copy link
Contributor Author

@anshulpundir anshulpundir commented Nov 27, 2017

Also ping @dperny @nishanttotla for review

Copy link
Collaborator

@dperny dperny left a comment

Lot of comments. Not all of these are blockers. I'll defer to your judgement on them.

if err == io.EOF {
break
} else if err != nil {
log.G(context.Background()).WithError(err).Error("error while reading from stream")

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

nit: stream.Context(), instead of context.Background(), unless there's some reason I'm not aware of to do so.

if recvdMsg.Message.Type == raftpb.MsgSnap {
assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...)
} else {
log.G(context.Background()).Errorf("Ignoring unexpected message type received on stream: %d", recvdMsg.Message.Type)

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

same as above, use stream context.

// split the snpashot into smaller messages.
for snapDataIndex := 0; snapDataIndex < size; {
remainingSize := size - snapDataIndex
chunkSize := int(math.Min(float64(remainingSize), float64(payloadSize)))

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

instead of math.Min why not just do min by hand for integers, so there's no round trip through floats?

chunkSize = payloadSize
if remainingSize < payloadSize {
    chunkSize = remainingSize
}
@@ -18,6 +19,11 @@ import (
"github.com/pkg/errors"
)

const (
// GrpcMaxMsgSize is the max allowed gRPC message size.
GrpcMaxMsgSize = 4 << 20

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

why represent this as 4 << 20? just curious.

This comment has been minimized.

@anshulpundir

anshulpundir Nov 27, 2017
Author Contributor

Someone suggested it as being cleaner/easier to read than 4 * 1024 * 1024.

This comment has been minimized.

@dperny

dperny Nov 28, 2017
Collaborator

lol i disagree strongly with someone but that's bikeshedding.

This comment has been minimized.

@dperny

dperny Nov 28, 2017
Collaborator

so this part looks fine to me.

This comment has been minimized.

@anshulpundir

anshulpundir Nov 28, 2017
Author Contributor

Its a little shorter, but I think its fine either way.

for chunkIndex := 0; chunkIndex < chunkSize; chunkIndex++ {
raftMsg.Snapshot.Data[chunkIndex] = m.Snapshot.Data[snapDataIndex]
snapDataIndex++
}

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

i don't believe we're mutating the bytes, so there's probably a way to avoid copying and leverage the same underlying storage array for all of the slices.

this would need to be documented, so that someone didn't come later on and break that assumption.

// Returns the max allowable payload based on MaxRaftMsgSize and
// the struct size for the given raftpb.Message and
func raftMessagePayloadSize(m *raftpb.Message) int {
return GrpcMaxMsgSize - raftMessageStructSize(m)

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

dumb question: is GrpcMaxMsgSize actually the optimal message size? Or would a smaller message perform better?

This comment has been minimized.

@anshulpundir

anshulpundir Nov 27, 2017
Author Contributor

GrpcMaxMsgSize is probably fine for now. I can't think of any reason to treat streamed messages differently than the regular messages.

assert.Error(t, err, "Received unexpected error EOF")

_, err = stream.CloseAndRecv()
errStr := fmt.Sprintf("rpc error: code = Internal desc = grpc: received message length %d exceeding the max size %d", msg.Size(), transport.GrpcMaxMsgSize)

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

instead of building an error string, you might want to use golang.google.com/grpc/status.FromError, and compare the code and message separately?

This comment has been minimized.

@anshulpundir

anshulpundir Nov 27, 2017
Author Contributor

Sounds reasonable


// We should have the complete snapshot. Verify and process.
if err == io.EOF {
_, err = n.ProcessRaftMessage(context.Background(), assembledMessage)

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

Unsure if I think we should use the stream context here or not.

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

maybe it's better not to discard the first return value, and instead use it as the return value for stream.SendAndClose below?

This comment has been minimized.

@anshulpundir

anshulpundir Nov 27, 2017
Author Contributor

Unsure if I think we should use the stream context here or not.

Yea, I'm unclear on this also. I'll think more about this.


// Append the received snapshot data.
if recvdMsg.Message.Type == raftpb.MsgSnap {
assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...)

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

i might be mistaken, but I don't think this code is any worse off than the regular raft message rpc call, except for the allocations and copies required to expand the array from the successive appends.


// Append the received snapshot data.
if recvdMsg.Message.Type == raftpb.MsgSnap {
assembledMessage.Message.Snapshot.Data = append(assembledMessage.Message.Snapshot.Data, recvdMsg.Message.Snapshot.Data...)

This comment has been minimized.

@dperny

dperny Nov 27, 2017
Collaborator

that said, maybe it's advantageous to take the couple of extra bytes to send the total message length with each stream request, so we can on the first message allocate an appropriately sized byte slice.

@anshulpundir anshulpundir force-pushed the anshulpundir:snap2 branch from efbe770 to 8a73b5d Nov 28, 2017
"github.com/coreos/etcd/raft/raftpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Build a snapshot message where each byte in the data is of the value (index % sizeof(byte))
func getSnapshotMessage(from uint64, to uint64) raftpb.Message {

This comment has been minimized.

@anshulpundir

anshulpundir Nov 28, 2017
Author Contributor

Trying to use this from testutils causes a circular dependency and moving this test to a transport_test package was just too problematic since unexported fields have been used extensively in the tests. Leaving as is for now, will find a way to address this in the future.

This comment has been minimized.

@dperny

dperny Nov 28, 2017
Collaborator

why do you need to use this from testutils?

This comment has been minimized.

@anshulpundir

anshulpundir Nov 28, 2017
Author Contributor

Keeping it in once place instead of duplicating it. It's also used in raft_test.go

@anshulpundir
Copy link
Contributor Author

@anshulpundir anshulpundir commented Nov 28, 2017

Working on making the change to always stream. Just to clarify: this change will only stream snapshot messages larger than the max grpc message size. Support for splitting other message types, e.g. append entires, may be supported later.

@anshulpundir anshulpundir force-pushed the anshulpundir:snap2 branch 2 times, most recently from 2d95f3a to b7c27fa Nov 29, 2017
@anshulpundir
Copy link
Contributor Author

@anshulpundir anshulpundir commented Nov 29, 2017

Addressed all review comments. ping @dperny @stevvooe @aaronlehmann for another pass

@dperny
dperny approved these changes Nov 29, 2017
Copy link
Collaborator

@dperny dperny left a comment

Couple of new comments but no blockers. LGTM.

func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
timeout := p.tr.config.SendTimeout
// if a snapshot is being sent, set timeout to LargeSendTimeout because
// sending snapshots can take more time than other messages sent between peers.
// The same applies to AppendEntries as well, where messages can get large.
// TODO(anshul) remove when streaming change ready to merge.

This comment has been minimized.

@dperny

dperny Nov 29, 2017
Collaborator

what's this?

This comment has been minimized.

@anshulpundir

anshulpundir Nov 29, 2017
Author Contributor

The large timeout/grpc message size is no longer needed once we have snapshot streaming. I guess that can be a separate PR.

chunkSize = payloadSize
}

raftMsg := *m

This comment has been minimized.

@dperny

dperny Nov 29, 2017
Collaborator

to be clear, you're derefing m to make a copy of it, yeah?

This comment has been minimized.

@anshulpundir

anshulpundir Nov 29, 2017
Author Contributor

Yes, since I want to keep the entire message the same except for the fields to be split.

msgs := splitSnapshotData(ctx, &raftMsg)
assert.Equal(t, numMsgs, len(msgs), "unexpected number of messages")

raftMsg.Snapshot.Data = make([]byte, raftMessagePayloadSize)

This comment has been minimized.

@dperny

dperny Nov 29, 2017
Collaborator

nit, not a blocker. change if you want to, or don't. either way.

check := func(size, expectedMessages int) {
    raftMsg.Snapshot.Data = make([]byte, size)
    msgs := splitSnapshotData(ctx, &raftMsg)
    assert.Equal(t, expectedMesages, len(msgs), "unexpected number of raft messages")
}

numMsgs := int(math.Ceil(float64(snapshotSize) / float64(raftMessagePayloadSize)))
check(raftMessagePayloadSize, numMsgs
check(raftMessagePayloadSize-1, 1)
check(raftMessagePayloadSize*2, 2)
check(0, 0)
// RaftMessageStream accepts a stream of raft messages to be processed on a raft member,
// returning a ProcessRaftMessageResponse when processing of the streamed messages is complete.
// It is called from the Raft leader, which uses it to stream messages to a raft member.
rpc RaftMessageStream(stream ProcessRaftMessageRequest) returns (ProcessRaftMessageResponse) {

This comment has been minimized.

@stevvooe

stevvooe Nov 30, 2017
Contributor

These need to have rpc specific types. You can't just reuse the existing request type.

This comment has been minimized.

@anshulpundir

anshulpundir Nov 30, 2017
Author Contributor

Offline conversation with @stevvooe This is a generally a d good practice, to help separate out the two rpcs and enable them to evolve independently.

// GetSnapshotMessage creates and returns a raftpb.Message of type MsgSnap
// where the snapshot data is of the given size and the value of each byte
// is (index of the byte) % 256.
func GetSnapshotMessage(from, to uint64, size int) *raftpb.Message {

This comment has been minimized.

@stevvooe

stevvooe Nov 30, 2017
Contributor

This is not a getter. This should be NewSnapshotMessage.

@@ -18,6 +18,11 @@ import (
"github.com/pkg/errors"
)

const (
// GrpcMaxMsgSize is the max allowed gRPC message size.

This comment has been minimized.

@stevvooe

stevvooe Nov 30, 2017
Contributor

GRPCMaxMsgSize.

log.G(stream.Context()).WithError(err).Error("error while reading from stream")
return err
}

This comment has been minimized.

@stevvooe

stevvooe Nov 30, 2017
Contributor

Validate that each message belongs to the same index and are to be concatenated.

@@ -16,6 +16,13 @@ service Raft {
option (docker.protobuf.plugin.tls_authorization) = { roles: "swarm-manager" };
};

// RaftMessageStream accepts a stream of raft messages to be processed on a raft member,
// returning a ProcessRaftMessageResponse when processing of the streamed messages is complete.
// It is called from the Raft leader, which uses it to stream messages to a raft member.

This comment has been minimized.

@stevvooe

stevvooe Nov 30, 2017
Contributor

Explain here that a single stream call corresponds to a single, disassembled message.

@anshulpundir anshulpundir force-pushed the anshulpundir:snap2 branch 4 times, most recently from 614d598 to a6ceb09 Dec 1, 2017
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
@anshulpundir anshulpundir force-pushed the anshulpundir:snap2 branch from a6ceb09 to 6a0cf8c Dec 1, 2017
@stevvooe
Copy link
Contributor

@stevvooe stevvooe commented Dec 1, 2017

LGTM

@anshulpundir anshulpundir changed the title [WIP] Snapshot streaming Snapshot streaming Dec 1, 2017
@docker docker deleted a comment from GordonTheTurtle Dec 1, 2017
@stevvooe stevvooe merged commit 889f1a3 into docker:master Dec 1, 2017
3 checks passed
3 checks passed
ci/circleci Your tests passed on CircleCI!
Details
codecov/project 61.64% (target 0%)
Details
dco-signed All commits are signed
@cyli cyli mentioned this pull request May 29, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

6 participants