Skip to content

Commit

Permalink
Restore peer node
Browse files Browse the repository at this point in the history
  • Loading branch information
tuxcanfly committed May 21, 2015
1 parent 1c8a768 commit 0789db0
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 36 deletions.
41 changes: 18 additions & 23 deletions comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,10 @@ func NewCommunication() *Communication {

// Start handles the main part of a simulation by starting
// all the necessary goroutines.
func (com *Communication) Start(actors []*Actor, txCurve map[int32]*Row) (tpsChan chan float64, tpbChan chan int) {
func (com *Communication) Start(actors []*Actor, node *Node, txCurve map[int32]*Row) (tpsChan chan float64, tpbChan chan int) {
tpsChan = make(chan float64, 1)
tpbChan = make(chan int, 1)

// Start a miner process to generate actor addrs
miner, err := NewMiner(nil, com.exit, com.height, com.txpool,
com.blockQueue.enqueue, com.timeReceived)
if err != nil {
close(com.exit)
com.wg.Add(1)
go com.Shutdown(miner, actors)
return
}

// Start actors
for _, a := range actors {
com.wg.Add(1)
Expand All @@ -111,6 +101,7 @@ func (com *Communication) Start(actors []*Actor, txCurve map[int32]*Row) (tpsCha
if err := a.Start(os.Stderr, os.Stdout, com); err != nil {
log.Printf("%s: Cannot start actor: %v", a, err)
a.Shutdown()
node.Shutdown()
}
}(a, com)
}
Expand All @@ -135,20 +126,20 @@ func (com *Communication) Start(actors []*Actor, txCurve map[int32]*Row) (tpsCha
}
}

// Re-start the miner with the mining addresses
if err := miner.Stop(); err != nil {
log.Printf("%s: Cannot stop miner: %v", miner, err)
return
}
miner, err = NewMiner(miningAddrs, com.exit, com.height, com.txpool,
com.blockQueue.enqueue, com.timeReceived)
// Start mining.
miner, err := NewMiner(miningAddrs, com.exit, com.height, com.txpool)
if err != nil {
close(com.exit)
close(tpsChan)
close(tpbChan)
com.wg.Add(1)
go com.Shutdown(miner, actors)
go com.Shutdown(miner, actors, node)
return
}

// Add mining node listen interface as a node
node.client.AddNode("localhost:18550", rpc.ANAdd)

// Start a goroutine to estimate tps
com.wg.Add(1)
go com.estimateTps(tpsChan, txCurve)
Expand All @@ -165,16 +156,17 @@ func (com *Communication) Start(actors []*Actor, txCurve map[int32]*Row) (tpsCha
go com.queueBlocks()

com.wg.Add(1)
go com.poolUtxos(miner.client, actors)
go com.poolUtxos(node.client, actors)

// Start a goroutine for shuting down the simulation when appropriate
com.wg.Add(1)
go com.Shutdown(miner, actors)
go com.Shutdown(miner, actors, node)

log.Printf("%s: Generating %v blocks...", miner, *startBlock)
if err := miner.Generate(uint32(*startBlock) - 1); err != nil {
return
}
log.Printf("%s: Generating %v blocks...", miner, *startBlock)

return
}

Expand Down Expand Up @@ -581,7 +573,7 @@ func (com *Communication) Communicate(txCurve map[int32]*Row, miner *Miner, acto

// Shutdown shuts down the simulation by killing the mining and the
// initial node processes and shuts down all actors.
func (com *Communication) Shutdown(miner *Miner, actors []*Actor) {
func (com *Communication) Shutdown(miner *Miner, actors []*Actor, node *Node) {
defer com.wg.Done()

<-com.exit
Expand All @@ -591,6 +583,9 @@ func (com *Communication) Shutdown(miner *Miner, actors []*Actor) {
for _, a := range actors {
a.Shutdown()
}
if node != nil {
node.Shutdown()
}
}

// WaitForShutdown waits until every goroutine inside com.Start
Expand Down
17 changes: 5 additions & 12 deletions miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main
import (
"fmt"
"log"
"time"

"github.com/btcsuite/btcd/wire"
rpc "github.com/btcsuite/btcrpcclient"
Expand All @@ -35,22 +34,13 @@ type Miner struct {
// NewMiner starts a cpu-mining enabled btcd instane and returns an rpc client
// to control it.
func NewMiner(miningAddrs []btcutil.Address, exit chan struct{},
height chan<- int32, txpool chan<- struct{}, enqueueBlock chan<- *Block,
timeReceived chan<- time.Time) (*Miner, error) {
height chan<- int32, txpool chan<- struct{}) (*Miner, error) {

ntfnHandlers := &rpc.NotificationHandlers{
// When a block higher than stopBlock connects to the chain,
// send a signal to stop actors. This is used so main can break from
// select and call actor.Stop to stop actors.
OnBlockConnected: func(hash *wire.ShaHash, h int32) {
block := &Block{
hash: hash,
height: h,
}
select {
case enqueueBlock <- block:
case <-exit:
}
if h <= int32(*startBlock) {
fmt.Printf("\r%d/%d", h, *startBlock)
}
Expand All @@ -64,7 +54,6 @@ func NewMiner(miningAddrs []btcutil.Address, exit chan struct{},
// required no of tx and receiving all of them
txpool <- struct{}{}
}
timeReceived <- time.Now()
},
}

Expand All @@ -74,6 +63,10 @@ func NewMiner(miningAddrs []btcutil.Address, exit chan struct{},
return nil, err
}

// set miner args - it listens on a different port
// because a node is already running on the default port
args.Listen = "127.0.0.1:18550"
args.RPCListen = "127.0.0.1:18551"
// need to log mining details, so set debuglevel
args.DebugLevel = "MINR=trace"
// if passed, set blockmaxsize to allow mining large blocks
Expand Down
59 changes: 58 additions & 1 deletion simulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"log"
"math"
"os"
"time"

"github.com/btcsuite/btcd/wire"
rpc "github.com/btcsuite/btcrpcclient"
"github.com/btcsuite/btcutil"
)

// MissingCertPairFile is raised when one of the cert pair files is missing
Expand Down Expand Up @@ -131,6 +136,58 @@ func (s *Simulation) Start() error {
}
}

ntfnHandlers := &rpc.NotificationHandlers{
OnBlockConnected: func(hash *wire.ShaHash, height int32) {
block := &Block{
hash: hash,
height: height,
}
select {
case s.com.blockQueue.enqueue <- block:
case <-s.com.exit:
}
},
OnTxAccepted: func(hash *wire.ShaHash, amount btcutil.Amount) {
s.com.timeReceived <- time.Now()
},
}

log.Println("Starting node on simnet...")
args, err := newBtcdArgs("node")
if err != nil {
log.Printf("Cannot create node args: %v", err)
return err
}
logFile, err := getLogFile(args.prefix)
if err != nil {
log.Printf("Cannot get log file, logging disabled: %v", err)
}
node, err := NewNodeFromArgs(args, ntfnHandlers, logFile)
if err != nil {
log.Printf("%s: Cannot create node: %v", node, err)
return err
}
if err := node.Start(); err != nil {
log.Printf("%s: Cannot start node: %v", node, err)
return err
}
if err := node.Connect(); err != nil {
log.Printf("%s: Cannot connect to node: %v", node, err)
return err
}

// Register for block notifications.
if err := node.client.NotifyBlocks(); err != nil {
log.Printf("%s: Cannot register for block notifications: %v", node, err)
return err
}

// Register for transaction notifications
if err := node.client.NotifyNewTransactions(false); err != nil {
log.Printf("%s: Cannot register for transactions notifications: %v", node, err)
return err
}

for i := 0; i < *numActors; i++ {
a, err := NewActor(uint16(18557 + i))
if err != nil {
Expand All @@ -146,7 +203,7 @@ func (s *Simulation) Start() error {
})

// Start simulation.
tpsChan, tpbChan := s.com.Start(s.actors, s.txCurve)
tpsChan, tpbChan := s.com.Start(s.actors, node, s.txCurve)
s.com.WaitForShutdown()

tps, ok := <-tpsChan
Expand Down

0 comments on commit 0789db0

Please sign in to comment.