Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app: wire and test simnet core workflow #206

Merged
merged 4 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 202 additions & 21 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import (
"crypto/ecdsa"
"net/http"
"net/http/pprof"
"time"

eth2v1 "github.com/attestantio/go-eth2-client/api/v1"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/dB2510/kryptology/pkg/signatures/bls/bls_sig"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -34,8 +38,20 @@ import (
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/bcast"
"github.com/obolnetwork/charon/core/dutydb"
"github.com/obolnetwork/charon/core/fetcher"
"github.com/obolnetwork/charon/core/leadercast"
"github.com/obolnetwork/charon/core/parsigdb"
"github.com/obolnetwork/charon/core/parsigex"
"github.com/obolnetwork/charon/core/scheduler"
"github.com/obolnetwork/charon/core/sigagg"
"github.com/obolnetwork/charon/core/validatorapi"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/tbls/tblsconv"
"github.com/obolnetwork/charon/testutil/beaconmock"
"github.com/obolnetwork/charon/testutil/validatormock"
)

type Config struct {
Expand All @@ -56,8 +72,18 @@ type TestConfig struct {
Manifest *Manifest
// P2PKey provides the p2p privkey explicitly, skips loading from keystore on disk.
P2PKey *ecdsa.PrivateKey
// DisablePing disables the ping service.
DisablePing bool
// PingCallback is called when a ping was completed to a peer.
PingCallback func(peer.ID)
// ParSigExFunc provides an in-memory partial signature exchange.
ParSigExFunc func() core.ParSigEx
// LcastTransportFunc provides an in-memory leader cast transport.
LcastTransportFunc func() leadercast.Transport
// SimnetSecrets provides private key shares for the simnet validatormock signer.
SimnetSecrets []*bls_sig.SecretKey
// BroadcastCallback is called when a duty is completed and sent to the broadcast component.
BroadcastCallback func(context.Context, core.Duty, core.PubKey, core.AggSignedData) error
}

// Run is the entrypoint for running a charon DVC instance.
Expand Down Expand Up @@ -88,36 +114,45 @@ func Run(ctx context.Context, conf Config) (err error) {
return err
}

tcpNode, localEnode, index, err := wireP2P(ctx, life, conf, manifest)
tcpNode, localEnode, err := wireP2P(ctx, life, conf, manifest)
if err != nil {
return err
}

nodeIdx, err := manifest.NodeIdx(tcpNode.ID())
if err != nil {
return err
}

log.Info(ctx, "Manifest loaded",
z.Int("peers", len(manifest.Peers)),
z.Str("local_peer", p2p.ShortID(tcpNode.ID())),
z.Int("index", index))
z.Str("peer_id", p2p.ShortID(tcpNode.ID())),
z.Int("peer_index", nodeIdx.PeerIdx))

wireMonitoringAPI(life, conf.MonitoringAddr, localEnode)

if err := wireValidatorAPI(life, conf); err != nil {
return err
}

if err := wireSimNetCoreWorkflow(life, conf, manifest, nodeIdx, tcpNode); err != nil {
return err
}

// Run life cycle manager
return life.Run(ctx)
}

// wireP2P constructs the p2p tcp (libp2p) and udp (discv5) nodes and registers it with the life cycle manager.
func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, manifest Manifest,
) (host.Host, *enode.LocalNode, int, error) {
) (host.Host, *enode.LocalNode, error) {
p2pKey := conf.TestConfig.P2PKey
if p2pKey == nil {
var err error
var loaded bool
p2pKey, loaded, err = p2p.LoadOrCreatePrivKey(conf.DataDir)
if err != nil {
return nil, nil, 0, errors.Wrap(err, "load or create peer ID")
return nil, nil, errors.Wrap(err, "load or create peer ID")
}

if loaded {
Expand All @@ -129,42 +164,162 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, manifest

localEnode, peerDB, err := p2p.NewLocalEnode(conf.P2P, p2pKey)
if err != nil {
return nil, nil, 0, errors.Wrap(err, "create local enode")
return nil, nil, errors.Wrap(err, "create local enode")
}

udpNode, err := p2p.NewUDPNode(conf.P2P, localEnode, p2pKey, manifest.ENRs())
if err != nil {
return nil, nil, 0, errors.Wrap(err, "start discv5 listener")
return nil, nil, errors.Wrap(err, "start discv5 listener")
}

connGater, err := p2p.NewConnGater(manifest.PeerIDs())
if err != nil {
return nil, nil, 0, errors.Wrap(err, "connection gater")
return nil, nil, errors.Wrap(err, "connection gater")
}

tcpNode, err := p2p.NewTCPNode(conf.P2P, p2pKey, connGater, udpNode, manifest.Peers)
if err != nil {
return nil, nil, 0, errors.Wrap(err, "new p2p node", z.Str("allowlist", conf.P2P.Allowlist))
return nil, nil, errors.Wrap(err, "new p2p node", z.Str("allowlist", conf.P2P.Allowlist))
}

index := -1
for i, p := range manifest.PeerIDs() {
if tcpNode.ID() == p {
index = i
}
}
if index == -1 {
return nil, nil, 0, errors.New("privkey not in manifest peers")
}
if !conf.TestConfig.DisablePing {
startPing := p2p.NewPingService(tcpNode, manifest.PeerIDs(), conf.TestConfig.PingCallback)

startPing := p2p.NewPingService(tcpNode, manifest.PeerIDs(), conf.TestConfig.PingCallback)
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PPing, lifecycle.HookFuncCtx(startPing))
}

life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PPing, lifecycle.HookFuncCtx(startPing))
life.RegisterStop(lifecycle.StopP2PPeerDB, lifecycle.HookFuncMin(peerDB.Close))
life.RegisterStop(lifecycle.StopP2PTCPNode, lifecycle.HookFuncErr(tcpNode.Close))
life.RegisterStop(lifecycle.StopP2PUDPNode, lifecycle.HookFuncMin(udpNode.Close))

return tcpNode, localEnode, index, nil
return tcpNode, localEnode, nil
}

// wireSimNetCoreWorkflow wires a simnet core workflow including a beaconmock and validatormock.
func wireSimNetCoreWorkflow(life *lifecycle.Manager, conf Config, manifest Manifest, nodeIdx NodeIdx, tcpNode host.Host) error {
// Convert and prep public keys and public shares
var (
corePubkeys []core.PubKey
pubkeys []eth2p0.BLSPubKey
pubshares []eth2p0.BLSPubKey
pubSharesByKey = make(map[*bls_sig.PublicKey]*bls_sig.PublicKey)
threshold int
)
for _, dv := range manifest.DVs {
threshold = dv.Threshold()

corePubkey, err := tblsconv.KeyToCore(dv.PublicKey())
if err != nil {
return err
}

pubkey, err := tblsconv.KeyToETH2(dv.PublicKey())
if err != nil {
return err
}

pubShare, err := dv.PublicShare(nodeIdx.ShareIdx)
if err != nil {
return err
}

eth2Share, err := tblsconv.KeyToETH2(pubShare)
if err != nil {
return err
}
corePubkeys = append(corePubkeys, corePubkey)
pubkeys = append(pubkeys, pubkey)
pubSharesByKey[dv.PublicKey()] = pubShare
pubshares = append(pubshares, eth2Share)
}

// Configure the beacon mock.
bmock := beaconmock.New(
beaconmock.WithDefaultStaticProvider(), // Use mostly beacon chain config.
beaconmock.WithSlotsPerEpoch(len(pubshares)), // Except for slots per epoch, make that faster.
beaconmock.WithSlotDuration(time.Second), // Except for slots duration, make that faster as well.
beaconmock.WithDeterministicDuties(13), // This should result in pseudo random duties.
beaconmock.WithValidatorSet(createMockValidators(pubkeys)),
)

sched, err := scheduler.New(corePubkeys, bmock)
if err != nil {
return err
}

fetch, err := fetcher.New(bmock)
if err != nil {
return err
}

var lcastTransport leadercast.Transport
if conf.TestConfig.LcastTransportFunc != nil {
lcastTransport = conf.TestConfig.LcastTransportFunc()
} else {
lcastTransport = leadercast.NewP2PTransport(tcpNode, nodeIdx.PeerIdx, manifest.PeerIDs())
}

consensus := leadercast.New(lcastTransport, nodeIdx.PeerIdx, len(manifest.PeerIDs()))

dutyDB := dutydb.NewMemDB()

vapi, err := validatorapi.NewComponent(bmock, pubSharesByKey, nodeIdx.ShareIdx)
if err != nil {
return err
}

parSigDB := parsigdb.NewMemDB(threshold)

var parSigEx core.ParSigEx
if conf.TestConfig.ParSigExFunc != nil {
parSigEx = conf.TestConfig.ParSigExFunc()
} else {
// TODO(corver): Use p2p implementation here.
parSigEx = parsigex.NewMemExFunc()()
}

sigAgg := sigagg.New(threshold)

broadcaster, err := bcast.New(bmock)
if err != nil {
return err
}

core.Wire(sched, fetch, consensus, dutyDB, vapi, parSigDB, parSigEx, sigAgg, broadcaster)

err = wireValidatorMock(conf, pubshares, sched, vapi)
if err != nil {
return err
}

if conf.TestConfig.BroadcastCallback != nil {
sigAgg.Subscribe(conf.TestConfig.BroadcastCallback)
}

life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartLeaderCast, lifecycle.HookFunc(consensus.Run))
life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartScheduler, lifecycle.HookFuncErr(sched.Run))
life.RegisterStop(lifecycle.StopScheduler, lifecycle.HookFuncMin(sched.Stop))

return nil
}

// createMockValidators creates mock validators identified by their public shares.
func createMockValidators(pubkeys []eth2p0.BLSPubKey) beaconmock.ValidatorSet {
resp := make(beaconmock.ValidatorSet)
for i, pubkey := range pubkeys {
vIdx := eth2p0.ValidatorIndex(i)

resp[vIdx] = &eth2v1.Validator{
Index: vIdx,
Status: eth2v1.ValidatorStateActiveOngoing,
Validator: &eth2p0.Validator{
WithdrawalCredentials: []byte("12345678901234567890123456789012"),
PublicKey: pubkey,
},
}
}

return resp
}

// wireMonitoringAPI constructs the monitoring API and registers it with the life cycle manager.
Expand Down Expand Up @@ -240,3 +395,29 @@ func (h httpServeHook) Call(context.Context) error {

return nil
}

func wireValidatorMock(conf Config, pubshares []eth2p0.BLSPubKey, sched core.Scheduler, vapi *validatorapi.Component) error {
secrets := conf.TestConfig.SimnetSecrets
// if len(secrets) == 0 {
// // TODO(corver): Load simnet secret shares from conf.DataDir/simnetkey*)
//}

signer := validatormock.NewSigner(secrets...)

// Trigger validatormock when scheduler triggers new slot.
sched.Subscribe(func(ctx context.Context, duty core.Duty, _ core.FetchArgSet) error {
ctx = log.WithTopic(ctx, "vmock")
go func() {
err := validatormock.Attest(ctx, vapi, signer, eth2p0.Slot(duty.Slot), pubshares...)
if err != nil {
log.Warn(ctx, "attestation failed", z.Err(err))
} else {
log.Info(ctx, "attestation success", z.I64("slot", duty.Slot))
}
}()

return nil
})

return nil
}
2 changes: 2 additions & 0 deletions app/lifecycle/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ const (
StartP2PPing
StartLeaderCast
StartSimulator
StartScheduler

StopTracing OrderStop = iota
StopScheduler
StopP2PPeerDB
StopP2PTCPNode
StopP2PUDPNode
Expand Down
5 changes: 3 additions & 2 deletions app/lifecycle/orderstart_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 11 additions & 10 deletions app/lifecycle/orderstop_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions app/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ const manifestVersion = "obol/charon/manifest/0.0.1"

// NodeIdx represents the index of a node/peer/share in the cluster as defined in the manifest.
type NodeIdx struct {
// PeerID is ID of this peer.
PeerID peer.ID
// PeerIdx is the index of a peer in the peer list (it 0-indexed).
PeerIdx int
// ShareIdx is the tbls share identifier (it is 1-indexed).
Expand Down Expand Up @@ -84,9 +82,8 @@ func (m Manifest) NodeIdx(pID peer.ID) (NodeIdx, error) {
}

return NodeIdx{
PeerIdx: i,
PeerID: pID,
ShareIdx: i + 1,
PeerIdx: i, // 0-indexed
ShareIdx: i + 1, // 1-indexed
}, nil
}

Expand Down
Loading