-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
coalescer.go
213 lines (172 loc) · 5.2 KB
/
coalescer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package store
import (
"context"
"time"
"github.com/filecoin-project/lotus/chain/types"
)
// WrapHeadChangeCoalescer wraps a ReorgNotifee with a head change coalescer.
// minDelay is the minimum coalesce delay; when a head change is first received, the coalescer will
// wait for that long to coalesce more head changes.
// maxDelay is the maximum coalesce delay; the coalescer will not delay delivery of a head change
// more than that.
// mergeInterval is the interval that triggers additional coalesce delay; if the last head change was
// within the merge interval when the coalesce timer fires, then the coalesce time is extended
// by min delay and up to max delay total.
func WrapHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) ReorgNotifee {
c := NewHeadChangeCoalescer(fn, minDelay, maxDelay, mergeInterval)
return c.HeadChange
}
// HeadChangeCoalescer is a stateful reorg notifee which coalesces incoming head changes
// with pending head changes to reduce state computations from head change notifications.
type HeadChangeCoalescer struct {
notify ReorgNotifee
ctx context.Context
cancel func()
eventq chan headChange
revert []*types.TipSet
apply []*types.TipSet
}
type headChange struct {
revert, apply []*types.TipSet
}
// NewHeadChangeCoalescer creates a HeadChangeCoalescer.
func NewHeadChangeCoalescer(fn ReorgNotifee, minDelay, maxDelay, mergeInterval time.Duration) *HeadChangeCoalescer {
ctx, cancel := context.WithCancel(context.Background())
c := &HeadChangeCoalescer{
notify: fn,
ctx: ctx,
cancel: cancel,
eventq: make(chan headChange),
}
go c.background(minDelay, maxDelay, mergeInterval)
return c
}
// HeadChange is the ReorgNotifee callback for the stateful coalescer; it receives an incoming
// head change and schedules dispatch of a coalesced head change in the background.
func (c *HeadChangeCoalescer) HeadChange(revert, apply []*types.TipSet) error {
select {
case c.eventq <- headChange{revert: revert, apply: apply}:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
}
}
// Close closes the coalescer and cancels the background dispatch goroutine.
// Any further notification will result in an error.
func (c *HeadChangeCoalescer) Close() error {
select {
case <-c.ctx.Done():
default:
c.cancel()
}
return nil
}
// Implementation details
func (c *HeadChangeCoalescer) background(minDelay, maxDelay, mergeInterval time.Duration) {
var timerC <-chan time.Time
var first, last time.Time
for {
select {
case evt := <-c.eventq:
c.coalesce(evt.revert, evt.apply)
now := time.Now()
last = now
if first.IsZero() {
first = now
}
if timerC == nil {
timerC = time.After(minDelay)
}
case now := <-timerC:
sinceFirst := now.Sub(first)
sinceLast := now.Sub(last)
if sinceLast < mergeInterval && sinceFirst < maxDelay {
// coalesce some more
maxWait := maxDelay - sinceFirst
wait := minDelay
if maxWait < wait {
wait = maxWait
}
timerC = time.After(wait)
} else {
// dispatch
c.dispatch()
first = time.Time{}
last = time.Time{}
timerC = nil
}
case <-c.ctx.Done():
if c.revert != nil || c.apply != nil {
c.dispatch()
}
return
}
}
}
func (c *HeadChangeCoalescer) coalesce(revert, apply []*types.TipSet) {
// newly reverted tipsets cancel out with pending applys.
// similarly, newly applied tipsets cancel out with pending reverts.
// pending tipsets
pendRevert := make(map[types.TipSetKey]struct{}, len(c.revert))
for _, ts := range c.revert {
pendRevert[ts.Key()] = struct{}{}
}
pendApply := make(map[types.TipSetKey]struct{}, len(c.apply))
for _, ts := range c.apply {
pendApply[ts.Key()] = struct{}{}
}
// incoming tipsets
reverting := make(map[types.TipSetKey]struct{}, len(revert))
for _, ts := range revert {
reverting[ts.Key()] = struct{}{}
}
applying := make(map[types.TipSetKey]struct{}, len(apply))
for _, ts := range apply {
applying[ts.Key()] = struct{}{}
}
// coalesced revert set
// - pending reverts are cancelled by incoming applys
// - incoming reverts are cancelled by pending applys
newRevert := c.merge(c.revert, revert, pendApply, applying)
// coalesced apply set
// - pending applys are cancelled by incoming reverts
// - incoming applys are cancelled by pending reverts
newApply := c.merge(c.apply, apply, pendRevert, reverting)
// commit the coalesced sets
c.revert = newRevert
c.apply = newApply
}
func (c *HeadChangeCoalescer) merge(pend, incoming []*types.TipSet, cancel1, cancel2 map[types.TipSetKey]struct{}) []*types.TipSet {
result := make([]*types.TipSet, 0, len(pend)+len(incoming))
for _, ts := range pend {
_, cancel := cancel1[ts.Key()]
if cancel {
continue
}
_, cancel = cancel2[ts.Key()]
if cancel {
continue
}
result = append(result, ts)
}
for _, ts := range incoming {
_, cancel := cancel1[ts.Key()]
if cancel {
continue
}
_, cancel = cancel2[ts.Key()]
if cancel {
continue
}
result = append(result, ts)
}
return result
}
func (c *HeadChangeCoalescer) dispatch() {
err := c.notify(c.revert, c.apply)
if err != nil {
log.Errorf("error dispatching coalesced head change notification: %s", err)
}
c.revert = nil
c.apply = nil
}