Skip to content

Commit

Permalink
get messages via method in kaspanet#2027
Browse files Browse the repository at this point in the history
  • Loading branch information
D-Stacks committed May 11, 2022
1 parent 69f4059 commit 501be3d
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 60 deletions.
2 changes: 1 addition & 1 deletion app/rpc/rpccontext/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewContext(cfg *config.Config,
UTXOIndex: utxoIndex,
ShutDownChan: shutDownChan,
}
context.NotificationManager = NewNotificationManager()
context.NotificationManager = NewNotificationManager(cfg.ActiveNetParams)

return context
}
41 changes: 37 additions & 4 deletions app/rpc/rpccontext/notificationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package rpccontext
import (
"sync"

"github.com/kaspanet/kaspad/domain/dagconfig"

"github.com/kaspanet/kaspad/domain/consensus/utils/txscript"

"github.com/kaspanet/kaspad/app/appmessage"
"github.com/kaspanet/kaspad/domain/utxoindex"
routerpkg "github.com/kaspanet/kaspad/infrastructure/network/netadapter/router"
Expand All @@ -13,6 +17,7 @@ import (
type NotificationManager struct {
sync.RWMutex
listeners map[*routerpkg.Router]*NotificationListener
params *dagconfig.Params
}

// UTXOsChangedNotificationAddress represents a kaspad address.
Expand All @@ -24,6 +29,8 @@ type UTXOsChangedNotificationAddress struct {

// NotificationListener represents a registered RPC notification listener
type NotificationListener struct {
params *dagconfig.Params

propagateBlockAddedNotifications bool
propagateVirtualSelectedParentChainChangedNotifications bool
propagateFinalityConflictNotifications bool
Expand All @@ -39,8 +46,9 @@ type NotificationListener struct {
}

// NewNotificationManager creates a new NotificationManager
func NewNotificationManager() *NotificationManager {
func NewNotificationManager(params *dagconfig.Params) *NotificationManager {
return &NotificationManager{
params: params,
listeners: make(map[*routerpkg.Router]*NotificationListener),
}
}
Expand All @@ -50,7 +58,7 @@ func (nm *NotificationManager) AddListener(router *routerpkg.Router) {
nm.Lock()
defer nm.Unlock()

listener := newNotificationListener()
listener := newNotificationListener(nm.params)
nm.listeners[router] = listener
}

Expand Down Expand Up @@ -265,8 +273,10 @@ func (nm *NotificationManager) NotifyPruningPointUTXOSetOverride() error {
return nil
}

func newNotificationListener() *NotificationListener {
func newNotificationListener(params *dagconfig.Params) *NotificationListener {
return &NotificationListener{
params: params,

propagateBlockAddedNotifications: false,
propagateVirtualSelectedParentChainChangedNotifications: false,
propagateFinalityConflictNotifications: false,
Expand Down Expand Up @@ -360,7 +370,7 @@ func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
notification.Removed = append(notification.Removed, utxosByAddressesEntries...)
}
}
} else {
} else if addressesSize > 0 {
for _, listenerAddress := range nl.propagateUTXOsChangedNotificationAddresses {
listenerScriptPublicKeyString := listenerAddress.ScriptPublicKeyString
if addedPairs, ok := utxoChanges.Added[listenerScriptPublicKeyString]; ok {
Expand All @@ -372,11 +382,34 @@ func (nl *NotificationListener) convertUTXOChangesToUTXOsChangedNotification(
notification.Removed = append(notification.Removed, utxosByAddressesEntries...)
}
}
} else {
for scriptPublicKeyString, addedPairs := range utxoChanges.Added {
addressString := nl.scriptPubKeyStringToAddressString(scriptPublicKeyString)
utxosByAddressesEntries := ConvertUTXOOutpointEntryPairsToUTXOsByAddressesEntries(addressString, addedPairs)
notification.Added = append(notification.Added, utxosByAddressesEntries...)
}
for scriptPublicKeyString, removedOutpoints := range utxoChanges.Removed {
addressString := nl.scriptPubKeyStringToAddressString(scriptPublicKeyString)
utxosByAddressesEntries := convertUTXOOutpointsToUTXOsByAddressesEntries(addressString, removedOutpoints)
notification.Removed = append(notification.Removed, utxosByAddressesEntries...)
}

}

return notification
}

func (nl *NotificationListener) scriptPubKeyStringToAddressString(scriptPublicKeyString utxoindex.ScriptPublicKeyString) string {
scriptPubKey := utxoindex.ConvertStringToScriptPublicKey(scriptPublicKeyString)
_, address, err := txscript.ExtractScriptPubKeyAddress(scriptPubKey, nl.params)
var addressString string
if err != nil { // This just means the script is not standard
addressString = ""
}
addressString = address.String()
return addressString
}

// PropagateVirtualSelectedParentBlueScoreChangedNotifications instructs the listener to send
// virtual selected parent blue score notifications to the remote listener
func (nl *NotificationListener) PropagateVirtualSelectedParentBlueScoreChangedNotifications() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/kaspawallet/daemon/server/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
maturity := s.params.BlockCoinbaseMaturity

balancesMap := make(balancesMapType, 0)
for _, entry := range s.utxosSortedByAmount {
for _, entry := range s.utxosSortedByAmount() {
amount := entry.UTXOEntry.Amount()
address := entry.address
balances, ok := balancesMap[address]
Expand Down
10 changes: 5 additions & 5 deletions cmd/kaspawallet/daemon/server/create_unsigned_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func (s *server) createUnsignedTransactions(address string, amount uint64, fromA
return nil, errors.New("server is not synced")
}

err := s.refreshUTXOs()
if err != nil {
return nil, err
}
//err := s.refreshUTXOs()
//if err != nil {
// return nil, err
//}

toAddress, err := util.DecodeAddress(address, s.params.Prefix)
if err != nil {
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *server) selectUTXOs(spendAmount uint64, feePerInput uint64, fromAddress
return nil, 0, err
}

for _, utxo := range s.availableUtxosSortedByAmount {
for _, utxo := range s.availableUtxosSortedByAmount() {
if (fromAddresses != nil && !slices.Contains(fromAddresses, utxo.address)) ||
!isUTXOSpendable(utxo, dagInfo.VirtualDAAScore, s.params.BlockCoinbaseMaturity) {
continue
Expand Down
36 changes: 17 additions & 19 deletions cmd/kaspawallet/daemon/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ type server struct {
rpcClient *rpcclient.RPCClient
params *dagconfig.Params

lock sync.RWMutex
utxosSortedByAmount []*walletUTXO
availableUtxosSortedByAmount []*walletUTXO
nextSyncStartIndex uint32
keysFile *keys.File
shutdown chan struct{}
addressSet walletAddressSet
tracker *Tracker
txMassCalculator *txmass.Calculator
lock sync.RWMutex
utxoSet walletUTXOSet
nextSyncStartIndex uint32
keysFile *keys.File
shutdown chan struct{}
addressSet walletAddressSet
tracker *Tracker
txMassCalculator *txmass.Calculator
}

// Start starts the kaspawalletd server
Expand Down Expand Up @@ -67,16 +66,15 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
}

serverInstance := &server{
rpcClient: rpcClient,
params: params,
utxosSortedByAmount: []*walletUTXO{},
availableUtxosSortedByAmount: []*walletUTXO{},
nextSyncStartIndex: 0,
keysFile: keysFile,
shutdown: make(chan struct{}),
addressSet: make(walletAddressSet),
tracker: NewTracker(),
txMassCalculator: txmass.NewCalculator(params.MassPerTxByte, params.MassPerScriptPubKeyByte, params.MassPerSigOp),
rpcClient: rpcClient,
params: params,
utxoSet: make(walletUTXOSet),
nextSyncStartIndex: 0,
keysFile: keysFile,
shutdown: make(chan struct{}),
addressSet: make(walletAddressSet),
tracker: NewTracker(),
txMassCalculator: txmass.NewCalculator(params.MassPerTxByte, params.MassPerScriptPubKeyByte, params.MassPerSigOp),
}

spawn("serverInstance.sync", func() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/kaspawallet/daemon/server/split_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (s *server) moreUTXOsForMergeTransaction(alreadySelectedUTXOs []*libkaspawa
alreadySelectedUTXOsMap[*alreadySelectedUTXO.Outpoint] = struct{}{}
}

for _, utxo := range s.availableUtxosSortedByAmount {
for _, utxo := range s.availableUtxosSortedByAmount() {
if _, ok := alreadySelectedUTXOsMap[*utxo.Outpoint]; ok {
continue
}
Expand Down
109 changes: 80 additions & 29 deletions cmd/kaspawallet/daemon/server/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"

"github.com/kaspanet/kaspad/app/appmessage"
"github.com/pkg/errors"
Expand All @@ -14,6 +15,31 @@ import (
var keyChains = []uint8{libkaspawallet.ExternalKeychain, libkaspawallet.InternalKeychain}

type walletAddressSet map[string]*walletAddress
type walletUTXOSet map[externalapi.DomainOutpoint]*walletUTXO

func (s *server) utxosSortedByAmount() []*walletUTXO {
utxos := make([]*walletUTXO, len(s.utxoSet))
i := 0
for _, walletUtxo := range s.utxoSet {
utxos[i] = walletUtxo
i = i + 1
}

sort.Slice(utxos, func(i, j int) bool { return utxos[i].UTXOEntry.Amount() > utxos[j].UTXOEntry.Amount() })
return utxos
}

func (s *server) availableUtxosSortedByAmount() []*walletUTXO {
utxos := make([]*walletUTXO, 0)
for _, walletUtxo := range s.utxoSet {
if s.tracker.isOutpointAvailable(walletUtxo.Outpoint) {
utxos = append(utxos, walletUtxo)
}
}

sort.Slice(utxos, func(i, j int) bool { return utxos[i].UTXOEntry.Amount() > utxos[j].UTXOEntry.Amount() })
return utxos
}

func (was walletAddressSet) strings() []string {
addresses := make([]string, 0, len(was))
Expand All @@ -36,6 +62,45 @@ func (s *server) onChainChanged(notification *appmessage.VirtualSelectedParentCh
}
}

func (s *server) onUtxoChange(notification *appmessage.UTXOsChangedNotificationMessage) {
s.lock.Lock()
defer s.lock.Unlock()
fmt.Println("chain changed", len(notification.Added))
for _, utxoRemoved := range notification.Removed {
_, found := s.addressSet[utxoRemoved.Address]
if found {
outpoint, err := appmessage.RPCOutpointToDomainOutpoint(utxoRemoved.Outpoint)
if err != nil {
log.Warn(err)
}

delete(s.utxoSet, *outpoint)
}

}
for _, utxoAdded := range notification.Added {
address, ok := s.addressSet[utxoAdded.Address]
if !ok {
continue
}
outpoint, err := appmessage.RPCOutpointToDomainOutpoint(utxoAdded.Outpoint)
if err != nil {
log.Warn(err)
}

utxoEntry, err := appmessage.RPCUTXOEntryToUTXOEntry(utxoAdded.UTXOEntry)
if err != nil {
log.Warn(err)
}

s.utxoSet[*outpoint] = &walletUTXO{
Outpoint: outpoint,
UTXOEntry: utxoEntry,
address: address,
}
}
}

func (s *server) intialize() error {
err := s.collectRecentAddresses()
if err != nil {
Expand All @@ -52,6 +117,13 @@ func (s *server) intialize() error {
return err
}

emptyStringSlice := []string{}

err = s.rpcClient.RegisterForUTXOsChangedNotifications(emptyStringSlice, s.onUtxoChange)
if err != nil {
return err
}

err = s.intializeMempool()
if err != nil {
return err
Expand Down Expand Up @@ -240,12 +312,10 @@ func (s *server) refreshExistingUTXOsWithLock() error {

// updateUTXOSet clears the current UTXO set, and re-fills it with the given entries
func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry) error {
utxos := make([]*walletUTXO, len(entries))

s.tracker.untrackExpiredOutpointsAsReserved() //untrack all stale reserved outpoints, before comparing in loop
availableUtxos := make([]*walletUTXO, 0)
newWalletUTXOSet := make(walletUTXOSet)

for i, entry := range entries {
for _, entry := range entries {
outpoint, err := appmessage.RPCOutpointToDomainOutpoint(entry.Outpoint)
if err != nil {
return err
Expand All @@ -260,37 +330,18 @@ func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry) erro
if !ok {
return errors.Errorf("Got result from address %s even though it wasn't requested", entry.Address)
}
utxos[i] = &walletUTXO{
newWalletUTXOSet[*outpoint] = &walletUTXO{
Outpoint: outpoint,
UTXOEntry: utxoEntry,
address: address,
}
if s.tracker.isOutpointAvailable(outpoint) {
availableUtxos = append(availableUtxos, &walletUTXO{
Outpoint: outpoint,
UTXOEntry: utxoEntry,
address: address,
})
}
}
s.utxoSet = newWalletUTXOSet

sort.Slice(utxos, func(i, j int) bool { return utxos[i].UTXOEntry.Amount() > utxos[j].UTXOEntry.Amount() })

s.utxosSortedByAmount = utxos

sort.Slice(availableUtxos, func(i, j int) bool {
return availableUtxos[i].UTXOEntry.Amount() > availableUtxos[j].UTXOEntry.Amount()
})

s.availableUtxosSortedByAmount = availableUtxos

fmt.Println("utxos total", len(s.utxosSortedByAmount))
fmt.Println("utxos available", len(s.availableUtxosSortedByAmount))
fmt.Println("utxos reserved", len(s.tracker.reservedOutpoints))
fmt.Println("transactions", len(s.tracker.sentTransactions))
fmt.Println("utxos in mempool", s.tracker.countOutpointsInmempool())

//s.tracker.untrackOutpointDifferenceViaWalletUTXOs(utxos) //clean up reserved tracker
fmt.Println("total", len(s.utxoSet))
fmt.Println("reserved", len(s.tracker.reservedOutpoints))
fmt.Println("followed txIDS", len(s.tracker.sentTransactions))
fmt.Println("UtxosSent", s.tracker.countOutpointsInmempool())

return nil
}
Expand Down

0 comments on commit 501be3d

Please sign in to comment.