Skip to content

Commit

Permalink
various optimizations we are considering
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulSnow committed May 18, 2019
1 parent df22c01 commit 03180f8
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 38 deletions.
43 changes: 27 additions & 16 deletions engine/NetworkProcessorNet.go
Expand Up @@ -127,6 +127,7 @@ func Peers(fnode *FactomNode) {

for i := 0; i < 100 && fnode.State.APIQueue().Length() > 0; i++ {
msg := fnode.State.APIQueue().Dequeue()
fnode.State.LogMessage("NetworkInputs", "from API, Dequeue", msg)

if globals.Params.FullHashesLog {
primitives.Loghash(msg.GetMsgHash())
Expand Down Expand Up @@ -182,16 +183,20 @@ func Peers(fnode *FactomNode) {
}

//fnode.MLog.add2(fnode, false, fnode.State.FactomNodeName, "API", true, msg)
if t := msg.Type(); t == constants.REVEAL_ENTRY_MSG || t == constants.COMMIT_CHAIN_MSG || t == constants.COMMIT_ENTRY_MSG {
fnode.State.LogMessage("NetworkInputs", "from API, Enqueue2", msg)
fnode.State.LogMessage("InMsgQueue2", "enqueue2", msg)
switch msg.Type() {
case constants.REVEAL_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.COMMIT_ENTRY_MSG:
fnode.State.LogMessage("InMsgQueue2", "APIenqueue2", msg)
fnode.State.InMsgQueue2().Enqueue(msg)
} else if t := msg.Type(); t == constants.EOM_MSG || t == constants.DIRECTORY_BLOCK_SIGNATURE_MSG {
fnode.State.LogMessage("eomQueue", "enqueue", msg)
case constants.EOM_MSG, constants.DIRECTORY_BLOCK_SIGNATURE_MSG,
constants.MISSING_MSG_RESPONSE, constants.MISSING_MSG,
constants.DBSTATE_MSG, constants.DBSTATE_MISSING_MSG:
fnode.State.LogMessage("eomQueue", "APIenqueue", msg)
fnode.State.EomQueue() <- msg
} else {
fnode.State.LogMessage("NetworkInputs", "from API, Enqueue", msg)
fnode.State.LogMessage("InMsgQueue", "enqueue", msg)
case constants.ACK_MSG:
fnode.State.LogMessage("ackQueue", "APIenqueue", msg)
fnode.State.AckQueue() <- msg
default:
fnode.State.LogMessage("InMsgQueue", "APIenqueue", msg)
fnode.State.InMsgQueue().Enqueue(msg)
}
} // for the api queue read up to 100 messages {...}
Expand All @@ -216,6 +221,8 @@ func Peers(fnode *FactomNode) {
break // move to next peer
}

fnode.State.LogMessage("NetworkInputs", fromPeer+",denqueue", msg)

if globals.Params.FullHashesLog {
primitives.Loghash(msg.GetMsgHash())
primitives.Loghash(msg.GetHash())
Expand Down Expand Up @@ -311,16 +318,20 @@ func Peers(fnode *FactomNode) {
msg.SetNetwork(true)

if !crossBootIgnore(msg) {
if t := msg.Type(); t == constants.REVEAL_ENTRY_MSG || t == constants.COMMIT_CHAIN_MSG || t == constants.COMMIT_ENTRY_MSG {
fnode.State.LogMessage("NetworkInputs", fromPeer+", enqueue2", msg)
fnode.State.LogMessage("InMsgQueue2", fromPeer+", enqueue2", msg)
switch msg.Type() {
case constants.REVEAL_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.COMMIT_ENTRY_MSG:
fnode.State.LogMessage("InMsgQueue2", fromPeer+",enqueue", msg)
fnode.State.InMsgQueue2().Enqueue(msg)
} else if t := msg.Type(); t == constants.EOM_MSG || t == constants.DIRECTORY_BLOCK_SIGNATURE_MSG {
fnode.State.LogMessage("eomQueue", "enqueue", msg)
case constants.EOM_MSG, constants.DIRECTORY_BLOCK_SIGNATURE_MSG,
constants.MISSING_MSG_RESPONSE, constants.MISSING_MSG,
constants.DBSTATE_MSG, constants.DBSTATE_MISSING_MSG:
fnode.State.LogMessage("eomQueue", fromPeer+",enqueue", msg)
fnode.State.EomQueue() <- msg
} else {
fnode.State.LogMessage("NetworkInputs", fromPeer+", enqueue", msg)
fnode.State.LogMessage("InMsgQueue", fromPeer+", enqueue", msg)
case constants.ACK_MSG:
fnode.State.LogMessage("ackQueue", fromPeer+",enqueue", msg)
fnode.State.AckQueue() <- msg
default:
fnode.State.LogMessage("InMsgQueue", fromPeer+",enqueue", msg)
fnode.State.InMsgQueue().Enqueue(msg)
}
}
Expand Down
18 changes: 14 additions & 4 deletions state/processList.go
Expand Up @@ -747,6 +747,11 @@ var extraDebug bool = false

// Process messages and update our state.
func (p *ProcessList) Process(s *State) (progress bool) {
if ValidationDebug {
s.LogPrintf("executeMsg", "start processlist.Process")
defer s.LogPrintf("executeMsg", "end processlist.Process")
}

dbht := s.GetHighestSavedBlk()
if dbht >= p.DBHeight {
//s.AddStatus(fmt.Sprintf("ProcessList.Process: VM Height is %d and Saved height is %d", dbht, s.GetHighestSavedBlk()))
Expand All @@ -756,7 +761,7 @@ func (p *ProcessList) Process(s *State) (progress bool) {
s.PLProcessHeight = p.DBHeight

now := s.GetTimestamp()

VMLoop:
for i := 0; i < len(p.FedServers); i++ {
vm := p.VMs[i]

Expand Down Expand Up @@ -804,6 +809,11 @@ func (p *ProcessList) Process(s *State) (progress bool) {
break VMListLoop
}

if t := vm.List[j].Type(); vm.Synced == true && t != constants.EOM_MSG && t != constants.DIRECTORY_BLOCK_SIGNATURE_MSG {
s.LogPrintf("process", "VM Synced %d/%d/%d", s.LLeaderHeight, i, j)
continue VMLoop
}

if extraDebug {
s.LogMessage("process", fmt.Sprintf("Consider %v/%v/%v", p.DBHeight, i, j), vm.List[j])
}
Expand Down Expand Up @@ -1094,9 +1104,9 @@ func (p *ProcessList) AddToProcessList(s *State, ack *messages.Ack, m interfaces
}

s.LogMessage("processList", fmt.Sprintf("Added at %d/%d/%d by %s", ack.DBHeight, ack.VMIndex, ack.Height, atomic.WhereAmIString(1)), m)
if ack.IsLocal() {
for p.Process(s) {
}
if ack.IsLocal() { // If I am a leader, I know that this message can be processed, so calling p.Process stops thrashing
for p.Process(s) { // If I am a follower, I really don't know I can process this, so we will thrash if we call process
} // That's why we check for IsLocal (did this node create the ack).
}

}
Expand Down
45 changes: 28 additions & 17 deletions state/stateConsensus.go
Expand Up @@ -143,6 +143,18 @@ func (s *State) Validate(msg interfaces.IMsg) (validToSend int, validToExec int)
}
}

defer func() {
if validToExec == 1 && s.LLeaderHeight > 1 && s.Acks[msg.GetHash().Fixed()] == nil {
switch msg.Type() {
case constants.REVEAL_ENTRY_MSG, constants.COMMIT_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.FACTOID_TRANSACTION_MSG:
pl := s.ProcessLists.Get(s.LLeaderHeight)
if pl.VMs[msg.GetVMIndex()].Synced {
validToExec = 0
}
}
}
}()

// Valid to send is a bit different from valid to execute. Check for valid to send here.
validToSend = msg.Validate(s)
if validToSend == 0 { // if the msg says hold then we hold...
Expand Down Expand Up @@ -173,7 +185,8 @@ func (s *State) Validate(msg interfaces.IMsg) (validToSend int, validToExec int)
// If we are not the leader, or this isn't the VM we are responsible for ...
if !s.Leader || (s.LeaderVMIndex != vmIndex) {
switch msg.Type() {
case constants.COMMIT_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.REVEAL_ENTRY_MSG, constants.EOM_MSG, constants.DIRECTORY_BLOCK_SIGNATURE_MSG, constants.FACTOID_TRANSACTION_MSG:
case constants.COMMIT_ENTRY_MSG, constants.COMMIT_CHAIN_MSG, constants.REVEAL_ENTRY_MSG,
constants.EOM_MSG, constants.DIRECTORY_BLOCK_SIGNATURE_MSG, constants.FACTOID_TRANSACTION_MSG:
// don't need to check for a matching ack for ACKs or local messages
// for messages that get ACK make sure we can expect to process them
ack, _ := s.Acks[msg.GetMsgHash().Fixed()].(*messages.Ack)
Expand Down Expand Up @@ -272,7 +285,7 @@ func (s *State) executeMsg(msg interfaces.IMsg) (ret bool) {
s.Leader &&
!s.Saving && // if not between blocks
vm != nil && vmh == vml && // if we have processed to the end of the process list
(!s.Syncing || !vms) && // if not syncing or this VM is not yet synced
!vms && // This VM is not yet synced
(local || vmi == s.LeaderVMIndex) && // if it's a local message or it a message for our VM
s.LeaderPL.DBHeight+1 >= hkb {
if vml == 0 { // if we have not generated a DBSig ...
Expand Down Expand Up @@ -414,6 +427,11 @@ func (s *State) Process() (progress bool) {
}
// Process inbound messages
preEmptyLoopTime := time.Now()

if ValidationDebug {
s.LogPrintf("executeMsg", "start messageSort")
}

emptyLoop:
for {
select {
Expand Down Expand Up @@ -460,7 +478,9 @@ ackLoop:
preProcessXReviewTime := time.Now()
// Reprocess any stalled messages, but not so much compared inbound messages
// Process last first

if ValidationDebug {
s.LogPrintf("executeMsg", "start reviewHolding")
}
if s.RunLeader {
s.ReviewHolding()
for {
Expand Down Expand Up @@ -585,6 +605,10 @@ func (s *State) ReviewHolding() {

for k, v := range s.Holding {

if (time.Now().Add(-2 * time.Second)).After(preReviewHoldingTime) {
break
}

if int(highest)-int(saved) > 1000 {
TotalHoldingQueueOutputs.Inc()
//delete(s.Holding, k)
Expand Down Expand Up @@ -978,7 +1002,7 @@ func (s *State) repost(m interfaces.IMsg, delay int) {
}
//s.LogMessage("MsgQueue", fmt.Sprintf("enqueue_%s(%d)", whereAmI, len(s.msgQueue)), m)
s.LogMessage("MsgQueue", fmt.Sprintf("enqueue (%d)", len(s.msgQueue)), m)
s.msgQueue <- m // Goes in the "do this really fast" queue so we are prompt about EOM's while syncing
s.eomQueue <- m // Goes in the "do this really fast" queue so we are prompt about EOM's while syncing
}()
}

Expand Down Expand Up @@ -1432,24 +1456,12 @@ func (s *State) FollowerExecuteMissingMsg(msg interfaces.IMsg) {
func (s *State) FollowerExecuteCommitChain(m interfaces.IMsg) {
FollowerExecutions.Inc()
s.FollowerExecuteMsg(m)
cc := m.(*messages.CommitChainMsg)
re := s.Holding[cc.CommitChain.EntryHash.Fixed()]
if re != nil {
re.FollowerExecute(s)
re.SendOut(s, re)
}
m.SendOut(s, m)
}

func (s *State) FollowerExecuteCommitEntry(m interfaces.IMsg) {
ce := m.(*messages.CommitEntryMsg)
FollowerExecutions.Inc()
s.FollowerExecuteMsg(m)
re := s.Holding[ce.CommitEntry.EntryHash.Fixed()]
if re != nil {
re.FollowerExecute(s)
re.SendOut(s, re)
}
m.SendOut(s, m)
}

Expand Down Expand Up @@ -2146,7 +2158,6 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool {
s.EOMMinute = int(e.Minute)
s.EOMsyncing = true
//fmt.Println(fmt.Sprintf("SigType PROCESS: %10s vm %2d First SigType processed: return on s.SigType(%v) && int(e.Minute(%v)) > s.EOMMinute(%v)", s.FactomNodeName, e.VMIndex, s.SigType, e.Minute, s.EOMMinute))
return false
}

// What I do for each EOM
Expand Down
2 changes: 1 addition & 1 deletion state/validation.go
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/FactomProject/factomd/util/atomic"
)

var ValidationDebug bool = false
var ValidationDebug bool = true

// This is the tread with access to state. It does process and update state
func (s *State) DoProcessing() {
Expand Down

0 comments on commit 03180f8

Please sign in to comment.