Skip to content

Commit

Permalink
Add Beacon ID support (#832)
Browse files Browse the repository at this point in the history
Add textual beacon 'ID' to provide an additional point of identification / disambiguation of beacons.
  • Loading branch information
emmanuelm41 committed Oct 20, 2021
1 parent 591ac2b commit f2ee959
Show file tree
Hide file tree
Showing 39 changed files with 502 additions and 251 deletions.
7 changes: 5 additions & 2 deletions chain/beacon/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ type partialCache struct {
rounds map[string]*roundCache
rcvd map[int][]string
l log.Logger
id string
}

func newPartialCache(l log.Logger) *partialCache {
func newPartialCache(l log.Logger, id string) *partialCache {
return &partialCache{
rounds: make(map[string]*roundCache),
rcvd: make(map[int][]string),
l: l,
id: id,
}
}

Expand Down Expand Up @@ -87,13 +89,14 @@ func (c *partialCache) getCache(id string, p *drand.PartialBeaconPacket) *roundC
if round, ok := c.rounds[id]; ok {
return round
}

idx, _ := key.Scheme.IndexOf(p.GetPartialSig())
if len(c.rcvd[idx]) >= MaxPartialsPerNode {
// this node has submitted too many partials - we take the last one off
toEvict := c.rcvd[idx][0]
round, ok := c.rounds[toEvict]
if !ok {
c.l.Errorw("", "cache", "miss", "node", idx, "not_present_for", p.GetRound())
c.l.Errorw("", "beacon_id", id, "cache", "miss", "node", idx, "not_present_for", p.GetRound())
return nil
}
round.flushIndex(idx)
Expand Down
2 changes: 1 addition & 1 deletion chain/beacon/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestCacheRound(t *testing.T) {

func TestCachePartial(t *testing.T) {
l := log.DefaultLogger()
cache := newPartialCache(l)
cache := newPartialCache(l, "test_id")
var round uint64 = 64
prev := []byte("yesterday was another day")

Expand Down
31 changes: 19 additions & 12 deletions chain/beacon/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ var partialCacheStoreLimit = 3
// runAggregator runs a continuous loop that tries to aggregate partial
// signatures when it can
func (c *chainStore) runAggregator() {
beaconID := c.conf.Group.ID
lastBeacon, err := c.Last()
if err != nil {
c.l.Fatalw("", "chain_aggregator", "loading", "last_beacon", err)
c.l.Fatalw("", "beacon_id", beaconID, "chain_aggregator", "loading", "last_beacon", err)
}

var cache = newPartialCache(c.l)
var cache = newPartialCache(c.l, beaconID)
for {
select {
case <-c.done:
Expand All @@ -116,7 +117,7 @@ func (c *chainStore) runAggregator() {
shouldStore := isNotInPast && isNotTooFar
// check if we can reconstruct
if !shouldStore {
c.l.Debugw("", "ignoring_partial", partial.p.GetRound(), "last_beacon_stored", lastBeacon.Round)
c.l.Debugw("", "beacon_id", beaconID, "ignoring_partial", partial.p.GetRound(), "last_beacon_stored", lastBeacon.Round)
break
}
// NOTE: This line means we can only verify partial signatures of
Expand All @@ -129,11 +130,12 @@ func (c *chainStore) runAggregator() {
cache.Append(partial.p)
roundCache := cache.GetRoundCache(partial.p.GetRound(), partial.p.GetPreviousSig())
if roundCache == nil {
c.l.Errorw("", "store_partial", partial.addr, "no_round_cache", partial.p.GetRound())
c.l.Errorw("", "beacon_id", beaconID, "store_partial", partial.addr, "no_round_cache", partial.p.GetRound())
break
}

c.l.Debugw("", "store_partial", partial.addr, "round", roundCache.round, "len_partials", fmt.Sprintf("%d/%d", roundCache.Len(), thr))
c.l.Debugw("", "beacon_id", beaconID, "store_partial", partial.addr,
"round", roundCache.round, "len_partials", fmt.Sprintf("%d/%d", roundCache.Len(), thr))
if roundCache.Len() < thr {
break
}
Expand All @@ -142,11 +144,11 @@ func (c *chainStore) runAggregator() {

finalSig, err := key.Scheme.Recover(c.crypto.GetPub(), msg, roundCache.Partials(), thr, n)
if err != nil {
c.l.Debugw("", "invalid_recovery", err, "round", pRound, "got", fmt.Sprintf("%d/%d", roundCache.Len(), n))
c.l.Debugw("", "beacon_id", beaconID, "invalid_recovery", err, "round", pRound, "got", fmt.Sprintf("%d/%d", roundCache.Len(), n))
break
}
if err := key.Scheme.VerifyRecovered(c.crypto.GetPub().Commit(), msg, finalSig); err != nil {
c.l.Errorw("", "invalid_sig", err, "round", pRound)
c.l.Errorw("", "beacon_id", beaconID, "invalid_sig", err, "round", pRound)
break
}
cache.FlushRounds(partial.p.GetRound())
Expand All @@ -155,21 +157,21 @@ func (c *chainStore) runAggregator() {
PreviousSig: roundCache.prev,
Signature: finalSig,
}
c.l.Infow("", "aggregated_beacon", newBeacon.Round)
c.l.Infow("", "beacon_id", beaconID, "aggregated_beacon", newBeacon.Round)
if c.tryAppend(lastBeacon, newBeacon) {
lastBeacon = newBeacon
break
}
// XXX store them for lfutur usage if it's a later round than what
// we have
c.l.Debugw("", "new_aggregated", "not_appendable", "last", lastBeacon.String(), "new", newBeacon.String())
c.l.Debugw("", "beacon_id", beaconID, "new_aggregated", "not_appendable", "last", lastBeacon.String(), "new", newBeacon.String())
if c.shouldSync(lastBeacon, newBeacon) {
peers := toPeers(c.crypto.GetGroup().Nodes)
go func() {
// XXX Could do something smarter with context and cancellation
// if we got to the right round
if err := c.sync.Follow(context.Background(), newBeacon.Round, peers); err != nil {
c.l.Debugw("", "chain_store", "unable to follow", "err", err)
c.l.Debugw("", "beacon_id", beaconID, "chain_store", "unable to follow", "err", err)
}
}()
}
Expand All @@ -183,9 +185,11 @@ func (c *chainStore) tryAppend(last, newB *chain.Beacon) bool {
// quick check before trying to compare bytes
return false
}

beaconID := c.conf.Group.ID
if err := c.CallbackStore.Put(newB); err != nil {
// if round is ok but bytes are different, error will be raised
c.l.Errorw("", "chain_store", "error storing beacon", "err", err)
c.l.Errorw("", "beacon_id", beaconID, "chain_store", "error storing beacon", "err", err)
return false
}
select {
Expand All @@ -209,11 +213,14 @@ func (c *chainStore) shouldSync(last *chain.Beacon, newB likeBeacon) bool {
// 0, then it will follow the chain indefinitely. If peers is nil, it uses the
// peers of the current group.
func (c *chainStore) RunSync(ctx context.Context, upTo uint64, peers []net.Peer) {
beaconID := c.conf.Group.ID

if peers == nil {
peers = toPeers(c.crypto.GetGroup().Nodes)
}

if err := c.sync.Follow(ctx, upTo, peers); err != nil {
c.l.Debugw("", "chain_store", "follow_finished", "err", err)
c.l.Debugw("", "beacon_id", beaconID, "chain_store", "follow_finished", "err", err)
}
}

Expand Down
66 changes: 41 additions & 25 deletions chain/beacon/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ var errOutOfRound = "out-of-round beacon request"
// ProcessPartialBeacon receives a request for a beacon partial signature. It
// forwards it to the round manager if it is a valid beacon.
func (h *Handler) ProcessPartialBeacon(c context.Context, p *proto.PartialBeaconPacket) (*proto.Empty, error) {
beaconID := h.conf.Group.ID
addr := net.RemoteAddress(c)
h.l.Debugw("", "received", "request", "from", addr, "round", p.GetRound())
h.l.Debugw("", "beacon_id", beaconID, "received", "request", "from", addr, "round", p.GetRound())

nextRound, _ := chain.NextRound(h.conf.Clock.Now().Unix(), h.conf.Group.Period, h.conf.Group.GenesisTime)
currentRound := nextRound - 1
Expand All @@ -110,7 +111,7 @@ func (h *Handler) ProcessPartialBeacon(c context.Context, p *proto.PartialBeacon
// possible, if a node receives a packet very fast just before his local
// clock passed to the next round
if p.GetRound() > nextRound {
h.l.Errorw("", "process_partial", addr, "invalid_future_round", p.GetRound(), "current_round", currentRound)
h.l.Errorw("", "beacon_id", beaconID, "process_partial", addr, "invalid_future_round", p.GetRound(), "current_round", currentRound)
return nil, fmt.Errorf("invalid round: %d instead of %d", p.GetRound(), currentRound)
}

Expand All @@ -121,21 +122,24 @@ func (h *Handler) ProcessPartialBeacon(c context.Context, p *proto.PartialBeacon
shortPub := h.crypto.GetPub().Eval(1).V.String()[14:19]
// verify if request is valid
if err := key.Scheme.VerifyPartial(h.crypto.GetPub(), msg, p.GetPartialSig()); err != nil {
h.l.Errorw("", "process_partial", addr, "err", err,
h.l.Errorw("", "beacon_id", beaconID,
"process_partial", addr, "err", err,
"prev_sig", shortSigStr(p.GetPreviousSig()),
"curr_round", currentRound,
"msg_sign", shortSigStr(msg),
"short_pub", shortPub)
return nil, err
}
h.l.Debugw("", "process_partial", addr,
h.l.Debugw("", "beacon_id", beaconID,
"process_partial", addr,
"prev_sig", shortSigStr(p.GetPreviousSig()),
"curr_round", currentRound, "msg_sign",
shortSigStr(msg), "short_pub", shortPub,
"status", "OK")
idx, _ := key.Scheme.IndexOf(p.GetPartialSig())
if idx == h.crypto.Index() {
h.l.Errorw("", "process_partial", addr,
h.l.Errorw("", "beacon_id", beaconID,
"process_partial", addr,
"index_got", idx,
"index_our", h.crypto.Index(),
"advance_packet", p.GetRound(),
Expand All @@ -160,8 +164,9 @@ func (h *Handler) Store() chain.Store {
// Round 0 = genesis seed - fixed
// Round 1 starts at genesis time, and is signing over the genesis seed
func (h *Handler) Start() error {
beaconID := h.conf.Group.ID
if h.conf.Clock.Now().Unix() > h.conf.Group.GenesisTime {
h.l.Errorw("", "genesis_time", "past", "call", "catchup")
h.l.Errorw("", "beacon_id", beaconID, "genesis_time", "past", "call", "catchup")
return errors.New("beacon: genesis time already passed. Call Catchup()")
}

Expand All @@ -170,7 +175,7 @@ func (h *Handler) Start() error {
h.Unlock()

_, tTime := chain.NextRound(h.conf.Clock.Now().Unix(), h.conf.Group.Period, h.conf.Group.GenesisTime)
h.l.Infow("", "beacon", "start")
h.l.Infow("", "beacon_id", beaconID, "beacon", "start")
go h.run(tTime)

return nil
Expand All @@ -196,11 +201,12 @@ func (h *Handler) Catchup() {
// randomness. To sync, he contact the nodes listed in the previous group file
// given.
func (h *Handler) Transition(prevGroup *key.Group) error {
beaconID := h.conf.Group.ID
targetTime := h.conf.Group.TransitionTime
tRound := chain.CurrentRound(targetTime, h.conf.Group.Period, h.conf.Group.GenesisTime)
tTime := chain.TimeOfRound(h.conf.Group.Period, h.conf.Group.GenesisTime, tRound)
if tTime != targetTime {
h.l.Fatalw("", "transition_time", "invalid_offset", "expected_time", tTime, "got_time", targetTime)
h.l.Fatalw("", "beacon_id", beaconID, "transition_time", "invalid_offset", "expected_time", tTime, "got_time", targetTime)
return nil
}

Expand All @@ -211,22 +217,23 @@ func (h *Handler) Transition(prevGroup *key.Group) error {
go h.run(targetTime)

// we run the sync up until (inclusive) one round before the transition
h.l.Debugw("", "new_node", "following chain", "to_round", tRound-1)
h.l.Debugw("", "beacon_id", beaconID, "new_node", "following chain", "to_round", tRound-1)
h.chain.RunSync(context.Background(), tRound-1, toPeers(prevGroup.Nodes))

return nil
}

// TransitionNewGroup prepares the node to transition to the new group
func (h *Handler) TransitionNewGroup(newShare *key.Share, newGroup *key.Group) {
beaconID := h.conf.Group.ID
targetTime := newGroup.TransitionTime
tRound := chain.CurrentRound(targetTime, h.conf.Group.Period, h.conf.Group.GenesisTime)
tTime := chain.TimeOfRound(h.conf.Group.Period, h.conf.Group.GenesisTime, tRound)
if tTime != targetTime {
h.l.Fatalw("", "transition_time", "invalid_offset", "expected_time", tTime, "got_time", targetTime)
h.l.Fatalw("", "beacon_id", beaconID, "transition_time", "invalid_offset", "expected_time", tTime, "got_time", targetTime)
return
}
h.l.Debugw("", "transition", "new_group", "at_round", tRound)
h.l.Debugw("", "beacon_id", beaconID, "transition", "new_group", "at_round", tRound)
// register a callback such that when the round happening just before the
// transition is stored, then it switches the current share to the new one
targetRound := tRound - 1
Expand Down Expand Up @@ -279,11 +286,12 @@ func (h *Handler) Reset() {

// run will wait until it is supposed to start
func (h *Handler) run(startTime int64) {
beaconID := h.conf.Group.ID
chanTick := h.ticker.ChannelAt(startTime)
h.l.Debugw("", "run_round", "wait", "until", startTime)
h.l.Debugw("", "beacon_id", beaconID, "run_round", "wait", "until", startTime)

var current roundInfo
setRunnig := sync.Once{}
setRunning := sync.Once{}

h.Lock()
h.running = true
Expand All @@ -293,18 +301,18 @@ func (h *Handler) run(startTime int64) {
select {
case current = <-chanTick:

setRunnig.Do(func() {
setRunning.Do(func() {
h.Lock()
h.serving = true
h.Unlock()
})

lastBeacon, err := h.chain.Last()
if err != nil {
h.l.Errorw("", "beacon_loop", "loading_last", "err", err)
h.l.Errorw("", "beacon_id", beaconID, "beacon_loop", "loading_last", "err", err)
break
}
h.l.Debugw("", "beacon_loop", "new_round", "round", current.round, "lastbeacon", lastBeacon.Round)
h.l.Debugw("", "beacon_id", beaconID, "beacon_loop", "new_round", "round", current.round, "lastbeacon", lastBeacon.Round)
h.broadcastNextPartial(current, lastBeacon)
// if the next round of the last beacon we generated is not the round we
// are now, that means there is a gap between the two rounds. In other
Expand All @@ -317,7 +325,7 @@ func (h *Handler) run(startTime int64) {
// network conditions allow for it.
// XXX find a way to start the catchup as soon as the runsync is
// done. Not critical but leads to faster network recovery.
h.l.Debugw("", "beacon_loop", "run_sync_catchup", "last_is", lastBeacon, "should_be", current.round)
h.l.Debugw("", "beacon_id", beaconID, "beacon_loop", "run_sync_catchup", "last_is", lastBeacon, "should_be", current.round)
go h.chain.RunSync(context.Background(), current.round, nil)
}
case b := <-h.chain.AppendedBeaconNoSync():
Expand All @@ -337,7 +345,7 @@ func (h *Handler) run(startTime int64) {
}(current, b)
}
case <-h.close:
h.l.Debugw("", "beacon_loop", "finished")
h.l.Debugw("", "beacon_id", beaconID, "beacon_loop", "finished")
return
}
}
Expand All @@ -347,6 +355,7 @@ func (h *Handler) broadcastNextPartial(current roundInfo, upon *chain.Beacon) {
ctx := context.Background()
previousSig := upon.Signature
round := upon.Round + 1
beaconID := h.conf.Group.ID
if current.round == upon.Round {
// we already have the beacon of the current round for some reasons - on
// CI it happens due to time shifts -
Expand All @@ -361,29 +370,32 @@ func (h *Handler) broadcastNextPartial(current roundInfo, upon *chain.Beacon) {

currSig, err := h.crypto.SignPartial(msg)
if err != nil {
h.l.Fatal("beacon_round", "err creating signature", "err", err, "round", round)
h.l.Fatal("beacon_id", beaconID, "beacon_round", "err creating signature", "err", err, "round", round)
return
}
h.l.Debugw("", "broadcast_partial", round, "from_prev_sig", shortSigStr(previousSig), "msg_sign", shortSigStr(msg))
h.l.Debugw("", "beacon_id", beaconID, "broadcast_partial", round, "from_prev_sig", shortSigStr(previousSig), "msg_sign", shortSigStr(msg))
metadata := common.NewMetadata(h.version.ToProto())
metadata.BeaconID = beaconID

packet := &proto.PartialBeaconPacket{
Round: round,
PreviousSig: previousSig,
PartialSig: currSig,
Metadata: metadata,
}

h.chain.NewValidPartial(h.addr, packet)
for _, id := range h.crypto.GetGroup().Nodes {
if h.addr == id.Address() {
continue
}
go func(i *key.Identity) {
h.l.Debugw("", "beacon_round", round, "send_to", i.Address())
h.l.Debugw("", "beacon_id", beaconID, "beacon_round", round, "send_to", i.Address())
err := h.client.PartialBeacon(ctx, i, packet)
if err != nil {
h.l.Errorw("", "beacon_round", round, "err_request", err, "from", i.Address())
h.l.Errorw("", "beacon_id", beaconID, "beacon_round", round, "err_request", err, "from", i.Address())
if strings.Contains(err.Error(), errOutOfRound) {
h.l.Errorw("", "beacon_round", round, "node", i.Addr, "reply", "out-of-round")
h.l.Errorw("", "beacon_id", beaconID, "beacon_round", round, "node", i.Addr, "reply", "out-of-round")
}
return
}
Expand All @@ -404,21 +416,25 @@ func (h *Handler) Stop() {
h.chain.Stop()
h.ticker.Stop()

beaconID := h.conf.Group.ID
h.stopped = true
h.running = false
h.l.Infow("", "beacon", "stop")
h.l.Infow("", "beacon_id", beaconID, "beacon", "stop")
}

// StopAt will stop the handler at the given time. It is useful when
// transitionining for a resharing.
func (h *Handler) StopAt(stopTime int64) error {
now := h.conf.Clock.Now().Unix()
beaconID := h.conf.Group.ID

if stopTime <= now {
// actually we can stop in the present but with "Stop"
return errors.New("can't stop in the past or present")
}
duration := time.Duration(stopTime-now) * time.Second
h.l.Debugw("", "stop_at", stopTime, "sleep_for", duration.Seconds())

h.l.Debugw("", "beacon_id", beaconID, "stop_at", stopTime, "sleep_for", duration.Seconds())
h.conf.Clock.Sleep(duration)
h.Stop()
return nil
Expand Down
Loading

0 comments on commit f2ee959

Please sign in to comment.