Skip to content

Commit

Permalink
dkg: wait for peers to connect (#485)
Browse files Browse the repository at this point in the history
Waits for all peers to connect before starting Frost DKG. This isn't required for keycast since it has retries built-in.

Also:
 - Improve logging
 - Make configuration files read-only
 - Add `DKGAlgo` to `charon create dkg` command.
 - Don't error when making folders (since this doesn't work in docker)

category: feature
ticket: #478 
feature_set: stable
  • Loading branch information
corverroos committed May 6, 2022
1 parent 56272ca commit 47e89c4
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 11 deletions.
21 changes: 12 additions & 9 deletions cmd/createdkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type createDKGConfig struct {
FeeRecipient string
WithdrawalAddress string
ForkVersion string
DKGAlgo string
OperatorENRs []string
}

Expand All @@ -63,10 +64,11 @@ func bindCreateDKGFlags(flags *pflag.FlagSet, config *createDKGConfig) {
flags.StringVar(&config.OutputDir, "output-dir", ".", "The folder to write the output cluster_definition.json file to.")
flags.IntVar(&config.NumValidators, "num-validators", 1, "The number of distributed validators the cluster will manage (32ETH staked for each).")
flags.IntVarP(&config.Threshold, "threshold", "t", 3, "The threshold required for signature reconstruction. Minimum is n-(ceil(n/3)-1).")
flags.StringVar(&config.FeeRecipient, "fee_recipient_address", "", "Optional Ethereum address of the fee recipient")
flags.StringVar(&config.WithdrawalAddress, "withdrawal_address", "", "Withdrawal Ethereum address")
flags.StringVar(&config.ForkVersion, "fork_version", "", "Optional hex fork version identifying the target network/chain")
flags.StringSliceVar(&config.OperatorENRs, "operator_enrs", nil, "Comma-separated list of each operator's Charon ENR address")
flags.StringVar(&config.FeeRecipient, "fee-recipient-address", "", "Optional Ethereum address of the fee recipient")
flags.StringVar(&config.WithdrawalAddress, "withdrawal-address", "", "Withdrawal Ethereum address")
flags.StringVar(&config.ForkVersion, "fork-version", "", "Optional hex fork version identifying the target network/chain")
flags.StringVar(&config.DKGAlgo, "dkg-algorithm", "default", "DKG algorithm to use; default, keycast, frost")
flags.StringSliceVar(&config.OperatorENRs, "operator-enrs", nil, "Comma-separated list of each operator's Charon ENR address")
}

func runCreateDKG(_ context.Context, conf createDKGConfig) error {
Expand All @@ -80,16 +82,17 @@ func runCreateDKG(_ context.Context, conf createDKGConfig) error {
def := cluster.NewDefinition(conf.Name, conf.NumValidators, conf.Threshold, conf.FeeRecipient, conf.WithdrawalAddress,
conf.ForkVersion, operators, crand.Reader)

b, err := json.Marshal(def)
def.DKGAlgorithm = conf.DKGAlgo

b, err := json.MarshalIndent(def, "", " ")
if err != nil {
return errors.Wrap(err, "marshal definition")
}

if err := os.MkdirAll(conf.OutputDir, 0o755); err != nil {
return errors.Wrap(err, "create output dir")
}
// Best effort creation of output dir, but error when writing the file.
_ = os.MkdirAll(conf.OutputDir, 0o755)

if err := os.WriteFile(path.Join(conf.OutputDir, "cluster_definition.json"), b, 0o400); err != nil {
if err := os.WriteFile(path.Join(conf.OutputDir, "cluster_definition.json"), b, 0o444); err != nil {
return errors.Wrap(err, "write definition")
}

Expand Down
4 changes: 2 additions & 2 deletions dkg/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ func writeKeystores(datadir string, shares []share) error {

// writeLock writes the lock file to disk.
func writeLock(datadir string, lock cluster.Lock) error {
b, err := json.Marshal(lock)
b, err := json.MarshalIndent(lock, "", " ")
if err != nil {
return errors.Wrap(err, "marshal lock")
}

err = os.WriteFile(path.Join(datadir, "cluster_lock.json"), b, 0o600)
err = os.WriteFile(path.Join(datadir, "cluster_lock.json"), b, 0o444) // Read-only
if err != nil {
return errors.Wrap(err, "write lock")
}
Expand Down
80 changes: 80 additions & 0 deletions dkg/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"context"
crand "crypto/rand"
"fmt"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/p2p"
)
Expand Down Expand Up @@ -96,8 +99,16 @@ func Run(ctx context.Context, conf Config) error {
}
peerMap[uint32(nodeIdx.ShareIdx)] = p.ID
}

tp := newFrostP2P(ctx, tcpNode, peerMap, clusterID)

err := waitPeers(ctx, tcpNode, peers)
if err != nil {
return err
}

log.Info(ctx, "Starting Frost DKG ceremony")

shares, err = runFrostParallel(ctx, tp, uint32(def.NumValidators), uint32(len(peerMap)),
uint32(def.Threshold), uint32(nodeIdx.ShareIdx), clusterID)
if err != nil {
Expand All @@ -107,6 +118,8 @@ func Run(ctx context.Context, conf Config) error {
return errors.New("unsupported dkg algorithm")
}

log.Info(ctx, "Successfully completed DKG ceremony, writing output")

if err := writeKeystores(conf.DataDir, shares); err != nil {
return err
}
Expand Down Expand Up @@ -158,6 +171,9 @@ func setupP2P(ctx context.Context, datadir string, p2pConf p2p.Config, peers []p
return nil, nil, errors.Wrap(err, "")
}

// Register ping service handler
_ = ping.NewPingService(tcpNode)

return tcpNode, func() {
db.Close()
udpNode.Close()
Expand Down Expand Up @@ -188,3 +204,67 @@ func dvsFromShares(shares []share) ([]cluster.DistValidator, error) {

return dvs, nil
}

// waitPeers blocks until all peers are connected or the context is cancelled.
func waitPeers(ctx context.Context, tcpNode host.Host, peers []p2p.Peer) error {
// TODO(corver): This can be improved by returning a context that is
// cancelled as soon as the connection to a single peer is lost.

type tuple struct {
Peer peer.ID
RTT time.Duration
}

var (
tuples = make(chan tuple, len(peers))
total int
)
for _, p := range peers {
if tcpNode.ID() == p.ID {
continue // Do not connect to self.
}
total++
go func(pID peer.ID) {
rtt := waitConnect(ctx, tcpNode, pID)
if ctx.Err() == nil {
tuples <- tuple{Peer: pID, RTT: rtt}
}
}(p.ID)
}

var i int
for {
select {
case <-ctx.Done():
return ctx.Err()
case tuple := <-tuples:
i++
log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", i, total),
z.Str("peer", p2p.ShortID(tuple.Peer)),
z.Str("rtt", tuple.RTT.String()),
)
if i == total {
return nil
}
}
}
}

// waitConnect blocks until a libp2p connection (ping) is established with the peer or the context is cancelled.
func waitConnect(ctx context.Context, tcpNode host.Host, p peer.ID) time.Duration {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for result := range ping.Ping(ctx, tcpNode, p) {
if result.Error == nil {
return result.RTT
} else if ctx.Err() != nil {
return 0
}

log.Warn(ctx, "Failed connecting to peer (will retry)", result.Error, z.Str("peer", p2p.ShortID(p)))
time.Sleep(time.Second * 5) // TODO(corver): Improve backoff.
}

return 0
}

0 comments on commit 47e89c4

Please sign in to comment.