Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use 'generate' instead of 'setgenerate' RPC #93

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ type TxOut struct {
// NewActor creates a new actor which runs its own wallet process connecting
// to the btcd node server specified by node, and listening for simulator
// websocket connections on the specified port.
func NewActor(node *Node, port uint16) (*Actor, error) {
func NewActor(port uint16) (*Actor, error) {
// Please don't run this as root.
if port < 1024 {
return nil, errors.New("invalid actor port")
}

// Set btcwallet node args
args, err := newBtcwalletArgs(port, node.Args.(*btcdArgs))
args, err := newBtcwalletArgs(port)
if err != nil {
return nil, err
}
Expand Down
18 changes: 11 additions & 7 deletions btcwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,13 @@ type btcwalletArgs struct {
}

// newBtcwalletArgs returns a btcwalletArgs with all default values
func newBtcwalletArgs(port uint16, nodeArgs *btcdArgs) (*btcwalletArgs, error) {
func newBtcwalletArgs(port uint16) (*btcwalletArgs, error) {
a := &btcwalletArgs{
RPCListen: fmt.Sprintf("127.0.0.1:%d", port),
RPCConnect: "127.0.0.1:18556",
Username: "user",
Password: "pass",
Certificates: nodeArgs.certificates,
CAFile: CertFile,
RPCListen: fmt.Sprintf("127.0.0.1:%d", port),
RPCConnect: "127.0.0.1:18556",
Username: "user",
Password: "pass",
CAFile: CertFile,

prefix: fmt.Sprintf("actor-%d", port),
exe: "btcwallet",
Expand All @@ -83,6 +82,11 @@ func (a *btcwalletArgs) SetDefaults() error {
return err
}
a.LogDir = logdir
certs, err := ioutil.ReadFile(CertFile)
if err != nil {
return err
}
a.Certificates = certs
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions btcwallet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (
)

func TestnewBtcwalletArgs(t *testing.T) {
btcdArgs, err := newBtcdArgs("node")
args, err := newBtcwalletArgs(18554, btcdArgs)
defer btcdArgs.Cleanup()
args, err := newBtcwalletArgs(18554)
defer args.Cleanup()
if err != nil {
t.Errorf("newBtcwalletArgs error: %v", err)
Expand Down
254 changes: 116 additions & 138 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func (com *Communication) Start(actors []*Actor, node *Node, txCurve map[int32]*
com.wg.Add(1)
go com.Shutdown(miner, actors, node)

log.Printf("%s: Generating %v blocks...", miner, *startBlock)
if err := miner.Generate(uint32(*startBlock) - 1); err != nil {
return
}

return
}

Expand Down Expand Up @@ -428,159 +433,132 @@ func (com *Communication) estimateTpb(tpbChan chan<- int) {
func (com *Communication) Communicate(txCurve map[int32]*Row, miner *Miner, actors []*Actor) {
defer com.wg.Done()

for {
for h, row := range txCurve {
// wait until this block is processed
select {
case h := <-com.height:

// stop simulation if we're at the last block
if h > int32(*stopBlock) {
close(com.exit)
return
}

// disable mining until the required no. of tx are in mempool
if err := miner.StopMining(); err != nil {
close(com.exit)
return
}

// wait until this block is processed
select {
case <-com.blockQueue.processed:
case <-com.exit:
return
}

var wg sync.WaitGroup
// count the number of utxos available in total
var utxoCount int
for _, a := range actors {
utxoCount += len(a.utxoQueue.utxos)
}
case <-com.blockQueue.processed:
case <-com.exit:
return
}

// the required transactions are divided into two groups because we need some of them to
// contribute to the utxo count required for the next block and the rest to contribute to
// the tx count
//
// it is possible to keep dividing the same utxo until it's broken into the required
// number of pieces but we want to stay close to the real world scenario and maximize
// the number of utxos used
//
// E.g: Assume the following CSV
//
// block,utxos,tx
// 20000,40000,20000
// 20001,50000,25000
//
// at block 19999, we need to ensure that next block has 40K utxos
// we have 19999 - blockchain.CoinbaseMaturity = 19899 utxos
// we need to create 40K-19899 = 20101 utxos so in this case, so
// we create 20101 tx which give 1 net utxo output
//
// at block 20000, we need to ensure that next block has 50K utxos
// we already have 40K by the previous iteration, so we need 50-40 = 10K utxos
// we also need to generate 20K tx before the next block, so
// create 10000 tx which generate 1 net utxo plus 10000 tx without any net utxo
//
// since we cannot generate more tx than the no of available utxos, the no of tx
// that can be generated at any iteration is limited by the utxos available

// in case the next row doesn't exist, we initialize the required no of utxos to zero
// so we keep the utxoCount same as current count
next, ok := txCurve[h+2]
if !ok {
next = &Row{}
next.utxoCount = utxoCount
}
var wg sync.WaitGroup
// count the number of utxos available in total
var utxoCount int
for _, a := range actors {
utxoCount += len(a.utxoQueue.utxos)
}

// reqUtxoCount is the number of utxos required
reqUtxoCount := 0
if next.utxoCount > utxoCount {
reqUtxoCount = next.utxoCount - utxoCount
}
// the required transactions are divided into two groups because we need some of them to
// contribute to the utxo count required for the next block and the rest to contribute to
// the tx count
//
// it is possible to keep dividing the same utxo until it's broken into the required
// number of pieces but we want to stay close to the real world scenario and maximize
// the number of utxos used
//
// E.g: Assume the following CSV
//
// block,utxos,tx
// 20000,40000,20000
// 20001,50000,25000
//
// at block 19999, we need to ensure that next block has 40K utxos
// we have 19999 - blockchain.CoinbaseMaturity = 19899 utxos
// we need to create 40K-19899 = 20101 utxos so in this case, so
// we create 20101 tx which give 1 net utxo output
//
// at block 20000, we need to ensure that next block has 50K utxos
// we already have 40K by the previous iteration, so we need 50-40 = 10K utxos
// we also need to generate 20K tx before the next block, so
// create 10000 tx which generate 1 net utxo plus 10000 tx without any net utxo
//
// since we cannot generate more tx than the no of available utxos, the no of tx
// that can be generated at any iteration is limited by the utxos available

// in case the next row doesn't exist, we initialize the required no of utxos to zero
// so we keep the utxoCount same as current count
next, ok := txCurve[h+1]
if !ok {
next = &Row{}
next.utxoCount = utxoCount
}

// in case this row doesn't exist, we initialize the required no of tx to reqUtxoCount
// i.e one tx per utxo required
row, ok := txCurve[h+1]
if !ok {
row = &Row{}
row.txCount = reqUtxoCount
}
// reqUtxoCount is the number of utxos required
reqUtxoCount := 0
if next.utxoCount > utxoCount {
reqUtxoCount = next.utxoCount - utxoCount
}

// reqTxCount is the number of tx that will generate reqUtxoCount
// no of utxos
reqTxCount := row.txCount
if reqTxCount > utxoCount {
log.Printf("Warning: capping no of transactions at %v based on no of available utxos", utxoCount)
// cap the total no of tx at the no of available utxos
reqTxCount = utxoCount
}
// reqTxCount is the number of tx that will generate reqUtxoCount
// no of utxos
reqTxCount := row.txCount
if reqTxCount > utxoCount {
log.Printf("Warning: capping no of transactions at %v based on no of available utxos", utxoCount)
// cap the total no of tx at the no of available utxos
reqTxCount = utxoCount
}

var multiplier, totalUtxos, totalTx int
// skip if we already have more than the no of utxos required
if reqUtxoCount > 0 {
// e.g: if we need 18K utxos in 12K tx
// multiplier = [18000/12000] = [1.5] = 2
// totalUtxos = 18000/2 = 9000
// totalTx = 120000 - 9000 = 3000
multiplier = int(math.Ceil(float64(reqUtxoCount) / float64(reqTxCount)))
if multiplier > *maxSplit {
// cap maximum splits at maxSplit
multiplier = *maxSplit
}
totalUtxos = reqUtxoCount / multiplier
var multiplier, totalUtxos, totalTx int
// skip if we already have more than the no of utxos required
if reqUtxoCount > 0 {
// e.g: if we need 18K utxos in 12K tx
// multiplier = [18000/12000] = [1.5] = 2
// totalUtxos = 18000/2 = 9000
// totalTx = 120000 - 9000 = 3000
multiplier = int(math.Ceil(float64(reqUtxoCount) / float64(reqTxCount)))
if multiplier > *maxSplit {
// cap maximum splits at maxSplit
multiplier = *maxSplit
}
totalUtxos = reqUtxoCount / multiplier
}

// if we're not already covered by the utxo transactions, generate additional tx
if reqTxCount > totalUtxos {
totalTx = reqTxCount - totalUtxos
}
// if we're not already covered by the utxo transactions, generate additional tx
if reqTxCount > totalUtxos {
totalTx = reqTxCount - totalUtxos
}

if reqTxCount > 0 {
log.Printf("Generating %v transactions ...", reqTxCount)
}
if totalTx > 0 {
for i := 0; i < totalTx; i++ {
fmt.Printf("\r%d/%d", i+1, reqTxCount)
a := actors[rand.Int()%len(actors)]
addr := a.ownedAddresses[rand.Int()%len(a.ownedAddresses)]
select {
case com.downstream <- addr:
// For every address sent downstream (one transaction about to happen),
// spawn a goroutine to listen for an accepted transaction in the mempool
wg.Add(1)
go com.txPoolRecv(&wg)
case <-com.exit:
return
}
if reqTxCount > 0 {
log.Printf("Generating %v transactions ...", reqTxCount)
}
if totalTx > 0 {
for i := 0; i < totalTx; i++ {
fmt.Printf("\r%d/%d", i+1, reqTxCount)
a := actors[rand.Int()%len(actors)]
addr := a.ownedAddresses[rand.Int()%len(a.ownedAddresses)]
select {
case com.downstream <- addr:
// For every address sent downstream (one transaction about to happen),
// spawn a goroutine to listen for an accepted transaction in the mempool
wg.Add(1)
go com.txPoolRecv(&wg)
case <-com.exit:
return
}
}
}

if totalUtxos > 0 {
for i := 0; i < totalUtxos; i++ {
fmt.Printf("\r%d/%d", i+totalTx+1, reqTxCount)
select {
case com.split <- multiplier:
// For every address sent downstream (one transaction about to happen),
// spawn a goroutine to listen for an accepted transaction in the mempool
wg.Add(1)
go com.txPoolRecv(&wg)
case <-com.exit:
return
}
if totalUtxos > 0 {
for i := 0; i < totalUtxos; i++ {
fmt.Printf("\r%d/%d", i+totalTx+1, reqTxCount)
select {
case com.split <- multiplier:
// For every address sent downstream (one transaction about to happen),
// spawn a goroutine to listen for an accepted transaction in the mempool
wg.Add(1)
go com.txPoolRecv(&wg)
case <-com.exit:
return
}
}
}

fmt.Printf("\n")
log.Printf("Waiting for miner...")
wg.Wait()
// mine the above tx in the next block
if err := miner.StartMining(); err != nil {
close(com.exit)
return
}
case <-com.exit:
return
fmt.Printf("\n")
log.Printf("Waiting for miner...")
wg.Wait()
// mine the above tx in the next block
if err := miner.Generate(1); err != nil {
close(com.exit)
}
}
}
Expand Down
29 changes: 5 additions & 24 deletions miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ func NewMiner(miningAddrs []btcutil.Address, exit chan struct{},
// send a signal to stop actors. This is used so main can break from
// select and call actor.Stop to stop actors.
OnBlockConnected: func(hash *wire.ShaHash, h int32) {
if h >= int32(*startBlock)-1 {
if height != nil {
height <- h
}
} else {
if h <= int32(*startBlock) {
fmt.Printf("\r%d/%d", h, *startBlock)
}
},
Expand Down Expand Up @@ -107,34 +103,19 @@ func NewMiner(miningAddrs []btcutil.Address, exit chan struct{},
return miner, err
}

// Use just one core for mining.
if err := miner.StartMining(); err != nil {
return miner, err
}

// Register for block notifications.
if err := miner.client.NotifyBlocks(); err != nil {
log.Printf("%s: Cannot register for block notifications: %v", miner, err)
return miner, err
}

log.Printf("%s: Generating %v blocks...", miner, *startBlock)
return miner, nil
}

// StartMining sets the cpu miner to mine coins
func (m *Miner) StartMining() error {
if err := m.client.SetGenerate(true, 1); err != nil {
log.Printf("%s: Cannot start mining: %v", m, err)
return err
}
return nil
}

// StopMining stops the cpu miner from mining coins
func (m *Miner) StopMining() error {
if err := m.client.SetGenerate(false, 0); err != nil {
log.Printf("%s: Cannot stop mining: %v", m, err)
// Generate makes the CPU miner mine the requested number of blocks
func (m *Miner) Generate(numBlocks uint32) error {
if _, err := m.client.Generate(numBlocks); err != nil {
log.Printf("%s: Cannot generate %d blocks: %v", m, numBlocks, err)
return err
}
return nil
Expand Down
Loading