-
Notifications
You must be signed in to change notification settings - Fork 900
/
heightsub.go
122 lines (107 loc) · 3.6 KB
/
heightsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package store
import (
"context"
"errors"
"sync"
"sync/atomic"
"github.com/celestiaorg/celestia-node/libs/header"
)
// errElapsedHeight is thrown when a requested height was already provided to heightSub.
var errElapsedHeight = errors.New("elapsed height")
// heightSub provides a minimalistic mechanism to wait till header for a height becomes available.
type heightSub[H header.Header] struct {
// height refers to the latest locally available header height
// that has been fully verified and inserted into the subjective chain
height uint64 // atomic
heightReqsLk sync.Mutex
heightReqs map[uint64][]chan H
}
// newHeightSub instantiates new heightSub.
func newHeightSub[H header.Header]() *heightSub[H] {
return &heightSub[H]{
heightReqs: make(map[uint64][]chan H),
}
}
// Height reports current height.
func (hs *heightSub[H]) Height() uint64 {
return atomic.LoadUint64(&hs.height)
}
// SetHeight sets the new head height for heightSub.
func (hs *heightSub[H]) SetHeight(height uint64) {
atomic.StoreUint64(&hs.height, height)
}
// Sub subscribes for a header of a given height.
// It can return errElapsedHeight, which means a requested header was already provided
// and caller should get it elsewhere.
func (hs *heightSub[H]) Sub(ctx context.Context, height uint64) (H, error) {
var zero H
if hs.Height() >= height {
return zero, errElapsedHeight
}
hs.heightReqsLk.Lock()
if hs.Height() >= height {
// This is a rare case we have to account for.
// The lock above can park a goroutine long enough for hs.height to change for a requested height,
// leaving the request never fulfilled and the goroutine deadlocked.
hs.heightReqsLk.Unlock()
return zero, errElapsedHeight
}
resp := make(chan H, 1)
hs.heightReqs[height] = append(hs.heightReqs[height], resp)
hs.heightReqsLk.Unlock()
select {
case resp := <-resp:
return resp, nil
case <-ctx.Done():
// no need to keep the request, if the op is canceled
hs.heightReqsLk.Lock()
delete(hs.heightReqs, height)
hs.heightReqsLk.Unlock()
return zero, ctx.Err()
}
}
// Pub processes all the outstanding subscriptions matching the given headers.
// Pub is only safe when called from one goroutine.
// For Pub to work correctly, heightSub has to be initialized with SetHeight
// so that given headers are contiguous to the height on heightSub.
func (hs *heightSub[H]) Pub(headers ...H) {
ln := len(headers)
if ln == 0 {
return
}
height := hs.Height()
from, to := uint64(headers[0].Height()), uint64(headers[ln-1].Height())
if height+1 != from {
log.Fatal("PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order")
return
}
hs.SetHeight(to)
hs.heightReqsLk.Lock()
defer hs.heightReqsLk.Unlock()
// there is a common case where we Pub only header
// in this case, we shouldn't loop over each heightReqs
// and instead read from the map directly
if ln == 1 {
reqs, ok := hs.heightReqs[from]
if ok {
for _, req := range reqs {
req <- headers[0] // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, from)
}
return
}
// instead of looping over each header in 'headers', we can loop over each request
// which will drastically decrease idle iterations, as there will be less requests than headers
for height, reqs := range hs.heightReqs {
// then we look if any of the requests match the given range of headers
if height >= from && height <= to {
// and if so, calculate its position and fulfill requests
h := headers[height-from]
for _, req := range reqs {
req <- h // reqs must always be buffered, so this won't block
}
delete(hs.heightReqs, height)
}
}
}