/
txandblocknotifications.go
158 lines (132 loc) · 5.84 KB
/
txandblocknotifications.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package btc
import (
"sync/atomic"
"decred.org/dcrwallet/v3/errors"
sharedW "github.com/crypto-power/cryptopower/libwallet/assets/wallet"
"github.com/crypto-power/cryptopower/libwallet/utils"
)
func (asset *Asset) listenForTransactions() {
if !atomic.CompareAndSwapUint32(&asset.syncData.txlistening, stop, start) {
// sync listening in progress already.
return
}
log.Infof("Subscribing wallet (%s) for transaction notifications", asset.GetWalletName())
notify := asset.Internal().BTC.NtfnServer.TransactionNotifications()
notificationsLoop:
for {
select {
case n, ok := <-notify.C:
if !ok {
break notificationsLoop
}
txToCache := make([]*sharedW.Transaction, len(n.UnminedTransactions))
// handle txs hitting the mempool.
for i, tx := range n.UnminedTransactions {
log.Debugf("(%v) Incoming unmined tx with hash (%v)",
asset.GetWalletName(), tx.Hash.String())
// decodeTxs
txToCache[i] = asset.decodeTransactionWithTxSummary(sharedW.UnminedTxHeight, tx)
// publish mempool tx.
asset.mempoolTransactionNotification(txToCache[i])
}
if len(n.UnminedTransactions) > 0 {
// Since the tx cache receives a fresh update only when a new
// block is detected, update cache with the newly received mempool tx(s).
asset.txs.mu.Lock()
asset.txs.unminedTxs = append(txToCache, asset.txs.unminedTxs...)
asset.txs.mu.Unlock()
}
// Handle Historical, Connected blocks and newly mined Txs.
for _, block := range n.AttachedBlocks {
// When syncing historical data no tx are available.
// Txs are reported only when chain is synced and newly mined tx
// we discovered in the latest block.
for _, tx := range block.Transactions {
log.Debugf("(%v) Incoming mined tx with hash=%v block=%v",
asset.GetWalletName(), tx.Hash, block.Height)
// Publish the confirmed tx notification.
asset.publishTransactionConfirmed(tx.Hash.String(), block.Height)
}
asset.publishBlockAttached(block.Height)
}
case <-asset.syncCtx.Done():
notify.Done()
break notificationsLoop
}
}
// Signal that handleNotifications can be safely started next time its needed.
atomic.StoreUint32(&asset.syncData.syncstarted, stop)
// when done allow timer reset.
atomic.SwapUint32(&asset.syncData.txlistening, stop)
}
// AddTxAndBlockNotificationListener registers a set of functions to be invoked
// when a transaction or block update is processed by the asset. If async is
// true, the provided callback methods will be called from separate goroutines,
// allowing notification senders to continue their operation without waiting
// for the listener to complete processing the notification. This asyncrhonous
// handling is especially important for cases where the wallet process that
// sends the notification temporarily prevents access to other wallet features
// until all notification handlers finish processing the notification. If a
// notification handler were to try to access such features, it would result
// in a deadlock.
func (asset *Asset) AddTxAndBlockNotificationListener(txAndBlockNotificationListener *sharedW.TxAndBlockNotificationListener,
uniqueIdentifier string,
) error {
asset.notificationListenersMu.Lock()
defer asset.notificationListenersMu.Unlock()
if _, ok := asset.txAndBlockNotificationListeners[uniqueIdentifier]; ok {
return errors.New(utils.ErrListenerAlreadyExist)
}
asset.txAndBlockNotificationListeners[uniqueIdentifier] = &sharedW.TxAndBlockNotificationListener{
OnTransaction: func(walletID int, transaction *sharedW.Transaction) {
if txAndBlockNotificationListener.OnTransaction != nil {
go txAndBlockNotificationListener.OnTransaction(walletID, transaction)
}
},
OnBlockAttached: func(walletID int, blockHeight int32) {
if txAndBlockNotificationListener.OnBlockAttached != nil {
go txAndBlockNotificationListener.OnBlockAttached(walletID, blockHeight)
}
},
OnTransactionConfirmed: func(walletID int, hash string, blockHeight int32) {
if txAndBlockNotificationListener.OnTransactionConfirmed != nil {
txAndBlockNotificationListener.OnTransactionConfirmed(walletID, hash, blockHeight)
}
},
}
return nil
}
// RemoveTxAndBlockNotificationListener removes a previously registered
// transaction and block notification listener.
func (asset *Asset) RemoveTxAndBlockNotificationListener(uniqueIdentifier string) {
asset.notificationListenersMu.Lock()
defer asset.notificationListenersMu.Unlock()
delete(asset.txAndBlockNotificationListeners, uniqueIdentifier)
}
// mempoolTransactionNotification publishes the txs that hit the mempool for the first time.
func (asset *Asset) mempoolTransactionNotification(transaction *sharedW.Transaction) {
asset.notificationListenersMu.RLock()
defer asset.notificationListenersMu.RUnlock()
for _, txAndBlockNotificationListener := range asset.txAndBlockNotificationListeners {
txAndBlockNotificationListener.OnTransaction(asset.ID, transaction)
}
}
// publishTransactionConfirmed publishes all the relevant tx identified in a filtered
// block. A valid list of addresses associated with the current block need to
// be provided.
func (asset *Asset) publishTransactionConfirmed(txHash string, blockHeight int32) {
asset.notificationListenersMu.RLock()
defer asset.notificationListenersMu.RUnlock()
for _, txAndBlockNotificationListener := range asset.txAndBlockNotificationListeners {
txAndBlockNotificationListener.OnTransactionConfirmed(asset.ID, txHash, blockHeight)
}
}
// publishBlockAttached once the initial sync is complete all the new blocks received
// are published through this method.
func (asset *Asset) publishBlockAttached(blockHeight int32) {
asset.notificationListenersMu.RLock()
defer asset.notificationListenersMu.RUnlock()
for _, txAndBlockNotificationListener := range asset.txAndBlockNotificationListeners {
txAndBlockNotificationListener.OnBlockAttached(asset.ID, blockHeight)
}
}