Skip to content

Commit

Permalink
Merge branch 'master' into release/v1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
willscott committed Apr 14, 2021
2 parents 1d6486e + dbeff67 commit 277e1e9
Show file tree
Hide file tree
Showing 28 changed files with 542 additions and 212 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/generate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ jobs:
name: Install Protoc
uses: arduino/setup-protoc@v1.1.2
with:
version: '3.12.3'
version: '3.14.0'
-
name: Install Protoc-gen-go
run: |
go get github.com/golang/protobuf/protoc-gen-go@v1.4.2
go get google.golang.org/grpc/cmd/protoc-gen-go-grpc@v0.0.0-20200617041141-9a465503579e
go get github.com/golang/protobuf/protoc-gen-go@v1.4.3
go get google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1.0
- name: Generate
run: go generate ./...&& go mod tidy
- name: Check
Expand All @@ -40,10 +40,11 @@ jobs:
exit 0
else
OUTPUT=$(git status --porcelain)
git diff
for file in $OUTPUT
do
f=$(echo $file | sed -e 's/^.* //')
echo "::error file=$f,line=1,col=1::File not in sync with `go generate`"
echo "::error file=$f,line=1,col=1::File $f not in sync with `go generate`"
done
OUTPUT="${OUTPUT//'%'/'%25'}"
OUTPUT="${OUTPUT//$'\n'/'%0A'}"
Expand Down
2 changes: 2 additions & 0 deletions chain/beacon/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func (d *discrepancyStore) Put(b *chain.Beacon) error {
discrepancy := float64(actual-expected) / float64(time.Millisecond)
metrics.BeaconDiscrepancyLatency.Set(float64(actual-expected) / float64(time.Millisecond))
metrics.LastBeaconRound.Set(float64(b.GetRound()))
metrics.GroupSize.Set(float64(d.group.Len()))
metrics.GroupThreshold.Set(float64(d.group.Threshold))
d.l.Info("NEW_BEACON_STORED", b.String(), "time_discrepancy_ms", discrepancy)
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions chain/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package boltdb

import (
"errors"
"io"
"path"
"sync"

Expand Down Expand Up @@ -146,6 +147,14 @@ func (b *boltStore) Cursor(fn func(chain.Cursor)) {
}
}

// SaveTo saves the bolt database to an alternate file.
func (b *boltStore) SaveTo(w io.Writer) error {
return b.db.View(func(tx *bolt.Tx) error {
_, err := tx.WriteTo(w)
return err
})
}

type boltCursor struct {
*bolt.Cursor
}
Expand Down
2 changes: 2 additions & 0 deletions chain/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chain
import (
"bytes"
"encoding/binary"
"io"
)

// store contains all the definitions and implementation of the logic that
Expand All @@ -19,6 +20,7 @@ type Store interface {
Cursor(func(Cursor))
Close()
Del(round uint64) error
SaveTo(w io.Writer) error
}

// Cursor iterates over items in sorted key order. This starts from the
Expand Down
10 changes: 5 additions & 5 deletions client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ func (h *httpClient) String() string {
return fmt.Sprintf("HTTP(%q)", h.root)
}

// Implement textMarshaller
func (h *httpClient) MarshalText() ([]byte, error) {
return json.Marshal(h.String())
}

type httpInfoResponse struct {
chainInfo *chain.Info
err error
Expand Down Expand Up @@ -234,11 +239,6 @@ func (h *httpClient) FetchChainInfo(chainHash []byte) (*chain.Info, error) {
}
}

// Implement textMarshaller
func (h *httpClient) MarshalText() ([]byte, error) {
return json.Marshal(h)
}

type httpGetResponse struct {
result client.Result
err error
Expand Down
6 changes: 3 additions & 3 deletions client/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func PollingWatcher(ctx context.Context, c Client, chainInfo *chain.Info, l log.
r := c.RoundAt(time.Now())
val, err := c.Get(ctx, r)
if err != nil {
l.Error("polling_client", "failed to watch", "err", err)
l.Error("polling_client", "failed synchronous get", "from", c, "err", err)
close(ch)
return ch
}
Expand All @@ -36,7 +36,7 @@ func PollingWatcher(ctx context.Context, c Client, chainInfo *chain.Info, l log.
if err == nil {
ch <- r
} else {
l.Error("polling_client", "failed to watch", "err", err)
l.Error("polling_client", "failed first async get", "from", c, "err", err)
}

// Then tick each period.
Expand All @@ -49,7 +49,7 @@ func PollingWatcher(ctx context.Context, c Client, chainInfo *chain.Info, l log.
if err == nil {
ch <- r
} else {
l.Error("polling_client", "failed to watch", "err", err)
l.Error("polling_client", "failed subsequent watch poll", "from", c, "err", err)
}
// TODO: keep trying on errors?
case <-ctx.Done():
Expand Down
6 changes: 6 additions & 0 deletions cmd/drand-cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,12 @@ var appCommands = []*cli.Command{
Flags: toArray(folderFlag),
Action: selfSign,
},
{
Name: "backup",
Usage: "backs up the primary drand database to a secondary location.",
Flags: toArray(outFlag, controlFlag),
Action: backupDBCmd,
},
},
},
{
Expand Down
12 changes: 12 additions & 0 deletions cmd/drand-cli/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,18 @@ func showShareCmd(c *cli.Context) error {
return printJSON(resp)
}

func backupDBCmd(c *cli.Context) error {
client, err := controlClient(c)
if err != nil {
return err
}
err = client.BackupDB(c.String(outFlag.Name))
if err != nil {
return fmt.Errorf("could not back up: %s", err)
}
return nil
}

func controlPort(c *cli.Context) string {
port := c.String(controlFlag.Name)
if port == "" {
Expand Down
69 changes: 39 additions & 30 deletions core/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,18 @@ import (
"github.com/drand/kyber/share/dkg"
)

// broadcast implements a very simple broadcasting mechanism: for each new packet
// seen, rebroadcast it once. While this protocol is simple to implement, it
// does not guarantees anything about the timing of which nodes is going to
// Broadcast is an interface that represents the minimum functionality required
// by drand to both (1) be the interface between drand and the dkg logic and (2)
// implement the broadcasting mechanisn.
type Broadcast interface {
dkg.Board
BroadcastDKG(c context.Context, p *drand.DKGPacket) (*drand.Empty, error)
Stop()
}

// echoBroadcast implements a very simple broadcasting mechanism: for each new
// packet seen, rebroadcast it once. While this protocol is simple to implement,
// it does not guarantees anything about the timing of which nodes is going to
// accept packets, with Byzantine adversaries. However, an attacker that wants
// to split the nodes into two groups such that they accept different deals need
// to be able to reliably know the network topology and be able to send the
Expand All @@ -35,7 +44,7 @@ import (
// nodes can "accept" a packet right before the next phase starts and the rest
// of the node don't accept it because it's too late. Note that even though the
// DKG library allows to use fast sync the fast sync mode.
type broadcast struct {
type echoBroadcast struct {
sync.Mutex
l log.Logger
// responsible for sending out the messages
Expand All @@ -50,14 +59,14 @@ type broadcast struct {

type packet = dkg.Packet

var _ dkg.Board = (*broadcast)(nil)
var _ Broadcast = (*echoBroadcast)(nil)

// verifier is a type for a function that can verify the validity of a dkg
// Packet, namely that the signature is correct.
type verifier func(packet) error

func newBroadcast(l log.Logger, c net.ProtocolClient, own string, to []*key.Node, v verifier) *broadcast {
return &broadcast{
func newEchoBroadcast(l log.Logger, c net.ProtocolClient, own string, to []*key.Node, v verifier) *echoBroadcast {
return &echoBroadcast{
l: l,
dispatcher: newDispatcher(l, c, to, own),
dealCh: make(chan dkg.DealBundle, len(to)),
Expand All @@ -68,62 +77,62 @@ func newBroadcast(l log.Logger, c net.ProtocolClient, own string, to []*key.Node
}
}

func (b *broadcast) PushDeals(bundle *dkg.DealBundle) {
func (b *echoBroadcast) PushDeals(bundle *dkg.DealBundle) {
b.dealCh <- *bundle
b.Lock()
defer b.Unlock()
h := hash(bundle.Hash())
b.l.Debug("broadcast", "push", "deal")
b.l.Debug("echoBroadcast", "push", "deal")
b.sendout(h, bundle, true)
}

func (b *broadcast) PushResponses(bundle *dkg.ResponseBundle) {
func (b *echoBroadcast) PushResponses(bundle *dkg.ResponseBundle) {
b.respCh <- *bundle
b.Lock()
defer b.Unlock()
h := hash(bundle.Hash())
b.l.Debug("broadcast", "push", "response", bundle.String())
b.l.Debug("echoBroadcast", "push", "response", bundle.String())
b.sendout(h, bundle, true)
}

func (b *broadcast) PushJustifications(bundle *dkg.JustificationBundle) {
func (b *echoBroadcast) PushJustifications(bundle *dkg.JustificationBundle) {
b.justCh <- *bundle
b.Lock()
defer b.Unlock()
h := hash(bundle.Hash())
b.l.Debug("broadcast", "push", "justification")
b.l.Debug("echoBroadcast", "push", "justification")
b.sendout(h, bundle, true)
}

func (b *broadcast) BroadcastDKG(c context.Context, p *drand.DKGPacket) (*drand.Empty, error) {
func (b *echoBroadcast) BroadcastDKG(c context.Context, p *drand.DKGPacket) (*drand.Empty, error) {
b.Lock()
defer b.Unlock()
addr := net.RemoteAddress(c)
dkgPacket, err := protoToDKGPacket(p.GetDkg())
if err != nil {
b.l.Debug("broadcast", "received invalid packet", "from", addr, "err", err)
b.l.Debug("echoBroadcast", "received invalid packet", "from", addr, "err", err)
return nil, errors.New("invalid packet")
}

hash := hash(dkgPacket.Hash())
if b.hashes.exists(hash) {
// if we already seen this one, no need to verify even because that
// means we already broadcasted it
b.l.Debug("broadcast", "ignoring duplicate packet", "from", addr, "type", fmt.Sprintf("%T", dkgPacket))
b.l.Debug("echoBroadcast", "ignoring duplicate packet", "from", addr, "type", fmt.Sprintf("%T", dkgPacket))
return new(drand.Empty), nil
}
if err := b.verif(dkgPacket); err != nil {
b.l.Debug("broadcast", "received invalid signature", "from", addr)
b.l.Debug("echoBroadcast", "received invalid signature", "from", addr)
return nil, errors.New("invalid packet")
}

b.l.Debug("broadcast", "received new packet to broadcast", "from", addr, "type", fmt.Sprintf("%T", dkgPacket))
b.l.Debug("echoBroadcast", "received new packet to echoBroadcast", "from", addr, "type", fmt.Sprintf("%T", dkgPacket))
b.sendout(hash, dkgPacket, false) // we're using the rate limiting
b.passToApplication(dkgPacket)
return new(drand.Empty), nil
}

func (b *broadcast) passToApplication(p packet) {
func (b *echoBroadcast) passToApplication(p packet) {
switch pp := p.(type) {
case *dkg.DealBundle:
b.dealCh <- *pp
Expand All @@ -132,18 +141,18 @@ func (b *broadcast) passToApplication(p packet) {
case *dkg.JustificationBundle:
b.justCh <- *pp
default:
b.l.Error("broadcast", "application channel full")
b.l.Error("echoBroadcast", "application channel full")
}
}

// sendout converts the packet to protobuf and pass the packet to the dispatcher
// so it is broadcasted out out to all nodes. sendout requires the broadcast
// so it is broadcasted out out to all nodes. sendout requires the echoBroadcast
// lock. If bypass is true, the message is directly sent to the peers, bypassing
// the rate limiting in place.
func (b *broadcast) sendout(h []byte, p packet, bypass bool) {
func (b *echoBroadcast) sendout(h []byte, p packet, bypass bool) {
dkgproto, err := dkgPacketToProto(p)
if err != nil {
b.l.Error("broadcast", "can't send packet", "err", err)
b.l.Error("echoBroadcast", "can't send packet", "err", err)
return
}
// we register we saw that packet and we broadcast it
Expand All @@ -161,19 +170,19 @@ func (b *broadcast) sendout(h []byte, p packet, bypass bool) {
}
}

func (b *broadcast) IncomingDeal() <-chan dkg.DealBundle {
func (b *echoBroadcast) IncomingDeal() <-chan dkg.DealBundle {
return b.dealCh
}

func (b *broadcast) IncomingResponse() <-chan dkg.ResponseBundle {
func (b *echoBroadcast) IncomingResponse() <-chan dkg.ResponseBundle {
return b.respCh
}

func (b *broadcast) IncomingJustification() <-chan dkg.JustificationBundle {
func (b *echoBroadcast) IncomingJustification() <-chan dkg.JustificationBundle {
return b.justCh
}

func (b *broadcast) stop() {
func (b *echoBroadcast) Stop() {
b.dispatcher.stop()
}

Expand Down Expand Up @@ -289,7 +298,7 @@ func (s *sender) sendPacket(p broadcastPacket) {
select {
case s.newCh <- p:
default:
s.l.Debug("broadcast", "sender queue full", "endpoint", s.to.Address())
s.l.Debug("echoBroadcast", "sender queue full", "endpoint", s.to.Address())
}
}

Expand All @@ -302,9 +311,9 @@ func (s *sender) run() {
func (s *sender) sendDirect(newPacket broadcastPacket) {
err := s.client.BroadcastDKG(context.Background(), s.to, newPacket)
if err != nil {
s.l.Debug("broadcast", "sending out", "error to", s.to.Address(), "err:", err)
s.l.Debug("echoBroadcast", "sending out", "error to", s.to.Address(), "err:", err)
} else {
s.l.Debug("broadcast", "sending out", "to", s.to.Address())
s.l.Debug("echoBroadcast", "sending out", "to", s.to.Address())
}
}

Expand Down
Loading

0 comments on commit 277e1e9

Please sign in to comment.