-
Notifications
You must be signed in to change notification settings - Fork 27
/
chainstate.go
501 lines (444 loc) · 14.9 KB
/
chainstate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
// Copyright (c) 2021 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package pool
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
"github.com/decred/dcrd/blockchain/standalone/v2"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/dcrutil/v3"
"github.com/decred/dcrd/wire"
errs "github.com/decred/dcrpool/errors"
)
const (
// bufferSize represents the block notification buffer size.
bufferSize = 128
)
// ChainStateConfig contains all of the configuration values which should be
// provided when creating a new instance of ChainState.
type ChainStateConfig struct {
// db represents the pool database.
db Database
// SoloPool represents the solo pool mining mode.
SoloPool bool
// ProcessPayments relays payment signals for processing.
ProcessPayments func(msg *paymentMsg)
// GeneratePayments creates payments for participating accounts in pool
// mining mode based on the configured payment scheme.
GeneratePayments func(uint32, *PaymentSource, dcrutil.Amount, int64) error
// GetBlock fetches the block associated with the provided block hash.
GetBlock func(context.Context, *chainhash.Hash) (*wire.MsgBlock, error)
// GetBlockConfirmations fetches the block confirmations with the provided
// block hash.
GetBlockConfirmations func(context.Context, *chainhash.Hash) (int64, error)
// Cancel represents the pool's context cancellation function.
Cancel context.CancelFunc
// SignalCache sends the provided cache update event to the gui cache.
SignalCache func(event CacheUpdateEvent)
// HubWg represents the hub's waitgroup.
HubWg *sync.WaitGroup
}
// blockNotification wraps a block header notification and a done channel.
type blockNotification struct {
Header []byte
Done chan bool
}
// ChainState represents the current state of the chain.
type ChainState struct {
lastWorkHeight uint32 // update atomically.
cfg *ChainStateConfig
connCh chan *blockNotification
discCh chan *blockNotification
currentWork string
currentWorkMtx sync.RWMutex
}
// NewChainState creates a a chain state.
func NewChainState(sCfg *ChainStateConfig) *ChainState {
return &ChainState{
cfg: sCfg,
connCh: make(chan *blockNotification, bufferSize),
discCh: make(chan *blockNotification, bufferSize),
}
}
// fetchLastWorkHeight fetches the last work height.
func (cs *ChainState) fetchLastWorkHeight() uint32 {
return atomic.LoadUint32(&cs.lastWorkHeight)
}
// setLastWorkHeight updates the last work height.
func (cs *ChainState) setLastWorkHeight(height uint32) {
atomic.StoreUint32(&cs.lastWorkHeight, height)
}
// setCurrentWork updates the current work.
func (cs *ChainState) setCurrentWork(headerE string) {
cs.currentWorkMtx.Lock()
cs.currentWork = headerE
cs.currentWorkMtx.Unlock()
}
// fetchCurrentWork fetches the current work.
func (cs *ChainState) fetchCurrentWork() string {
cs.currentWorkMtx.RLock()
work := cs.currentWork
cs.currentWorkMtx.RUnlock()
return work
}
// pruneAcceptedWork removes all accepted work not confirmed as mined work
// with heights less than the provided height.
func (cs *ChainState) pruneAcceptedWork(ctx context.Context, height uint32) error {
toDelete, err := cs.cfg.db.fetchUnconfirmedWork(height)
if err != nil {
return err
}
// It is possible to miss mined block confirmations if the pool is
// restarted, accepted work pruning candidates must be checked to
// ensure there are not part of the chain before being pruned as a
// result.
for _, work := range toDelete {
hash, err := chainhash.NewHashFromStr(work.BlockHash)
if err != nil {
return err
}
confs, err := cs.cfg.GetBlockConfirmations(ctx, hash)
if err != nil {
return err
}
// If the block has no confirmations at the current height,
// it is an orphan. Prune it.
if confs <= 0 {
err = cs.cfg.db.deleteAcceptedWork(work.UUID)
if err != nil {
return err
}
continue
}
// If the block has confirmations mark the accepted work as
// confirmed.
work.Confirmed = true
err = cs.cfg.db.updateAcceptedWork(work)
if err != nil {
return err
}
}
return nil
}
// prunePayments removes all spendable payments sourcing from
// orphaned blocks at the provided height.
func (cs *ChainState) prunePayments(ctx context.Context, height uint32) error {
toDelete, err := cs.cfg.db.fetchPaymentsAtHeight(height)
if err != nil {
return err
}
for _, payment := range toDelete {
hash, err := chainhash.NewHashFromStr(payment.Source.BlockHash)
if err != nil {
return err
}
confs, err := cs.cfg.GetBlockConfirmations(ctx, hash)
if err != nil {
return err
}
// If the block has no confirmations at the current height,
// it is an orphan. Delete the payments associated with it.
if confs <= 0 {
err = cs.cfg.db.deletePayment(payment.UUID)
if err != nil {
return err
}
}
}
return nil
}
// isTreasuryActive checks the provided coinbase transaction if
// the treasury agenda is active.
func isTreasuryActive(tx *wire.MsgTx) bool {
if tx.Version < wire.TxVersionTreasury {
return false
}
if !standalone.IsCoinBaseTx(tx, true) {
return false
}
return true
}
// handleChainUpdates processes connected and disconnected block
// notifications from the consensus daemon.
func (cs *ChainState) handleChainUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
close(cs.discCh)
close(cs.connCh)
cs.cfg.HubWg.Done()
return
case msg := <-cs.connCh:
var header wire.BlockHeader
err := header.FromBytes(msg.Header)
if err != nil {
// Errors generated parsing block notifications should not
// terminate the chainstate process.
log.Errorf("unable to create header from bytes: %v", err)
close(msg.Done)
continue
}
block, err := cs.cfg.GetBlock(ctx, &header.PrevBlock)
if err != nil {
// Errors generated fetching blocks of confirmed mined
// work are curently fatal because payments are
// sourced from coinbases. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch block with hash %x: %v",
header.PrevBlock, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
coinbaseTx := block.Transactions[0]
treasuryActive := isTreasuryActive(coinbaseTx)
soloPool := cs.cfg.SoloPool
if !soloPool {
go cs.cfg.ProcessPayments(&paymentMsg{
CurrentHeight: header.Height,
TreasuryActive: treasuryActive,
Done: make(chan bool),
})
}
// Prune invalidated jobs and accepted work.
if header.Height > MaxReorgLimit {
pruneLimit := header.Height - MaxReorgLimit
err := cs.cfg.db.deleteJobsBeforeHeight(pruneLimit)
if err != nil {
// Errors generated pruning invalidated jobs indicate an
// underlying issue accessing the database. The chainstate
// process will be terminated as a result.
log.Errorf("unable to prune jobs to height %d: %v",
pruneLimit, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
// Prune all hash data not updated in the past ten minutes.
// A connected client should have updated multiple times
// by then. Only disconnected miners would not have
// updated within the timeframe.
tenMinutesAgo := time.Now().Add(-time.Minute * 10).UnixNano()
err = cs.cfg.db.pruneHashData(tenMinutesAgo)
if err != nil {
// Errors generated pruning invalidated hash rate
// indicate an underlying issue accessing the
// database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to prune hash data: %v", err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
err = cs.pruneAcceptedWork(ctx, pruneLimit)
if err != nil {
// Errors generated pruning invalidated accepted
// work indicate an underlying issue accessing
// the database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to prune accepted work below "+
"height #%d: %v", pruneLimit, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
err = cs.prunePayments(ctx, header.Height)
if err != nil {
// Errors generated pruning invalidated payments
// indicate an underlying issue accessing the
// database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to prune orphaned payments at "+
"height #%d: %v", header.Height, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
}
// Check if the parent of the connected block is an accepted work
// of the pool.
parentHeight := header.Height - 1
parentHash := header.PrevBlock.String()
parentID := AcceptedWorkID(parentHash, parentHeight)
work, err := cs.cfg.db.fetchAcceptedWork(parentID)
if err != nil {
// If the parent of the connected block is not an accepted
// work of the the pool, ignore it.
if errors.Is(err, errs.ValueNotFound) {
log.Tracef("Block #%d (%s) is not an accepted "+
"work of the pool", parentHeight, parentHash)
close(msg.Done)
continue
}
// Errors generated, except for a value not found error,
// looking up accepted work indicates an underlying issue
// accessing the database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch accepted work for block #%d's "+
"parent %s : %v", header.Height, parentHash, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
// If the parent block is already confirmed as mined by the pool,
// ignore it.
if work.Confirmed {
close(msg.Done)
continue
}
// Update accepted work as confirmed mined.
work.Confirmed = true
err = cs.cfg.db.updateAcceptedWork(work)
if err != nil {
// Errors generated updating work state indicate an underlying
// issue accessing the database. The chainstate process will
// be terminated as a result.
log.Errorf("unable to confirm accepted work for block "+
"%s: %v", header.PrevBlock.String(), err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
log.Infof("Mined work %s confirmed by connected block #%d (%s)",
header.PrevBlock.String(), header.Height,
header.BlockHash().String())
// Signal the gui cache of the confirmed mined work.
cs.cfg.SignalCache(Confirmed)
if !cs.cfg.SoloPool {
count, err := cs.cfg.db.pendingPaymentsForBlockHash(parentHash)
if err != nil {
// Errors generated looking up pending payments
// indicates an underlying issue accessing the database.
// The chainstate process will be terminated as a result.
log.Errorf("failed to fetch pending payments "+
"at height #%d: %v", parentHeight, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
// If the parent block already has payments generated for it
// do not generate a new set of payments.
if count > 0 {
close(msg.Done)
continue
}
// Generate payments for the confirmed block.
source := &PaymentSource{
BlockHash: block.BlockHash().String(),
Coinbase: coinbaseTx.TxHash().String(),
}
// The coinbase output prior to
// [DCP0006](https://github.com/decred/dcps/pull/17)
// activation is at the third index position and at
// the second index position once DCP0006 is activated.
amt := dcrutil.Amount(coinbaseTx.TxOut[1].Value)
if !treasuryActive {
amt = dcrutil.Amount(coinbaseTx.TxOut[2].Value)
}
err = cs.cfg.GeneratePayments(block.Header.Height, source,
amt, work.CreatedOn)
if err != nil {
// Errors generated creating payments are fatal since it is
// required to distribute payments to participating miners.
// The chainstate process will be terminated as a result.
log.Error(err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
}
close(msg.Done)
case msg := <-cs.discCh:
var header wire.BlockHeader
err := header.FromBytes(msg.Header)
if err != nil {
// Errors generated parsing block notifications should not
// terminate the chainstate process.
log.Errorf("unable to create header from bytes: %v", err)
close(msg.Done)
continue
}
// Check if the disconnected block confirms a mined block, if it
// does unconfirm it.
parentHeight := header.Height - 1
parentHash := header.PrevBlock.String()
parentID := AcceptedWorkID(parentHash, parentHeight)
confirmedWork, err := cs.cfg.db.fetchAcceptedWork(parentID)
if err != nil {
// Errors generated, except for a value not found error,
// looking up accepted work indicates an underlying issue
// accessing the database. The chainstate process will be
// terminated as a result.
if !errors.Is(err, errs.ValueNotFound) {
log.Errorf("unable to fetch accepted work for block "+
"#%d's parent %s: %v", header.Height, parentHash, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
// If the parent of the disconnected block is not an accepted
// work of the the pool, ignore it.
}
if confirmedWork != nil {
confirmedWork.Confirmed = false
err = cs.cfg.db.updateAcceptedWork(confirmedWork)
if err != nil {
// Errors generated updating work state indicate an underlying
// issue accessing the database. The chainstate process will
// be terminated as a result.
log.Errorf("unable to unconfirm accepted work for block "+
"%s: %v", parentHash, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
log.Infof("Mined work %s unconfirmed via disconnected "+
"block #%d", parentHash, header.Height)
}
// If the disconnected block is an accepted work of the pool
// ensure it is not confirmed mined.
blockHash := header.BlockHash().String()
id := AcceptedWorkID(blockHash, header.Height)
work, err := cs.cfg.db.fetchAcceptedWork(id)
if err != nil {
// If the disconnected block is not an accepted
// work of the the pool, ignore it.
if errors.Is(err, errs.ValueNotFound) {
close(msg.Done)
continue
}
// Errors generated, except for a value not found error,
// looking up accepted work indicates an underlying issue
// accessing the database. The chainstate process will be
// terminated as a result.
log.Errorf("unable to fetch accepted work for block #%d: %v",
header.Height, blockHash, err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
work.Confirmed = false
err = cs.cfg.db.updateAcceptedWork(work)
if err != nil {
// Errors generated updating work state indicate an underlying
// issue accessing the database. The chainstate process will
// be terminated as a result.
log.Errorf("unable to unconfirm mined work at "+
"height #%d: %v", err)
close(msg.Done)
cs.cfg.Cancel()
continue
}
log.Infof("Disconnected mined work %s at height #%d",
blockHash, header.Height)
// Signal the gui cache of the unconfirmed (due to a reorg)
// mined work.
cs.cfg.SignalCache(Unconfirmed)
close(msg.Done)
}
}
}