-
Notifications
You must be signed in to change notification settings - Fork 128
/
blockgate.go
381 lines (329 loc) · 10.7 KB
/
blockgate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
// Copyright (c) 2017-2019, The dcrdata developers
// See LICENSE for details.
package rpcutils
import (
"fmt"
"sync"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/dcrutil/v2"
)
// BlockGetter is an interface for requesting blocks
type BlockGetter interface {
NodeHeight() (int64, error)
BestBlockHeight() int64
BestBlockHash() (chainhash.Hash, int64, error)
BestBlock() (*dcrutil.Block, error)
Block(chainhash.Hash) (*dcrutil.Block, error)
WaitForHeight(int64) chan chainhash.Hash
WaitForHash(chainhash.Hash) chan int64
GetChainWork(*chainhash.Hash) (string, error)
}
// MasterBlockGetter builds on BlockGetter, adding functions that fetch blocks
// directly from dcrd via RPC and subsequently update the internal block cache
// with the retrieved block.
type MasterBlockGetter interface {
BlockGetter
UpdateToBestBlock() (*dcrutil.Block, error)
UpdateToNextBlock() (*dcrutil.Block, error)
UpdateToBlock(height int64) (*dcrutil.Block, error)
}
// BlockGate is an implementation of MasterBlockGetter with cache
type BlockGate struct {
mtx sync.RWMutex
client BlockFetcher
height int64
fetchToHeight int64
hashAtHeight map[int64]chainhash.Hash
blockWithHash map[chainhash.Hash]*dcrutil.Block
heightWaiters map[int64][]chan chainhash.Hash
hashWaiters map[chainhash.Hash][]chan int64
expireQueue heightHashQueue
}
type heightHashQueue struct {
q []heightHashPair
cap int
}
type heightHashPair struct {
height int64
hash chainhash.Hash
}
func hashInQueue(q heightHashQueue, hash chainhash.Hash) bool {
for i := range q.q {
if q.q[i].hash == hash {
return true
}
}
return false
}
func heightInQueue(q heightHashQueue, height int64) bool {
for i := range q.q {
if q.q[i].height == height {
return true
}
}
return false
}
// Ensure BlockGate satisfies BlockGetter and MasterBlockGetter.
var _ BlockGetter = (*BlockGate)(nil)
var _ MasterBlockGetter = (*BlockGate)(nil)
// NewBlockGate constructs a new BlockGate, wrapping an RPC client, with a
// specified block cache capacity.
func NewBlockGate(client BlockFetcher, capacity int) *BlockGate {
return &BlockGate{
client: client,
height: -1,
fetchToHeight: -1,
hashAtHeight: make(map[int64]chainhash.Hash),
blockWithHash: make(map[chainhash.Hash]*dcrutil.Block),
heightWaiters: make(map[int64][]chan chainhash.Hash),
hashWaiters: make(map[chainhash.Hash][]chan int64),
expireQueue: heightHashQueue{
cap: capacity,
},
}
}
// SetFetchToHeight sets the height up to which WaitForHeight will trigger an
// RPC to retrieve the block immediately. For the given height and up,
// WaitForHeight will only return a notification channel.
func (g *BlockGate) SetFetchToHeight(height int64) {
g.mtx.RLock()
defer g.mtx.RUnlock()
g.fetchToHeight = height
}
// NodeHeight gets the chain height from dcrd.
func (g *BlockGate) NodeHeight() (int64, error) {
_, height, err := g.client.GetBestBlock()
return height, err
}
// BestBlockHeight gets the best block height in the block cache.
func (g *BlockGate) BestBlockHeight() int64 {
g.mtx.RLock()
defer g.mtx.RUnlock()
return g.height
}
// BestBlockHash gets the hash and height of the best block in cache.
func (g *BlockGate) BestBlockHash() (chainhash.Hash, int64, error) {
g.mtx.RLock()
defer g.mtx.RUnlock()
var err error
hash, ok := g.hashAtHeight[g.height]
if !ok {
err = fmt.Errorf("hash of best block %d not found", g.height)
}
return hash, g.height, err
}
// BestBlock gets the best block in cache.
func (g *BlockGate) BestBlock() (*dcrutil.Block, error) {
g.mtx.RLock()
defer g.mtx.RUnlock()
var err error
hash, ok := g.hashAtHeight[g.height]
if !ok {
err = fmt.Errorf("hash of best block %d not found", g.height)
}
block, ok := g.blockWithHash[hash]
if !ok {
err = fmt.Errorf("block %d at height %d not found", hash, g.height)
}
return block, err
}
// CachedBlock attempts to get the block with the specified hash from cache.
func (g *BlockGate) CachedBlock(hash chainhash.Hash) (*dcrutil.Block, error) {
g.mtx.RLock()
defer g.mtx.RUnlock()
block, ok := g.blockWithHash[hash]
if !ok {
return nil, fmt.Errorf("block %d not found", hash)
}
return block, nil
}
// Block first attempts to get the block with the specified hash from cache. In
// the event of a cache miss, the block is retrieved from dcrd via RPC.
func (g *BlockGate) Block(hash chainhash.Hash) (*dcrutil.Block, error) {
// Try block cache first.
block, err := g.CachedBlock(hash)
if err == nil {
return block, nil
}
// Cache miss. Retrieve from dcrd RPC.
block, err = GetBlockByHash(&hash, g.client)
if err != nil {
return nil, fmt.Errorf("GetBlock (%v) failed: %v", hash, err)
}
g.mtx.RLock()
fmt.Printf("Block cache miss: requested %d, cache capacity %d, tip %d.",
block.Height(), g.expireQueue.cap, g.height)
g.mtx.RUnlock()
return block, nil
}
// UpdateToBestBlock gets the best block via RPC and updates the cache.
func (g *BlockGate) UpdateToBestBlock() (*dcrutil.Block, error) {
_, height, err := g.client.GetBestBlock()
if err != nil {
return nil, fmt.Errorf("GetBestBlockHash failed: %v", err)
}
return g.UpdateToBlock(height)
}
// UpdateToNextBlock gets the next block following the best in cache via RPC and
// updates the cache.
func (g *BlockGate) UpdateToNextBlock() (*dcrutil.Block, error) {
g.mtx.Lock()
height := g.height + 1
g.mtx.Unlock()
return g.UpdateToBlock(height)
}
// UpdateToBlock gets the block at the specified height on the main chain from
// dcrd, stores it in cache, and signals any waiters. This is the thread-safe
// version of updateToBlock.
func (g *BlockGate) UpdateToBlock(height int64) (*dcrutil.Block, error) {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.updateToBlock(height)
}
// updateToBlock gets the block at the specified height on the main chain from
// dcrd. It is not thread-safe. It wrapped by UpdateToBlock for thread-safety,
// and used directly by WaitForHeight which locks the BlockGate.
func (g *BlockGate) updateToBlock(height int64) (*dcrutil.Block, error) {
block, hash, err := GetBlock(height, g.client)
if err != nil {
return nil, fmt.Errorf("GetBlock (%d) failed: %v", height, err)
}
if block.Height() != height {
return nil, fmt.Errorf("GetBlock (%d) returned block at height %d",
height, block.Height())
}
if !hash.IsEqual(block.Hash()) {
return nil, fmt.Errorf("GetBlock (%s) returned block with hash %s",
hash.String(), block.Hash().String())
}
g.height = height
g.hashAtHeight[height] = *hash
g.blockWithHash[*hash] = block
// Push the new block onto the expiration queue, and remove any old ones if
// above capacity.
g.rotateIn(height, *hash)
// Launch goroutines to signal to height and hash waiters.
g.signalHeight(height, *hash)
g.signalHash(*hash, height)
return block, nil
}
// rotateIn puts the input height-hash pair into an expiration queue. If the
// queue is at capacity, the oldest entry in the queue is popped off, and the
// corresponding items in the blockWithHash and hashAtHeight maps are deleted.
// TODO: possibly check for hash and height waiters before deleting items.
// However, since signalHeight and signalHeight are run synchronously after
// rotateIn in UpdateToBlock and WaitForHeight (both lock the BlockGate), there
// should be no such issues. At worst, their may be a cache miss when a client
// calls Block or CachedBlock.
func (g *BlockGate) rotateIn(height int64, hash chainhash.Hash) {
// Push this new height-hash pair onto the queue.
g.expireQueue.q = append(g.expireQueue.q, heightHashPair{height, hash})
// If above capacity, pop the oldest off.
if len(g.expireQueue.q) > g.expireQueue.cap {
// Pop
oldest := g.expireQueue.q[0]
g.expireQueue.q = g.expireQueue.q[1:]
// Remove the dropped height-hash pair from cache maps only if we don't
// see it items in the queue with the same height or hash.
if !hashInQueue(g.expireQueue, oldest.hash) {
delete(g.blockWithHash, oldest.hash)
}
if !heightInQueue(g.expireQueue, oldest.height) {
delete(g.hashAtHeight, oldest.height)
}
}
}
func (g *BlockGate) signalHash(hash chainhash.Hash, height int64) {
// Get the hash waiter channels, and delete them from list.
waitChans, ok := g.hashWaiters[hash]
if !ok {
return
}
delete(g.hashWaiters, hash)
// Empty slice or nil slice may have been stored.
if len(waitChans) == 0 {
return
}
// Send the height to each of the hash waiters.
go func() {
for _, c := range waitChans {
select {
case c <- height:
default:
panic(fmt.Sprintf("unable to signal block with hash %s at height %d",
hash, height))
}
}
}()
}
func (g *BlockGate) signalHeight(height int64, hash chainhash.Hash) {
waitChans, ok := g.heightWaiters[height]
if !ok {
return
}
delete(g.heightWaiters, height)
// Empty slice or nil slice may have been stored.
if len(waitChans) == 0 {
return
}
// Send the hash to each of the height waiters.
go func() {
for _, c := range waitChans {
select {
case c <- hash:
default:
panic(fmt.Sprintf("unable to signal block with hash %s at height %d",
hash, height))
}
}
}()
}
// WaitForHeight provides a notification channel for signaling to the caller
// when the block at the specified height is available.
func (g *BlockGate) WaitForHeight(height int64) chan chainhash.Hash {
g.mtx.Lock()
defer g.mtx.Unlock()
if height < 0 {
return nil
}
waitChan := make(chan chainhash.Hash, 1)
// Queue for future send.
g.heightWaiters[height] = append(g.heightWaiters[height], waitChan)
// If the block is already cached, send now.
if hash, ok := g.hashAtHeight[height]; ok {
g.signalHeight(height, hash)
} else if height <= g.fetchToHeight {
if _, err := g.updateToBlock(height); err != nil {
fmt.Printf("Failed to updateToBlock: %v", err)
return nil
}
} else if height < g.height {
fmt.Printf("WARNING: WaitForHeight(%d), but the best block is at %d. "+
"You may wait forever for this block.",
height, g.height)
}
return waitChan
}
// WaitForHash provides a notification channel for signaling to the caller
// when the block with the specified hash is available.
func (g *BlockGate) WaitForHash(hash chainhash.Hash) chan int64 {
g.mtx.Lock()
defer g.mtx.Unlock()
waitChan := make(chan int64, 1)
// Queue for future send.
g.hashWaiters[hash] = append(g.hashWaiters[hash], waitChan)
// If the block is already cached, send now.
if block, ok := g.blockWithHash[hash]; ok {
g.signalHash(hash, block.Height())
}
return waitChan
}
// GetChainWork fetches the chainjson.BlockHeaderVerbose and returns only the
// ChainWork attribute as a string.
func (g *BlockGate) GetChainWork(hash *chainhash.Hash) (string, error) {
return GetChainWork(g.client, hash)
}
// Client is just an access function to get the BlockGate's RPC client.
func (g *BlockGate) Client() BlockFetcher {
return g.client
}