diff --git a/comm.go b/comm.go index 5709f03..076186b 100644 --- a/comm.go +++ b/comm.go @@ -89,20 +89,10 @@ func NewCommunication() *Communication { // Start handles the main part of a simulation by starting // all the necessary goroutines. -func (com *Communication) Start(actors []*Actor, txCurve map[int32]*Row) (tpsChan chan float64, tpbChan chan int) { +func (com *Communication) Start(actors []*Actor, node *Node, txCurve map[int32]*Row) (tpsChan chan float64, tpbChan chan int) { tpsChan = make(chan float64, 1) tpbChan = make(chan int, 1) - // Start a miner process to generate actor addrs - miner, err := NewMiner(nil, com.exit, com.height, com.txpool, - com.blockQueue.enqueue, com.timeReceived) - if err != nil { - close(com.exit) - com.wg.Add(1) - go com.Shutdown(miner, actors) - return - } - // Start actors for _, a := range actors { com.wg.Add(1) @@ -111,6 +101,7 @@ func (com *Communication) Start(actors []*Actor, txCurve map[int32]*Row) (tpsCha if err := a.Start(os.Stderr, os.Stdout, com); err != nil { log.Printf("%s: Cannot start actor: %v", a, err) a.Shutdown() + node.Shutdown() } }(a, com) } @@ -135,20 +126,20 @@ func (com *Communication) Start(actors []*Actor, txCurve map[int32]*Row) (tpsCha } } - // Re-start the miner with the mining addresses - if err := miner.Stop(); err != nil { - log.Printf("%s: Cannot stop miner: %v", miner, err) - return - } - miner, err = NewMiner(miningAddrs, com.exit, com.height, com.txpool, - com.blockQueue.enqueue, com.timeReceived) + // Start mining. + miner, err := NewMiner(miningAddrs, com.exit, com.height, com.txpool) if err != nil { close(com.exit) + close(tpsChan) + close(tpbChan) com.wg.Add(1) - go com.Shutdown(miner, actors) + go com.Shutdown(miner, actors, node) return } + // Add mining node listen interface as a node + node.client.AddNode("localhost:18550", rpc.ANAdd) + // Start a goroutine to estimate tps com.wg.Add(1) go com.estimateTps(tpsChan, txCurve) @@ -165,16 +156,17 @@ func (com *Communication) Start(actors []*Actor, txCurve map[int32]*Row) (tpsCha go com.queueBlocks() com.wg.Add(1) - go com.poolUtxos(miner.client, actors) + go com.poolUtxos(node.client, actors) // Start a goroutine for shuting down the simulation when appropriate com.wg.Add(1) - go com.Shutdown(miner, actors) + go com.Shutdown(miner, actors, node) + log.Printf("%s: Generating %v blocks...", miner, *startBlock) if err := miner.Generate(uint32(*startBlock) - 1); err != nil { return } - log.Printf("%s: Generating %v blocks...", miner, *startBlock) + return } @@ -581,7 +573,7 @@ func (com *Communication) Communicate(txCurve map[int32]*Row, miner *Miner, acto // Shutdown shuts down the simulation by killing the mining and the // initial node processes and shuts down all actors. -func (com *Communication) Shutdown(miner *Miner, actors []*Actor) { +func (com *Communication) Shutdown(miner *Miner, actors []*Actor, node *Node) { defer com.wg.Done() <-com.exit @@ -591,6 +583,9 @@ func (com *Communication) Shutdown(miner *Miner, actors []*Actor) { for _, a := range actors { a.Shutdown() } + if node != nil { + node.Shutdown() + } } // WaitForShutdown waits until every goroutine inside com.Start diff --git a/miner.go b/miner.go index d8e234b..447318d 100644 --- a/miner.go +++ b/miner.go @@ -19,7 +19,6 @@ package main import ( "fmt" "log" - "time" "github.com/btcsuite/btcd/wire" rpc "github.com/btcsuite/btcrpcclient" @@ -35,22 +34,13 @@ type Miner struct { // NewMiner starts a cpu-mining enabled btcd instane and returns an rpc client // to control it. func NewMiner(miningAddrs []btcutil.Address, exit chan struct{}, - height chan<- int32, txpool chan<- struct{}, enqueueBlock chan<- *Block, - timeReceived chan<- time.Time) (*Miner, error) { + height chan<- int32, txpool chan<- struct{}) (*Miner, error) { ntfnHandlers := &rpc.NotificationHandlers{ // When a block higher than stopBlock connects to the chain, // send a signal to stop actors. This is used so main can break from // select and call actor.Stop to stop actors. OnBlockConnected: func(hash *wire.ShaHash, h int32) { - block := &Block{ - hash: hash, - height: h, - } - select { - case enqueueBlock <- block: - case <-exit: - } if h <= int32(*startBlock) { fmt.Printf("\r%d/%d", h, *startBlock) } @@ -64,7 +54,6 @@ func NewMiner(miningAddrs []btcutil.Address, exit chan struct{}, // required no of tx and receiving all of them txpool <- struct{}{} } - timeReceived <- time.Now() }, } @@ -74,6 +63,10 @@ func NewMiner(miningAddrs []btcutil.Address, exit chan struct{}, return nil, err } + // set miner args - it listens on a different port + // because a node is already running on the default port + args.Listen = "127.0.0.1:18550" + args.RPCListen = "127.0.0.1:18551" // need to log mining details, so set debuglevel args.DebugLevel = "MINR=trace" // if passed, set blockmaxsize to allow mining large blocks diff --git a/simulation.go b/simulation.go index df53680..d2209e4 100644 --- a/simulation.go +++ b/simulation.go @@ -21,6 +21,11 @@ import ( "log" "math" "os" + "time" + + "github.com/btcsuite/btcd/wire" + rpc "github.com/btcsuite/btcrpcclient" + "github.com/btcsuite/btcutil" ) // MissingCertPairFile is raised when one of the cert pair files is missing @@ -131,6 +136,58 @@ func (s *Simulation) Start() error { } } + ntfnHandlers := &rpc.NotificationHandlers{ + OnBlockConnected: func(hash *wire.ShaHash, height int32) { + block := &Block{ + hash: hash, + height: height, + } + select { + case s.com.blockQueue.enqueue <- block: + case <-s.com.exit: + } + }, + OnTxAccepted: func(hash *wire.ShaHash, amount btcutil.Amount) { + s.com.timeReceived <- time.Now() + }, + } + + log.Println("Starting node on simnet...") + args, err := newBtcdArgs("node") + if err != nil { + log.Printf("Cannot create node args: %v", err) + return err + } + logFile, err := getLogFile(args.prefix) + if err != nil { + log.Printf("Cannot get log file, logging disabled: %v", err) + } + node, err := NewNodeFromArgs(args, ntfnHandlers, logFile) + if err != nil { + log.Printf("%s: Cannot create node: %v", node, err) + return err + } + if err := node.Start(); err != nil { + log.Printf("%s: Cannot start node: %v", node, err) + return err + } + if err := node.Connect(); err != nil { + log.Printf("%s: Cannot connect to node: %v", node, err) + return err + } + + // Register for block notifications. + if err := node.client.NotifyBlocks(); err != nil { + log.Printf("%s: Cannot register for block notifications: %v", node, err) + return err + } + + // Register for transaction notifications + if err := node.client.NotifyNewTransactions(false); err != nil { + log.Printf("%s: Cannot register for transactions notifications: %v", node, err) + return err + } + for i := 0; i < *numActors; i++ { a, err := NewActor(uint16(18557 + i)) if err != nil { @@ -146,7 +203,7 @@ func (s *Simulation) Start() error { }) // Start simulation. - tpsChan, tpbChan := s.com.Start(s.actors, s.txCurve) + tpsChan, tpbChan := s.com.Start(s.actors, node, s.txCurve) s.com.WaitForShutdown() tps, ok := <-tpsChan