Skip to content

Commit

Permalink
Rate limit (#778)
Browse files Browse the repository at this point in the history
Prioritize locally-generated DKG messages over rebroadcast messages
  • Loading branch information
nikkolasg committed Dec 10, 2020
1 parent d6a09ba commit ba36514
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 29 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
test: test-unit test-integration

test-unit:
GO111MODULE=on go test -race -v ./...
GO111MODULE=on go test -race -short -v ./...

test-unit-cover:
GO111MODULE=on go test -v -coverprofile=coverage.txt -covermode=count -coverpkg=all $(go list ./... | grep -v /demo/)
GO111MODULE=on go test -short -v -coverprofile=coverage.txt -covermode=count -coverpkg=all $(go list ./... | grep -v /demo/)

test-integration:
go test -v ./demo
Expand Down
74 changes: 52 additions & 22 deletions core/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (b *broadcast) PushDeals(bundle *dkg.DealBundle) {
defer b.Unlock()
h := hash(bundle.Hash())
b.l.Debug("broadcast", "push", "deal")
b.sendout(h, bundle)
b.sendout(h, bundle, true)
}

func (b *broadcast) PushResponses(bundle *dkg.ResponseBundle) {
Expand All @@ -83,7 +83,7 @@ func (b *broadcast) PushResponses(bundle *dkg.ResponseBundle) {
defer b.Unlock()
h := hash(bundle.Hash())
b.l.Debug("broadcast", "push", "response", bundle.String())
b.sendout(h, bundle)
b.sendout(h, bundle, true)
}

func (b *broadcast) PushJustifications(bundle *dkg.JustificationBundle) {
Expand All @@ -92,7 +92,7 @@ func (b *broadcast) PushJustifications(bundle *dkg.JustificationBundle) {
defer b.Unlock()
h := hash(bundle.Hash())
b.l.Debug("broadcast", "push", "justification")
b.sendout(h, bundle)
b.sendout(h, bundle, true)
}

func (b *broadcast) BroadcastDKG(c context.Context, p *drand.DKGPacket) (*drand.Empty, error) {
Expand All @@ -118,7 +118,7 @@ func (b *broadcast) BroadcastDKG(c context.Context, p *drand.DKGPacket) (*drand.
}

b.l.Debug("broadcast", "received new packet to broadcast", "from", addr, "type", fmt.Sprintf("%T", dkgPacket))
b.sendout(hash, dkgPacket)
b.sendout(hash, dkgPacket, false) // we're using the rate limiting
b.passToApplication(dkgPacket)
return new(drand.Empty), nil
}
Expand All @@ -138,8 +138,9 @@ func (b *broadcast) passToApplication(p packet) {

// sendout converts the packet to protobuf and pass the packet to the dispatcher
// so it is broadcasted out out to all nodes. sendout requires the broadcast
// lock.
func (b *broadcast) sendout(h []byte, p packet) {
// lock. If bypass is true, the message is directly sent to the peers, bypassing
// the rate limiting in place.
func (b *broadcast) sendout(h []byte, p packet, bypass bool) {
dkgproto, err := dkgPacketToProto(p)
if err != nil {
b.l.Error("broadcast", "can't send packet", "err", err)
Expand All @@ -150,7 +151,14 @@ func (b *broadcast) sendout(h []byte, p packet) {
proto := &drand.DKGPacket{
Dkg: dkgproto,
}
b.dispatcher.broadcast(proto)
if bypass {
// in a routine cause we don't want to block the processing of the DKG
// as well - that's ok since we are only expecting to send 3 packets out
// at the very least.
go b.dispatcher.broadcastDirect(proto)
} else {
b.dispatcher.broadcast(proto)
}
}

func (b *broadcast) IncomingDeal() <-chan dkg.DealBundle {
Expand Down Expand Up @@ -203,6 +211,19 @@ func (a *arraySet) exists(hash hash) bool {

type broadcastPacket = *drand.DKGPacket

// maxQueueSize is the maximum queue size we reserve for each destination of
// broadcast.
const maxQueueSize = 1000

// senderQueueSize returns a dynamic queue size depending on the number of nodes
// to contact.
func senderQueueSize(nodes int) int {
if nodes > maxQueueSize {
return maxQueueSize
}
return nodes
}

// dispatcher maintains a list of worker assigned one destination and pushes the
// message to send to the right worker
type dispatcher struct {
Expand All @@ -212,11 +233,12 @@ type dispatcher struct {

func newDispatcher(l log.Logger, client net.ProtocolClient, to []*key.Node, us string) *dispatcher {
var senders = make([]*sender, 0, len(to)-1)
queue := senderQueueSize(len(to))
for _, node := range to {
if node.Address() == us {
continue
}
sender := newSender(l, client, node)
sender := newSender(l, client, node, queue)
go sender.run()
senders = append(senders, sender)
}
Expand All @@ -225,37 +247,41 @@ func newDispatcher(l log.Logger, client net.ProtocolClient, to []*key.Node, us s
}
}

// broadcast uses the regular channel limitation for messages coming from other
// nodes.
func (d *dispatcher) broadcast(p broadcastPacket) {
for _, i := range rand.Perm(len(d.senders)) {
d.senders[i].sendPacket(p)
}
}

// broadcastDirect directly send to the other peers - it is used only for our
// own packets so we're not bound to congestion events.
func (d *dispatcher) broadcastDirect(p broadcastPacket) {
for _, i := range rand.Perm(len(d.senders)) {
d.senders[i].sendDirect(p)
}
}

func (d *dispatcher) stop() {
for _, sender := range d.senders {
sender.stop()
}
}

// size of the receiving queue for a sender channel
// XXX should it be dynamic ? Reason to fix it is that after some pushed packets
// if the connection still haven't sent them, that means there's probably
// something wrong with the connection and we shouldn't push more.
const senderQueueSize = 10

type sender struct {
l log.Logger
client net.ProtocolClient
to net.Peer
newCh chan broadcastPacket
}

func newSender(l log.Logger, client net.ProtocolClient, to net.Peer) *sender {
func newSender(l log.Logger, client net.ProtocolClient, to net.Peer, queueSize int) *sender {
return &sender{
l: l,
client: client,
to: to,
newCh: make(chan broadcastPacket, senderQueueSize),
newCh: make(chan broadcastPacket, queueSize),
}
}

Expand All @@ -269,12 +295,16 @@ func (s *sender) sendPacket(p broadcastPacket) {

func (s *sender) run() {
for newPacket := range s.newCh {
err := s.client.BroadcastDKG(context.Background(), s.to, newPacket)
if err != nil {
s.l.Debug("broadcast", "sending out", "error to", s.to.Address(), "err:", err)
} else {
s.l.Debug("broadcast", "sending out", "to", s.to.Address())
}
s.sendDirect(newPacket)
}
}

func (s *sender) sendDirect(newPacket broadcastPacket) {
err := s.client.BroadcastDKG(context.Background(), s.to, newPacket)
if err != nil {
s.l.Debug("broadcast", "sending out", "error to", s.to.Address(), "err:", err)
} else {
s.l.Debug("broadcast", "sending out", "to", s.to.Address())
}
}

Expand Down
30 changes: 28 additions & 2 deletions core/drand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,36 @@ import (
"github.com/stretchr/testify/require"
)

func setFDLimit() {
fdOpen := 2000
_, max, err := unixGetLimit()
if err != nil {
panic(err)
}
if err := unixSetLimit(uint64(fdOpen), max); err != nil {
panic(err)
}
}

// 1 second after end of dkg
var testBeaconOffset = 1
var testDkgTimeout = 2 * time.Second

func TestDrandLarge(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
setFDLimit()
n := 22
beaconPeriod := 5 * time.Second

dt := NewDrandTest2(t, n, key.DefaultThreshold(n), beaconPeriod)
defer dt.Cleanup()
dt.RunDKG()
time.Sleep(getSleepDuration())
fmt.Println(" --- DKG FINISHED ---")
}

func TestDrandDKGFresh(t *testing.T) {
n := 4
beaconPeriod := 1 * time.Second
Expand Down Expand Up @@ -62,8 +88,8 @@ func TestDrandDKGFresh(t *testing.T) {
}

func TestDrandDKGBroadcastDeny(t *testing.T) {
n := 5
thr := 4
n := 4
thr := 3
beaconPeriod := 1 * time.Second

dt := NewDrandTest2(t, n, thr, beaconPeriod)
Expand Down
19 changes: 16 additions & 3 deletions core/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/drand/drand/test"
clock "github.com/jonboulle/clockwork"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -461,10 +462,8 @@ func (d *Drand) DenyBroadcastTo(addrs ...string) {

func (d *DenyClient) BroadcastDKG(c context.Context, p net.Peer, in *drand.DKGPacket, opts ...net.CallOption) error {
if !d.isAllowed(p) {
d := make(chan bool)
fmt.Printf("\nDENIAL BROADCAST DKG TO %s\n", p.Address())
<-d
return nil
return errors.New("dkg broadcast denied")
}
return d.ProtocolClient.BroadcastDKG(c, p, in)
}
Expand All @@ -477,3 +476,17 @@ func (d *DenyClient) isAllowed(p net.Peer) bool {
}
return true
}

func unixGetLimit() (curr, max uint64, err error) {
rlimit := unix.Rlimit{}
err = unix.Getrlimit(unix.RLIMIT_NOFILE, &rlimit)
return rlimit.Cur, rlimit.Max, err
}

func unixSetLimit(soft, max uint64) error {
rlimit := unix.Rlimit{
Cur: soft,
Max: max,
}
return unix.Setrlimit(unix.RLIMIT_NOFILE, &rlimit)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
go.etcd.io/bbolt v1.3.4
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/net v0.0.0-20200602114024-627f9648deb9
golang.org/x/sys v0.0.0-20200926100807-9d91bd62050c
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
google.golang.org/genproto v0.0.0-20200608115520-7c474a2e3482 // indirect
google.golang.org/grpc v1.29.1
Expand Down

0 comments on commit ba36514

Please sign in to comment.