Skip to content

Commit

Permalink
gRPC streaming by default for all messages
Browse files Browse the repository at this point in the history
Signed-off-by: Anshul Pundir <anshul.pundir@docker.com>
  • Loading branch information
anshulpundir committed Nov 29, 2017
1 parent 32cb918 commit b7c27fa
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 35 deletions.
9 changes: 9 additions & 0 deletions manager/state/raft/transport/mock_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type mockRaft struct {

reportedUnreachables chan uint64
updatedNodes chan updateInfo

forceErrorStream bool
}

func newMockRaft() (*mockRaft, error) {
Expand Down Expand Up @@ -98,6 +100,9 @@ func (r *mockRaft) ProcessRaftMessage(ctx context.Context, req *api.ProcessRaftM

// RaftMessageStream is the mock server endpoint for streaming messages of type ProcessRaftMessageRequest.
func (r *mockRaft) RaftMessageStream(stream api.Raft_RaftMessageStreamServer) error {
if r.forceErrorStream {
return grpc.Errorf(codes.Unimplemented, "streaming not supported")
}
var recvdMsg, assembledMessage *api.ProcessRaftMessageRequest
var err error
for {
Expand All @@ -109,6 +114,10 @@ func (r *mockRaft) RaftMessageStream(stream api.Raft_RaftMessageStreamServer) er
return err
}

if r.removed[recvdMsg.Message.From] {
return status.Errorf(codes.NotFound, "%s", membership.ErrMemberRemoved.Error())
}

if assembledMessage == nil {
assembledMessage = recvdMsg
continue
Expand Down
81 changes: 46 additions & 35 deletions manager/state/raft/transport/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,14 @@ func raftMessageStructSize(m *raftpb.Message) int {
}

// Returns the max allowable payload based on MaxRaftMsgSize and
// the struct size for the given raftpb.Message and
// the struct size for the given raftpb.Message.
func raftMessagePayloadSize(m *raftpb.Message) int {
return GrpcMaxMsgSize - raftMessageStructSize(m)
}

// Split a large raft message into smaller messages.
// Currently this means splitting the []Snapshot.Data into chunks whose size
// is dictacted by MaxRaftMsgSize.
// Currently this means splitting the []Snapshot.Data
// into chunks whose size is dictacted by MaxRaftMsgSize.
func splitSnapshotData(ctx context.Context, m *raftpb.Message) []api.ProcessRaftMessageRequest {
var messages []api.ProcessRaftMessageRequest
if m.Type != raftpb.MsgSnap {
Expand All @@ -171,12 +171,9 @@ func splitSnapshotData(ctx context.Context, m *raftpb.Message) []api.ProcessRaft
chunkSize = payloadSize
}

// allocate a new slice for the snapshot data chunk
raftMsg := *m
//raftMsg.Snapshot.Data = make([]byte, chunkSize)
// sub-slice for this snapshot chunk.
raftMsg.Snapshot.Data = m.Snapshot.Data[snapDataIndex : snapDataIndex+chunkSize]
// copy snapshot data chunk
//copy(raftMsg.Snapshot.Data, m.Snapshot.Data[snapDataIndex:snapDataIndex+chunkSize])

snapDataIndex += chunkSize

Expand All @@ -188,45 +185,70 @@ func splitSnapshotData(ctx context.Context, m *raftpb.Message) []api.ProcessRaft
return messages
}

// Function to check if this message needs to be split to be streamed
// (because it is larger than GrpcMaxMsgSize).
// Returns true if the message type is MsgSnap
// and size larger than MaxRaftMsgSize.
func needsSplitting(m *raftpb.Message) bool {
raftMsg := api.ProcessRaftMessageRequest{Message: m}
return m.Type == raftpb.MsgSnap && raftMsg.Size() > GrpcMaxMsgSize
}

func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
timeout := p.tr.config.SendTimeout
// if a snapshot is being sent, set timeout to LargeSendTimeout because
// sending snapshots can take more time than other messages sent between peers.
// The same applies to AppendEntries as well, where messages can get large.
// TODO(anshul) remove when streaming change ready to merge.
if m.Type == raftpb.MsgSnap || m.Type == raftpb.MsgApp {
timeout = p.tr.config.LargeSendTimeout
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var err error
if needsStreaming(&m) {
// stream message if its bigger than MaxRaftMsgSize.
var stream api.Raft_RaftMessageStreamClient
stream, err = api.NewRaftClient(p.conn()).RaftMessageStream(ctx)
if err == nil {
// split message
msgs := splitSnapshotData(ctx, &m)
for _, msg := range msgs {
err = stream.Send(&msg)
if err != nil {
log.G(ctx).WithError(err).Error("failed to stream message")
}
var stream api.Raft_RaftMessageStreamClient
stream, err = api.NewRaftClient(p.conn()).RaftMessageStream(ctx)
if err == nil {
// Split the message if needed.
// Currently only supported for MsgSnap.
var msgs []api.ProcessRaftMessageRequest
if needsSplitting(&m) {
msgs = splitSnapshotData(ctx, &m)
} else {
raftMsg := api.ProcessRaftMessageRequest{Message: &m}
msgs = append(msgs, raftMsg)
}

// Stream
for _, msg := range msgs {
err = stream.Send(&msg)
if err != nil {
log.G(ctx).WithError(err).Error("error streaming message to peer")
stream.CloseAndRecv()
break
}
}

// Finished sending all the messages.
// Close and receive response.
if err == nil {
_, err = stream.CloseAndRecv()

// TODO(anshul) Check if Unimplemented and bubble up the error.
if err != nil {
log.G(ctx).WithError(err).Error("error receiving response on stream close")
return err
log.G(ctx).WithError(err).Error("error receiving response")
}
} else {
log.G(ctx).WithError(err).Error("failed to create stream!")
}
} else {
log.G(ctx).WithError(err).Error("error sending message to peer")
}

// Try doing a regular rpc if the receiver doesn't support streaming.
if grpc.Code(err) == codes.Unimplemented {
_, err = api.NewRaftClient(p.conn()).ProcessRaftMessage(ctx, &api.ProcessRaftMessageRequest{Message: &m})
}

// Handle errors.
if grpc.Code(err) == codes.NotFound && grpc.ErrorDesc(err) == membership.ErrMemberRemoved.Error() {
p.tr.config.NodeRemoved()
}
Expand Down Expand Up @@ -331,17 +353,6 @@ func (p *peer) handleAddressChange(ctx context.Context) error {
return nil
}

// Function to check if streaming is needed.
// Returns true if the message type is MsgSnap/MsgApp
// and size larger than MaxRaftMsgSize.
func needsStreaming(m *raftpb.Message) bool {
if m.Type == raftpb.MsgSnap && m.Size() > GrpcMaxMsgSize {
return true
}

return false
}

func (p *peer) run(ctx context.Context) {
defer func() {
p.mu.Lock()
Expand Down
8 changes: 8 additions & 0 deletions manager/state/raft/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ func TestSend(t *testing.T) {

t.Run("Send Message", testSend(ctx, c, 1, []uint64{2, 3}, raftpb.MsgHup))
t.Run("Send_Snapshot_Message", testSend(ctx, c, 1, []uint64{2, 3}, raftpb.MsgSnap))

// Return error on streaming.
for _, raft := range c.rafts {
raft.forceErrorStream = true
}

// Messages should still be delivered.
t.Run("Send Message", testSend(ctx, c, 1, []uint64{2, 3}, raftpb.MsgHup))
}

func TestSendRemoved(t *testing.T) {
Expand Down

0 comments on commit b7c27fa

Please sign in to comment.