Skip to content

Commit

Permalink
fix: improve sending vote messages and neighbor messages
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed Jul 22, 2022
1 parent 70181cd commit af18a24
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 49 deletions.
8 changes: 7 additions & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ func (s *Service) createNotificationsMessageHandler(
return fmt.Errorf("could not check if message was seen before: %w", err)
}

if hasSeen {
// sometimes substrate can send prevote/precommit messages already seen by
// gossamer but those messages are related to another round, for example to
// finalize a block Y substrate sends prevotes and precommits to Y in round r
// and in the round r + 1 it is possible to receive prevotes for block Y again, this
// is not a problem and we can improve the gossamer behavior implementing Polite GRANDPA #2505
_, isConsensusMsg := msg.(*ConsensusMessage)
if hasSeen && !isConsensusMsg {
// report peer if we get duplicate gossip message.
s.host.cm.peerSetHandler.ReportPeer(peerset.ReputationChange{
Value: peerset.DuplicateGossipValue,
Expand Down
88 changes: 66 additions & 22 deletions lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,6 @@ func (s *Service) Start() error {
}
}()

go s.sendNeighbourMessage()

return nil
}

Expand Down Expand Up @@ -344,6 +342,13 @@ func (s *Service) initiateRound() error {
s.precommits = new(sync.Map)
s.pvEquivocations = make(map[ed25519.PublicKeyBytes][]*SignedVote)
s.pcEquivocations = make(map[ed25519.PublicKeyBytes][]*SignedVote)

s.sendNeighborMessage(&NeighbourMessage{
Version: 1,
Round: s.state.round,
SetID: s.state.setID,
Number: uint32(s.head.Number),
})
s.roundLock.Unlock()

best, err := s.blockState.BestBlockHeader()
Expand Down Expand Up @@ -501,15 +506,8 @@ func (s *Service) playGrandpaRound() error {
s.prevotes.Store(s.publicKeyBytes(), spv)
}

logger.Debugf("sending pre-vote message %s...", pv)
roundComplete := make(chan struct{})
// roundComplete is a signal channel which is closed when the round completes
// (will receive the default value of channel's type), so we don't need to
// explicitly send a value.
defer close(roundComplete)

// continue to send prevote messages until round is done
go s.sendVoteMessage(prevote, vm, roundComplete)
prevoteRoundComplete := make(chan struct{})
go s.sendPrevoteMessage(vm, prevoteRoundComplete)

logger.Debug("receiving pre-commit messages...")
// through goroutine s.receiveVoteMessages(ctx)
Expand All @@ -519,7 +517,8 @@ func (s *Service) playGrandpaRound() error {
return ErrServicePaused
}

// broadcast pre-commit
// determine and broadcast pre-commit only after seen prevote messages
<-prevoteRoundComplete
pc, err := s.determinePreCommit()
if err != nil {
return err
Expand All @@ -531,12 +530,14 @@ func (s *Service) playGrandpaRound() error {
}

s.precommits.Store(s.publicKeyBytes(), spc)
logger.Debugf("sending pre-commit message %s...", pc)

// continue to send precommit messages until round is done
go s.sendVoteMessage(precommit, pcm, roundComplete)
precommitDoneCh := make(chan struct{})
defer close(precommitDoneCh)
go s.sendPrecommitMessage(pcm, precommitDoneCh)

if err = s.attemptToFinalize(); err != nil {
err = s.attemptToFinalize(precommitDoneCh)
if err != nil {
logger.Errorf("failed to finalise: %s", err)
return err
}
Expand All @@ -545,33 +546,73 @@ func (s *Service) playGrandpaRound() error {
return nil
}

func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplete <-chan struct{}) {
func (s *Service) sendPrecommitMessage(vm *VoteMessage, done <-chan struct{}) {
logger.Debugf("sending pre-commit message %s...", vm.Message)

ticker := time.NewTicker(s.interval * 4)
defer ticker.Stop()

threshold := s.state.threshold()

// Though this looks like we are sending messages multiple times,
// caching would make sure that they are being sent only once.
for {
if s.paused.Load().(bool) {
return
}

if err := s.sendMessage(msg); err != nil {
logger.Warnf("could not send message for stage %s: %s", stage, err)
if err := s.sendMessage(vm); err != nil {
logger.Warnf("could not send message for stage %s: %s", precommit, err)
} else {
logger.Tracef("sent vote message for stage %s: %s", stage, msg.Message)
logger.Warnf("sent vote message for stage %s: %s", precommit, vm.Message)
}

select {
case <-roundComplete:
if uint64(s.lenVotes(precommit)) >= threshold {
<-done
return
}

select {
case <-ticker.C:
case <-done:
return
}
}
}

func (s *Service) sendPrevoteMessage(vm *VoteMessage, done chan<- struct{}) {
logger.Debugf("sending pre-vote message %s...", vm)
defer close(done)

ticker := time.NewTicker(s.interval * 4)
defer ticker.Stop()

threshold := s.state.threshold()

// Though this looks like we are sending messages multiple times,
// caching would make sure that they are being sent only once.
for {
// stop sending prevote messages once we see a precommit vote
if uint64(s.lenVotes(precommit)) > 0 {
return
}

if err := s.sendMessage(vm); err != nil {
logger.Warnf("could not send message for stage %s: %s", prevote, err)
} else {
logger.Warnf("sent vote message for stage %s: %s", prevote, vm.Message)
}

if s.paused.Load().(bool) || uint64(s.lenVotes(prevote)) >= threshold {
return
}

<-ticker.C
}
}

// attemptToFinalize loops until the round is finalisable
func (s *Service) attemptToFinalize() error {
func (s *Service) attemptToFinalize(precommitDoneCh chan<- struct{}) error {
ticker := time.NewTicker(s.interval / 100)

for {
Expand Down Expand Up @@ -618,6 +659,9 @@ func (s *Service) attemptToFinalize() error {
continue
}

// once we reach the threshold we should stop sending precommit messages to other peers
precommitDoneCh <- struct{}{}

if err = s.finalise(); err != nil {
return err
}
Expand Down
46 changes: 25 additions & 21 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,39 +175,43 @@ func (s *Service) sendMessage(msg GrandpaMessage) error {
return nil
}

func (s *Service) sendNeighbourMessage() {
t := time.NewTicker(neighbourMessageInterval)
// notifyNeighbor will gossip a NeighbourMessage every 5 minutes, however we reset the ticker
// whenever a finalization occur meaning that a neighbour message already was sent by s.initiateRound()
func (s *Service) notifyNeighbor(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()

for {
select {
case <-s.ctx.Done():
return
case <-t.C:
if s.neighbourMessage == nil {
continue
}
case info, ok := <-s.finalisedCh:
if !ok {
// channel was closed
return
}

s.neighbourMessage = &NeighbourMessage{
s.roundLock.Lock()
nm := &NeighbourMessage{
Version: 1,
Round: info.Round,
SetID: info.SetID,
Number: uint32(info.Header.Number),
Round: s.state.round,
SetID: s.state.setID,
Number: uint32(s.head.Number),
}
}
s.roundLock.Unlock()

cm, err := s.neighbourMessage.ToConsensusMessage()
if err != nil {
logger.Warnf("failed to convert NeighbourMessage to network message: %s", err)
continue
s.sendNeighborMessage(nm)
case <-s.finalisedCh:
t = time.NewTicker(interval)
}
}
}

func (s *Service) sendNeighborMessage(nm *NeighbourMessage) {
logger.Tracef("send neighbour message: %v", nm)

s.network.GossipMessage(cm)
cm, err := nm.ToConsensusMessage()
if err != nil {
logger.Warnf("failed to convert NeighbourMessage to network message: %s", err)
return
}

s.network.GossipMessage(cm)
}

// decodeMessage decodes a network-level consensus message into a GRANDPA VoteMessage or CommitMessage
Expand Down
6 changes: 1 addition & 5 deletions lib/grandpa/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ func TestHandleNetworkMessage(t *testing.T) {

func TestSendNeighbourMessage(t *testing.T) {
gs, st := newTestService(t)
neighbourMessageInterval = time.Second
defer func() {
neighbourMessageInterval = time.Minute * 5
}()
go gs.sendNeighbourMessage()
go gs.notifyNeighbor(time.Second)

digest := types.NewDigest()
prd, err := types.NewBabeSecondaryPlainPreDigest(0, 1).ToPreRuntimeDigest()
Expand Down

0 comments on commit af18a24

Please sign in to comment.