Skip to content

Commit

Permalink
Test for broadcast based on callbacks instead of timeouts (#797)
Browse files Browse the repository at this point in the history
* callbacks based broadcast test
* linting made happy
  • Loading branch information
nikkolasg committed Apr 12, 2021
1 parent 5323ece commit b79fc75
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 79 deletions.
69 changes: 39 additions & 30 deletions core/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,18 @@ import (
"github.com/drand/kyber/share/dkg"
)

// broadcast implements a very simple broadcasting mechanism: for each new packet
// seen, rebroadcast it once. While this protocol is simple to implement, it
// does not guarantees anything about the timing of which nodes is going to
// Broadcast is an interface that represents the minimum functionality required
// by drand to both (1) be the interface between drand and the dkg logic and (2)
// implement the broadcasting mechanisn.
type Broadcast interface {
dkg.Board
BroadcastDKG(c context.Context, p *drand.DKGPacket) (*drand.Empty, error)
Stop()
}

// echoBroadcast implements a very simple broadcasting mechanism: for each new
// packet seen, rebroadcast it once. While this protocol is simple to implement,
// it does not guarantees anything about the timing of which nodes is going to
// accept packets, with Byzantine adversaries. However, an attacker that wants
// to split the nodes into two groups such that they accept different deals need
// to be able to reliably know the network topology and be able to send the
Expand All @@ -35,7 +44,7 @@ import (
// nodes can "accept" a packet right before the next phase starts and the rest
// of the node don't accept it because it's too late. Note that even though the
// DKG library allows to use fast sync the fast sync mode.
type broadcast struct {
type echoBroadcast struct {
sync.Mutex
l log.Logger
// responsible for sending out the messages
Expand All @@ -50,14 +59,14 @@ type broadcast struct {

type packet = dkg.Packet

var _ dkg.Board = (*broadcast)(nil)
var _ Broadcast = (*echoBroadcast)(nil)

// verifier is a type for a function that can verify the validity of a dkg
// Packet, namely that the signature is correct.
type verifier func(packet) error

func newBroadcast(l log.Logger, c net.ProtocolClient, own string, to []*key.Node, v verifier) *broadcast {
return &broadcast{
func newEchoBroadcast(l log.Logger, c net.ProtocolClient, own string, to []*key.Node, v verifier) *echoBroadcast {
return &echoBroadcast{
l: l,
dispatcher: newDispatcher(l, c, to, own),
dealCh: make(chan dkg.DealBundle, len(to)),
Expand All @@ -68,62 +77,62 @@ func newBroadcast(l log.Logger, c net.ProtocolClient, own string, to []*key.Node
}
}

func (b *broadcast) PushDeals(bundle *dkg.DealBundle) {
func (b *echoBroadcast) PushDeals(bundle *dkg.DealBundle) {
b.dealCh <- *bundle
b.Lock()
defer b.Unlock()
h := hash(bundle.Hash())
b.l.Debug("broadcast", "push", "deal")
b.l.Debug("echoBroadcast", "push", "deal")
b.sendout(h, bundle, true)
}

func (b *broadcast) PushResponses(bundle *dkg.ResponseBundle) {
func (b *echoBroadcast) PushResponses(bundle *dkg.ResponseBundle) {
b.respCh <- *bundle
b.Lock()
defer b.Unlock()
h := hash(bundle.Hash())
b.l.Debug("broadcast", "push", "response", bundle.String())
b.l.Debug("echoBroadcast", "push", "response", bundle.String())
b.sendout(h, bundle, true)
}

func (b *broadcast) PushJustifications(bundle *dkg.JustificationBundle) {
func (b *echoBroadcast) PushJustifications(bundle *dkg.JustificationBundle) {
b.justCh <- *bundle
b.Lock()
defer b.Unlock()
h := hash(bundle.Hash())
b.l.Debug("broadcast", "push", "justification")
b.l.Debug("echoBroadcast", "push", "justification")
b.sendout(h, bundle, true)
}

func (b *broadcast) BroadcastDKG(c context.Context, p *drand.DKGPacket) (*drand.Empty, error) {
func (b *echoBroadcast) BroadcastDKG(c context.Context, p *drand.DKGPacket) (*drand.Empty, error) {
b.Lock()
defer b.Unlock()
addr := net.RemoteAddress(c)
dkgPacket, err := protoToDKGPacket(p.GetDkg())
if err != nil {
b.l.Debug("broadcast", "received invalid packet", "from", addr, "err", err)
b.l.Debug("echoBroadcast", "received invalid packet", "from", addr, "err", err)
return nil, errors.New("invalid packet")
}

hash := hash(dkgPacket.Hash())
if b.hashes.exists(hash) {
// if we already seen this one, no need to verify even because that
// means we already broadcasted it
b.l.Debug("broadcast", "ignoring duplicate packet", "from", addr, "type", fmt.Sprintf("%T", dkgPacket))
b.l.Debug("echoBroadcast", "ignoring duplicate packet", "from", addr, "type", fmt.Sprintf("%T", dkgPacket))
return new(drand.Empty), nil
}
if err := b.verif(dkgPacket); err != nil {
b.l.Debug("broadcast", "received invalid signature", "from", addr)
b.l.Debug("echoBroadcast", "received invalid signature", "from", addr)
return nil, errors.New("invalid packet")
}

b.l.Debug("broadcast", "received new packet to broadcast", "from", addr, "type", fmt.Sprintf("%T", dkgPacket))
b.l.Debug("echoBroadcast", "received new packet to echoBroadcast", "from", addr, "type", fmt.Sprintf("%T", dkgPacket))
b.sendout(hash, dkgPacket, false) // we're using the rate limiting
b.passToApplication(dkgPacket)
return new(drand.Empty), nil
}

func (b *broadcast) passToApplication(p packet) {
func (b *echoBroadcast) passToApplication(p packet) {
switch pp := p.(type) {
case *dkg.DealBundle:
b.dealCh <- *pp
Expand All @@ -132,18 +141,18 @@ func (b *broadcast) passToApplication(p packet) {
case *dkg.JustificationBundle:
b.justCh <- *pp
default:
b.l.Error("broadcast", "application channel full")
b.l.Error("echoBroadcast", "application channel full")
}
}

// sendout converts the packet to protobuf and pass the packet to the dispatcher
// so it is broadcasted out out to all nodes. sendout requires the broadcast
// so it is broadcasted out out to all nodes. sendout requires the echoBroadcast
// lock. If bypass is true, the message is directly sent to the peers, bypassing
// the rate limiting in place.
func (b *broadcast) sendout(h []byte, p packet, bypass bool) {
func (b *echoBroadcast) sendout(h []byte, p packet, bypass bool) {
dkgproto, err := dkgPacketToProto(p)
if err != nil {
b.l.Error("broadcast", "can't send packet", "err", err)
b.l.Error("echoBroadcast", "can't send packet", "err", err)
return
}
// we register we saw that packet and we broadcast it
Expand All @@ -161,19 +170,19 @@ func (b *broadcast) sendout(h []byte, p packet, bypass bool) {
}
}

func (b *broadcast) IncomingDeal() <-chan dkg.DealBundle {
func (b *echoBroadcast) IncomingDeal() <-chan dkg.DealBundle {
return b.dealCh
}

func (b *broadcast) IncomingResponse() <-chan dkg.ResponseBundle {
func (b *echoBroadcast) IncomingResponse() <-chan dkg.ResponseBundle {
return b.respCh
}

func (b *broadcast) IncomingJustification() <-chan dkg.JustificationBundle {
func (b *echoBroadcast) IncomingJustification() <-chan dkg.JustificationBundle {
return b.justCh
}

func (b *broadcast) stop() {
func (b *echoBroadcast) Stop() {
b.dispatcher.stop()
}

Expand Down Expand Up @@ -289,7 +298,7 @@ func (s *sender) sendPacket(p broadcastPacket) {
select {
case s.newCh <- p:
default:
s.l.Debug("broadcast", "sender queue full", "endpoint", s.to.Address())
s.l.Debug("echoBroadcast", "sender queue full", "endpoint", s.to.Address())
}
}

Expand All @@ -302,9 +311,9 @@ func (s *sender) run() {
func (s *sender) sendDirect(newPacket broadcastPacket) {
err := s.client.BroadcastDKG(context.Background(), s.to, newPacket)
if err != nil {
s.l.Debug("broadcast", "sending out", "error to", s.to.Address(), "err:", err)
s.l.Debug("echoBroadcast", "sending out", "error to", s.to.Address(), "err:", err)
} else {
s.l.Debug("broadcast", "sending out", "to", s.to.Address())
s.l.Debug("echoBroadcast", "sending out", "to", s.to.Address())
}
}

Expand Down
143 changes: 99 additions & 44 deletions core/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,37 @@ import (
"github.com/stretchr/testify/require"
)

type packInfo struct {
id string
self *echoBroadcast
p *drand.DKGPacket
}

type callback func(*packInfo)

type callbackBroadcast struct {
*echoBroadcast
id string
cb callback
}

func withCallback(id string, b *echoBroadcast, cb callback) Broadcast {
return &callbackBroadcast{
id: id,
echoBroadcast: b,
cb: cb,
}
}

func (cb *callbackBroadcast) BroadcastDKG(c context.Context, p *drand.DKGPacket) (*drand.Empty, error) {
r, err := cb.echoBroadcast.BroadcastDKG(c, p)
if err != nil {
return r, err
}
cb.cb(&packInfo{id: cb.id, self: cb.echoBroadcast, p: p})
return r, err
}

func TestBroadcastSet(t *testing.T) {
aset := new(arraySet)
h1 := []byte("Hello")
Expand All @@ -35,67 +66,70 @@ func TestBroadcast(t *testing.T) {
defer os.RemoveAll(dir)
defer CloseAllDrands(drands)

broads := make([]*broadcast, 0, n)
// channel that will receive all broadcasted packets
incPackets := make(chan *packInfo)
// callback that all nodes execute when they receive a "successful" packet
callback := func(p *packInfo) {
incPackets <- p
}
broads := make([]*echoBroadcast, 0, n)
ids := make([]string, 0, n)
for _, d := range drands {
b := newBroadcast(d.log, d.privGateway.ProtocolClient, d.priv.Public.Address(), group.Nodes, func(dkg.Packet) error { return nil })
id := d.priv.Public.Address()
b := newEchoBroadcast(d.log, d.privGateway.ProtocolClient, id, group.Nodes, func(dkg.Packet) error { return nil })

d.dkgInfo = &dkgInfo{
board: b,
board: withCallback(id, b, callback),
started: true,
}
broads = append(broads, b)
ids = append(ids, id)
}

deal := fakeDeal()
dealProto, err := dkgPacketToProto(deal)
require.NoError(t, err)
_, err = broads[0].BroadcastDKG(context.Background(), &drand.DKGPacket{Dkg: dealProto})
require.NoError(t, err)
// leave some time so other get it
time.Sleep(100 * time.Millisecond)
dealPacket, hash := sendNewDeal(t, broads[0])
waitForAll := func(exp int) {
received := make(map[string]bool)
for i := 0; i < exp; i++ {
select {
case info := <-incPackets:
received[info.id] = true
case <-time.After(5 * time.Second):
require.True(t, false, "test failed to continue")
}
}
for _, id := range ids {
require.True(t, received[id])
}
}
exp := n * (n - 1)
waitForAll(exp)
for _, b := range broads {
b.Lock()
require.True(t, b.hashes.exists(deal.Hash()))
require.True(t, b.hashes.exists(hash))
require.True(t, len(b.dealCh) == 1, "len of channel is %d", len(b.dealCh))
// drain the channel
<-b.dealCh
b.Unlock()
drain(t, b.dealCh)
}

// try again to broadcast but it shouldn't actually do it
broads[1].Lock()
broads[1].hashes = new(arraySet)
broads[1].Unlock()
_, err = broads[0].BroadcastDKG(context.Background(), &drand.DKGPacket{Dkg: dealProto})
// try again to broadcast but it shouldn't actually do it because the first
// node (the one we ask to send first) already has the hash registered.
_, err := broads[0].BroadcastDKG(context.Background(), dealPacket)
require.NoError(t, err)
time.Sleep(500 * time.Millisecond)
broads[1].Lock()
select {
case <-broads[1].dealCh:
require.False(t, true, "deal shouldn't be passed down to application")
case <-time.After(500 * time.Millisecond):
// all good - the message is not supposed to be passed down since it was
// already sent in the first round, so this broadcast instance should
// never have received it, because broads[0] should never have sent it
// in the first place
}
// put it again
broads[1].hashes.put(deal.Hash())
broads[1].Unlock()
checkEmpty(t, incPackets)
require.Len(t, broads[0].dealCh, 0)

// let's make everyone broadcast a different packet
hashes := make([][]byte, 0, n-1)
for _, b := range broads[1:] {
deal := fakeDeal()
dealProto, err := dkgPacketToProto(deal)
require.NoError(t, err)
_, err = b.BroadcastDKG(context.Background(), &drand.DKGPacket{
Dkg: dealProto,
})
require.NoError(t, err)
_, hash := sendNewDeal(t, b)
hashes = append(hashes, hash)
}

time.Sleep(100 * time.Millisecond)
for i, b := range broads {
require.Equal(t, drain(t, b.dealCh), n-1, "node %d failed", i)
exp *= (n - 1)
waitForAll(exp)
// check if they all have all hashes
for _, broad := range broads {
for _, hash := range hashes {
require.True(t, broad.hashes.exists(hash))
}
}

// check that it dispatches to the correct channel
Expand All @@ -105,6 +139,27 @@ func TestBroadcast(t *testing.T) {
require.True(t, len(broads[0].justCh) == 1)
}

func sendNewDeal(t *testing.T, b *echoBroadcast) (packet *drand.DKGPacket, hash []byte) {
deal := fakeDeal()
dealProto, err := dkgPacketToProto(deal)
require.NoError(t, err)
packet = &drand.DKGPacket{
Dkg: dealProto,
}
_, err = b.BroadcastDKG(context.Background(), packet)
require.NoError(t, err)
hash = deal.Hash()
return
}

func checkEmpty(t *testing.T, ch chan *packInfo) {
select {
case <-ch:
require.False(t, true, "deal shouldn't be passed down to application")
case <-time.After(500 * time.Millisecond):
}
}

func drain(t *testing.T, ch chan dkg.DealBundle) int {
t.Helper()
var howMany int
Expand Down
4 changes: 2 additions & 2 deletions core/drand.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (d *Drand) WaitDKG() (*key.Group, error) {
return nil, err
}
d.opts.applyDkgCallback(d.share)
d.dkgInfo.board.stop()
d.dkgInfo.board.Stop()
d.dkgInfo = nil
return d.group, nil
}
Expand Down Expand Up @@ -358,7 +358,7 @@ func checkGroup(l log.Logger, group *key.Group) {
// necessary during the DKG protocol.
type dkgInfo struct {
target *key.Group
board *broadcast
board Broadcast
phaser *dkg.TimePhaser
conf *dkg.Config
proto *dkg.Protocol
Expand Down
Loading

0 comments on commit b79fc75

Please sign in to comment.