-
Notifications
You must be signed in to change notification settings - Fork 3
/
ntfnchans.go
132 lines (114 loc) · 4.47 KB
/
ntfnchans.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright (c) 2017, Jonathan Chappelow
// See LICENSE for details.
package main
import (
"github.com/dcrdata/dcrdata/blockdata"
"github.com/dcrdata/dcrdata/dcrsqlite"
"github.com/dcrdata/dcrdata/stakedb"
"github.com/dcrdata/dcrdata/txhelpers"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrutil"
)
const (
// blockConnChanBuffer is the size of the block connected channel buffer.
blockConnChanBuffer = 64
// newTxChanBuffer is the size of the new transaction channel buffer, for
// ANY transactions are added into mempool.
newTxChanBuffer = 4096
reorgBuffer = 2
// relevantMempoolTxChanBuffer is the size of the new transaction channel
// buffer, for relevant transactions that are added into mempool.
//relevantMempoolTxChanBuffer = 2048
)
// Channels are package-level variables for simplicity
var ntfnChans struct {
connectChan chan *chainhash.Hash
reorgChanBlockData chan *blockdata.ReorgData
connectChanStkInf chan int32
connectChanWiredDB chan *chainhash.Hash
reorgChanWiredDB chan *dcrsqlite.ReorgData
connectChanStakeDB chan *chainhash.Hash
reorgChanStakeDB chan *stakedb.ReorgData
updateStatusNodeHeight chan uint32
updateStatusDBHeight chan uint32
spendTxBlockChan, recvTxBlockChan chan *txhelpers.BlockWatchedTx
relevantTxMempoolChan chan *dcrutil.Tx
newTxChan chan *chainhash.Hash
}
func makeNtfnChans(cfg *config) {
// If we're monitoring for blocks OR collecting block data, these channels
// are necessary to handle new block notifications. Otherwise, leave them
// as nil so that both a send (below) blocks and a receive (in
// blockConnectedHandler) block. default case makes non-blocking below.
// quit channel case manages blockConnectedHandlers.
ntfnChans.connectChan = make(chan *chainhash.Hash, blockConnChanBuffer)
//ntfnChans.stakeDiffChan = make(chan int64, blockConnChanBuffer)
// WiredDB channel for connecting new blocks
ntfnChans.connectChanWiredDB = make(chan *chainhash.Hash, blockConnChanBuffer)
// Stake DB channel for connecting new blocks
ntfnChans.connectChanStakeDB = make(chan *chainhash.Hash, blockConnChanBuffer)
// Like connectChan for block data, connectChanStkInf is used when a new
// block is connected, but to signal the stake info monitor.
ntfnChans.connectChanStkInf = make(chan int32, blockConnChanBuffer)
// Reorg data channels
ntfnChans.reorgChanBlockData = make(chan *blockdata.ReorgData, reorgBuffer)
ntfnChans.reorgChanWiredDB = make(chan *dcrsqlite.ReorgData, reorgBuffer)
ntfnChans.reorgChanStakeDB = make(chan *stakedb.ReorgData, reorgBuffer)
// To update app status
ntfnChans.updateStatusNodeHeight = make(chan uint32, blockConnChanBuffer)
ntfnChans.updateStatusDBHeight = make(chan uint32, blockConnChanBuffer)
// watchaddress
// if len(cfg.WatchAddresses) > 0 {
// // recv/spendTxBlockChan come with connected blocks
// ntfnChans.recvTxBlockChan = make(chan *txhelpers.BlockWatchedTx, blockConnChanBuffer)
// ntfnChans.spendTxBlockChan = make(chan *txhelpers.BlockWatchedTx, blockConnChanBuffer)
// ntfnChans.relevantTxMempoolChan = make(chan *dcrutil.Tx, relevantMempoolTxChanBuffer)
// }
if cfg.MonitorMempool {
ntfnChans.newTxChan = make(chan *chainhash.Hash, newTxChanBuffer)
}
}
func closeNtfnChans() {
// if ntfnChans.stakeDiffChan != nil {
// close(ntfnChans.stakeDiffChan)
// }
if ntfnChans.connectChan != nil {
close(ntfnChans.connectChan)
}
if ntfnChans.connectChanWiredDB != nil {
close(ntfnChans.connectChanWiredDB)
}
if ntfnChans.connectChanStakeDB != nil {
close(ntfnChans.connectChanStakeDB)
}
if ntfnChans.connectChanStkInf != nil {
close(ntfnChans.connectChanStkInf)
}
if ntfnChans.reorgChanBlockData != nil {
close(ntfnChans.reorgChanBlockData)
}
if ntfnChans.reorgChanWiredDB != nil {
close(ntfnChans.reorgChanWiredDB)
}
if ntfnChans.reorgChanStakeDB != nil {
close(ntfnChans.reorgChanStakeDB)
}
if ntfnChans.updateStatusNodeHeight != nil {
close(ntfnChans.updateStatusNodeHeight)
}
if ntfnChans.updateStatusDBHeight != nil {
close(ntfnChans.updateStatusDBHeight)
}
if ntfnChans.newTxChan != nil {
close(ntfnChans.newTxChan)
}
if ntfnChans.relevantTxMempoolChan != nil {
close(ntfnChans.relevantTxMempoolChan)
}
if ntfnChans.spendTxBlockChan != nil {
close(ntfnChans.spendTxBlockChan)
}
if ntfnChans.recvTxBlockChan != nil {
close(ntfnChans.recvTxBlockChan)
}
}