Skip to content

Commit

Permalink
dkg/sync: refactor and integrate sync protocol (#758)
Browse files Browse the repository at this point in the history
Slight refactor and cleanup of sync protocol implementation.

category: refactor
ticket: #751
  • Loading branch information
corverroos committed Jul 1, 2022
1 parent f6b5361 commit a2e36f6
Show file tree
Hide file tree
Showing 5 changed files with 509 additions and 562 deletions.
219 changes: 98 additions & 121 deletions dkg/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@ package dkg

import (
"context"
"crypto/ecdsa"
crand "crypto/rand"
"encoding/base64"
"fmt"
"time"

"github.com/coinbase/kryptology/pkg/signatures/bls/bls_sig"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
libp2pcrypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/dkg/sync"
"github.com/obolnetwork/charon/eth2util/deposit"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/tbls"
Expand Down Expand Up @@ -84,7 +86,12 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}

tcpNode, shutdown, err := setupP2P(ctx, conf.DataDir, conf.P2P, peers)
key, err := p2p.LoadPrivKey(conf.DataDir)
if err != nil {
return err
}

tcpNode, shutdown, err := setupP2P(ctx, key, conf.P2P, peers)
if err != nil {
return err
}
Expand All @@ -108,6 +115,26 @@ func Run(ctx context.Context, conf Config) (err error) {

ex := newExchanger(tcpNode, nodeIdx.PeerIdx, peerIds, def.NumValidators)

// Register Frost libp2p handlers
peerMap := make(map[uint32]peer.ID)
for _, p := range peers {
nodeIdx, err := def.NodeIdx(p.ID)
if err != nil {
return err
}
peerMap[uint32(nodeIdx.ShareIdx)] = p.ID
}
tp := newFrostP2P(ctx, tcpNode, peerMap, clusterID)

log.Info(ctx, "Connecting to peers...", z.Str("definition_hash", clusterID))

stopSync, err := startSyncProtocol(ctx, tcpNode, key, defHash, peerIds, cancel)
if err != nil {
return err
}

log.Info(ctx, "Starting DKG ceremony")

var shares []share
switch def.DKGAlgorithm {
case "default", "keycast":
Expand All @@ -122,28 +149,6 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}
case "frost":
// Construct peer map
peerMap := make(map[uint32]peer.ID)
for _, p := range peers {
nodeIdx, err := def.NodeIdx(p.ID)
if err != nil {
return err
}
peerMap[uint32(nodeIdx.ShareIdx)] = p.ID
}

tp := newFrostP2P(ctx, tcpNode, peerMap, clusterID)

log.Info(ctx, "Connecting to peers...", z.Str("definition_hash", clusterID))

ctx, cancel, err = waitPeers(ctx, tcpNode, peers)
if err != nil {
return err
}
defer cancel()

log.Info(ctx, "Starting Frost DKG ceremony")

shares, err = runFrostParallel(ctx, tp, uint32(def.NumValidators), uint32(len(peerMap)),
uint32(def.Threshold), uint32(nodeIdx.ShareIdx), clusterID)
if err != nil {
Expand All @@ -167,6 +172,10 @@ func Run(ctx context.Context, conf Config) (err error) {
}
log.Debug(ctx, "Aggregated deposit data signatures")

if err = stopSync(ctx); err != nil {
return errors.Wrap(err, "stop sync")
}

// Write keystores, deposit data and cluster lock files after exchange of partial signatures in order
// to prevent partial data writes in case of peer connection lost

Expand All @@ -191,12 +200,7 @@ func Run(ctx context.Context, conf Config) (err error) {
}

// setupP2P returns a started libp2p tcp node and a shutdown function.
func setupP2P(ctx context.Context, datadir string, p2pConf p2p.Config, peers []p2p.Peer) (host.Host, func(), error) {
key, err := p2p.LoadPrivKey(datadir)
if err != nil {
return nil, nil, err
}

func setupP2P(ctx context.Context, key *ecdsa.PrivateKey, p2pConf p2p.Config, peers []p2p.Peer) (host.Host, func(), error) {
localEnode, db, err := p2p.NewLocalEnode(p2pConf, key)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to open enode")
Expand Down Expand Up @@ -231,16 +235,76 @@ func setupP2P(ctx context.Context, datadir string, p2pConf p2p.Config, peers []p
}(relay)
}

// Register ping service handler
_ = ping.NewPingService(tcpNode)

return tcpNode, func() {
db.Close()
udpNode.Close()
_ = tcpNode.Close()
}, nil
}

// startSyncProtocol sets up a sync protocol server and clients for each peer and returns a shutdown function
// when all peers are connected.
func startSyncProtocol(ctx context.Context, tcpNode host.Host, key *ecdsa.PrivateKey, defHash [32]byte, peerIDs []peer.ID,
onFailure func(),
) (func(context.Context) error, error) {
// Sign definition hash with charon-enr-private-key
priv, err := libp2pcrypto.UnmarshalSecp256k1PrivateKey(crypto.FromECDSA(key))
if err != nil {
return nil, errors.Wrap(err, "convert key")
}

hashSig, err := priv.Sign(defHash[:])
if err != nil {
return nil, errors.Wrap(err, "sign definition hash")
}

server := sync.NewServer(tcpNode, len(peerIDs)-1, defHash[:])
server.Start(ctx)

var clients []*sync.Client
for _, pID := range peerIDs {
if tcpNode.ID() == pID {
continue
}

ctx := log.WithCtx(ctx, z.Str("peer", p2p.PeerName(pID)))
client := sync.NewClient(tcpNode, pID, hashSig)
clients = append(clients, client)

go func() {
err := client.Run(ctx)
if err != nil {
log.Error(ctx, "Sync failed to peer", err)
onFailure()
}
}()
}

for _, client := range clients {
err := client.AwaitConnected(ctx)
if err != nil {
return nil, err
}
}

err = server.AwaitAllConnected(ctx)
if err != nil {
return nil, err
}

// Shutdown function stops all clients and server
return func(ctx context.Context) error {
for _, client := range clients {
err := client.Shutdown(ctx)
if err != nil {
return err
}
}

return server.AwaitAllShutdown(ctx)
}, nil
}

// signAndAggLockHash returns cluster lock file with aggregated signature after signing, exchange and aggregation of partial signatures.
func signAndAggLockHash(ctx context.Context, shares []share, def cluster.Definition, nodeIdx cluster.NodeIdx, ex *exchanger) (cluster.Lock, error) {
dvs, err := dvsFromShares(shares)
Expand Down Expand Up @@ -504,93 +568,6 @@ func dvsFromShares(shares []share) ([]cluster.DistValidator, error) {
return dvs, nil
}

// waitPeers blocks until all peers are connected and returns a context that is cancelled when
// any connection is lost afterwards or when the parent context is cancelled.
func waitPeers(ctx context.Context, tcpNode host.Host, peers []p2p.Peer) (context.Context, context.CancelFunc, error) {
ctx, cancel := context.WithCancel(ctx)

type tuple struct {
Peer peer.ID
RTT time.Duration
}

var (
tuples = make(chan tuple, len(peers))
total int
)
for _, p := range peers {
if tcpNode.ID() == p.ID {
continue // Do not connect to self.
}
total++
go func(pID peer.ID) {
for {
results, rtt, ok := waitConnect(ctx, tcpNode, pID)
if ctx.Err() != nil {
return
} else if !ok {
continue
}

// We are connected
tuples <- tuple{Peer: pID, RTT: rtt}

// Wait for disconnect and cancel the context.
var err error
for result := range results {
if result.Error != nil {
err = result.Error
break
}
}

if ctx.Err() == nil {
log.Error(ctx, "Peer connection lost", err, z.Str("peer", p2p.PeerName(pID)))
cancel()
}

return
}
}(p.ID)
}

var i int
for {
select {
case <-ctx.Done():
return ctx, cancel, ctx.Err()
case <-time.After(time.Second * 30):
log.Info(ctx, fmt.Sprintf("Connected to %d of %d peers", i, total))
case tuple := <-tuples:
i++
log.Info(ctx, fmt.Sprintf("Connected to peer %d of %d", i, total),
z.Str("peer", p2p.PeerName(tuple.Peer)),
z.Str("rtt", tuple.RTT.String()),
)
if i == total {
return ctx, cancel, nil
}
}
}
}

// waitConnect blocks until a libp2p connection (ping) is established returning the ping result chan, with the peer or the context is cancelled.
func waitConnect(ctx context.Context, tcpNode host.Host, p peer.ID) (<-chan ping.Result, time.Duration, bool) {
resp := ping.Ping(ctx, tcpNode, p)
for result := range resp {
if result.Error == nil {
return resp, result.RTT, true
} else if ctx.Err() != nil {
return nil, 0, false
}

log.Debug(ctx, "Failed connecting to peer (will retry)", z.Str("peer", p2p.PeerName(p)), z.Err(result.Error))
time.Sleep(time.Second * 5) // TODO(corver): Improve backoff.
}

return nil, 0, false
}

func forkVersionToNetwork(forkVersion string) (string, error) {
switch forkVersion {
case "0x00001020":
Expand Down
Loading

0 comments on commit a2e36f6

Please sign in to comment.