Skip to content

Commit

Permalink
Porting the SyncManager code
Browse files Browse the repository at this point in the history
  • Loading branch information
AnomalRoil committed Mar 30, 2022
1 parent 2b96619 commit cfebfb3
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 304 deletions.
31 changes: 14 additions & 17 deletions chain/beacon/chain.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package beacon

import (
"context"
"fmt"

"github.com/drand/drand/chain"
Expand All @@ -24,7 +23,7 @@ type chainStore struct {
l log.Logger
conf *Config
client net.ProtocolClient
sync Syncer
syncm *SyncManager
verifier *chain.Verifier
crypto *cryptoStore
ticker *ticker
Expand All @@ -51,18 +50,24 @@ func newChainStore(l log.Logger, cf *Config, cl net.ProtocolClient, c *cryptoSto
// we can register callbacks on it
cbs := NewCallbackStore(ds)

// we give the final append store to the syncer
syncer := NewSyncer(l, cbs, c.chain, cl)
// we give the final append store to the sync manager
syncm := NewSyncManager(&SyncConfig{
Log: l,
Store: cbs,
Info: c.chain,
Client: cl,
Clock: cf.Clock,
})
go syncm.Run()

//
verifier := chain.NewVerifier(cf.Group.Scheme)

cs := &chainStore{
CallbackStore: cbs,
l: l,
conf: cf,
client: cl,
sync: syncer,
syncm: syncm,
verifier: verifier,
crypto: c,
ticker: t,
Expand Down Expand Up @@ -176,13 +181,7 @@ func (c *chainStore) runAggregator() {
c.l.Debugw("", "new_aggregated", "not_appendable", "last", lastBeacon.String(), "new", newBeacon.String())
if c.shouldSync(lastBeacon, newBeacon) {
peers := toPeers(c.crypto.GetGroup().Nodes)
go func() {
// XXX Could do something smarter with context and cancellation
// if we got to the right round
if err := c.sync.Follow(context.Background(), newBeacon.Round, peers); err != nil {
c.l.Debugw("", "chain_store", "unable to follow", "err", err)
}
}()
c.syncm.RequestSync(peers, newBeacon.Round)
}
break
}
Expand Down Expand Up @@ -220,14 +219,12 @@ func (c *chainStore) shouldSync(last *chain.Beacon, newB likeBeacon) bool {
// RunSync will sync up with other nodes and fill the store. If upTo is equal to
// 0, then it will follow the chain indefinitely. If peers is nil, it uses the
// peers of the current group.
func (c *chainStore) RunSync(ctx context.Context, upTo uint64, peers []net.Peer) {
func (c *chainStore) RunSync(upTo uint64, peers []net.Peer) {
if peers == nil {
peers = toPeers(c.crypto.GetGroup().Nodes)
}

if err := c.sync.Follow(ctx, upTo, peers); err != nil {
c.l.Debugw("", "chain_store", "follow_finished", "err", err)
}
c.syncm.RequestSync(peers, upTo)
}

func (c *chainStore) AppendedBeaconNoSync() chan *chain.Beacon {
Expand Down
13 changes: 4 additions & 9 deletions chain/beacon/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (h *Handler) ProcessPartialBeacon(c context.Context, p *proto.PartialBeacon
}

// Store returns the store associated with this beacon handler
func (h *Handler) Store() chain.Store {
func (h *Handler) Store() CallbackStore {
return h.chain
}

Expand Down Expand Up @@ -194,7 +194,7 @@ func (h *Handler) Catchup() {

nRound, tTime := chain.NextRound(h.conf.Clock.Now().Unix(), h.conf.Group.Period, h.conf.Group.GenesisTime)
go h.run(tTime)
h.chain.RunSync(context.Background(), nRound, nil)
h.chain.RunSync(nRound, nil)
}

// Transition makes this beacon continuously sync until the time written in the
Expand All @@ -218,7 +218,7 @@ func (h *Handler) Transition(prevGroup *key.Group) error {

// we run the sync up until (inclusive) one round before the transition
h.l.Debugw("", "new_node", "following chain", "to_round", tRound-1)
h.chain.RunSync(context.Background(), tRound-1, toPeers(prevGroup.Nodes))
h.chain.RunSync(tRound-1, toPeers(prevGroup.Nodes))

return nil
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func (h *Handler) run(startTime int64) {
// XXX find a way to start the catchup as soon as the runsync is
// done. Not critical but leads to faster network recovery.
h.l.Debugw("", "beacon_loop", "run_sync_catchup", "last_is", lastBeacon, "should_be", current.round)
go h.chain.RunSync(context.Background(), current.round, nil)
h.chain.RunSync(current.round, nil)
}
case b := <-h.chain.AppendedBeaconNoSync():
if b.Round < current.round {
Expand Down Expand Up @@ -452,11 +452,6 @@ func (h *Handler) GetConfg() *Config {
return h.conf
}

// SyncChain is a proxy method to sync a chain
func (h *Handler) SyncChain(req *proto.SyncRequest, stream proto.Protocol_SyncChainServer) error {
return h.chain.sync.SyncChain(req, stream)
}

func shortSigStr(sig []byte) string {
max := 3
if len(sig) < max {
Expand Down
3 changes: 2 additions & 1 deletion chain/beacon/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (t *testBeaconServer) SyncChain(req *drand.SyncRequest, p drand.Protocol_Sy
if t.disable {
return errors.New("disabled server")
}
return t.h.chain.sync.SyncChain(req, p)
SyncChain(t.h.l, t.h.chain, req, p)
return nil
}

func dkgShares(_ *testing.T, n, t int) ([]*key.Share, []kyber.Point) {
Expand Down
208 changes: 0 additions & 208 deletions chain/beacon/sync.go

This file was deleted.

Loading

0 comments on commit cfebfb3

Please sign in to comment.