Skip to content

Commit

Permalink
Merge branch 'communitynet-m3' of github.com:FactomProject/factomd in…
Browse files Browse the repository at this point in the history
…to communitynet-m3
  • Loading branch information
Emyrk committed Apr 25, 2018
2 parents f4aeca8 + 2db1907 commit 2c0fca5
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 104 deletions.
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.4.2.21
5.0.0
6 changes: 3 additions & 3 deletions common/constants/constants.go
Expand Up @@ -183,9 +183,9 @@ var (

const (
// Limits for keeping inputs from flooding our execution
INMSGQUEUE_HIGH = 1000
INMSGQUEUE_MED = 500
INMSGQUEUE_LOW = 100
INMSGQUEUE_HIGH = 100000
INMSGQUEUE_MED = 5000
INMSGQUEUE_LOW = 1000

DBSTATE_REQUEST_LIM_HIGH = 200
DBSTATE_REQUEST_LIM_MED = 50
Expand Down
22 changes: 15 additions & 7 deletions common/messages/directoryBlockSignature.go
Expand Up @@ -40,6 +40,7 @@ type DirectoryBlockSignature struct {
Matches bool
hash interfaces.IHash
marshalCache []byte
dbsHash interfaces.IHash
}

var _ interfaces.IMsg = (*DirectoryBlockSignature)(nil)
Expand Down Expand Up @@ -387,23 +388,30 @@ func (m *DirectoryBlockSignature) MarshalBinary() (data []byte, err error) {
}

func (m *DirectoryBlockSignature) String() string {
return fmt.Sprintf("%6s-VM%3d: DBHt:%5d -- Signer[:3]=%x PrevDBKeyMR[:3]=%x hash[:3]=%x",
b, err := m.DirectoryBlockHeader.MarshalBinary()
if b != nil && err != nil {
h := primitives.Sha(b)
m.dbsHash = h
} else {
m.dbsHash = primitives.NewHash(constants.ZERO)
}
return fmt.Sprintf("%6s-VM%3d: DBHt:%5d -- Signer[:3]=%x Directory Hash[:3]=%x hash[:3]=%x",
"DBSig",
m.VMIndex,
m.DBHeight,
m.ServerIdentityChainID.Bytes()[2:6],
m.DirectoryBlockHeader.GetPrevKeyMR().Bytes()[:3],
m.dbsHash.Bytes()[:5],
m.GetHash().Bytes()[:3])

}

func (m *DirectoryBlockSignature) LogFields() log.Fields {
return log.Fields{"category": "message", "messagetype": "dbsig",
"dbheight": m.DBHeight,
"vm": m.VMIndex,
"server": m.ServerIdentityChainID.String(),
"prevkeymr": m.DirectoryBlockHeader.GetPrevKeyMR().String(),
"hash": m.GetHash().String()}
"dbheight": m.DBHeight,
"vm": m.VMIndex,
"server": m.ServerIdentityChainID.String(),
"dbhash": m.DirectoryBlockHeader.GetPrevFullHash(),
"hash": m.GetHash().String()}
}

func (e *DirectoryBlockSignature) JSONByte() ([]byte, error) {
Expand Down
6 changes: 3 additions & 3 deletions engine/NetworkProcessorNet.go
Expand Up @@ -102,7 +102,7 @@ func Peers(fnode *FactomNode) {
case constants.MISSING_DATA:
if !fnode.State.DBFinished {
return true
} else if fnode.State.InMsgQueue().Length() > 4000 {
} else if fnode.State.InMsgQueue().Length() > constants.INMSGQUEUE_HIGH {
// If > 4000, we won't get to this in time anyway. Just drop it since we are behind
return true
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func Peers(fnode *FactomNode) {
continue // Toss any inputs from API
}

if fnode.State.InMsgQueue().Length() > fnode.State.InMsgQueue().Cap()*9/10 {
if fnode.State.InMsgQueue().Length() > constants.INMSGQUEUE_HIGH {
fnode.State.LogMessage("NetworkInputs", "API Drop, Too Full", msg)
continue
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func Peers(fnode *FactomNode) {
continue // Toss any inputs from this peer
}

if fnode.State.InMsgQueue().Length() > fnode.State.InMsgQueue().Cap()*9/10 {
if fnode.State.InMsgQueue().Length() > constants.INMSGQUEUE_HIGH {
fnode.State.LogMessage("NetworkInputs", fromPeer+" Drop, Too Full", msg)
continue
}
Expand Down
7 changes: 4 additions & 3 deletions engine/loadJournal.go
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"time"

"github.com/FactomProject/factomd/common/constants"
"github.com/FactomProject/factomd/common/interfaces"
"github.com/FactomProject/factomd/common/messages/msgsupport"
)
Expand Down Expand Up @@ -84,8 +85,8 @@ func LoadJournalFromReader(s interfaces.IState, r *bufio.Reader) {
// Process the message.
s.InMsgQueue().Enqueue(msg)
p++
if s.InMsgQueue().Length() > 200 {
for s.InMsgQueue().Length() > 50 {
if s.InMsgQueue().Length() > constants.INMSGQUEUE_MED {
for s.InMsgQueue().Length() > constants.INMSGQUEUE_LOW {
time.Sleep(time.Millisecond * 10)
}
time.Sleep(time.Millisecond * 100)
Expand All @@ -94,7 +95,7 @@ func LoadJournalFromReader(s interfaces.IState, r *bufio.Reader) {

//Waiting for state to process the message queue
//before we disable "IsDoneReplaying"
for s.InMsgQueue().Length() > 0 {
for s.InMsgQueue().Length() > constants.INMSGQUEUE_LOW {
time.Sleep(time.Millisecond * 100)
}
}
3 changes: 2 additions & 1 deletion engine/timer.go
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"time"

"github.com/FactomProject/factomd/common/constants"
"github.com/FactomProject/factomd/common/interfaces"
s "github.com/FactomProject/factomd/state"
)
Expand Down Expand Up @@ -53,7 +54,7 @@ func Timer(state interfaces.IState) {
next += tenthPeriod
}
time.Sleep(time.Duration(wait))
for state.InMsgQueue().Length() > 5000 {
for state.InMsgQueue().Length() > constants.INMSGQUEUE_HIGH {
time.Sleep(100 * time.Millisecond)
}

Expand Down
30 changes: 28 additions & 2 deletions p2p/controller.go
Expand Up @@ -12,14 +12,16 @@ package p2p

import (
"fmt"
"math/rand"

"net"
"strings"
"time"
"unicode"

"github.com/FactomProject/factomd/common/primitives"

"math/rand"

log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -742,7 +744,7 @@ func (c *Controller) broadcast(parcel Parcel) {

// perform a shuffle on the connection peers, so that we can obtain a random sample
// by getting items from the begiining of the shuffled slice
rand.Shuffle(len(regularPeers), func(i, j int) {
Shuffle(len(regularPeers), func(i, j int) {
regularPeers[i], regularPeers[j] = regularPeers[j], regularPeers[i]
})

Expand All @@ -755,6 +757,30 @@ func (c *Controller) broadcast(parcel Parcel) {
SentToPeers.Set(float64(numSent))
}

// This is the implementation of Shuffle with go 1.10, but included here to allow go 1.9 to
// still compile our code.
func Shuffle(n int, swap func(i, j int)) {
if n < 0 {
panic("invalid argument to Shuffle")
}

// Fisher-Yates shuffle: https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
// Shuffle really ought not be called with n that doesn't fit in 32 bits.
// Not only will it take a very long time, but with 2³¹! possible permutations,
// there's no way that any PRNG can have a big enough internal state to
// generate even a minuscule percentage of the possible permutations.
// Nevertheless, the right API signature accepts an int n, so handle it as best we can.
i := n - 1
for ; i > 1<<31-1-1; i-- {
j := int(rand.Int63n(int64(i + 1)))
swap(i, j)
}
for ; i > 0; i-- {
j := int(rand.Int31n(int32(i + 1)))
swap(i, j)
}
}

func min(x, y int) int {
if x < y {
return x
Expand Down
4 changes: 2 additions & 2 deletions state/loadDatabase.go
Expand Up @@ -61,8 +61,8 @@ func LoadDatabase(s *State) {
}
s.InMsgQueue().Enqueue(msg)
msg.SetLocal(true)
if s.InMsgQueue().Length() > 500 {
for s.InMsgQueue().Length() > 100 {
if s.InMsgQueue().Length() > constants.INMSGQUEUE_MED {
for s.InMsgQueue().Length() > constants.INMSGQUEUE_LOW {
time.Sleep(10 * time.Millisecond)
}
}
Expand Down
6 changes: 6 additions & 0 deletions state/processList.go
Expand Up @@ -864,6 +864,12 @@ func (p *ProcessList) Process(state *State) (progress bool) {
}

if msg.Process(p.DBHeight, state) { // Try and Process this entry

if msg.Type() == constants.REVEAL_ENTRY_MSG {
delete(p.State.Holding, msg.GetHash().Fixed()) // We successfully executed the message, so take it out of holding if it is there.
p.State.Commits.Delete(msg.GetHash().Fixed())
}

p.State.LogMessage("processList", "done", msg)
vm.heartBeat = 0
vm.Height = j + 1 // Don't process it again if the process worked.
Expand Down
14 changes: 3 additions & 11 deletions state/processListManager.go
Expand Up @@ -15,10 +15,9 @@ var _ = fmt.Print
var _ = log.Print

type ProcessLists struct {
State *State // Pointer to the state object
DBHeightBase uint32 // Height of the first Process List in this structure.
Lists []*ProcessList // Pointer to the ProcessList structure for each DBHeight under construction
Holding [64]map[[32]byte]interfaces.IMsg // Messages that don't have Acks
State *State // Pointer to the state object
DBHeightBase uint32 // Height of the first Process List in this structure.
Lists []*ProcessList // Pointer to the ProcessList structure for each DBHeight under construction
SetString bool
Str string
}
Expand All @@ -27,13 +26,6 @@ func (lists *ProcessLists) LastList() *ProcessList {
return lists.Lists[len(lists.Lists)-1]
}

func (lists *ProcessLists) GetHolding(vmi int) map[[32]byte]interfaces.IMsg {
if lists.Holding[vmi] == nil {
lists.Holding[vmi] = make(map[[32]byte]interfaces.IMsg)
}
return lists.Holding[vmi]
}

// UpdateState is executed from a Follower's perspective. So the block we are building
// is always the block above the HighestRecordedBlock, but we only care about messages that
// are at the highest known block, as long as that is above the highest recorded block.
Expand Down
20 changes: 10 additions & 10 deletions state/state.go
Expand Up @@ -825,16 +825,16 @@ func (s *State) Init() {
s.TimeOffset = new(primitives.Timestamp) //interfaces.Timestamp(int64(rand.Int63() % int64(time.Microsecond*10)))
s.networkInvalidMsgQueue = make(chan interfaces.IMsg, 100) //incoming message queue from the network messages
s.InvalidMessages = make(map[[32]byte]interfaces.IMsg, 0)
s.networkOutMsgQueue = NewNetOutMsgQueue(1000) //Messages to be broadcast to the network
s.inMsgQueue = NewInMsgQueue(10000) //incoming message queue for Factom application messages
s.electionsQueue = NewElectionQueue(10000) //incoming message queue for Factom application messages
s.apiQueue = NewAPIQueue(100) //incoming message queue from the API
s.ackQueue = make(chan interfaces.IMsg, 100) //queue of Leadership messages
s.msgQueue = make(chan interfaces.IMsg, 400) //queue of Follower messages
s.ShutdownChan = make(chan int, 1) //Channel to gracefully shut down.
s.MissingEntries = make(chan *MissingEntry, 1000) //Entries I discover are missing from the database
s.UpdateEntryHash = make(chan *EntryUpdate, 10000) //Handles entry hashes and updating Commit maps.
s.WriteEntry = make(chan interfaces.IEBEntry, 3000) //Entries to be written to the database
s.networkOutMsgQueue = NewNetOutMsgQueue(1000) //Messages to be broadcast to the network
s.inMsgQueue = NewInMsgQueue(constants.INMSGQUEUE_HIGH + 10) //incoming message queue for Factom application messages
s.electionsQueue = NewElectionQueue(10000) //incoming message queue for Factom application messages
s.apiQueue = NewAPIQueue(100) //incoming message queue from the API
s.ackQueue = make(chan interfaces.IMsg, 100) //queue of Leadership messages
s.msgQueue = make(chan interfaces.IMsg, 400) //queue of Follower messages
s.ShutdownChan = make(chan int, 1) //Channel to gracefully shut down.
s.MissingEntries = make(chan *MissingEntry, 1000) //Entries I discover are missing from the database
s.UpdateEntryHash = make(chan *EntryUpdate, 10000) //Handles entry hashes and updating Commit maps.
s.WriteEntry = make(chan interfaces.IEBEntry, 3000) //Entries to be written to the database

if s.Journaling {
f, err := os.Create(s.JournalFile)
Expand Down

0 comments on commit 2c0fca5

Please sign in to comment.