Skip to content

Commit

Permalink
Stream management (#798)
Browse files Browse the repository at this point in the history
* Porting the SyncManager code
* Properly stopping the SyncManager
* Skip our own node for sync
* Fixing Generate on our CI
* Updating protoc too

Co-authored-by: Yolan Romailler <yolan.romailler@protocol.ai>
  • Loading branch information
nikkolasg and AnomalRoil committed Apr 1, 2022
1 parent d823174 commit def4c1b
Show file tree
Hide file tree
Showing 22 changed files with 431 additions and 317 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/generate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ jobs:
- name: Install Protoc
uses: arduino/setup-protoc@v1.1.2
with:
version: '3.17.x'
version: '3.19.4'
- name: Install Protoc-gen-go
run: |
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.27.1
go get google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1.0
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0
- name: Generate
run: go generate ./...&& go mod tidy
- name: Check
Expand Down
36 changes: 17 additions & 19 deletions chain/beacon/chain.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package beacon

import (
"context"
"fmt"

"github.com/drand/drand/chain"
Expand All @@ -24,7 +23,7 @@ type chainStore struct {
l log.Logger
conf *Config
client net.ProtocolClient
sync Syncer
syncm *SyncManager
verifier *chain.Verifier
crypto *cryptoStore
ticker *ticker
Expand All @@ -51,18 +50,25 @@ func newChainStore(l log.Logger, cf *Config, cl net.ProtocolClient, c *cryptoSto
// we can register callbacks on it
cbs := NewCallbackStore(ds)

// we give the final append store to the syncer
syncer := NewSyncer(l, cbs, c.chain, cl)
// we give the final append store to the sync manager
syncm := NewSyncManager(&SyncConfig{
Log: l,
Store: cbs,
Info: c.chain,
Client: cl,
Clock: cf.Clock,
NodeAddr: cf.Public.Address(),
})
go syncm.Run()

//
verifier := chain.NewVerifier(cf.Group.Scheme)

cs := &chainStore{
CallbackStore: cbs,
l: l,
conf: cf,
client: cl,
sync: syncer,
syncm: syncm,
verifier: verifier,
crypto: c,
ticker: t,
Expand All @@ -89,6 +95,7 @@ func (c *chainStore) NewValidPartial(addr string, p *drand.PartialBeaconPacket)
}

func (c *chainStore) Stop() {
c.syncm.Stop()
c.CallbackStore.Close()
close(c.done)
}
Expand Down Expand Up @@ -171,18 +178,11 @@ func (c *chainStore) runAggregator() {
lastBeacon = newBeacon
break
}
// XXX store them for lfutur usage if it's a later round than what
// we have
// XXX store them for future usage if it's a later round than what we have
c.l.Debugw("", "new_aggregated", "not_appendable", "last", lastBeacon.String(), "new", newBeacon.String())
if c.shouldSync(lastBeacon, newBeacon) {
peers := toPeers(c.crypto.GetGroup().Nodes)
go func() {
// XXX Could do something smarter with context and cancellation
// if we got to the right round
if err := c.sync.Follow(context.Background(), newBeacon.Round, peers); err != nil {
c.l.Debugw("", "chain_store", "unable to follow", "err", err)
}
}()
c.syncm.RequestSync(peers, newBeacon.Round)
}
break
}
Expand Down Expand Up @@ -221,14 +221,12 @@ func (c *chainStore) shouldSync(last *chain.Beacon, newB likeBeacon) bool {
// RunSync will sync up with other nodes and fill the store. If upTo is equal to
// 0, then it will follow the chain indefinitely. If peers is nil, it uses the
// peers of the current group.
func (c *chainStore) RunSync(ctx context.Context, upTo uint64, peers []net.Peer) {
func (c *chainStore) RunSync(upTo uint64, peers []net.Peer) {
if peers == nil {
peers = toPeers(c.crypto.GetGroup().Nodes)
}

if err := c.sync.Follow(ctx, upTo, peers); err != nil {
c.l.Debugw("", "chain_store", "follow_finished", "err", err)
}
c.syncm.RequestSync(peers, upTo)
}

func (c *chainStore) AppendedBeaconNoSync() chan *chain.Beacon {
Expand Down
13 changes: 4 additions & 9 deletions chain/beacon/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (h *Handler) ProcessPartialBeacon(c context.Context, p *proto.PartialBeacon
}

// Store returns the store associated with this beacon handler
func (h *Handler) Store() chain.Store {
func (h *Handler) Store() CallbackStore {
return h.chain
}

Expand Down Expand Up @@ -194,7 +194,7 @@ func (h *Handler) Catchup() {

nRound, tTime := chain.NextRound(h.conf.Clock.Now().Unix(), h.conf.Group.Period, h.conf.Group.GenesisTime)
go h.run(tTime)
h.chain.RunSync(context.Background(), nRound, nil)
h.chain.RunSync(nRound, nil)
}

// Transition makes this beacon continuously sync until the time written in the
Expand All @@ -218,7 +218,7 @@ func (h *Handler) Transition(prevGroup *key.Group) error {

// we run the sync up until (inclusive) one round before the transition
h.l.Debugw("", "new_node", "following chain", "to_round", tRound-1)
h.chain.RunSync(context.Background(), tRound-1, toPeers(prevGroup.Nodes))
h.chain.RunSync(tRound-1, toPeers(prevGroup.Nodes))

return nil
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func (h *Handler) run(startTime int64) {
// XXX find a way to start the catchup as soon as the runsync is
// done. Not critical but leads to faster network recovery.
h.l.Debugw("", "beacon_loop", "run_sync_catchup", "last_is", lastBeacon, "should_be", current.round)
go h.chain.RunSync(context.Background(), current.round, nil)
h.chain.RunSync(current.round, nil)
}
case b := <-h.chain.AppendedBeaconNoSync():
h.l.Debugw("", "beacon_loop", "catchupmode", "last_is", b.Round, "current", current.round, "catchup_launch", b.Round < current.round)
Expand Down Expand Up @@ -455,11 +455,6 @@ func (h *Handler) GetConfg() *Config {
return h.conf
}

// SyncChain is a proxy method to sync a chain
func (h *Handler) SyncChain(req *proto.SyncRequest, stream proto.Protocol_SyncChainServer) error {
return h.chain.sync.SyncChain(req, stream)
}

func shortSigStr(sig []byte) string {
max := 3
if len(sig) < max {
Expand Down
3 changes: 2 additions & 1 deletion chain/beacon/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (t *testBeaconServer) SyncChain(req *drand.SyncRequest, p drand.Protocol_Sy
if t.disable {
return errors.New("disabled server")
}
return t.h.chain.sync.SyncChain(req, p)
SyncChain(t.h.l, t.h.chain, req, p)
return nil
}

func dkgShares(_ *testing.T, n, t int) ([]*key.Share, []kyber.Point) {
Expand Down
208 changes: 0 additions & 208 deletions chain/beacon/sync.go

This file was deleted.

Loading

0 comments on commit def4c1b

Please sign in to comment.