/
change_round.go
294 lines (259 loc) · 9.44 KB
/
change_round.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
package ibft
import (
"encoding/json"
"github.com/herumi/bls-eth-go-binary/bls"
"github.com/pkg/errors"
"math"
"time"
"go.uber.org/zap"
"github.com/bloxapp/ssv/ibft/pipeline"
"github.com/bloxapp/ssv/ibft/pipeline/auth"
"github.com/bloxapp/ssv/ibft/pipeline/changeround"
"github.com/bloxapp/ssv/ibft/proto"
)
// ChangeRoundMsgPipeline - the main change round msg pipeline
func (i *Instance) ChangeRoundMsgPipeline() pipeline.Pipeline {
return i.fork.ChangeRoundMsgPipeline()
}
// ChangeRoundMsgPipelineV0 - genesis version 0
func (i *Instance) ChangeRoundMsgPipelineV0() pipeline.Pipeline {
return pipeline.Combine(
i.ChangeRoundMsgValidationPipeline(),
pipeline.WrapFunc("add change round msg", func(signedMessage *proto.SignedMessage) error {
i.Logger.Info("received valid change round message for round",
zap.String("sender_ibft_id", signedMessage.SignersIDString()),
zap.Uint64("round", signedMessage.Message.Round))
i.ChangeRoundMessages.AddMessage(signedMessage)
return nil
}),
i.ChangeRoundPartialQuorumMsgPipeline(),
i.changeRoundFullQuorumMsgPipeline(),
)
}
// ChangeRoundMsgValidationPipeline - the main change round msg validation pipeline
func (i *Instance) ChangeRoundMsgValidationPipeline() pipeline.Pipeline {
return i.fork.ChangeRoundMsgValidationPipeline()
}
// ChangeRoundMsgValidationPipelineV0 - genesis version 0
func (i *Instance) ChangeRoundMsgValidationPipelineV0() pipeline.Pipeline {
return pipeline.Combine(
auth.BasicMsgValidation(),
auth.MsgTypeCheck(proto.RoundState_ChangeRound),
auth.ValidateLambdas(i.State().Lambda.Get()),
auth.ValidateSequenceNumber(i.State().SeqNumber.Get()),
auth.AuthorizeMsg(i.ValidatorShare),
changeround.Validate(i.ValidatorShare),
)
}
func (i *Instance) changeRoundFullQuorumMsgPipeline() pipeline.Pipeline {
return pipeline.IfFirstTrueContinueToSecond(
auth.ValidateRound(i.State().Round.Get()),
i.uponChangeRoundFullQuorum(),
)
}
/**
upon receiving a quorum Qrc of valid ⟨ROUND-CHANGE, λi, ri, −, −⟩ messages such that
leader(λi, ri) = pi ∧ JustifyRoundChange(Qrc) do
if HighestPrepared(Qrc) ̸= ⊥ then
let v such that (−, v) = HighestPrepared(Qrc))
else
let v such that v = inputValue i
broadcast ⟨PRE-PREPARE, λi, ri, v⟩
*/
func (i *Instance) uponChangeRoundFullQuorum() pipeline.Pipeline {
return pipeline.WrapFunc("upon change round full quorum", func(signedMessage *proto.SignedMessage) error {
var err error
quorum, msgsCount, committeeSize := i.changeRoundQuorum(signedMessage.Message.Round)
// change round if quorum reached
if !quorum {
i.Logger.Info("change round - quorum not reached",
zap.Uint64("round", signedMessage.Message.Round),
zap.Int("msgsCount", msgsCount),
zap.Int("committeeSize", committeeSize),
)
return nil
}
err = i.JustifyRoundChange(signedMessage.Message.Round)
if err != nil {
return errors.Wrap(err, "could not justify change round quorum")
}
i.processChangeRoundQuorumOnce.Do(func() {
i.ProcessStageChange(proto.RoundState_PrePrepare)
logger := i.Logger.With(zap.Uint64("round", signedMessage.Message.Round),
zap.Bool("is_leader", i.IsLeader()),
zap.Uint64("leader", i.ThisRoundLeader()),
zap.Bool("round_justified", true))
logger.Info("change round quorum received")
if !i.IsLeader() {
err = i.actOnExistingPrePrepare(signedMessage)
return
}
notPrepared, highest, e := i.HighestPrepared(signedMessage.Message.Round)
if e != nil {
err = e
return
}
var value []byte
if notPrepared {
value = i.State().InputValue.Get()
logger.Info("broadcasting pre-prepare as leader after round change with input value")
} else {
value = highest.PreparedValue
logger.Info("broadcasting pre-prepare as leader after round change with justified prepare value")
}
// send pre-prepare msg
broadcastMsg := i.generatePrePrepareMessage(value)
if e := i.SignAndBroadcast(broadcastMsg); e != nil {
logger.Error("could not broadcast pre-prepare message after round change", zap.Error(err))
err = e
}
})
return err
})
}
// actOnExistingPrePrepare will try to find exiting pre-prepare msg and run the UponPrePrepareMsg if found.
// We do this in case a future pre-prepare msg was sent before we reached change round quorum, this check is to prevent the instance to wait another round.
func (i *Instance) actOnExistingPrePrepare(signedMessage *proto.SignedMessage) error {
found, msg, err := i.checkExistingPrePrepare(signedMessage.Message.Round)
if err != nil {
return err
}
if !found {
return nil
}
return i.UponPrePrepareMsg().Run(msg)
}
func (i *Instance) changeRoundQuorum(round uint64) (quorum bool, t int, n int) {
// TODO - calculate quorum one way (for prepare, commit, change round and decided) and refactor
msgs := i.ChangeRoundMessages.ReadOnlyMessagesByRound(round)
quorum = len(msgs)*3 >= i.ValidatorShare.CommitteeSize()*2
return quorum, len(msgs), i.ValidatorShare.CommitteeSize()
}
func (i *Instance) roundChangeInputValue() ([]byte, error) {
// prepare justificationMsg and sig
var justificationMsg *proto.Message
var aggSig []byte
ids := make([]uint64, 0)
if i.isPrepared() {
_, msgs := i.PrepareMessages.QuorumAchieved(i.State().PreparedRound.Get(), i.State().PreparedValue.Get())
var aggregatedSig *bls.Sign
justificationMsg = msgs[0].Message
for _, msg := range msgs {
// add sig to aggregate
sig := &bls.Sign{}
if err := sig.Deserialize(msg.Signature); err != nil {
return nil, err
}
if aggregatedSig == nil {
aggregatedSig = sig
} else {
aggregatedSig.Add(sig)
}
// add id to list
ids = append(ids, msg.SignerIds...)
}
aggSig = aggregatedSig.Serialize()
}
data := &proto.ChangeRoundData{
PreparedRound: i.State().PreparedRound.Get(),
PreparedValue: i.State().PreparedValue.Get(),
JustificationMsg: justificationMsg,
JustificationSig: aggSig,
SignerIds: ids,
}
return json.Marshal(data)
}
func (i *Instance) uponChangeRoundTrigger() {
i.Logger.Info("round timeout, changing round", zap.Uint64("round", i.State().Round.Get()))
// bump round
i.BumpRound()
// mark stage
i.ProcessStageChange(proto.RoundState_ChangeRound)
// set time for next round change
i.resetRoundTimer()
// broadcast round change
if err := i.broadcastChangeRound(); err != nil {
i.Logger.Error("could not broadcast round change message", zap.Error(err))
}
}
func (i *Instance) broadcastChangeRound() error {
broadcastMsg, err := i.generateChangeRoundMessage()
if err != nil {
return err
}
if err := i.SignAndBroadcast(broadcastMsg); err != nil {
return err
}
i.Logger.Info("broadcasted change round", zap.Uint64("round", broadcastMsg.Round))
return nil
}
// JustifyRoundChange see below
func (i *Instance) JustifyRoundChange(round uint64) error {
// ### Algorithm 4 IBFTController pseudocode for process pi: message justification
// predicate JustifyRoundChange(Qrc) return
// ∀⟨ROUND-CHANGE, λi, ri, prj, pvj⟩ ∈ Qrc : prj = ⊥ ∧ pvj = ⊥
// ∨ received a quorum of valid ⟨PREPARE, λi, pr, pv⟩ messages such that:
// (pr, pv) = HighestPrepared(Qrc)
notPrepared, _, err := i.HighestPrepared(round)
if err != nil {
return err
}
if notPrepared && i.isPrepared() {
return errors.New("highest prepared doesn't match prepared state")
}
/**
IMPORTANT
Change round msgs are verified against their justifications as well in the pipline, a quorum of change round msgs
will not include un justified prepared round/ value indicated by a change round msg.
*/
return nil
}
// HighestPrepared is slightly changed to also include a returned flag to indicate if all change round messages have prj = ⊥ ∧ pvj = ⊥
func (i *Instance) HighestPrepared(round uint64) (notPrepared bool, highestPrepared *proto.ChangeRoundData, err error) {
/**
### Algorithm 4 IBFTController pseudocode for process pi: message justification
Helper function that returns a tuple (pr, pv) where pr and pv are, respectively,
the prepared round and the prepared value of the ROUND-CHANGE message in Qrc with the highest prepared round.
function HighestPrepared(Qrc)
return (pr, pv) such that:
∃⟨ROUND-CHANGE, λi, round, pr, pv⟩ ∈ Qrc :
∀⟨ROUND-CHANGE, λi, round, prj, pvj⟩ ∈ Qrc : prj = ⊥ ∨ pr ≥ prj
*/
notPrepared = true
for _, msg := range i.ChangeRoundMessages.ReadOnlyMessagesByRound(round) {
candidateChangeData := &proto.ChangeRoundData{}
err = json.Unmarshal(msg.Message.Value, candidateChangeData)
if err != nil {
return false, nil, err
}
// compare to highest found
if candidateChangeData.PreparedValue != nil {
notPrepared = false
if highestPrepared != nil {
if candidateChangeData.PreparedRound > highestPrepared.PreparedRound {
highestPrepared = candidateChangeData
}
} else {
highestPrepared = candidateChangeData
}
}
}
return notPrepared, highestPrepared, nil
}
func (i *Instance) generateChangeRoundMessage() (*proto.Message, error) {
data, err := i.roundChangeInputValue()
if err != nil {
return nil, errors.New("failed to create round change data for round")
}
return &proto.Message{
Type: proto.RoundState_ChangeRound,
Round: i.State().Round.Get(),
Lambda: i.State().Lambda.Get(),
SeqNumber: i.State().SeqNumber.Get(),
Value: data,
}, nil
}
func (i *Instance) roundTimeoutSeconds() time.Duration {
roundTimeout := math.Pow(float64(i.Config.RoundChangeDurationSeconds), float64(i.State().Round.Get()))
return time.Duration(float64(time.Second) * roundTimeout)
}