-
Notifications
You must be signed in to change notification settings - Fork 16
/
sync_head.go
210 lines (188 loc) · 7.54 KB
/
sync_head.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package sync
import (
"context"
"errors"
"time"
"github.com/celestiaorg/go-header"
)
// headRequestTimeout is the amount of time the syncer is willing to wait for
// the exchange to request the head of the chain from the network.
var headRequestTimeout = time.Second * 2
// Head returns the Network Head.
//
// Known subjective head is considered network head if it is recent enough(now-timestamp<=blocktime)
// Otherwise, we attempt to request recent network head from a trusted peer and
// set as the new subjective head, assuming that trusted peer is always fully synced.
//
// The request is limited with 2 seconds and otherwise potentially unrecent header is returned.
func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, error) {
sbjHead, err := s.subjectiveHead(ctx)
if err != nil {
return sbjHead, err
}
// if subjective header is recent enough (relative to the network's block time) - just use it
if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) {
return sbjHead, nil
}
// single-flight protection ensure only one Head is requested at the time
if !s.getter.Lock() {
// means that other routine held the lock and set the subjective head for us,
// so just recursively get it
return s.Head(ctx)
}
defer s.getter.Unlock()
s.metrics.outdatedHead(s.ctx)
reqCtx, cancel := context.WithTimeout(ctx, headRequestTimeout)
defer cancel()
netHead, err := s.getter.Head(reqCtx, header.WithTrustedHead[H](sbjHead))
if err != nil {
log.Warnw("failed to get recent head, returning current subjective", "sbjHead", sbjHead.Height(), "err", err)
return s.subjectiveHead(ctx)
}
// process and validate netHead fetched from trusted peers
// NOTE: We could trust the netHead like we do during 'automatic subjective initialization'
// but in this case our subjective head is not expired, so we should verify netHead
// and only if it is valid, set it as new head
_ = s.incomingNetworkHead(ctx, netHead)
// netHead was either accepted or rejected as the new subjective
// anyway return most current known subjective head
return s.subjectiveHead(ctx)
}
// subjectiveHead returns the latest known local header that is not expired(within trusting period).
// If the header is expired, it is retrieved from a trusted peer without validation;
// in other words, an automatic subjective initialization is performed.
func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) {
// pending head is the latest known subjective head and sync target, so try to get it
// NOTES:
// * Empty when no sync is in progress
// * Pending cannot be expired, guaranteed
pendHead := s.pending.Head()
if !pendHead.IsZero() {
return pendHead, nil
}
// if pending is empty - get the latest stored/synced head
storeHead, err := s.store.Head(ctx)
if err != nil {
return storeHead, err
}
// check if the stored header is not expired and use it
if !isExpired(storeHead, s.Params.TrustingPeriod) {
return storeHead, nil
}
// otherwise, request head from a trusted peer
log.Infow("stored head header expired", "height", storeHead.Height())
// single-flight protection
// ensure only one Head is requested at the time
if !s.getter.Lock() {
// means that other routine held the lock and set the subjective head for us,
// so just recursively get it
return s.subjectiveHead(ctx)
}
defer s.getter.Unlock()
trustHead, err := s.getter.Head(ctx)
if err != nil {
return trustHead, err
}
s.metrics.subjectiveInitialization(s.ctx)
// and set it as the new subjective head without validation,
// or, in other words, do 'automatic subjective initialization'
// NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack
s.setSubjectiveHead(ctx, trustHead)
switch {
default:
log.Infow("subjective initialization finished", "height", trustHead.Height())
return trustHead, nil
case isExpired(trustHead, s.Params.TrustingPeriod):
log.Warnw("subjective initialization with an expired header", "height", trustHead.Height())
case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold):
log.Warnw("subjective initialization with an old header", "height", trustHead.Height())
}
log.Warn("trusted peer is out of sync")
s.metrics.trustedPeersOutOufSync(s.ctx)
return trustHead, nil
}
// setSubjectiveHead takes already validated head and sets it as the new sync target.
func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) {
// TODO(@Wondertan): Right now, we can only store adjacent headers, instead we should:
// * Allow storing any valid header here in Store
// * Remove ErrNonAdjacent
// * Remove writeHead from the canonical store implementation
err := s.store.Append(ctx, netHead)
var nonAdj *errNonAdjacent
if err != nil && !errors.As(err, &nonAdj) {
// might be a storage error or something else, but we can still try to continue processing netHead
log.Errorw("storing new network header",
"height", netHead.Height(),
"hash", netHead.Hash().String(),
"err", err)
}
s.metrics.newSubjectiveHead(s.ctx, netHead.Height(), netHead.Time())
storeHead, err := s.store.Head(ctx)
if err == nil && storeHead.Height() >= netHead.Height() {
// we already synced it up - do nothing
return
}
// and if valid, set it as new subjective head
s.pending.Add(netHead)
s.wantSync()
log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash())
}
// incomingNetworkHead processes new potential network headers.
// If the header valid, sets as new subjective header.
func (s *Syncer[H]) incomingNetworkHead(ctx context.Context, head H) error {
// ensure there is no racing between network head candidates
s.incomingMu.Lock()
defer s.incomingMu.Unlock()
softFailure, err := s.verify(ctx, head)
if err != nil && !softFailure {
return err
}
// TODO(@Wondertan):
// Implement setSyncTarget and use it for soft failures
s.setSubjectiveHead(ctx, head)
return err
}
// verify verifies given network head candidate.
func (s *Syncer[H]) verify(ctx context.Context, newHead H) (bool, error) {
sbjHead, err := s.subjectiveHead(ctx)
if err != nil {
log.Errorw("getting subjective head during validation", "err", err)
// local error, so soft
return true, &header.VerifyError{Reason: err, SoftFailure: true}
}
var heightThreshold uint64
if s.Params.TrustingPeriod != 0 && s.Params.blockTime != 0 {
buffer := time.Hour * 48 / s.Params.blockTime // generous buffer to account for variable block time
heightThreshold = uint64(s.Params.TrustingPeriod/s.Params.blockTime + buffer)
}
err = header.Verify(sbjHead, newHead, heightThreshold)
if err == nil {
return false, nil
}
var verErr *header.VerifyError
if errors.As(err, &verErr) && !verErr.SoftFailure {
logF := log.Warnw
if errors.Is(err, header.ErrKnownHeader) {
logF = log.Debugw
}
logF("invalid network header",
"height_of_invalid", newHead.Height(),
"hash_of_invalid", newHead.Hash(),
"height_of_subjective", sbjHead.Height(),
"hash_of_subjective", sbjHead.Hash(),
"reason", verErr.Reason)
}
return verErr.SoftFailure, err
}
// isExpired checks if header is expired against trusting period.
func isExpired[H header.Header[H]](header H, period time.Duration) bool {
expirationTime := header.Time().Add(period)
return expirationTime.Before(time.Now())
}
// isRecent checks if header is recent against the given recency threshold.
func isRecent[H header.Header[H]](header H, blockTime, recencyThreshold time.Duration) bool {
if recencyThreshold == 0 {
recencyThreshold = blockTime * 2 // allow some drift by adding additional buffer of 2 blocks
}
return time.Since(header.Time()) <= recencyThreshold
}