From 03180f83bfe7ece85c8bb5a22815504c208dddb3 Mon Sep 17 00:00:00 2001 From: Paul Snow Date: Sat, 18 May 2019 11:53:07 -0500 Subject: [PATCH] various optimizations we are considering --- engine/NetworkProcessorNet.go | 43 ++++++++++++++++++++------------- state/processList.go | 18 ++++++++++---- state/stateConsensus.go | 45 ++++++++++++++++++++++------------- state/validation.go | 2 +- 4 files changed, 70 insertions(+), 38 deletions(-) diff --git a/engine/NetworkProcessorNet.go b/engine/NetworkProcessorNet.go index 1612502aec..0816bd9eee 100644 --- a/engine/NetworkProcessorNet.go +++ b/engine/NetworkProcessorNet.go @@ -127,6 +127,7 @@ func Peers(fnode *FactomNode) { for i := 0; i < 100 && fnode.State.APIQueue().Length() > 0; i++ { msg := fnode.State.APIQueue().Dequeue() + fnode.State.LogMessage("NetworkInputs", "from API, Dequeue", msg) if globals.Params.FullHashesLog { primitives.Loghash(msg.GetMsgHash()) @@ -182,16 +183,20 @@ func Peers(fnode *FactomNode) { } //fnode.MLog.add2(fnode, false, fnode.State.FactomNodeName, "API", true, msg) - if t := msg.Type(); t == constants.REVEAL_ENTRY_MSG || t == constants.COMMIT_CHAIN_MSG || t == constants.COMMIT_ENTRY_MSG { - fnode.State.LogMessage("NetworkInputs", "from API, Enqueue2", msg) - fnode.State.LogMessage("InMsgQueue2", "enqueue2", msg) + switch msg.Type() { + case constants.REVEAL_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.COMMIT_ENTRY_MSG: + fnode.State.LogMessage("InMsgQueue2", "APIenqueue2", msg) fnode.State.InMsgQueue2().Enqueue(msg) - } else if t := msg.Type(); t == constants.EOM_MSG || t == constants.DIRECTORY_BLOCK_SIGNATURE_MSG { - fnode.State.LogMessage("eomQueue", "enqueue", msg) + case constants.EOM_MSG, constants.DIRECTORY_BLOCK_SIGNATURE_MSG, + constants.MISSING_MSG_RESPONSE, constants.MISSING_MSG, + constants.DBSTATE_MSG, constants.DBSTATE_MISSING_MSG: + fnode.State.LogMessage("eomQueue", "APIenqueue", msg) fnode.State.EomQueue() <- msg - } else { - fnode.State.LogMessage("NetworkInputs", "from API, Enqueue", msg) - fnode.State.LogMessage("InMsgQueue", "enqueue", msg) + case constants.ACK_MSG: + fnode.State.LogMessage("ackQueue", "APIenqueue", msg) + fnode.State.AckQueue() <- msg + default: + fnode.State.LogMessage("InMsgQueue", "APIenqueue", msg) fnode.State.InMsgQueue().Enqueue(msg) } } // for the api queue read up to 100 messages {...} @@ -216,6 +221,8 @@ func Peers(fnode *FactomNode) { break // move to next peer } + fnode.State.LogMessage("NetworkInputs", fromPeer+",denqueue", msg) + if globals.Params.FullHashesLog { primitives.Loghash(msg.GetMsgHash()) primitives.Loghash(msg.GetHash()) @@ -311,16 +318,20 @@ func Peers(fnode *FactomNode) { msg.SetNetwork(true) if !crossBootIgnore(msg) { - if t := msg.Type(); t == constants.REVEAL_ENTRY_MSG || t == constants.COMMIT_CHAIN_MSG || t == constants.COMMIT_ENTRY_MSG { - fnode.State.LogMessage("NetworkInputs", fromPeer+", enqueue2", msg) - fnode.State.LogMessage("InMsgQueue2", fromPeer+", enqueue2", msg) + switch msg.Type() { + case constants.REVEAL_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.COMMIT_ENTRY_MSG: + fnode.State.LogMessage("InMsgQueue2", fromPeer+",enqueue", msg) fnode.State.InMsgQueue2().Enqueue(msg) - } else if t := msg.Type(); t == constants.EOM_MSG || t == constants.DIRECTORY_BLOCK_SIGNATURE_MSG { - fnode.State.LogMessage("eomQueue", "enqueue", msg) + case constants.EOM_MSG, constants.DIRECTORY_BLOCK_SIGNATURE_MSG, + constants.MISSING_MSG_RESPONSE, constants.MISSING_MSG, + constants.DBSTATE_MSG, constants.DBSTATE_MISSING_MSG: + fnode.State.LogMessage("eomQueue", fromPeer+",enqueue", msg) fnode.State.EomQueue() <- msg - } else { - fnode.State.LogMessage("NetworkInputs", fromPeer+", enqueue", msg) - fnode.State.LogMessage("InMsgQueue", fromPeer+", enqueue", msg) + case constants.ACK_MSG: + fnode.State.LogMessage("ackQueue", fromPeer+",enqueue", msg) + fnode.State.AckQueue() <- msg + default: + fnode.State.LogMessage("InMsgQueue", fromPeer+",enqueue", msg) fnode.State.InMsgQueue().Enqueue(msg) } } diff --git a/state/processList.go b/state/processList.go index 38d4b0f6ba..2e176d0236 100644 --- a/state/processList.go +++ b/state/processList.go @@ -747,6 +747,11 @@ var extraDebug bool = false // Process messages and update our state. func (p *ProcessList) Process(s *State) (progress bool) { + if ValidationDebug { + s.LogPrintf("executeMsg", "start processlist.Process") + defer s.LogPrintf("executeMsg", "end processlist.Process") + } + dbht := s.GetHighestSavedBlk() if dbht >= p.DBHeight { //s.AddStatus(fmt.Sprintf("ProcessList.Process: VM Height is %d and Saved height is %d", dbht, s.GetHighestSavedBlk())) @@ -756,7 +761,7 @@ func (p *ProcessList) Process(s *State) (progress bool) { s.PLProcessHeight = p.DBHeight now := s.GetTimestamp() - +VMLoop: for i := 0; i < len(p.FedServers); i++ { vm := p.VMs[i] @@ -804,6 +809,11 @@ func (p *ProcessList) Process(s *State) (progress bool) { break VMListLoop } + if t := vm.List[j].Type(); vm.Synced == true && t != constants.EOM_MSG && t != constants.DIRECTORY_BLOCK_SIGNATURE_MSG { + s.LogPrintf("process", "VM Synced %d/%d/%d", s.LLeaderHeight, i, j) + continue VMLoop + } + if extraDebug { s.LogMessage("process", fmt.Sprintf("Consider %v/%v/%v", p.DBHeight, i, j), vm.List[j]) } @@ -1094,9 +1104,9 @@ func (p *ProcessList) AddToProcessList(s *State, ack *messages.Ack, m interfaces } s.LogMessage("processList", fmt.Sprintf("Added at %d/%d/%d by %s", ack.DBHeight, ack.VMIndex, ack.Height, atomic.WhereAmIString(1)), m) - if ack.IsLocal() { - for p.Process(s) { - } + if ack.IsLocal() { // If I am a leader, I know that this message can be processed, so calling p.Process stops thrashing + for p.Process(s) { // If I am a follower, I really don't know I can process this, so we will thrash if we call process + } // That's why we check for IsLocal (did this node create the ack). } } diff --git a/state/stateConsensus.go b/state/stateConsensus.go index 44661625c9..eb4cb12028 100644 --- a/state/stateConsensus.go +++ b/state/stateConsensus.go @@ -143,6 +143,18 @@ func (s *State) Validate(msg interfaces.IMsg) (validToSend int, validToExec int) } } + defer func() { + if validToExec == 1 && s.LLeaderHeight > 1 && s.Acks[msg.GetHash().Fixed()] == nil { + switch msg.Type() { + case constants.REVEAL_ENTRY_MSG, constants.COMMIT_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.FACTOID_TRANSACTION_MSG: + pl := s.ProcessLists.Get(s.LLeaderHeight) + if pl.VMs[msg.GetVMIndex()].Synced { + validToExec = 0 + } + } + } + }() + // Valid to send is a bit different from valid to execute. Check for valid to send here. validToSend = msg.Validate(s) if validToSend == 0 { // if the msg says hold then we hold... @@ -173,7 +185,8 @@ func (s *State) Validate(msg interfaces.IMsg) (validToSend int, validToExec int) // If we are not the leader, or this isn't the VM we are responsible for ... if !s.Leader || (s.LeaderVMIndex != vmIndex) { switch msg.Type() { - case constants.COMMIT_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.REVEAL_ENTRY_MSG, constants.EOM_MSG, constants.DIRECTORY_BLOCK_SIGNATURE_MSG, constants.FACTOID_TRANSACTION_MSG: + case constants.COMMIT_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.REVEAL_ENTRY_MSG, + constants.EOM_MSG, constants.DIRECTORY_BLOCK_SIGNATURE_MSG, constants.FACTOID_TRANSACTION_MSG: // don't need to check for a matching ack for ACKs or local messages // for messages that get ACK make sure we can expect to process them ack, _ := s.Acks[msg.GetMsgHash().Fixed()].(*messages.Ack) @@ -272,7 +285,7 @@ func (s *State) executeMsg(msg interfaces.IMsg) (ret bool) { s.Leader && !s.Saving && // if not between blocks vm != nil && vmh == vml && // if we have processed to the end of the process list - (!s.Syncing || !vms) && // if not syncing or this VM is not yet synced + !vms && // This VM is not yet synced (local || vmi == s.LeaderVMIndex) && // if it's a local message or it a message for our VM s.LeaderPL.DBHeight+1 >= hkb { if vml == 0 { // if we have not generated a DBSig ... @@ -414,6 +427,11 @@ func (s *State) Process() (progress bool) { } // Process inbound messages preEmptyLoopTime := time.Now() + + if ValidationDebug { + s.LogPrintf("executeMsg", "start messageSort") + } + emptyLoop: for { select { @@ -460,7 +478,9 @@ ackLoop: preProcessXReviewTime := time.Now() // Reprocess any stalled messages, but not so much compared inbound messages // Process last first - + if ValidationDebug { + s.LogPrintf("executeMsg", "start reviewHolding") + } if s.RunLeader { s.ReviewHolding() for { @@ -585,6 +605,10 @@ func (s *State) ReviewHolding() { for k, v := range s.Holding { + if (time.Now().Add(-2 * time.Second)).After(preReviewHoldingTime) { + break + } + if int(highest)-int(saved) > 1000 { TotalHoldingQueueOutputs.Inc() //delete(s.Holding, k) @@ -978,7 +1002,7 @@ func (s *State) repost(m interfaces.IMsg, delay int) { } //s.LogMessage("MsgQueue", fmt.Sprintf("enqueue_%s(%d)", whereAmI, len(s.msgQueue)), m) s.LogMessage("MsgQueue", fmt.Sprintf("enqueue (%d)", len(s.msgQueue)), m) - s.msgQueue <- m // Goes in the "do this really fast" queue so we are prompt about EOM's while syncing + s.eomQueue <- m // Goes in the "do this really fast" queue so we are prompt about EOM's while syncing }() } @@ -1432,24 +1456,12 @@ func (s *State) FollowerExecuteMissingMsg(msg interfaces.IMsg) { func (s *State) FollowerExecuteCommitChain(m interfaces.IMsg) { FollowerExecutions.Inc() s.FollowerExecuteMsg(m) - cc := m.(*messages.CommitChainMsg) - re := s.Holding[cc.CommitChain.EntryHash.Fixed()] - if re != nil { - re.FollowerExecute(s) - re.SendOut(s, re) - } m.SendOut(s, m) } func (s *State) FollowerExecuteCommitEntry(m interfaces.IMsg) { - ce := m.(*messages.CommitEntryMsg) FollowerExecutions.Inc() s.FollowerExecuteMsg(m) - re := s.Holding[ce.CommitEntry.EntryHash.Fixed()] - if re != nil { - re.FollowerExecute(s) - re.SendOut(s, re) - } m.SendOut(s, m) } @@ -2146,7 +2158,6 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool { s.EOMMinute = int(e.Minute) s.EOMsyncing = true //fmt.Println(fmt.Sprintf("SigType PROCESS: %10s vm %2d First SigType processed: return on s.SigType(%v) && int(e.Minute(%v)) > s.EOMMinute(%v)", s.FactomNodeName, e.VMIndex, s.SigType, e.Minute, s.EOMMinute)) - return false } // What I do for each EOM diff --git a/state/validation.go b/state/validation.go index 2845778796..19c8491f4e 100644 --- a/state/validation.go +++ b/state/validation.go @@ -14,7 +14,7 @@ import ( "github.com/FactomProject/factomd/util/atomic" ) -var ValidationDebug bool = false +var ValidationDebug bool = true // This is the tread with access to state. It does process and update state func (s *State) DoProcessing() {