Skip to content

Commit

Permalink
notification: refactor dcrd notification pipeline
Browse files Browse the repository at this point in the history
The block, tx, reorg, etc updates from dcrd were being routed
through a series of channels that made the code a bit tedious.
These changes are a simplification, but should reproduce the
established behavior exactly. The 2nd level channels were dropped
in favor of registered functions. There is a still a queue loop
that handles top-level synchronization, but packages' handlers no
longer requires an additional goroutine or explicit synchronization
measures.

Handlers are registered in groups. The groups themselves are run
sequentially, but the functions within a group are run async,
allowing a high degree of sychronization control.
  • Loading branch information
buck54321 authored and chappjc committed Jun 13, 2019
1 parent 6b3a6a5 commit 856d046
Show file tree
Hide file tree
Showing 22 changed files with 1,083 additions and 1,276 deletions.
21 changes: 10 additions & 11 deletions api/apiroutes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/decred/dcrdata/gov/agendas"
m "github.com/decred/dcrdata/middleware"
"github.com/decred/dcrdata/txhelpers"
notify "github.com/decred/dcrdata/v5/notification"
appver "github.com/decred/dcrdata/v5/version"
)

Expand Down Expand Up @@ -91,6 +90,7 @@ type DataSourceLite interface {
SendRawTransaction(txhex string) (string, error)
GetExplorerAddress(address string, count, offset int64) (*dbtypes.AddressInfo, txhelpers.AddressType, txhelpers.AddressError)
GetMempoolPriceCountTime() *apitypes.PriceCountTime
UpdateChan() chan uint32
}

// DataSourceAux specifies an interface for advanced data collection using the
Expand Down Expand Up @@ -211,9 +211,16 @@ func (c *appContext) updateNodeConnections() error {
return nil
}

// UpdateNodeHeight updates the Status height. This method satisfies
// notification.BlockHandlerLite.
func (c *appContext) UpdateNodeHeight(height uint32, _ string) error {
c.Status.SetHeight(height)
return nil
}

// StatusNtfnHandler keeps the appContext's Status up-to-date with changes in
// node and DB status.
func (c *appContext) StatusNtfnHandler(ctx context.Context, wg *sync.WaitGroup) {
func (c *appContext) StatusNtfnHandler(ctx context.Context, wg *sync.WaitGroup, wireHeightChan chan uint32) {
defer wg.Done()
// Check the node connection count periodically.
rpcCheckTicker := time.NewTicker(5 * time.Second)
Expand All @@ -227,15 +234,7 @@ out:
break keepon
}

case height, ok := <-notify.NtfnChans.UpdateStatusNodeHeight:
if !ok {
log.Warnf("Block connected channel closed.")
break out
}

c.Status.SetHeight(height)

case height, ok := <-notify.NtfnChans.UpdateStatusDBHeight:
case height, ok := <-wireHeightChan:
if !ok {
log.Warnf("Block connected channel closed.")
break out
Expand Down
72 changes: 36 additions & 36 deletions api/insight/socket.io.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type NewTx struct {

// NewSocketServer constructs a new SocketServer, registering handlers for the
// "connection", "disconnection", and "subscribe" events.
func NewSocketServer(newTxChan chan *NewTx, params *chaincfg.Params) (*SocketServer, error) {
func NewSocketServer(params *chaincfg.Params) (*SocketServer, error) {
server, err := socketio.NewServer(nil)
if err != nil {
apiLog.Errorf("Could not create socket.io server: %v", err)
Expand Down Expand Up @@ -133,7 +133,6 @@ func NewSocketServer(newTxChan chan *NewTx, params *chaincfg.Params) (*SocketSer
params: params,
watchedAddresses: addrs,
}
go sockServ.sendNewTx(newTxChan)
return &sockServ, nil
}

Expand Down Expand Up @@ -174,42 +173,43 @@ func (soc *SocketServer) Store(blockData *blockdata.BlockData, msgBlock *wire.Ms
return nil
}

func (soc *SocketServer) sendNewTx(newTxChan chan *NewTx) {
for {
ntx, ok := <-newTxChan
if !ok {
break
}
msgTx, err := txhelpers.MsgTxFromHex(ntx.Hex)
if err != nil {
continue
}
hash := msgTx.TxHash().String()
var vouts []InsightSocketVout
var total int64
for i, v := range msgTx.TxOut {
total += v.Value
if len(ntx.Vouts[i].ScriptPubKey.Addresses) != 0 {
soc.watchedAddresses.RLock()
for _, address := range ntx.Vouts[i].ScriptPubKey.Addresses {
if _, ok := soc.watchedAddresses.c[address]; ok {
soc.BroadcastTo(address, address, hash)
}
vouts = append(vouts, InsightSocketVout{
Address: address,
Value: v.Value,
})
// SendNewTx prepares a dcrd mempool tx for broadcast. This method satisfies
// notification.TxHandler and is registered as a handler in main.go.
func (soc *SocketServer) SendNewTx(rawTx *dcrjson.TxRawResult) error {
ntx := NewTx{
Hex: rawTx.Hex,
Vouts: rawTx.Vout,
}
msgTx, err := txhelpers.MsgTxFromHex(rawTx.Hex)
if err != nil {
return err
}
hash := msgTx.TxHash().String()
var vouts []InsightSocketVout
var total int64
for i, v := range msgTx.TxOut {
total += v.Value
if len(ntx.Vouts[i].ScriptPubKey.Addresses) != 0 {
soc.watchedAddresses.RLock()
for _, address := range ntx.Vouts[i].ScriptPubKey.Addresses {
if _, ok := soc.watchedAddresses.c[address]; ok {
soc.BroadcastTo(address, address, hash)
}
soc.watchedAddresses.RUnlock()
vouts = append(vouts, InsightSocketVout{
Address: address,
Value: v.Value,
})
}
soc.watchedAddresses.RUnlock()
}
tx := WebSocketTx{
Hash: hash,
Size: len(ntx.Hex) / 2,
TotalOut: total,
Vouts: vouts,
}
apiLog.Tracef("Sending new websocket tx %s", hash)
soc.BroadcastTo("inv", "tx", tx)
}
tx := WebSocketTx{
Hash: hash,
Size: len(ntx.Hex) / 2,
TotalOut: total,
Vouts: vouts,
}
apiLog.Tracef("Sending new websocket tx %s", hash)
soc.BroadcastTo("inv", "tx", tx)
return nil
}
163 changes: 35 additions & 128 deletions blockdata/chainmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,18 @@ type chainMonitor struct {
collector *Collector
dataSavers []BlockDataSaver
reorgDataSavers []BlockDataSaver
wg *sync.WaitGroup
watchaddrs map[string]txhelpers.TxAction
blockChan chan *chainhash.Hash
recvTxBlockChan chan *txhelpers.BlockWatchedTx
reorgChan chan *txhelpers.ReorgData
reorgLock sync.Mutex
}

// NewChainMonitor creates a new chainMonitor.
func NewChainMonitor(ctx context.Context, collector *Collector, savers []BlockDataSaver,
reorgSavers []BlockDataSaver, wg *sync.WaitGroup, addrs map[string]txhelpers.TxAction,
recvTxBlockChan chan *txhelpers.BlockWatchedTx, reorgChan chan *txhelpers.ReorgData) *chainMonitor {
reorgSavers []BlockDataSaver) *chainMonitor {

return &chainMonitor{
ctx: ctx,
collector: collector,
dataSavers: savers,
reorgDataSavers: reorgSavers,
wg: wg,
watchaddrs: addrs,
recvTxBlockChan: recvTxBlockChan,
reorgChan: reorgChan,
}
}

Expand All @@ -56,22 +47,6 @@ func (p *chainMonitor) collect(hash *chainhash.Hash) (*wire.MsgBlock, *BlockData
height := block.Height()
log.Infof("Block height %v connected. Collecting data...", height)

if len(p.watchaddrs) > 0 {
// txsForOutpoints := blockConsumesOutpointWithAddresses(block, p.watchaddrs,
// p.collector.dcrdChainSvr)
// if len(txsForOutpoints) > 0 {
// p.spendTxBlockChan <- &BlockWatchedTx{height, txsForOutpoints}
// }

txsForAddrs := txhelpers.BlockReceivesToAddresses(block,
p.watchaddrs, p.collector.netParams)
if len(txsForAddrs) > 0 {
p.recvTxBlockChan <- &txhelpers.BlockWatchedTx{
BlockHeight: height,
TxsForAddress: txsForAddrs}
}
}

// Get node's best block height to see if the block for which we are
// collecting data is the best block.
chainHeight, err := p.collector.dcrdChainSvr.GetBlockCount()
Expand Down Expand Up @@ -102,14 +77,17 @@ func (p *chainMonitor) collect(hash *chainhash.Hash) (*wire.MsgBlock, *BlockData
}

// ConnectBlock is a sychronous version of BlockConnectedHandler that collects
// and stores data for a block specified by the given hash.
func (p *chainMonitor) ConnectBlock(hash *chainhash.Hash) error {
// and stores data for a block specified by the given hash. ConnectBlock
// satisfies notification.BlockHandler, and is registered as a handler in
// main.go.
func (p *chainMonitor) ConnectBlock(header *wire.BlockHeader) error {
// Do not handle reorg and block connects simultaneously.
hash := header.BlockHash()
p.reorgLock.Lock()
defer p.reorgLock.Unlock()

// Collect block data.
msgBlock, blockData, err := p.collect(hash)
msgBlock, blockData, err := p.collect(&hash)
if err != nil {
return err
}
Expand All @@ -127,110 +105,39 @@ func (p *chainMonitor) ConnectBlock(hash *chainhash.Hash) error {
return err
}

// SetNewBlockChan specifies the new-block channel to be used by
// BlockConnectedHandler. Note that BlockConnectedHandler is not required if
// using a direct call to ConnectBlock.
func (p *chainMonitor) SetNewBlockChan(blockChan chan *chainhash.Hash) {
p.blockChan = blockChan
}

// BlockConnectedHandler handles block connected notifications, which trigger
// data collection and storage.
func (p *chainMonitor) BlockConnectedHandler() {
defer p.wg.Done()
out:
for {
keepon:
select {
case hash, ok := <-p.blockChan:
if !ok {
log.Warnf("Block connected channel closed.")
break out
}

// Do not handle reorg and block connects simultaneously.
p.reorgLock.Lock()
// Collect block data.
msgBlock, blockData, err := p.collect(hash)
p.reorgLock.Unlock()
if err != nil {
log.Errorf("Failed to collect data for block %v: %v", hash, err)
break keepon
}

// Store block data with each saver.
for _, s := range p.dataSavers {
if s != nil {
// Save data to wherever the saver wants to put it.
if err = s.Store(blockData, msgBlock); err != nil {
log.Errorf("(%v).Store failed: %v", reflect.TypeOf(s), err)
}
}
}

case <-p.ctx.Done():
log.Debugf("Got quit signal. Exiting block connected handler.")
break out
}
// ReorgHandler processes a chain reorg. A reorg is handled in blockdata by
// simply collecting data for the new best block, and storing it in the
// *reorgDataSavers*.
func (p *chainMonitor) ReorgHandler(reorg *txhelpers.ReorgData) error {
if reorg == nil {
return fmt.Errorf("nil reorg data received!")
}

}

// ReorgHandler receives notification of a chain reorganization. A reorg is
// handled in blockdata by simply collecting data for the new best block, and
// storing it in the *reorgDataSavers*.
func (p *chainMonitor) ReorgHandler() {
defer p.wg.Done()
out:
for {
keepon:
select {
case reorgData, ok := <-p.reorgChan:
if !ok {
log.Warnf("Reorg channel closed.")
break out
}
if reorgData == nil {
log.Warnf("nil reorg data received!")
break keepon
}

newHeight := reorgData.NewChainHeight
newHash := reorgData.NewChainHead

// Do not handle reorg and block connects simultaneously.
p.reorgLock.Lock()
newHeight := reorg.NewChainHeight
newHash := reorg.NewChainHead

log.Infof("Reorganize signaled to blockdata. "+
"Collecting data for NEW head block %v at height %d.",
newHash, newHeight)
// Do not handle reorg and block connects simultaneously.
p.reorgLock.Lock()
defer p.reorgLock.Unlock()
log.Infof("Reorganize signaled to blockdata. "+
"Collecting data for NEW head block %v at height %d.",
newHash, newHeight)

// Collect data for the new best block.
msgBlock, blockData, err := p.collect(&newHash)
if err != nil {
log.Errorf("ReorgHandler: Failed to collect data for block %v: %v", newHash, err)
p.reorgLock.Unlock()
reorgData.WG.Done()
break keepon
}
// Collect data for the new best block.
msgBlock, blockData, err := p.collect(&newHash)
if err != nil {
reorg.WG.Done()
return fmt.Errorf("ReorgHandler: Failed to collect data for block %v: %v", newHash, err)
}

// Store block data with each REORG saver.
for _, s := range p.reorgDataSavers {
if s != nil {
// Save data to wherever the saver wants to put it.
if err := s.Store(blockData, msgBlock); err != nil {
log.Errorf("(%v).Store failed: %v", reflect.TypeOf(s), err)
}
}
// Store block data with each REORG saver.
for _, s := range p.reorgDataSavers {
if s != nil {
// Save data to wherever the saver wants to put it.
if err := s.Store(blockData, msgBlock); err != nil {
return fmt.Errorf("(%v).Store failed: %v", reflect.TypeOf(s), err)
}

p.reorgLock.Unlock()

reorgData.WG.Done()

case <-p.ctx.Done():
log.Debugf("Got quit signal. Exiting reorg notification handler.")
break out
}
}
return nil
}
2 changes: 1 addition & 1 deletion cmd/rebuilddb/rebuilddb.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func mainCore() int {

// Sqlite output
dbInfo := dcrsqlite.DBInfo{FileName: cfg.DBFileName}
sqliteDB, err := dcrsqlite.InitWiredDB(&dbInfo, stakeDB, nil, client,
sqliteDB, err := dcrsqlite.InitWiredDB(&dbInfo, stakeDB, client,
activeChain, func() {})
if err != nil {
log.Errorf("Unable to initialize SQLite database: %v", err)
Expand Down
Loading

0 comments on commit 856d046

Please sign in to comment.