Skip to content

Commit 9cd628f

Browse files
manishrjaindanielmai
authored andcommitted
Expose Raft Comms
1 parent 6c68a70 commit 9cd628f

2 files changed

Lines changed: 24 additions & 4 deletions

File tree

conn/node.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,21 @@ func (n *Node) SetPeer(pid uint64, addr string) {
225225
n.peers[pid] = addr
226226
}
227227

228-
func (n *Node) Send(m raftpb.Message) {
229-
x.AssertTruef(n.Id != m.To, "Sending message to itself")
230-
data, err := m.Marshal()
228+
func (n *Node) Send(msg raftpb.Message) {
229+
x.AssertTruef(n.Id != msg.To, "Sending message to itself")
230+
data, err := msg.Marshal()
231231
x.Check(err)
232232

233+
if glog.V(2) {
234+
switch msg.Type {
235+
case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
236+
case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
237+
case raftpb.MsgApp, raftpb.MsgAppResp:
238+
case raftpb.MsgProp:
239+
default:
240+
glog.Infof("RaftComm: [%#x] Sending message of type %s to %#x", msg.From, msg.Type, msg.To)
241+
}
242+
}
233243
// As long as leadership is stable, any attempted Propose() calls should be reflected in the
234244
// next raft.Ready.Messages. Leaders will send MsgApps to the followers; followers will send
235245
// MsgProp to the leader. It is up to the transport layer to get those messages to their
@@ -243,7 +253,7 @@ func (n *Node) Send(m raftpb.Message) {
243253
// node. But, we shouldn't take the liberty to do that here. It would take us more time to
244254
// repropose these dropped messages anyway, than to block here a bit waiting for the messages
245255
// channel to clear out.
246-
n.messages <- sendmsg{to: m.To, data: data}
256+
n.messages <- sendmsg{to: msg.To, data: data}
247257
}
248258

249259
func (n *Node) Snapshot() (raftpb.Snapshot, error) {

conn/raft_server.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,16 @@ func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error {
225225
// This should be done in order, and not via a goroutine.
226226
// Step can block forever. See: https://github.com/etcd-io/etcd/issues/10585
227227
// So, add a context with timeout to allow it to get out of the blockage.
228+
if glog.V(2) {
229+
switch msg.Type {
230+
case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp:
231+
case raftpb.MsgReadIndex, raftpb.MsgReadIndexResp:
232+
case raftpb.MsgApp, raftpb.MsgAppResp:
233+
case raftpb.MsgProp:
234+
default:
235+
glog.Infof("RaftComm: [%#x] Received msg of type: %s from %#x", msg.To, msg.Type, msg.From)
236+
}
237+
}
228238
if err := raft.Step(ctx, msg); err != nil {
229239
glog.Warningf("Error while raft.Step from %#x: %v. Closing RaftMessage stream.",
230240
rc.GetId(), err)

0 commit comments

Comments
 (0)