Skip to content

Commit

Permalink
all DKG packets are now gossiped throughout the network (#1230)
Browse files Browse the repository at this point in the history
* DKG messages between participants are now signed and verified

* all DKG packets are now gossiped throughout the network
* DKG RPCs are split into `Command`s and `GossipPacket`s
* the flow of execution is now a lot simpler and with fewer
  higher-order-function shenanigans

* remove the unnecessary (racy) brokenbroadcaster

* updated with the rebased signatures branch

* additional nil checks

* added DKG failed state in for DKGs that don't hit threshold

* use beaconID from metadata rather than passing it around

* added a timeout for the large DKG

* removed unnecessary grpc call options

* Update internal/dkg/actions.go

Co-authored-by: Yolan Romailler <AnomalRoil@users.noreply.github.com>

* use mock clock for determining all DKG timings

* fixed some references to clocks

* fixed an old test where multiple clocks were being created

---------

Co-authored-by: Yolan Romailler <AnomalRoil@users.noreply.github.com>
  • Loading branch information
CluEleSsUK and AnomalRoil committed Oct 31, 2023
1 parent bd4eb7c commit b0970d8
Show file tree
Hide file tree
Showing 52 changed files with 2,492 additions and 2,657 deletions.
1 change: 0 additions & 1 deletion client/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func TestClientClose(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
//nolint:revive // we drain the channel
for range c.Watch(context.Background()) {
}
wg.Done()
Expand Down
3 changes: 1 addition & 2 deletions client/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestHTTPWatch(t *testing.T) {
if len(first.Randomness()) == 0 {
t.Fatal("should get randomness from watching")
}
//nolint:revive // draining channel until the context expires

for range result {
}
_ = httpClient.Close()
Expand Down Expand Up @@ -186,7 +186,6 @@ func TestHTTPClientClose(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
//nolint:revive // draining channel
for range httpClient.Watch(context.Background()) {
}
wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion client/lp2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (c *Client) Watch(ctx context.Context) <-chan client.Result {
c.log.Debugw("client.Watch done")
end()
// drain leftover on innerCh
for range innerCh { //nolint:revive
for range innerCh {
}
c.log.Debugw("client.Watch finished draining the innerCh")
return
Expand Down
1 change: 0 additions & 1 deletion client/optimizing.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ LOOP:
stats = append(stats, rr.stat)
res = rr.result
if rr.err != nil && !errors.Is(rr.err, common.ErrEmptyClientUnsupportedGet) {
//nolint:errorlint
err = fmt.Errorf("%v - %w", err, rr.err)
} else if rr.err == nil {
err = nil
Expand Down
2 changes: 1 addition & 1 deletion common/key/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (g *Group) FromTOML(i interface{}) error {
}

if g.Threshold < dkg.MinimumT(len(gt.Nodes)) {
return errors.New("group file have threshold 0")
return errors.New("group file has threshold 0")
} else if g.Threshold > g.Len() {
return errors.New("group file threshold greater than number of participants")
}
Expand Down
1 change: 1 addition & 0 deletions demo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func main() {
if err != nil {
panic(err)
}

err = orch.RunDKG(1 * time.Minute)
if err != nil {
panic(err)
Expand Down
3 changes: 2 additions & 1 deletion demo/node/node_inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
clock "github.com/jonboulle/clockwork"
"os"
"os/exec"
"path"
Expand Down Expand Up @@ -88,7 +89,7 @@ func NewLocalNode(i int, bindAddr string, cfg cfg.Config) *LocalNode {
dbEngineType: cfg.DBEngineType,
pgDSN: cfg.PgDSN,
memDBSize: cfg.MemDBSize,
dkgRunner: &test.DKGRunner{BeaconID: cfg.BeaconID, Client: dkgClient},
dkgRunner: &test.DKGRunner{BeaconID: cfg.BeaconID, Client: dkgClient, Clock: clock.NewRealClock()},
}

var priv *key.Pair
Expand Down
4 changes: 3 additions & 1 deletion demo/node/node_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
clock "github.com/jonboulle/clockwork"
"net"
"os"
"os/exec"
Expand Down Expand Up @@ -147,6 +148,7 @@ func (n *NodeProc) setup() {
n.dkgRunner = &test.DKGRunner{
BeaconID: n.beaconID,
Client: dkgClient,
Clock: clock.NewRealClock(),
}
// call drand binary
n.priv, err = key.NewKeyPair(n.privAddr, n.scheme)
Expand Down Expand Up @@ -347,7 +349,7 @@ func (n *NodeProc) StartLeaderReshare(thr int, transitionTime time.Time, _ int,
"--threshold", strconv.Itoa(thr),
"--transition-time", durationUntilTransitionTime.String(),
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
proposeCmd := exec.CommandContext(ctx, n.binary, proposeArgs...)
_ = runCommand(proposeCmd)
Expand Down
29 changes: 18 additions & 11 deletions internal/chain/beacon/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ func (t *testBeaconServer) SyncChain(req *drand.SyncRequest, p drand.Protocol_Sy
return SyncChain(t.h.l, t.h.chain, req, p)
}

func (t *testBeaconServer) Migrate(context.Context, *drand.Empty) (*drand.Empty, error) {
return &drand.Empty{}, nil
func (t *testBeaconServer) Command(context.Context, *drand.DKGCommand) (*drand.EmptyDKGResponse, error) {
return &drand.EmptyDKGResponse{}, nil
}

func (t *testBeaconServer) Packet(context.Context, *drand.GossipPacket) (*drand.EmptyDKGResponse, error) {
return &drand.EmptyDKGResponse{}, nil
}

func dkgShares(t *testing.T, n, thr int, sch *crypto.Scheme) ([]*key.Share, []kyber.Point) {
Expand Down Expand Up @@ -129,7 +133,7 @@ type BeaconTest struct {
scheme *crypto.Scheme
}

func NewBeaconTest(ctx context.Context, t *testing.T, n, thr int, period time.Duration, genesisTime int64, beaconID string) *BeaconTest {
func NewBeaconTest(ctx context.Context, t *testing.T, clock clock.FakeClock, n, thr int, period time.Duration, genesisTime int64, beaconID string) *BeaconTest {
sch, err := crypto.GetSchemeFromEnv()
require.NoError(t, err)
prefix := t.TempDir()
Expand All @@ -154,7 +158,7 @@ func NewBeaconTest(ctx context.Context, t *testing.T, n, thr int, period time.Du
group: group,
dpublic: group.PublicKey.PubPoly(sch).Commit(),
nodes: make(map[int]*node),
time: clock.NewFakeClock(),
time: clock,
}

for i := 0; i < n; i++ {
Expand Down Expand Up @@ -386,10 +390,11 @@ func TestBeaconSync(t *testing.T) {
ctx := context.Background()

genesisOffset := 2 * time.Second
genesisTime := clock.NewFakeClock().Now().Add(genesisOffset).Unix()
fakeClock := clock.NewFakeClock()
genesisTime := fakeClock.Now().Add(genesisOffset).Unix()
beaconID := test.GetBeaconIDFromEnv()

bt := NewBeaconTest(ctx, t, n, thr, period, genesisTime, beaconID)
bt := NewBeaconTest(ctx, t, fakeClock, n, thr, period, genesisTime, beaconID)

var counter = &sync.WaitGroup{}
myCallBack := func(i int) CallbackFunc {
Expand Down Expand Up @@ -467,10 +472,11 @@ func TestBeaconSimple(t *testing.T) {
thr := n/2 + 1
period := 2 * time.Second

genesisTime := clock.NewFakeClock().Now().Unix() + 2
fakeClock := clock.NewFakeClock()
genesisTime := fakeClock.Now().Unix() + 2
beaconID := test.GetBeaconIDFromEnv()

bt := NewBeaconTest(ctx, t, n, thr, period, genesisTime, beaconID)
bt := NewBeaconTest(ctx, t, fakeClock, n, thr, period, genesisTime, beaconID)

var counter = &sync.WaitGroup{}
counter.Add(n)
Expand Down Expand Up @@ -530,10 +536,11 @@ func TestBeaconThreshold(t *testing.T) {
period := 2 * time.Second

offsetGenesis := 2 * time.Second
genesisTime := clock.NewFakeClock().Now().Add(offsetGenesis).Unix()
fakeClock := clock.NewFakeClock()
genesisTime := fakeClock.Now().Add(offsetGenesis).Unix()
beaconID := test.GetBeaconIDFromEnv()

bt := NewBeaconTest(ctx, t, n, thr, period, genesisTime, beaconID)
bt := NewBeaconTest(ctx, t, fakeClock, n, thr, period, genesisTime, beaconID)

currentRound := uint64(0)
var counter sync.WaitGroup
Expand Down Expand Up @@ -612,7 +619,7 @@ func TestBeaconThreshold(t *testing.T) {

func TestProcessingPartialBeaconWithNonExistentIndexDoesntSegfault(t *testing.T) {
ctx := context.Background()
bt := NewBeaconTest(ctx, t, 3, 2, 30*time.Second, 0, "default")
bt := NewBeaconTest(ctx, t, clock.NewFakeClock(), 3, 2, 30*time.Second, 0, "default")

packet := drand.PartialBeaconPacket{
Round: 1,
Expand Down
2 changes: 1 addition & 1 deletion internal/core/drand_beacon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestMemDBBeaconJoinsNetworkAfterDKG(t *testing.T) {
memDBNode := newNodes[0]

t.Log("running reshare")
newGroup, err := ts.RunReshare(t, ts.nodes, newNodes)
newGroup, err := ts.RunReshare(t, ts.clock.Now().Add(3*period), ts.nodes, newNodes)
require.NoError(t, err)
require.NotNil(t, newGroup)

Expand Down
107 changes: 19 additions & 88 deletions internal/core/drand_daemon_dkg_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,12 @@ package core

import (
"context"
"errors"
"fmt"

"github.com/drand/drand/protobuf/drand"
)

func (dd *DrandDaemon) StartNetwork(ctx context.Context, options *drand.FirstProposalOptions) (*drand.EmptyResponse, error) {
return dd.dkg.StartNetwork(ctx, options)
}

func (dd *DrandDaemon) StartProposal(ctx context.Context, options *drand.ProposalOptions) (*drand.EmptyResponse, error) {
return dd.dkg.StartProposal(ctx, options)
}

func (dd *DrandDaemon) StartAbort(ctx context.Context, options *drand.AbortOptions) (*drand.EmptyResponse, error) {
beaconID := options.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
}

return dd.dkg.StartAbort(ctx, options)
}

func (dd *DrandDaemon) StartExecute(ctx context.Context, options *drand.ExecutionOptions) (*drand.EmptyResponse, error) {
beaconID := options.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
}

return dd.dkg.StartExecute(ctx, options)
}

func (dd *DrandDaemon) StartJoin(ctx context.Context, options *drand.JoinOptions) (*drand.EmptyResponse, error) {
beaconID := options.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
}

return dd.dkg.StartJoin(ctx, options)
}

func (dd *DrandDaemon) StartAccept(ctx context.Context, options *drand.AcceptOptions) (*drand.EmptyResponse, error) {
beaconID := options.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
}

return dd.dkg.StartAccept(ctx, options)
}

func (dd *DrandDaemon) StartReject(ctx context.Context, options *drand.RejectOptions) (*drand.EmptyResponse, error) {
beaconID := options.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
}

return dd.dkg.StartReject(ctx, options)
}

func (dd *DrandDaemon) DKGStatus(ctx context.Context, request *drand.DKGStatusRequest) (*drand.DKGStatusResponse, error) {
beaconID := request.BeaconID

Expand All @@ -75,52 +18,40 @@ func (dd *DrandDaemon) DKGStatus(ctx context.Context, request *drand.DKGStatusRe
return dd.dkg.DKGStatus(ctx, request)
}

func (dd *DrandDaemon) Propose(ctx context.Context, terms *drand.ProposalTerms) (*drand.EmptyResponse, error) {
return dd.dkg.Propose(ctx, terms)
}

func (dd *DrandDaemon) Abort(ctx context.Context, abortDKG *drand.AbortDKG) (*drand.EmptyResponse, error) {
beaconID := abortDKG.Metadata.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
func (dd *DrandDaemon) Command(ctx context.Context, command *drand.DKGCommand) (*drand.EmptyDKGResponse, error) {
if command.Metadata == nil {
return nil, errors.New("could not find command metadata to read beaconID")
}

return dd.dkg.Abort(ctx, abortDKG)
}

func (dd *DrandDaemon) Execute(ctx context.Context, execution *drand.StartExecution) (*drand.EmptyResponse, error) {
beaconID := execution.Metadata.BeaconID
beaconID := command.Metadata.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
}

return dd.dkg.Execute(ctx, execution)
return dd.dkg.Command(ctx, command)
}

func (dd *DrandDaemon) Accept(ctx context.Context, proposal *drand.AcceptProposal) (*drand.EmptyResponse, error) {
beaconID := proposal.Metadata.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
func (dd *DrandDaemon) Packet(ctx context.Context, packet *drand.GossipPacket) (*drand.EmptyDKGResponse, error) {
if packet.Metadata == nil {
return nil, errors.New("could not find command metadata to read beaconID")
}

return dd.dkg.Accept(ctx, proposal)
}

func (dd *DrandDaemon) Reject(ctx context.Context, proposal *drand.RejectProposal) (*drand.EmptyResponse, error) {
beaconID := proposal.Metadata.BeaconID
beaconID := packet.Metadata.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
}

return dd.dkg.Reject(ctx, proposal)
return dd.dkg.Packet(ctx, packet)
}

func (dd *DrandDaemon) BroadcastDKG(ctx context.Context, packet *drand.DKGPacket) (*drand.EmptyResponse, error) {
beaconID := packet.Metadata.BeaconID
func (dd *DrandDaemon) BroadcastDKG(ctx context.Context, packet *drand.DKGPacket) (*drand.EmptyDKGResponse, error) {
if packet.GetDkg() == nil {
return nil, errors.New("DKG was missing from packet")
}
if packet.GetDkg().Metadata == nil {
return nil, errors.New("could not find packet metadata to read beaconID")
}
beaconID := packet.Dkg.Metadata.BeaconID

if !dd.beaconExists(beaconID) {
return nil, fmt.Errorf("beacon with ID %s is not running on this daemon", beaconID)
Expand Down
Loading

0 comments on commit b0970d8

Please sign in to comment.