Skip to content

Commit

Permalink
feat(service/header): implement header ranges to cache headers rcvd f…
Browse files Browse the repository at this point in the history
…rom pubsub and massive todo list
  • Loading branch information
Wondertan committed Jan 12, 2022
1 parent e301f41 commit 2541ac2
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 24 deletions.
4 changes: 4 additions & 0 deletions service/header/p2p_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func (ex *P2PExchange) RequestByHash(ctx context.Context, hash tmbytes.HexBytes)
}

func (ex *P2PExchange) performRequest(ctx context.Context, req *pb.ExtendedHeaderRequest) ([]*ExtendedHeader, error) {
if req.Amount == 0 {
return make([]*ExtendedHeader, 0), nil
}

select {
case <-ctx.Done():
return nil, ctx.Err()
Expand Down
225 changes: 201 additions & 24 deletions service/header/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ import (
tmbytes "github.com/tendermint/tendermint/libs/bytes"
)

// TODO:
// 1. Sync protocol for peers to exchange their local heads on connect
// 2. If we are far from peers but within an unbonding period - trigger sync automatically
// 3. If we are beyond the unbonding period - request Local Head + 1 header from trusted and hardcoded peer
// automatically and continue sync until know head.
// 4. Limit amount of requests on server side
// 5. Sync status and till sync is done.
// 6. Cache even unverifiable headers from the future, but don't resend them with ignore.
// 7. Retry requesting headers
// 8. Hardcode genesisHash

// Syncer implements simplest possible synchronization for headers.
type Syncer struct {
sub *P2PSubscriber
Expand All @@ -24,9 +35,8 @@ type Syncer struct {
inProgress uint64
triggerSync chan struct{}

// pending keeps valid headers rcvd from the network awaiting to be appended to store
pending map[uint64]*ExtendedHeader
pendingLk sync.Mutex
// pending keeps ranges of valid headers rcvd from the network awaiting to be appended to store
pending *ranges

ctx context.Context
cancel context.CancelFunc
Expand All @@ -39,8 +49,8 @@ func NewSyncer(exchange Exchange, store Store, sub *P2PSubscriber, trusted tmbyt
exchange: exchange,
store: store,
trusted: trusted,
triggerSync: make(chan struct{}), // no buffer needed
pending: make(map[uint64]*ExtendedHeader),
triggerSync: make(chan struct{}),
pending: newRanges(),
}
}

Expand All @@ -55,13 +65,21 @@ func (s *Syncer) Start(context.Context) error {

s.ctx, s.cancel = context.WithCancel(context.Background())
go s.syncLoop()
s.wantSync()
s.mustSync()
return nil
}

func (s *Syncer) wantSync() {
select {
case s.triggerSync <- struct{}{}:
default:
}
}

func (s *Syncer) mustSync() {
select {
case s.triggerSync <- struct{}{}:
case <-s.ctx.Done():
}
}

Expand All @@ -83,31 +101,30 @@ func (s *Syncer) sync(ctx context.Context) {
s.syncInProgress()
// when method returns, toggle inProgress off
defer s.finishSync()
// TODO(@Wondertan): Retry logic
for {
if ctx.Err() != nil {
return
}

localHead, err := s.getHead(ctx)
sbjHead, err := s.subjectiveHead(ctx)
if err != nil {
log.Errorw("getting head", "err", err)
log.Errorw("getting subjective head", "err", err)
return
}

netHead, err := s.exchange.RequestHead(ctx)
objHead, err := s.objectiveHead(ctx, sbjHead)
if err != nil {
log.Errorw("requesting network head", "err", err)
log.Errorw("getting objective head", "err", err)
return
}

if localHead.Height >= netHead.Height {
if sbjHead.Height >= objHead.Height {
// we are now synced
log.Info("synced headers")
return
}

err = s.syncDiff(ctx, localHead, netHead)
err = s.syncDiff(ctx, sbjHead, objHead)
if err != nil {
log.Errorw("syncing headers", "err", err)
return
Expand Down Expand Up @@ -161,13 +178,10 @@ func (s *Syncer) validate(ctx context.Context, p peer.ID, msg *pubsub.Message) p
}

if maybeHead.Height > localHead.Height+1 {
// we might be missing some headers, so trigger sync to catch-up
// we might be missing some headers, so save verified header to pending cache
s.pending.Add(maybeHead)
// and trigger sync to catch-up
s.wantSync()
// and save verified header for later
// TODO(@Wondertan): If we sync from scratch a year of headers, this may blow up memory
s.pendingLk.Lock()
s.pending[uint64(maybeHead.Height)] = maybeHead
s.pendingLk.Unlock()
// accepted
return pubsub.ValidationAccept
}
Expand All @@ -187,8 +201,8 @@ func (s *Syncer) validate(ctx context.Context, p peer.ID, msg *pubsub.Message) p
return pubsub.ValidationAccept
}

// getHead tries to get head locally and if not exists requests trusted hash.
func (s *Syncer) getHead(ctx context.Context) (*ExtendedHeader, error) {
// subjectiveHead tries to get head locally and if not exists requests by trusted hash.
func (s *Syncer) subjectiveHead(ctx context.Context) (*ExtendedHeader, error) {
head, err := s.store.Head(ctx)
switch err {
case nil:
Expand All @@ -213,9 +227,19 @@ func (s *Syncer) getHead(ctx context.Context) (*ExtendedHeader, error) {
return nil, err
}

// objectiveHead gets the objective network head.
func (s *Syncer) objectiveHead(ctx context.Context, sbj *ExtendedHeader) (*ExtendedHeader, error) {
phead := s.pending.Head()
if phead != nil && phead.Height >= sbj.Height {
return phead, nil
}

return s.exchange.RequestHead(ctx)
}

// TODO(@Wondertan): Number of headers that can be requested at once. Either make this configurable or,
// find a proper rationale for constant.
var requestSize uint64 = 128
// find a proper rationale for constant.
var requestSize uint64 = 256

// syncDiff requests headers from knownHead up to new head.
func (s *Syncer) syncDiff(ctx context.Context, knownHead, newHead *ExtendedHeader) error {
Expand All @@ -226,7 +250,7 @@ func (s *Syncer) syncDiff(ctx context.Context, knownHead, newHead *ExtendedHeade
amount = requestSize
}

headers, err := s.exchange.RequestHeaders(ctx, start, amount)
headers, err := s.getHeaders(ctx, start, amount)
if err != nil {
return err
}
Expand All @@ -241,3 +265,156 @@ func (s *Syncer) syncDiff(ctx context.Context, knownHead, newHead *ExtendedHeade

return s.store.Append(ctx, newHead)
}

// getHeaders gets headers from either remote peers or from local cached of headers rcvd by PubSub
func (s *Syncer) getHeaders(ctx context.Context, start, amount uint64) ([]*ExtendedHeader, error) {
// short-circuit if nothing in pending cache to avoid unnecessary allocation below
if _, ok := s.pending.NextWithin(start, start+amount); !ok {
return s.exchange.RequestHeaders(ctx, start, amount)
}

end, out := start+amount, make([]*ExtendedHeader, 0, amount)
for start < end {
// if we have some range cached - use it
if r, ok := s.pending.NextWithin(start, end); ok {
// first, request everything between start and found range
hs, err := s.exchange.RequestHeaders(ctx, start, r.Start-start)
if err != nil {
return nil, err
}
out = append(out, hs...)
start += uint64(len(hs))

// than, apply cached range
cached := r.Before(end)
out = append(out, cached...)
start += uint64(len(cached))

// repeat, as there might be multiple caches
continue
}

// fetch the leftovers
hs, err := s.exchange.RequestHeaders(ctx, start, start-end)
if err != nil {
return nil, err
}

return append(out, hs...), nil
}

return out, nil
}

type ranges struct {
head *ExtendedHeader
ranges []*_range
lk sync.Mutex // no need for RWMutex as there is only one reader
}

func newRanges() *ranges {
return new(ranges)
}

func (rs *ranges) Head() *ExtendedHeader {
rs.lk.Lock()
defer rs.lk.Unlock()
return rs.head
}

func (rs *ranges) Add(h *ExtendedHeader) {
rs.lk.Lock()
defer rs.lk.Unlock()

// short-circuit if header is from the past
if rs.head.Height >= h.Height {
// TODO(@Wondertan): Technically, we can still apply the header:
// * Headers here are verified, so we can trust them
// * PubSub does not guarantee the ordering of msgs
// * So there might be a case where ordering is broken
// * Even considering the delay(block time) with which new headers are generated
// * But rarely
// Would be still nice to implement
log.Warnf("rcvd headers in wrong order")
return
}

// if the new header is adjacent to head
if h.Height == rs.head.Height+1 {
// append it to the last known range
rs.ranges[len(rs.ranges)-1].Append(h)
} else {
// otherwise, start a new range
rs.ranges = append(rs.ranges, newRange(h))

// it is possible to miss a header or few from PubSub, due to quick disconnects or sleep
// once we start rcving them again we save those in new range
// so 'Syncer.getHeaders' can fetch what was missed
}

// we know the new header is higher than head, so it as a new head
rs.head = h
}

func (rs *ranges) NextWithin(start, end uint64) (*_range, bool) {
r, ok := rs.Next()
if !ok {
return nil, false
}

if r.Start >= start && r.Start < end {
return r, true
}

return nil, false
}

func (rs *ranges) Next() (*_range, bool) {
rs.lk.Lock()
defer rs.lk.Unlock()

for {
if len(rs.ranges) == 0 {
return nil, false
}

out := rs.ranges[0]
if !out.Empty() {
return out, true
}

rs.ranges = rs.ranges[1:]
}
}

type _range struct {
Start uint64

headers []*ExtendedHeader
}

func newRange(h *ExtendedHeader) *_range {
return &_range{
Start: uint64(h.Height),
headers: []*ExtendedHeader{h},
}
}

func (r *_range) Append(h ...*ExtendedHeader) {
r.headers = append(r.headers, h...)
}

func (r *_range) Empty() bool {
return len(r.headers) == 0
}

func (r *_range) Before(end uint64) []*ExtendedHeader {
amnt := uint64(len(r.headers))
if r.Start+amnt > end {
amnt = end - r.Start
}

out := r.headers[:amnt-1]
r.headers = r.headers[amnt:]
return out
}

0 comments on commit 2541ac2

Please sign in to comment.