Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve efficiency of triggering the ticket buyer. #480

Merged
merged 2 commits into from Dec 20, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion ticketbuyer.go
Expand Up @@ -219,7 +219,7 @@ func startTicketPurchase(w *wallet.Wallet, dcrdClient *dcrrpcclient.Client,
}
}
quit := make(chan struct{})
n := w.NtfnServer.TransactionNotifications()
n := w.NtfnServer.MainTipChangedNotifications()
pm := ticketbuyer.NewPurchaseManager(w, p, n.C, quit)
go pm.NotificationHandler()
go func() {
Expand Down
13 changes: 9 additions & 4 deletions ticketbuyer/purchase.go
Expand Up @@ -13,13 +13,13 @@ import (
type PurchaseManager struct {
w *wallet.Wallet
purchaser *TicketPurchaser
ntfnChan <-chan *wallet.TransactionNotifications
ntfnChan <-chan *wallet.MainTipChangedNotification
quit chan struct{}
}

// NewPurchaseManager creates a new PurchaseManager.
func NewPurchaseManager(w *wallet.Wallet, purchaser *TicketPurchaser,
ntfnChan <-chan *wallet.TransactionNotifications,
ntfnChan <-chan *wallet.MainTipChangedNotification,
quit chan struct{}) *PurchaseManager {
return &PurchaseManager{
w: w,
Expand Down Expand Up @@ -50,9 +50,14 @@ out:
case v := <-p.ntfnChan:
go func(s1, s2 chan struct{}) {
<-s1 // wait for previous worker to finish
for _, block := range v.AttachedBlocks {
p.purchase(int64(block.Height))

// Purchase tickets for each attached block, not just for the
// update to the main chain. This is probably not optimal but
// it matches how dcrticketbuyer worked.
for h := v.NewHeight - int32(len(v.AttachedBlocks)) + 1; h <= v.NewHeight; h++ {
p.purchase(int64(h))
}

close(s2) // unblock next worker
}(s1, s2)
s1, s2 = s2, make(chan struct{})
Expand Down
41 changes: 33 additions & 8 deletions wallet/chainntfns.go
Expand Up @@ -202,44 +202,60 @@ type sideChainBlock struct {

// switchToSideChain performs a chain switch, switching the main chain to the
// in-memory side chain. The old side chain becomes the new main chain.
func (w *Wallet) switchToSideChain(dbtx walletdb.ReadWriteTx) error {
func (w *Wallet) switchToSideChain(dbtx walletdb.ReadWriteTx) (*MainTipChangedNotification, error) {
addrmgrNs := dbtx.ReadBucket(waddrmgrNamespaceKey)
txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)

sideChain := w.sideChain
if len(sideChain) == 0 {
return errors.New("no side chain to switch to")
return nil, errors.New("no side chain to switch to")
}

sideChainForkHeight := sideChain[0].headerData.SerializedHeader.Height()

// Notify detached blocks for each removed block, in reversed order.
_, tipHeight := w.TxStore.MainChainTip(txmgrNs)

chainTipChanges := &MainTipChangedNotification{
AttachedBlocks: make([]*chainhash.Hash, len(sideChain)),
DetachedBlocks: make([]*chainhash.Hash, tipHeight-sideChainForkHeight+1),
NewHeight: 0, // Must be set by caller before sending
}

// Find hashes of removed blocks for notifications.
for i := tipHeight; i >= sideChainForkHeight; i-- {
hash, err := w.TxStore.GetMainChainBlockHashForHeight(txmgrNs, i)
if err != nil {
return err
return nil, err
}

// DetachedBlocks contains block hashes in order of increasing heights.
chainTipChanges.DetachedBlocks[i-sideChainForkHeight] = &hash

// For transaction notifications, the blocks are notified in reverse
// height order.
w.NtfnServer.notifyDetachedBlock(&hash)
}

// Remove blocks on the current main chain that are at or above the
// height of the block that begins the side chain.
err := w.TxStore.Rollback(txmgrNs, addrmgrNs, sideChainForkHeight)
if err != nil {
return err
return nil, err
}

// Extend the main chain with each sidechain block.
for i := range sideChain {
scBlock := &sideChain[i]
err = w.extendMainChain(dbtx, &scBlock.headerData, scBlock.transactions)
if err != nil {
return err
return nil, err
}

// Add the block hash to the notification.
chainTipChanges.AttachedBlocks[i] = &scBlock.headerData.BlockHash
}

return nil
return chainTipChanges, nil
}

func copyHeaderSliceToArray(array *wtxmgr.RawBlockHeader, slice []byte) error {
Expand All @@ -264,6 +280,8 @@ func (w *Wallet) onBlockConnected(dbtx walletdb.ReadWriteTx, serializedBlockHead
return err
}

var chainTipChanges *MainTipChangedNotification

w.reorganizingLock.Lock()
reorg, reorgToHash := w.reorganizing, w.reorganizeToHash
w.reorganizingLock.Unlock()
Expand All @@ -283,7 +301,7 @@ func (w *Wallet) onBlockConnected(dbtx walletdb.ReadWriteTx, serializedBlockHead
return nil
}

err = w.switchToSideChain(dbtx)
chainTipChanges, err = w.switchToSideChain(dbtx)
if err != nil {
return err
}
Expand All @@ -298,9 +316,15 @@ func (w *Wallet) onBlockConnected(dbtx walletdb.ReadWriteTx, serializedBlockHead
if err != nil {
return err
}
chainTipChanges = &MainTipChangedNotification{
AttachedBlocks: []*chainhash.Hash{&block.BlockHash},
DetachedBlocks: nil,
NewHeight: 0, // set below
}
}

height := int32(blockHeader.Height)
chainTipChanges.NewHeight = height

// Handle automatic ticket purchasing if enabled. This function should
// not error due to an error purchasing tickets (several tickets may be
Expand All @@ -325,6 +349,7 @@ func (w *Wallet) onBlockConnected(dbtx walletdb.ReadWriteTx, serializedBlockHead
"connecting block height %v: %s", height, err.Error())
}

w.NtfnServer.notifyMainChainTipChanged(chainTipChanges)
w.NtfnServer.sendAttachedBlockNotification(dbtx)

return nil
Expand Down
76 changes: 71 additions & 5 deletions wallet/notifications.go
Expand Up @@ -36,11 +36,12 @@ type NotificationServer struct {
transactions []chan *TransactionNotifications
// Coalesce transaction notifications since wallet previously did not add
// mined txs together. Now it does and this can be rewritten.
currentTxNtfn *TransactionNotifications
spentness map[uint32][]chan *SpentnessNotifications
accountClients []chan *AccountNotification
mu sync.Mutex // Only protects registered client channels
wallet *Wallet // smells like hacks
currentTxNtfn *TransactionNotifications
spentness map[uint32][]chan *SpentnessNotifications
accountClients []chan *AccountNotification
tipChangedClients []chan *MainTipChangedNotification
mu sync.Mutex // Only protects registered client channels
wallet *Wallet // smells like hacks
}

func newNotificationServer(wallet *Wallet) *NotificationServer {
Expand Down Expand Up @@ -617,3 +618,68 @@ func (c *AccountNotificationsClient) Done() {
s.mu.Unlock()
}()
}

// MainTipChangedNotification describes processed changes to the main chain tip
// block. Attached and detached blocks are sorted by increasing heights.
//
// This is intended to be a lightweight alternative to TransactionNotifications
// when only info regarding the main chain tip block changing is needed.
type MainTipChangedNotification struct {
AttachedBlocks []*chainhash.Hash
DetachedBlocks []*chainhash.Hash
NewHeight int32
}

// MainTipChangedNotificationsClient receives
// AccountNoMainTipChangedNotifications over the channel C.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo.. fixing.

type MainTipChangedNotificationsClient struct {
C chan *MainTipChangedNotification
server *NotificationServer
}

// MainTipChangedNotifications returns a client for receiving
// MainTipChangedNotification over a channel. The channel is unbuffered. When
// finished, the client's Done method should be called to disassociate the
// client from the server.
func (s *NotificationServer) MainTipChangedNotifications() MainTipChangedNotificationsClient {
c := make(chan *MainTipChangedNotification)
s.mu.Lock()
s.tipChangedClients = append(s.tipChangedClients, c)
s.mu.Unlock()
return MainTipChangedNotificationsClient{
C: c,
server: s,
}
}

// Done deregisters the client from the server and drains any remaining
// messages. It must be called exactly once when the client is finished
// receiving notifications.
func (c *MainTipChangedNotificationsClient) Done() {
go func() {
for range c.C {
}
}()
go func() {
s := c.server
s.mu.Lock()
clients := s.tipChangedClients
for i, ch := range clients {
if c.C == ch {
clients[i] = clients[len(clients)-1]
s.tipChangedClients = clients[:len(clients)-1]
close(ch)
break
}
}
s.mu.Unlock()
}()
}

func (s *NotificationServer) notifyMainChainTipChanged(n *MainTipChangedNotification) {
s.mu.Lock()
for _, c := range s.tipChangedClients {
c <- n
}
s.mu.Unlock()
}