Skip to content

Commit

Permalink
Merge pull request #242 from FactomProject/FD-87
Browse files Browse the repository at this point in the history
Fd 87
  • Loading branch information
ThePiachu committed Apr 28, 2017
2 parents 710f993 + bd174fc commit 27e1722
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 52 deletions.
10 changes: 9 additions & 1 deletion common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ const (
const NUM_MESSAGES = 29

const (
// Limits for keeping inputs from flooding our execution
INMSGQUEUE_HIGH = 1000
INMSGQUEUE_MED = 500
INMSGQUEUE_LOW = 100

DBSTATE_REQUEST_LIM_HIGH = 200
DBSTATE_REQUEST_LIM_MED = 50

// Replay
INTERNAL_REPLAY = 1
NETWORK_REPLAY = 2
Expand Down Expand Up @@ -97,7 +105,7 @@ const (
//NETWORK constants
//==================
VERSION_0 = byte(0)
FACTOMD_VERSION = 4001002
FACTOMD_VERSION = 4001003
MAIN_NETWORK_ID uint32 = 0xFA92E5A2
TEST_NETWORK_ID uint32 = 0xFA92E5A3
LOCAL_NETWORK_ID uint32 = 0xFA92E5A4
Expand Down
10 changes: 5 additions & 5 deletions common/messages/dbstateMissing.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ func (m *DBStateMissing) send(dbheight uint32, state interfaces.IState) (msglen

func NewEnd(inLen int, start uint32, end uint32) (s uint32, e uint32) {
switch {
case inLen > 500:
case inLen > constants.INMSGQUEUE_HIGH:
return 0, 0
case inLen > 200 && end-start > 50:
end = start + 50
case end-start > 200:
end = start + 200
case inLen > constants.INMSGQUEUE_MED && end-start > constants.DBSTATE_REQUEST_LIM_MED:
end = start + constants.DBSTATE_REQUEST_LIM_MED
case end-start > constants.DBSTATE_REQUEST_LIM_HIGH:
end = start + constants.DBSTATE_REQUEST_LIM_HIGH
}
return start, end
}
Expand Down
56 changes: 28 additions & 28 deletions engine/MsgLogging.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,41 +25,41 @@ type msglist struct {
}

type MsgLog struct {
Enable bool
sem sync.Mutex
MsgList []*msglist
last interfaces.Timestamp
all bool
nodeCnt int
Enable bool
sem sync.Mutex
MsgList []*msglist
Last interfaces.Timestamp
all bool
nodeCnt int

start interfaces.Timestamp
msgCnt int
msgPerSec int
start interfaces.Timestamp
msgCnt int
msgPerSec int

// The last period (msg rate over the last period, so msg changes can be seen)
period int64
startp interfaces.Timestamp
msgCntp int
msgPerSecp int
Period int64
Startp interfaces.Timestamp
MsgCntp int
MsgPerSecp int
}

func (m *MsgLog) init(enable bool, nodecnt int) {
func (m *MsgLog) Init(enable bool, nodecnt int) {
m.Enable = enable
m.nodeCnt = nodecnt
if nodecnt == 0 {
m.nodeCnt = 1
}
}

func (m *MsgLog) add2(fnode *FactomNode, out bool, peer string, where string, valid bool, msg interfaces.IMsg) {
func (m *MsgLog) Add2(fnode *FactomNode, out bool, peer string, where string, valid bool, msg interfaces.IMsg) {
m.sem.Lock()
defer m.sem.Unlock()
now := fnode.State.GetTimestamp()
if m.start == nil {
m.start = fnode.State.GetTimestamp()
m.last = m.start // last is start
m.period = 2
m.startp = m.start
m.Last = m.start // last is start
m.Period = 2
m.Startp = m.start
}

nm := new(msglist)
Expand All @@ -80,18 +80,18 @@ func (m *MsgLog) add2(fnode *FactomNode, out bool, peer string, where string, va
if now.GetTimeSeconds()-m.start.GetTimeSeconds() > 1 {
m.msgPerSec = (m.msgCnt + len(m.MsgList)) / interval / m.nodeCnt
}
if now.GetTimeSeconds()-m.startp.GetTimeSeconds() >= m.period {
m.msgPerSecp = (m.msgCntp + len(m.MsgList)) / interval / m.nodeCnt
m.msgCntp = 0
m.startp = now // Reset timer
if now.GetTimeSeconds()-m.Startp.GetTimeSeconds() >= m.Period {
m.MsgPerSecp = (m.MsgCntp + len(m.MsgList)) / interval / m.nodeCnt
m.MsgCntp = 0
m.Startp = now // Reset timer
}
// If it has been 4 seconds and we are NOT printing, then toss.
// This gives us a second to get to print.
if now.GetTimeSeconds()-m.last.GetTimeSeconds() > 3 {
if now.GetTimeSeconds()-m.Last.GetTimeSeconds() > 3 {
m.msgCnt += len(m.MsgList) // Keep my counts
m.msgCntp += len(m.MsgList)
m.MsgCntp += len(m.MsgList)
m.MsgList = make([]*msglist, 0) // Clear the record.
m.last = now
m.Last = now
}

}
Expand Down Expand Up @@ -119,11 +119,11 @@ func (m *MsgLog) PrtMsgs(state interfaces.IState) {
}
}
now := state.GetTimestamp()
m.last = now
m.Last = now
m.msgCnt += len(m.MsgList) // Keep my counts
m.msgCntp += len(m.MsgList)
m.MsgCntp += len(m.MsgList)
m.MsgList = m.MsgList[0:0] // Once printed, clear the list

fmt.Println(fmt.Sprintf("*** %42s **** ", fmt.Sprintf("Length: %d Msgs/sec: T %d P %d", len(m.MsgList), m.msgPerSec, m.msgPerSecp)))
fmt.Println(fmt.Sprintf("*** %42s **** ", fmt.Sprintf("Length: %d Msgs/sec: T %d P %d", len(m.MsgList), m.msgPerSec, m.MsgPerSecp)))
fmt.Println("\n-----------------------------------------------------")
}
51 changes: 51 additions & 0 deletions engine/MsgLogging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package engine_test

import (
"testing"

. "github.com/FactomProject/factomd/engine"
"github.com/FactomProject/factomd/state"
"github.com/FactomProject/factomd/common/messages"
"github.com/FactomProject/factomd/common/primitives"

"time"
)



func TestMessageLoging(t *testing.T) {
msgLog := new(MsgLog)
msgLog.Init(true, 0)

fnode := new(FactomNode)
fnode.State = new(state.State)
s := fnode.State

msg := new(messages.Bounce)
msg.Name = "bob"
msg.Timestamp = primitives.NewTimestampNow()
msg.Data = []byte("here is some data")
msg.Stamps = append(msg.Stamps, primitives.NewTimestampNow())

msgLog.PrtMsgs(s)

msgLog.Add2(fnode, true, "peer","where",true,msg)
msgLog.Add2(fnode, false, "peer","where",true,msg)

msgLog.Startp = primitives.NewTimestampFromMilliseconds(0)
msgLog.Add2(fnode, false, "peer","where",true,msg)

if len(msgLog.MsgList) != 3 {
t.Error("Should have three entries")
}
msgLog.PrtMsgs(s)

msgLog.Last.SetTimeSeconds(msgLog.Last.GetTimeSeconds()-6)
time.Sleep(10*time.Millisecond)
msgLog.Add2(fnode, false, "peer","where",true,msg)

if len(msgLog.MsgList) != 0 {
t.Error("Should have zero messages")
}
msgLog.PrtMsgs(s)
}
2 changes: 1 addition & 1 deletion engine/NetStart.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func NetStart(s *state.State) {
s.Init()
s.SetDropRate(droprate)

mLog.init(runtimeLog, cnt)
mLog.Init(runtimeLog, cnt)

setupFirstAuthority(s)

Expand Down
4 changes: 2 additions & 2 deletions engine/NetworkProcessorNet.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func NetworkOutputs(fnode *FactomNode) {
if p < 0 {
p = rand.Int() % len(fnode.Peers)
}
fnode.MLog.add2(fnode, true, fnode.Peers[p].GetNameTo(), "P2P out", true, msg)
fnode.MLog.Add2(fnode, true, fnode.Peers[p].GetNameTo(), "P2P out", true, msg)
if !fnode.State.GetNetStateOff() {
fnode.Peers[p].Send(msg)
if fnode.State.MessageTally {
Expand All @@ -168,7 +168,7 @@ func NetworkOutputs(fnode *FactomNode) {
// Don't resend to the node that sent it to you.
if i != p || wt > 1 {
bco := fmt.Sprintf("%s/%d/%d", "BCast", p, i)
fnode.MLog.add2(fnode, true, peer.GetNameTo(), bco, true, msg)
fnode.MLog.Add2(fnode, true, peer.GetNameTo(), bco, true, msg)
if !fnode.State.GetNetStateOff() {
peer.Send(msg)
if fnode.State.MessageTally {
Expand Down
3 changes: 2 additions & 1 deletion state/entrySyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"math/rand"
"time"

"github.com/FactomProject/factomd/common/constants"
"github.com/FactomProject/factomd/common/interfaces"
"github.com/FactomProject/factomd/common/messages"
"github.com/FactomProject/factomd/database/databaseOverlay"
Expand Down Expand Up @@ -80,7 +81,7 @@ func (s *State) MakeMissingEntryRequests() {
}

sent := 0
if s.inMsgQueue.Length() < 500 {
if s.inMsgQueue.Length() < constants.INMSGQUEUE_MED {
// Make requests for entries we don't have.
for k := range MissingEntryMap {

Expand Down
2 changes: 1 addition & 1 deletion state/printState.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func PrintState(state *State) {
str = fmt.Sprintf("%s %35s = %+v\n", str, "NewEntryChains", state.NewEntryChains)
str = fmt.Sprintf("%s %35s = %+v\n", str, "NewEntries", state.NewEntries)
str = fmt.Sprintf("%s %35s = %+v\n", str, "LeaderTimestamp", state.LeaderTimestamp)
str = fmt.Sprintf("%s %35s = %+v\n", str, "resendHolding", state.resendHolding)
str = fmt.Sprintf("%s %35s = %+v\n", str, "resendHolding", state.ResendHolding)
//str = fmt.Sprintf("%s %35s = %+v\n", str, "Holding", state.Holding)
str = fmt.Sprintf("%s %35s = %+v\n", str, "XReview", state.XReview)
str = fmt.Sprintf("%s %35s = %+v\n", str, "Acks", state.Acks)
Expand Down
6 changes: 1 addition & 5 deletions state/processList.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func (p *ProcessList) Ask(vmIndex int, height int, waitSeconds int64, tag int) i
return 0
}

if now-r.sent >= waitSeconds*1000+500 {
if now-r.sent >= waitSeconds*1000+500 && p.State.inMsgQueue.Length() < constants.INMSGQUEUE_MED {
missingMsgRequest := messages.NewMissingMsg(p.State, r.vmIndex, p.DBHeight, r.vmheight)

// The System (handling full faults) is a special VM. Let's guess it first.
Expand Down Expand Up @@ -939,10 +939,6 @@ func (p *ProcessList) AddToProcessList(ack *messages.Ack, m interfaces.IMsg) {
}
}

if _, ok := m.(*messages.MissingMsg); ok {
panic("This shouldn't happen")
}

toss := func(hint string) {
fmt.Println("dddd TOSS in Process List", p.State.FactomNodeName, hint)
fmt.Println("dddd TOSS in Process List", p.State.FactomNodeName, ack.String())
Expand Down
2 changes: 1 addition & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ type State struct {
// Maps
// ====
// For Follower
resendHolding interfaces.Timestamp // Timestamp to gate resending holding to neighbors
ResendHolding interfaces.Timestamp // Timestamp to gate resending holding to neighbors
Holding map[[32]byte]interfaces.IMsg // Hold Messages
XReview []interfaces.IMsg // After the EOM, we must review the messages in Holding
Acks map[[32]byte]interfaces.IMsg // Hold Acknowledgemets
Expand Down
14 changes: 7 additions & 7 deletions state/stateConsensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,21 +240,21 @@ func (s *State) ReviewHolding() {
return
}

if s.inMsgQueue.Length() > 10 {
if s.inMsgQueue.Length() > constants.INMSGQUEUE_LOW {
return
}

now := s.GetTimestamp()
if s.resendHolding == nil {
s.resendHolding = now
if s.ResendHolding == nil {
s.ResendHolding = now
}
if now.GetTimeMilli()-s.resendHolding.GetTimeMilli() < 300 {
if now.GetTimeMilli()-s.ResendHolding.GetTimeMilli() < 300 {
return
}

s.DB.Trim()

s.resendHolding = now
s.ResendHolding = now
// Anything we are holding, we need to reprocess.
s.XReview = make([]interfaces.IMsg, 0)

Expand Down Expand Up @@ -570,7 +570,7 @@ func (s *State) FollowerExecuteDBState(msg interfaces.IMsg) {
func (s *State) FollowerExecuteMMR(m interfaces.IMsg) {

// Just ignore missing messages for a period after going off line or starting up.
if s.IgnoreMissing {
if s.IgnoreMissing || s.inMsgQueue.Length() > constants.INMSGQUEUE_HIGH {
return
}

Expand Down Expand Up @@ -694,7 +694,7 @@ func (s *State) FollowerExecuteDataResponse(m interfaces.IMsg) {

func (s *State) FollowerExecuteMissingMsg(msg interfaces.IMsg) {
// Don't respond to missing messages if we are behind.
if len(s.inMsgQueue) > 100 {
if s.inMsgQueue.Length() > constants.INMSGQUEUE_LOW {
return
}

Expand Down
12 changes: 12 additions & 0 deletions state/stateConsensus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2017 Factom Foundation
// Use of this source code is governed by the MIT
// license that can be found in the LICENSE file.

package state_test

import (

)



0 comments on commit 27e1722

Please sign in to comment.