-
Notifications
You must be signed in to change notification settings - Fork 2
/
consensus_msg_sender.go
144 lines (127 loc) · 4.92 KB
/
consensus_msg_sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package consensus
import (
"sync"
"sync/atomic"
"time"
msg_pb "github.com/PositionExchange/posichain/api/proto/message"
nodeconfig "github.com/PositionExchange/posichain/internal/configs/node"
"github.com/PositionExchange/posichain/internal/utils"
"github.com/PositionExchange/posichain/p2p"
)
const (
// RetryIntervalInSec is the interval for message retry
RetryIntervalInSec = 7
)
// MessageSender is the wrapper object that controls how a consensus message is sent
type MessageSender struct {
blockNum uint64 // The current block number at consensus
messagesToRetry sync.Map
// The p2p host used to send/receive p2p messages
host p2p.Host
// RetryTimes is number of retry attempts
retryTimes int
}
// MessageRetry controls the message that can be retried
type MessageRetry struct {
blockNum uint64 // The block number this message is for
groups []nodeconfig.GroupID
p2pMsg []byte
msgType msg_pb.MessageType
retryCount int
isActive uint32 // 0 is false, != 0 is true, in order to use atomic.Load/Store functions
}
// NewMessageSender initializes the consensus message sender.
func NewMessageSender(host p2p.Host) *MessageSender {
return &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
}
// Reset resets the sender's state for new block
func (sender *MessageSender) Reset(blockNum uint64) {
atomic.StoreUint64(&sender.blockNum, blockNum)
sender.StopAllRetriesExceptCommitted()
sender.messagesToRetry.Range(func(key interface{}, value interface{}) bool {
if msgRetry, ok := value.(*MessageRetry); ok {
if msgRetry.msgType != msg_pb.MessageType_COMMITTED {
sender.messagesToRetry.Delete(key)
}
}
return true
})
}
// SendWithRetry sends message with retry logic.
func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) error {
if sender.retryTimes != 0 {
msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0}
atomic.StoreUint32(&msgRetry.isActive, 1)
// First stop the old one
sender.StopRetry(msgType)
sender.messagesToRetry.Store(msgType, &msgRetry)
go func() {
sender.Retry(&msgRetry)
}()
}
return sender.host.SendMessageToGroups(groups, p2pMsg)
}
// DelayedSendWithRetry is similar to SendWithRetry but without the initial message sending but only retries.
func (sender *MessageSender) DelayedSendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []nodeconfig.GroupID, p2pMsg []byte) {
if sender.retryTimes != 0 {
msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0}
atomic.StoreUint32(&msgRetry.isActive, 1)
// First stop the old one
sender.StopRetry(msgType)
sender.messagesToRetry.Store(msgType, &msgRetry)
go func() {
sender.Retry(&msgRetry)
}()
}
}
// SendWithoutRetry sends message without retry logic.
func (sender *MessageSender) SendWithoutRetry(groups []nodeconfig.GroupID, p2pMsg []byte) error {
return sender.host.SendMessageToGroups(groups, p2pMsg)
}
// Retry will retry the consensus message for <RetryTimes> times.
func (sender *MessageSender) Retry(msgRetry *MessageRetry) {
for {
time.Sleep(RetryIntervalInSec * time.Second)
if msgRetry.retryCount >= sender.retryTimes {
// Retried enough times
return
}
isActive := atomic.LoadUint32(&msgRetry.isActive)
if isActive == 0 {
// Retry is stopped
return
}
if msgRetry.msgType != msg_pb.MessageType_COMMITTED {
senderBlockNum := atomic.LoadUint64(&sender.blockNum)
if msgRetry.blockNum < senderBlockNum {
// Block already moved ahead, no need to retry old block's messages
return
}
}
msgRetry.retryCount++
if err := sender.host.SendMessageToGroups(msgRetry.groups, msgRetry.p2pMsg); err != nil {
utils.Logger().Warn().Str("groupID[0]", msgRetry.groups[0].String()).Uint64("blockNum", msgRetry.blockNum).Str("MsgType", msgRetry.msgType.String()).Int("RetryCount", msgRetry.retryCount).Msg("[Retry] Failed re-sending consensus message")
} else {
utils.Logger().Info().Str("groupID[0]", msgRetry.groups[0].String()).Uint64("blockNum", msgRetry.blockNum).Str("MsgType", msgRetry.msgType.String()).Int("RetryCount", msgRetry.retryCount).Msg("[Retry] Successfully resent consensus message")
}
}
}
// StopRetry stops the retry.
func (sender *MessageSender) StopRetry(msgType msg_pb.MessageType) {
data, ok := sender.messagesToRetry.Load(msgType)
if ok {
msgRetry := data.(*MessageRetry)
atomic.StoreUint32(&msgRetry.isActive, 0)
}
}
// StopAllRetriesExceptCommitted stops all the existing retries except committed message (which lives across consensus).
func (sender *MessageSender) StopAllRetriesExceptCommitted() {
sender.messagesToRetry.Range(func(k, v interface{}) bool {
if msgRetry, ok := v.(*MessageRetry); ok {
if msgRetry.msgType != msg_pb.MessageType_COMMITTED {
atomic.StoreUint32(&msgRetry.isActive, 0)
}
}
return true
})
}