/
ntfnchans.go
147 lines (123 loc) · 4.82 KB
/
ntfnchans.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright (c) 2018, The Decred developers
// Copyright (c) 2017, Jonathan Chappelow
// See LICENSE for details.
package notification
import (
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/dcrutil"
"github.com/decred/dcrdata/api/insight"
"github.com/decred/dcrdata/blockdata"
"github.com/decred/dcrdata/db/dcrsqlite"
"github.com/decred/dcrdata/explorer"
"github.com/decred/dcrdata/mempool"
"github.com/decred/dcrdata/stakedb"
"github.com/decred/dcrdata/txhelpers"
)
const (
// blockConnChanBuffer is the size of the block connected channel buffers.
blockConnChanBuffer = 64
// newTxChanBuffer is the size of the new transaction channel buffer, for
// ANY transactions are added into mempool.
newTxChanBuffer = 48
// expNewTxChanBuffer is the size of the new transaction buffer for explorer
expNewTxChanBuffer = 70
// relevantMempoolTxChanBuffer is the size of the new transaction channel
// buffer, for relevant transactions that are added into mempool.
//relevantMempoolTxChanBuffer = 2048
)
// NtfnChans collects the chain server notification channels
var NtfnChans struct {
ConnectChan chan *chainhash.Hash
ReorgChanBlockData chan *blockdata.ReorgData
ConnectChanWiredDB chan *chainhash.Hash
ReorgChanWiredDB chan *dcrsqlite.ReorgData
ConnectChanStakeDB chan *chainhash.Hash
ReorgChanStakeDB chan *stakedb.ReorgData
UpdateStatusNodeHeight chan uint32
UpdateStatusDBHeight chan uint32
SpendTxBlockChan, RecvTxBlockChan chan *txhelpers.BlockWatchedTx
RelevantTxMempoolChan chan *dcrutil.Tx
NewTxChan chan *mempool.NewTx
ExpNewTxChan chan *explorer.NewMempoolTx
InsightNewTxChan chan *insight.NewTx
}
// MakeNtfnChans create notification channels based on config
func MakeNtfnChans(monitorMempool, postgresEnabled bool) {
// If we're monitoring for blocks OR collecting block data, these channels
// are necessary to handle new block notifications. Otherwise, leave them
// as nil so that both a send (below) blocks and a receive (in
// blockConnectedHandler) block. default case makes non-blocking below.
// quit channel case manages blockConnectedHandlers.
NtfnChans.ConnectChan = make(chan *chainhash.Hash, blockConnChanBuffer)
// WiredDB channel for connecting new blocks
NtfnChans.ConnectChanWiredDB = make(chan *chainhash.Hash, blockConnChanBuffer)
// Stake DB channel for connecting new blocks - BLOCKING!
NtfnChans.ConnectChanStakeDB = make(chan *chainhash.Hash)
// Reorg data channels
NtfnChans.ReorgChanBlockData = make(chan *blockdata.ReorgData)
NtfnChans.ReorgChanWiredDB = make(chan *dcrsqlite.ReorgData)
NtfnChans.ReorgChanStakeDB = make(chan *stakedb.ReorgData)
// To update app status
NtfnChans.UpdateStatusNodeHeight = make(chan uint32, blockConnChanBuffer)
NtfnChans.UpdateStatusDBHeight = make(chan uint32, blockConnChanBuffer)
// watchaddress
// if len(cfg.WatchAddresses) > 0 {
// // recv/SpendTxBlockChan come with connected blocks
// NtfnChans.RecvTxBlockChan = make(chan *txhelpers.BlockWatchedTx, blockConnChanBuffer)
// NtfnChans.SpendTxBlockChan = make(chan *txhelpers.BlockWatchedTx, blockConnChanBuffer)
// NtfnChans.RelevantTxMempoolChan = make(chan *dcrutil.Tx, relevantMempoolTxChanBuffer)
// }
if monitorMempool {
NtfnChans.NewTxChan = make(chan *mempool.NewTx, newTxChanBuffer)
}
// New mempool tx chan for explorer
NtfnChans.ExpNewTxChan = make(chan *explorer.NewMempoolTx, expNewTxChanBuffer)
if postgresEnabled {
NtfnChans.InsightNewTxChan = make(chan *insight.NewTx, expNewTxChanBuffer)
}
}
// CloseNtfnChans close all notification channels
func CloseNtfnChans() {
if NtfnChans.ConnectChan != nil {
close(NtfnChans.ConnectChan)
}
if NtfnChans.ConnectChanWiredDB != nil {
close(NtfnChans.ConnectChanWiredDB)
}
if NtfnChans.ConnectChanStakeDB != nil {
close(NtfnChans.ConnectChanStakeDB)
}
if NtfnChans.ReorgChanBlockData != nil {
close(NtfnChans.ReorgChanBlockData)
}
if NtfnChans.ReorgChanWiredDB != nil {
close(NtfnChans.ReorgChanWiredDB)
}
if NtfnChans.ReorgChanStakeDB != nil {
close(NtfnChans.ReorgChanStakeDB)
}
if NtfnChans.UpdateStatusNodeHeight != nil {
close(NtfnChans.UpdateStatusNodeHeight)
}
if NtfnChans.UpdateStatusDBHeight != nil {
close(NtfnChans.UpdateStatusDBHeight)
}
if NtfnChans.NewTxChan != nil {
close(NtfnChans.NewTxChan)
}
if NtfnChans.RelevantTxMempoolChan != nil {
close(NtfnChans.RelevantTxMempoolChan)
}
if NtfnChans.SpendTxBlockChan != nil {
close(NtfnChans.SpendTxBlockChan)
}
if NtfnChans.RecvTxBlockChan != nil {
close(NtfnChans.RecvTxBlockChan)
}
if NtfnChans.ExpNewTxChan != nil {
close(NtfnChans.ExpNewTxChan)
}
if NtfnChans.InsightNewTxChan != nil {
close(NtfnChans.InsightNewTxChan)
}
}