From f1a86af3fe29e18d410bec948f0303db11d082c6 Mon Sep 17 00:00:00 2001 From: corverroos Date: Tue, 24 May 2022 14:54:23 +0200 Subject: [PATCH 1/2] core/consensus: integrate qbft --- app/app.go | 78 ++++++++++++----- app/lifecycle/order.go | 2 +- app/lifecycle/orderstart_string.go | 6 +- core/consensus/component.go | 38 ++++++-- core/consensus/component_test.go | 134 +++++++++++++++++++++++++++++ core/consensus/msg.go | 27 +++--- core/consensus/transport.go | 50 +++++++---- core/qbft/qbft.go | 6 +- core/qbft/qbft_internal_test.go | 2 +- 9 files changed, 274 insertions(+), 69 deletions(-) create mode 100644 core/consensus/component_test.go diff --git a/app/app.go b/app/app.go index 2b6874a56..e0f257212 100644 --- a/app/app.go +++ b/app/app.go @@ -50,6 +50,7 @@ import ( "github.com/obolnetwork/charon/core" "github.com/obolnetwork/charon/core/aggsigdb" "github.com/obolnetwork/charon/core/bcast" + "github.com/obolnetwork/charon/core/consensus" "github.com/obolnetwork/charon/core/dutydb" "github.com/obolnetwork/charon/core/fetcher" "github.com/obolnetwork/charon/core/leadercast" @@ -151,7 +152,16 @@ func Run(ctx context.Context, conf Config) (err error) { } lockHashHex := hex.EncodeToString(lockHash[:])[:7] - tcpNode, localEnode, err := wireP2P(ctx, life, conf, lock) + p2pKey := conf.TestConfig.P2PKey + if p2pKey == nil { + var err error + p2pKey, err = p2p.LoadPrivKey(conf.DataDir) + if err != nil { + return err + } + } + + tcpNode, localEnode, err := wireP2P(ctx, life, conf, lock, p2pKey) if err != nil { return err } @@ -182,7 +192,7 @@ func Run(ctx context.Context, conf Config) (err error) { wireMonitoringAPI(life, conf.MonitoringAddr, localEnode) - if err := wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, tcpNode); err != nil { + if err := wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, tcpNode, p2pKey); err != nil { return err } @@ -191,16 +201,8 @@ func Run(ctx context.Context, conf Config) (err error) { } // wireP2P constructs the p2p tcp (libp2p) and udp (discv5) nodes and registers it with the life cycle manager. -func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, lock cluster.Lock, +func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, lock cluster.Lock, p2pKey *ecdsa.PrivateKey, ) (host.Host, *enode.LocalNode, error) { - p2pKey := conf.TestConfig.P2PKey - if p2pKey == nil { - var err error - p2pKey, err = p2p.LoadPrivKey(conf.DataDir) - if err != nil { - return nil, nil, err - } - } peers, err := lock.Peers() if err != nil { @@ -260,7 +262,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, lock clu // wireCoreWorkflow wires the core workflow components. func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, - lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, + lock cluster.Lock, nodeIdx cluster.NodeIdx, tcpNode host.Host, p2pKey *ecdsa.PrivateKey, ) error { // Convert and prep public keys and public shares var ( @@ -345,15 +347,6 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, return err } - var lcastTransport leadercast.Transport - if conf.TestConfig.LcastTransportFunc != nil { - lcastTransport = conf.TestConfig.LcastTransportFunc() - } else { - lcastTransport = leadercast.NewP2PTransport(tcpNode, nodeIdx.PeerIdx, peerIDs) - } - - consensus := leadercast.New(lcastTransport, nodeIdx.PeerIdx, len(peerIDs)) - dutyDB := dutydb.NewMemDB() vapi, err := validatorapi.NewComponent(eth2Cl, pubSharesByKey, nodeIdx.ShareIdx) @@ -388,7 +381,12 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, return err } - core.Wire(sched, fetch, consensus, dutyDB, vapi, + cons, startCons, err := newConsensus(conf, lock, tcpNode, p2pKey, nodeIdx) + if err != nil { + return err + } + + core.Wire(sched, fetch, cons, dutyDB, vapi, parSigDB, parSigEx, sigAgg, aggSigDB, broadcaster, core.WithTracing(), core.WithAsyncRetry(retryer), @@ -403,8 +401,8 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, sigAgg.Subscribe(conf.TestConfig.BroadcastCallback) } - life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartLeaderCast, lifecycle.HookFunc(consensus.Run)) life.RegisterStart(lifecycle.AsyncBackground, lifecycle.StartScheduler, lifecycle.HookFuncErr(sched.Run)) + life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PConsensus, startCons) life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartAggSigDB, lifecycle.HookFuncCtx(aggSigDB.Run)) life.RegisterStop(lifecycle.StopScheduler, lifecycle.HookFuncMin(sched.Stop)) life.RegisterStop(lifecycle.StopDutyDB, lifecycle.HookFuncMin(dutyDB.Shutdown)) @@ -413,6 +411,40 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config, return nil } +// newConsensus returns a new consensus component and its start lifecycle hook. +func newConsensus(conf Config, lock cluster.Lock, tcpNode host.Host, p2pKey *ecdsa.PrivateKey, + nodeIdx cluster.NodeIdx, +) (core.Consensus, lifecycle.IHookFunc, error) { + peers, err := lock.Peers() + if err != nil { + return nil, nil, err + } + peerIDs, err := lock.PeerIDs() + if err != nil { + return nil, nil, err + } + + if featureset.Enabled(featureset.QBFTConsensus) { + comp, err := consensus.NewComponent(tcpNode, peers, p2pKey) + if err != nil { + return nil, nil, err + } + + return comp, lifecycle.HookFuncCtx(comp.Start), nil + } + + var lcastTransport leadercast.Transport + if conf.TestConfig.LcastTransportFunc != nil { + lcastTransport = conf.TestConfig.LcastTransportFunc() + } else { + lcastTransport = leadercast.NewP2PTransport(tcpNode, nodeIdx.PeerIdx, peerIDs) + } + + lcast := leadercast.New(lcastTransport, nodeIdx.PeerIdx, len(peerIDs)) + + return lcast, lifecycle.HookFunc(lcast.Run), nil +} + // createMockValidators creates mock validators identified by their public shares. func createMockValidators(pubkeys []eth2p0.BLSPubKey) beaconmock.ValidatorSet { resp := make(beaconmock.ValidatorSet) diff --git a/app/lifecycle/order.go b/app/lifecycle/order.go index 70e82e7f9..54b674aa2 100644 --- a/app/lifecycle/order.go +++ b/app/lifecycle/order.go @@ -31,7 +31,7 @@ const ( StartMonitoringAPI StartValidatorAPI StartP2PPing - StartLeaderCast + StartP2PConsensus StartSimulator StartScheduler ) diff --git a/app/lifecycle/orderstart_string.go b/app/lifecycle/orderstart_string.go index b9463db6a..940d0078a 100644 --- a/app/lifecycle/orderstart_string.go +++ b/app/lifecycle/orderstart_string.go @@ -28,14 +28,14 @@ func _() { _ = x[StartMonitoringAPI-2] _ = x[StartValidatorAPI-3] _ = x[StartP2PPing-4] - _ = x[StartLeaderCast-5] + _ = x[StartP2PConsensus-5] _ = x[StartSimulator-6] _ = x[StartScheduler-7] } -const _OrderStart_name = "AggSigDBRelayMonitoringAPIValidatorAPIP2PPingLeaderCastSimulatorScheduler" +const _OrderStart_name = "AggSigDBRelayMonitoringAPIValidatorAPIP2PPingP2PConsensusSimulatorScheduler" -var _OrderStart_index = [...]uint8{0, 8, 13, 26, 38, 45, 55, 64, 73} +var _OrderStart_index = [...]uint8{0, 8, 13, 26, 38, 45, 57, 66, 75} func (i OrderStart) String() string { if i < 0 || i >= OrderStart(len(_OrderStart_index)-1) { diff --git a/core/consensus/component.go b/core/consensus/component.go index d358a79f8..bbf49fdd3 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -43,9 +43,7 @@ const ( ) // NewComponent returns a new consensus QBFT component. -func NewComponent(tcpNode host.Host, peers []p2p.Peer, - peerIdx int64, p2pKey *ecdsa.PrivateKey, -) (*Component, error) { +func NewComponent(tcpNode host.Host, peers []p2p.Peer, p2pKey *ecdsa.PrivateKey) (*Component, error) { // Extract peer pubkeys. keys := make(map[int64]*ecdsa.PublicKey) for i, p := range peers { @@ -72,7 +70,7 @@ func NewComponent(tcpNode host.Host, peers []p2p.Peer, // IsLeader is a deterministic leader election function. IsLeader: func(duty core.Duty, round, process int64) bool { mod := ((duty.Slot) + int64(duty.Type) + round) % int64(len(peers)) - return mod == peerIdx + return mod == process }, // Decide sends consensus output to subscribers. @@ -115,7 +113,6 @@ type Component struct { peers []p2p.Peer pubkeys map[int64]*ecdsa.PublicKey privkey *ecdsa.PrivateKey - peerIdx int64 def qbft.Definition[core.Duty, [32]byte] subs []func(ctx context.Context, duty core.Duty, set core.UnsignedDataSet) error @@ -130,7 +127,7 @@ func (c *Component) Subscribe(fn func(ctx context.Context, duty core.Duty, set c c.subs = append(c.subs, fn) } -// Start registers the libp2p receive handler. +// Start registers the libp2p receive handler. This should only be called once. func (c *Component) Start(ctx context.Context) { c.tcpNode.SetStreamHandler(protocolID, c.makeHandler(ctx)) } @@ -142,6 +139,8 @@ func (c *Component) Propose(ctx context.Context, duty core.Duty, data core.Unsig ctx, cancel := context.WithCancel(ctx) defer cancel() + log.Debug(ctx, "Starting qbft consensus instance", z.Any("duty", duty)) + // Hash the proposed data, since qbft ony supports simple comparable values. value := core.UnsignedDataSetToProto(data) hash, err := hashProto(value) @@ -166,12 +165,18 @@ func (c *Component) Propose(ctx context.Context, duty core.Duty, data core.Unsig Receive: t.recvBuffer, } + peerIdx, err := c.getPeerIdx() + if err != nil { + return err + } + // Run the algo, blocking until the context is cancelled. - return qbft.Run[core.Duty, [32]byte](ctx, c.def, qt, duty, c.peerIdx, hash) + return qbft.Run[core.Duty, [32]byte](ctx, c.def, qt, duty, peerIdx, hash) } // makeHandler returns a consensus libp2p handler. func (c *Component) makeHandler(ctx context.Context) func(s network.Stream) { + ctx = log.WithTopic(ctx, "qbft") return func(s network.Stream) { defer s.Close() @@ -188,13 +193,13 @@ func (c *Component) makeHandler(ctx context.Context) func(s network.Stream) { } if pbMsg.Msg == nil || pbMsg.Msg.Duty == nil { - log.Error(ctx, "Invalid consensus message", err) + log.Error(ctx, "Invalid consensus message", errors.New("nil msg")) return } duty := core.DutyFromProto(pbMsg.Msg.Duty) if !duty.Type.Valid() { - log.Error(ctx, "Invalid duty type", err) + log.Error(ctx, "Invalid duty type", errors.New("", z.Str("type", duty.Type.String()))) return } @@ -247,3 +252,18 @@ func (c *Component) deleteRecvChan(duty core.Duty) { delete(c.recvBuffers, duty) } + +// getPeerIdx returns the local peer index. +func (c *Component) getPeerIdx() (int64, error) { + peerIdx := int64(-1) + for i, p := range c.peers { + if c.tcpNode.ID() == p.ID { + peerIdx = int64(i) + } + } + if peerIdx == -1 { + return 0, errors.New("local libp2p host not in peer list") + } + + return peerIdx, nil +} diff --git a/core/consensus/component_test.go b/core/consensus/component_test.go new file mode 100644 index 000000000..1d93e71ee --- /dev/null +++ b/core/consensus/component_test.go @@ -0,0 +1,134 @@ +// Copyright © 2022 Obol Labs Inc. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . + +package consensus_test + +import ( + "context" + "fmt" + "testing" + + "github.com/libp2p/go-libp2p" + p2pcrypto "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" + + "github.com/obolnetwork/charon/app/log" + "github.com/obolnetwork/charon/app/z" + "github.com/obolnetwork/charon/cluster" + "github.com/obolnetwork/charon/core" + "github.com/obolnetwork/charon/core/consensus" + "github.com/obolnetwork/charon/p2p" + "github.com/obolnetwork/charon/testutil" +) + +func TestComponent(t *testing.T) { + const ( + nodes = 4 + ) + + lock, p2pkeys, _ := cluster.NewForT(t, 0, 0, nodes, 0) + + var ( + peers []p2p.Peer + hosts []host.Host + hostsInfo []peer.AddrInfo + components []*consensus.Component + results = make(chan core.UnsignedDataSet, nodes) + runErrs = make(chan error, nodes) + ctx, cancel = context.WithCancel(context.Background()) + ) + defer cancel() + + // Create hosts and enrs. + for i := 0; i < nodes; i++ { + addr := testutil.AvailableAddr(t) + mAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", addr.IP, addr.Port)) + require.NoError(t, err) + + privkey := (*p2pcrypto.Secp256k1PrivateKey)(p2pkeys[i]) + h, err := libp2p.New(libp2p.Identity(privkey), libp2p.ListenAddrs(mAddr)) + require.NoError(t, err) + + record, err := p2p.DecodeENR(lock.Operators[i].ENR) + require.NoError(t, err) + + p, err := p2p.NewPeer(record, i) + require.NoError(t, err) + + hostsInfo = append(hostsInfo, peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()}) + peers = append(peers, p) + hosts = append(hosts, h) + } + + // Connect each host with its peers + for i := 0; i < nodes; i++ { + for j := 0; j < nodes; j++ { + if i == j { + continue + } + hosts[i].Peerstore().AddAddrs(hostsInfo[j].ID, hostsInfo[j].Addrs, peerstore.PermanentAddrTTL) + } + + c, err := consensus.NewComponent(hosts[i], peers, p2pkeys[i]) + require.NoError(t, err) + c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error { + results <- set + return nil + }) + c.Start(log.WithCtx(ctx, z.Int("node", i))) + + components = append(components, c) + } + + pubkey := testutil.RandomCorePubKey(t) + + // Start all components. + for i, c := range components { + go func(ctx context.Context, i int, c *consensus.Component) { + runErrs <- c.Propose( + log.WithCtx(ctx, z.Int("node", i)), + core.Duty{Type: core.DutyAttester}, + core.UnsignedDataSet{pubkey: core.UnsignedData{byte(i)}}, + ) + }(ctx, i, c) + } + + var ( + count int + result core.UnsignedDataSet + ) + for { + select { + case err := <-runErrs: + log.Error(ctx, "", err) + require.NoError(t, err) + case res := <-results: + t.Logf("Got result: %#v", res) + if count == 0 { + result = res + } else { + require.EqualValues(t, result, res) + } + count++ + if count == nodes { + return + } + } + } +} diff --git a/core/consensus/msg.go b/core/consensus/msg.go index cd3a67d55..43b43fce3 100644 --- a/core/consensus/msg.go +++ b/core/consensus/msg.go @@ -31,15 +31,22 @@ import ( // newMsg returns a new msg. func newMsg(pbMsg *pbv1.QBFTMsg, justification []*pbv1.QBFTMsg) (msg, error) { // Do all possible error conversions first. - - valueHash, err := hashProto(pbMsg.Value) - if err != nil { - return msg{}, err + var ( + valueHash [32]byte + preparedValueHash [32]byte + err error + ) + if pbMsg.Value != nil { + valueHash, err = hashProto(pbMsg.Value) + if err != nil { + return msg{}, err + } } - - preparedValueHash, err := hashProto(pbMsg.PreparedValue) - if err != nil { - return msg{}, err + if pbMsg.PreparedValue != nil { + preparedValueHash, err = hashProto(pbMsg.PreparedValue) + if err != nil { + return msg{}, err + } } var justImpls []qbft.Msg[core.Duty, [32]byte] @@ -112,10 +119,6 @@ func (m msg) ToConsensusMsg() *pbv1.ConsensusMsg { // hashProto returns a deterministic ssz hash root of the proto message. func hashProto(msg proto.Message) ([32]byte, error) { - if msg == nil { - return [32]byte{}, nil - } - hh := ssz.DefaultHasherPool.Get() defer ssz.DefaultHasherPool.Put(hh) diff --git a/core/consensus/transport.go b/core/consensus/transport.go index 3c1d194c3..9f06ee5d1 100644 --- a/core/consensus/transport.go +++ b/core/consensus/transport.go @@ -48,8 +48,12 @@ func (t *transport) setValues(msg msg) { t.valueMu.Lock() defer t.valueMu.Unlock() - t.values[msg.Value()] = msg.msg.Value - t.values[msg.PreparedValue()] = msg.msg.PreparedValue + if msg.msg.Value != nil { + t.values[msg.Value()] = msg.msg.Value + } + if msg.msg.PreparedValue != nil { + t.values[msg.PreparedValue()] = msg.msg.PreparedValue + } } // getValue returns the value by its hash. @@ -70,14 +74,25 @@ func (t *transport) Broadcast(ctx context.Context, typ qbft.MsgType, duty core.D peerIdx int64, round int64, valueHash [32]byte, pr int64, pvHash [32]byte, justification []qbft.Msg[core.Duty, [32]byte], ) error { - // Get the values by their hashes. - value, err := t.getValue(valueHash) - if err != nil { - return err + // Get the values by their hashes if not zero. + var ( + value *pbv1.UnsignedDataSet + pv *pbv1.UnsignedDataSet + err error + ) + + if valueHash != [32]byte{} { + value, err = t.getValue(valueHash) + if err != nil { + return err + } } - pv, err := t.getValue(pvHash) - if err != nil { - return err + + if pvHash != [32]byte{} { + pv, err = t.getValue(pvHash) + if err != nil { + return err + } } // Make the message @@ -86,15 +101,16 @@ func (t *transport) Broadcast(ctx context.Context, typ qbft.MsgType, duty core.D return err } - // First send to self. - select { - case <-ctx.Done(): - return ctx.Err() - case t.recvBuffer <- msg: - } + // Send to self (async since buffer is blocking). + go func() { + select { + case <-ctx.Done(): + case t.recvBuffer <- msg: + } + }() - for i, p := range t.component.peers { - if int64(i) == t.component.peerIdx { + for _, p := range t.component.peers { + if p.ID == t.component.tcpNode.ID() { // Do not broadcast to self continue } diff --git a/core/qbft/qbft.go b/core/qbft/qbft.go index c8c9e963b..8ff4fbd64 100644 --- a/core/qbft/qbft.go +++ b/core/qbft/qbft.go @@ -418,7 +418,7 @@ func nextMinRound[I any, V comparable](d Definition[I, V], frc []Msg[I, V], roun func isJustified[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool { switch msg.Type() { case MsgPrePrepare: - return IsJustifiedPrePrepare(d, instance, msg) + return isJustifiedPrePrepare(d, instance, msg) case MsgPrepare: return true case MsgCommit: @@ -487,8 +487,8 @@ func isJustifiedDecided[I any, V comparable](d Definition[I, V], msg Msg[I, V]) return len(commits) >= d.Quorum() } -// IsJustifiedPrePrepare returns true if the PRE-PREPARE message is justified. -func IsJustifiedPrePrepare[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool { +// isJustifiedPrePrepare returns true if the PRE-PREPARE message is justified. +func isJustifiedPrePrepare[I any, V comparable](d Definition[I, V], instance I, msg Msg[I, V]) bool { if msg.Type() != MsgPrePrepare { panic("bug: not a preprepare message") } diff --git a/core/qbft/qbft_internal_test.go b/core/qbft/qbft_internal_test.go index 0481d2486..266954a5c 100644 --- a/core/qbft/qbft_internal_test.go +++ b/core/qbft/qbft_internal_test.go @@ -491,7 +491,7 @@ func TestIsJustifiedPrePrepare(t *testing.T) { Nodes: n, } - ok := IsJustifiedPrePrepare[int64, int64](def, instance, preprepare) + ok := isJustifiedPrePrepare[int64, int64](def, instance, preprepare) require.True(t, ok) } From 68bfeba75d4a9abf3b80334371257be915125bd7 Mon Sep 17 00:00:00 2001 From: corverroos Date: Tue, 24 May 2022 16:18:45 +0200 Subject: [PATCH 2/2] cleanup --- app/app.go | 2 +- core/consensus/component.go | 4 ++-- core/consensus/component_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/app/app.go b/app/app.go index e0f257212..d90e0db1d 100644 --- a/app/app.go +++ b/app/app.go @@ -425,7 +425,7 @@ func newConsensus(conf Config, lock cluster.Lock, tcpNode host.Host, p2pKey *ecd } if featureset.Enabled(featureset.QBFTConsensus) { - comp, err := consensus.NewComponent(tcpNode, peers, p2pKey) + comp, err := consensus.New(tcpNode, peers, p2pKey) if err != nil { return nil, nil, err } diff --git a/core/consensus/component.go b/core/consensus/component.go index bbf49fdd3..8fdcb9560 100644 --- a/core/consensus/component.go +++ b/core/consensus/component.go @@ -42,8 +42,8 @@ const ( protocolID = "/charon/consensus/qbft/1.0.0" ) -// NewComponent returns a new consensus QBFT component. -func NewComponent(tcpNode host.Host, peers []p2p.Peer, p2pKey *ecdsa.PrivateKey) (*Component, error) { +// New returns a new consensus QBFT component. +func New(tcpNode host.Host, peers []p2p.Peer, p2pKey *ecdsa.PrivateKey) (*Component, error) { // Extract peer pubkeys. keys := make(map[int64]*ecdsa.PublicKey) for i, p := range peers { diff --git a/core/consensus/component_test.go b/core/consensus/component_test.go index 1d93e71ee..47cc7acd2 100644 --- a/core/consensus/component_test.go +++ b/core/consensus/component_test.go @@ -85,7 +85,7 @@ func TestComponent(t *testing.T) { hosts[i].Peerstore().AddAddrs(hostsInfo[j].ID, hostsInfo[j].Addrs, peerstore.PermanentAddrTTL) } - c, err := consensus.NewComponent(hosts[i], peers, p2pkeys[i]) + c, err := consensus.New(hosts[i], peers, p2pkeys[i]) require.NoError(t, err) c.Subscribe(func(_ context.Context, _ core.Duty, set core.UnsignedDataSet) error { results <- set