Skip to content

Commit

Permalink
refactor to start/stop sims
Browse files Browse the repository at this point in the history
  • Loading branch information
stackdump committed Nov 27, 2018
1 parent 4bd5f67 commit 5dfa325
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 67 deletions.
71 changes: 71 additions & 0 deletions LongTests/SaveStateRestore_test.go
@@ -0,0 +1,71 @@
package longtests

import (
"fmt"
"github.com/FactomProject/factomd/state"
. "github.com/FactomProject/factomd/testHelper"
"github.com/stretchr/testify/assert"
"testing"
)

func TestFastBootSaveAndRestore(t *testing.T) {
var saveRate = 4
var state0 *state.State
var fastBootFile string

startSim := func(nodes string, maxHeight int) {
state0 = SetupSim(
nodes,
map[string]string{"--debuglog": ".", "--fastsaverate": fmt.Sprintf("%v", saveRate) },
maxHeight,
0,
0,
t,
)
}

stopSim := func() {
WaitForAllNodes(state0)
ShutDownEverything(t)
state0 = nil
}

t.Run("after restart node should catch up", func(t *testing.T) {
if RanSimTest {
return
}

startSim("LF", 200)
StopNode(1,'F')
WaitBlocks(state0, 5)
StartNode(1,'F')
stopSim()
})

// FIXME:
// this test currently fails which either means:
// it can reproduce the issue w/ fastboot
// or
// we broke something else shoe-horning in the ability to start/stop a sim
t.Run("run sim to create fastboot", func(t *testing.T) {
if RanSimTest {
return
}

startSim("LF", 20)
WaitForBlock(state0, saveRate*2+2)
StopNode(1, 'F')

t.Run("reload follower with fastboot", func(t *testing.T) {
fastBootFile = state.NetworkIDToFilename(state0.Network, state0.FastBootLocation)
assert.FileExists(t, fastBootFile)

s := GetNode(1).State
err := s.StateSaverStruct.LoadDBStateListFromFile(s.DBStates, fastBootFile)
assert.Nil(t, err)
})

StartNode(1, 'F')
stopSim()
})
}
2 changes: 2 additions & 0 deletions LongTests/longtests.go
@@ -0,0 +1,2 @@
// Package longtests contains simulation tests that may take longer than 10 min
package longtests
9 changes: 5 additions & 4 deletions elections/elections.go
Expand Up @@ -418,7 +418,7 @@ func (e *Elections) ProcessWaiting() {
}

// Runs the main loop for elections for this instance of factomd
func Run(s *state.State) {
func ElectionWorker(s *state.State) func() error {
e := new(Elections)
s.Elections = e
e.State = s
Expand All @@ -432,28 +432,29 @@ func Run(s *state.State) {
e.Waiting = make(chan interfaces.IElectionMsg, 500)

// Actually run the elections
for {
return func() error {
msg := e.Input.BlockingDequeue().(interfaces.IElectionMsg)
e.LogMessage("election", fmt.Sprintf("exec %d", e.Electing), msg.(interfaces.IMsg))

valid := msg.ElectionValidate(e)
switch valid {
case -1:
// Do not process
continue
return nil
case 0:
// Drop the oldest message if at capacity
if len(e.Waiting) > 9*cap(e.Waiting)/10 {
<-e.Waiting
}
// Waiting will get drained when a new election begins, or we move forward
e.Waiting <- msg
continue
return nil
}
msg.ElectionProcess(s, e)

//if msg.(interfaces.IMsg).Type() != constants.INTERNALEOMSIG { // If it's not an EOM check the authority set
// CheckAuthSetsMatch("election.Run", e, s)
//}
return nil
}
}
104 changes: 82 additions & 22 deletions engine/NetStart.go
Expand Up @@ -41,6 +41,8 @@ type FactomNode struct {
Peers []interfaces.IPeer
MLog *MsgLog
P2PIndex int
Running bool
workers []chan int
}

var fnodes []*FactomNode
Expand Down Expand Up @@ -311,7 +313,7 @@ func NetStart(s *state.State, p *FactomParams, listenToStdin bool) {
// Actually setup the Network
//************************************************

// Make p.cnt Factom nodes
fnodes = fnodes[:0]
for i := 0; i < p.Cnt; i++ {
makeServer(s) // We clone s to make all of our servers
}
Expand Down Expand Up @@ -557,7 +559,6 @@ func NetStart(s *state.State, p *FactomParams, listenToStdin bool) {
RegisterPrometheus()

go controlPanel.ServeControlPanel(fnodes[0].State.ControlPanelChannel, fnodes[0].State, connectionMetricsChannel, p2pNetwork, Build)

go SimControl(p.ListenTo, listenToStdin)

}
Expand All @@ -581,38 +582,97 @@ func printGraphData(filename string, period int) {
func makeServer(s *state.State) *FactomNode {
// All other states are clones of the first state. Which this routine
// gets passed to it.
newState := s

if len(fnodes) > 0 {
newState = s.Clone(len(fnodes)).(*state.State)
newState.EFactory = new(electionMsgs.ElectionsFactory) // not an elegant place but before we let the messages hit the state
time.Sleep(10 * time.Millisecond)
newState.Init()
newState.EFactory = new(electionMsgs.ElectionsFactory)
return AddFnode(s.Clone(len(fnodes)).(*state.State))
} else {
return AddFnode(s)
}
}

func AddFnode(s *state.State) *FactomNode {
fnode := new(FactomNode)
fnode.State = newState
fnodes = append(fnodes, fnode)
fnode.State = s
fnode.MLog = mLog

if len(fnodes) > 0 {
// not an elegant place but before we let the messages hit the state
fnode.State.EFactory = new(electionMsgs.ElectionsFactory)
time.Sleep(10 * time.Millisecond)
fnode.State.Init() // REVIEW: State.Init() is called again in StartFnode()
fnode.State.EFactory = new(electionMsgs.ElectionsFactory)
}

fnodes = append(fnodes, fnode)
return fnode
}

func startServers(load bool) {
for i, fnode := range fnodes {
if i > 0 {
fnode.State.Init()
func startServers(loadDB bool) {
for i := range fnodes {
StartFnode(i, loadDB)
}
}

// register workers with node
func (fnode *FactomNode) addWorker(doWork func() error) {
quit := make(chan int)
fnode.workers = append(fnode.workers, quit)
var err error
//var workerID = len(fnode.workers)

go func() {
for {
select {
case <-quit:
return
default:
//fmt.Printf("Fnode:%v worker:%v\n", fnode.Index, workerID)
err = doWork()
if err != nil {
panic(err)
}
}
}
go NetworkProcessorNet(fnode)
if load {
go state.LoadDatabase(fnode.State)
}()
}

func (fnode *FactomNode) stopWorker(i int) {
defer func() {
err := recover()
if err != nil {
fmt.Printf("stopWorker recovered: %v", err)
}
go fnode.State.GoSyncEntries()
go Timer(fnode.State)
go elections.Run(fnode.State)
go fnode.State.ValidatorLoop()
}()

fnode.workers[i] <- 0
}

func StartFnode(i int, loadDB bool) {
fnode := fnodes[i]

if loadDB {
state.LoadDatabase(fnode.State)
}

go func() {
fnode.Running = true
fmt.Printf("FNode%v: START\n", i)
fnode.State.ValidatorLoop() // blocks until there is a shutdown signal
for i := range fnode.workers {
go fnode.stopWorker(i) // workers exit when validator exits
}
fmt.Printf("FNode%v: STOP\n", i)
fnode.Running = false
}()

fnode.workers = fnode.workers[:0] // remove old channels

fnode.addWorker(PeersWorker(fnode))
fnode.addWorker(NetworkOutputWorker(fnode))
fnode.addWorker(InvalidOutputWorker(fnode))
fnode.addWorker(fnode.State.MissingEntryRequestWorker())
fnode.addWorker(fnode.State.SyncEntryWorker())
fnode.addWorker(Timer(fnode.State))
fnode.addWorker(elections.ElectionWorker(fnode.State))
}

func setupFirstAuthority(s *state.State) {
Expand Down
44 changes: 22 additions & 22 deletions engine/NetworkProcessorNet.go
Expand Up @@ -19,13 +19,7 @@ import (
var _ = log.Printf
var _ = fmt.Print

func NetworkProcessorNet(fnode *FactomNode) {
go Peers(fnode)
go NetworkOutputs(fnode)
go InvalidOutputs(fnode)
}

func Peers(fnode *FactomNode) {
func PeersWorker(fnode *FactomNode) func() error {
saltReplayFilterOn := true

crossBootIgnore := func(amsg interfaces.IMsg) bool {
Expand Down Expand Up @@ -117,7 +111,7 @@ func Peers(fnode *FactomNode) {
return false
} // func ignoreMsg(){...}

for {
return func() error {
if primitives.NewTimestampNow().GetTimeSeconds()-fnode.State.BootTime > int64(constants.CROSSBOOT_SALT_REPLAY_DURATION.Seconds()) {
saltReplayFilterOn = false
}
Expand Down Expand Up @@ -318,11 +312,12 @@ func Peers(fnode *FactomNode) {
if cnt == 0 {
time.Sleep(50 * time.Millisecond) // handled no message, sleep a bit
}
return nil
} // forever {...}
}

func NetworkOutputs(fnode *FactomNode) {
for {
func NetworkOutputWorker(fnode *FactomNode) func() error {
return func() error {
// if len(fnode.State.NetworkOutMsgQueue()) > 500 {
// fmt.Print(fnode.State.GetFactomNodeName(), "-", len(fnode.State.NetworkOutMsgQueue()), " ")
// }
Expand All @@ -336,19 +331,18 @@ func NetworkOutputs(fnode *FactomNode) {
// by an updated version when the block is ready.
if msg.IsLocal() {
// todo: Should be a dead case. Add tracking code to see if it ever happens -- clay
fnode.State.LogMessage("NetworkOutputs", "drop, local", msg)
continue
fnode.State.LogMessage("NetworkOutputs", "Drop, local", msg)
return nil
}
// Don't do a rand int if drop rate is 0
if fnode.State.GetDropRate() > 0 && rand.Int()%1000 < fnode.State.GetDropRate() {
//drop the message, rather than processing it normally

fnode.State.LogMessage("NetworkOutputs", "drop, simCtrl", msg)
continue
fnode.State.LogMessage("NetworkOutputs", "Drop, simCtrl", msg)
return nil
}
if msg.GetRepeatHash() == nil {
fnode.State.LogMessage("NetworkOutputs", "drop, no repeat hash", msg)
continue
fnode.State.LogMessage("NetworkOutputs", "Drop, no repeat hash", msg)
return nil
}

//_, ok := msg.(*messages.Ack)
Expand Down Expand Up @@ -414,14 +408,20 @@ func NetworkOutputs(fnode *FactomNode) {
}
}
}
return nil
}
}

// Just throw away the trash
func InvalidOutputs(fnode *FactomNode) {
for {
time.Sleep(1 * time.Millisecond)
_ = <-fnode.State.NetworkInvalidMsgQueue()
func InvalidOutputWorker(fnode *FactomNode) func() error {
return func() error {
select {
case <-fnode.State.NetworkInvalidMsgQueue():
// Just throw away the trash
return nil
default:
time.Sleep(500 * time.Millisecond)
return nil
}
//fmt.Println(invalidMsg)

// The following code was giving a demerit for each instance of a message in the NetworkInvalidMsgQueue.
Expand Down
5 changes: 2 additions & 3 deletions engine/simWallet.go
Expand Up @@ -80,7 +80,6 @@ func FundWalletTOFF(st *state.State, timeOffsetInMilliseconds int64, amt uint64)
return nil, fmt.Sprintf("%v", trans.GetTxID())
}


func SendTxn(s *state.State, amt uint64, userSecretIn string, userPubOut string, ecPrice uint64) (*factoid.Transaction, error) {
txn, _ := NewTransaction(amt, userSecretIn, userPubOut, ecPrice)
msg := new(messages.FactoidTransaction)
Expand All @@ -97,9 +96,9 @@ func GetBalance(s *state.State, userStr string) int64 {
func RandomFctAddressPair() (string, string) {
pkey := primitives.RandomPrivateKey()
privUserStr, _ := primitives.PrivateKeyStringToHumanReadableFactoidPrivateKey(pkey.PrivateKeyString())
_, _, pubUserStr,_ := factoid.PrivateKeyStringToEverythingString(pkey.PrivateKeyString())
_, _, pubUserAddr, _ := factoid.PrivateKeyStringToEverythingString(pkey.PrivateKeyString())

return privUserStr, pubUserStr
return privUserStr, pubUserAddr
}

// construct a new factoid transaction
Expand Down
6 changes: 4 additions & 2 deletions engine/timer.go
Expand Up @@ -15,7 +15,7 @@ import (

var _ = (*s.State)(nil)

func Timer(state interfaces.IState) {
func Timer(state interfaces.IState) func() error {
time.Sleep(2 * time.Second)

billion := int64(1000000000)
Expand All @@ -34,7 +34,7 @@ func Timer(state interfaces.IState) {

time.Sleep(time.Duration(wait))

for {
return func() error {
for i := 0; i < 10; i++ {
// Don't stuff messages into the system if the
// Leader is behind.
Expand Down Expand Up @@ -67,6 +67,8 @@ func Timer(state interfaces.IState) {
tenthPeriod = period / 10

}

return nil
}
}

Expand Down

0 comments on commit 5dfa325

Please sign in to comment.