Skip to content

Commit

Permalink
Merge pull request #267 from FactomProject/FD-156
Browse files Browse the repository at this point in the history
Fd 156
  • Loading branch information
Emyrk committed Jun 13, 2017
2 parents 154d593 + 15f28fc commit edbbf8e
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -10,7 +10,7 @@ install:
script:
- go build -v
- go test -v $(glide nv)
- $GOPATH/bin/goveralls -service=travis-ci -ignore=$(paste -sd, .coverignore)
- travis_wait 30 $GOPATH/bin/goveralls -service=travis-ci -ignore=$(paste -sd, .coverignore)
notifications:
slack:
secure: c8wuYqAHxEhIPBcCoM0IzoPQaBA5pqqi5jFRHsMu98VlHoKEiyjTGtNp2E7I5Y0jmxg1fEFZNsfRe0qC6X+XgcLuBCvsbFVeImtLP5sDZh3+FgfvQM+rh4VpgUENuIEpMCjXkJVzXAxEmWve3GLrhmdlP8PBxCbiwnUBe4kpbYeWZNcNXC4bZGxslNQBC/EHcrzW2zvRoMvBRhfPCbNea/XD/+6yK6tujOJ61HA9h3+Ys8FIAfyYy5XmNctKQKE6MOo6sh9Ou/OSlVM8JajG+FPDoYbk/MMnakAL41pQbZZjCYB9xI1y58zslTlv0FxmKsqml9qffb5veNpVYpdljOpegA3u4TGaLdTCpzJxibpuGVJWzUKHO2y59y54DPK275mOZCVL88SfKuUsFNER3Y6z0uZR3lLfsK5cmh5rPLyFCoPW9QvgT+nSUP/ueS629RgvyVWRXMpEin2P38v+4FqBrQMJ1fjAnecFxT9ztot0YyUsYqfzVnvaaQbXG6hAvCcc+iE5N0ObqqUhFGdiRa8IbsuOU1pL1uGeVSZXTvbb7Gh1TP1KPAb+vbeJygg6VYxHTJZIbF58yo7myqfprq6WBocYQh1C2/hhBfE4cE1su8vNZMex9cSk7fbK2WM9vYGe5g/rtWfx0EGK/16qOnOdCnrG7fnPq/R1REh8bvk=
1 change: 1 addition & 0 deletions common/messages/MessageBase.go
Expand Up @@ -24,6 +24,7 @@ type MessageBase struct {

LeaderChainID interfaces.IHash
MsgHash interfaces.IHash // Cache of the hash of a message
RepeatHash interfaces.IHash // Cache of the hash of a message
VMIndex int // The Index of the VM responsible for this message.
VMHash []byte // Basis for selecting a VMIndex
Minute byte
Expand Down
20 changes: 14 additions & 6 deletions common/messages/directoryBlockSignature.go
Expand Up @@ -86,20 +86,28 @@ func (e *DirectoryBlockSignature) Process(dbheight uint32, state interfaces.ISta
}

func (m *DirectoryBlockSignature) GetRepeatHash() interfaces.IHash {
return m.GetMsgHash()
if m.RepeatHash == nil {
data, err := m.MarshalBinary()
if err != nil {
return nil
}
m.RepeatHash = primitives.Sha(data)
}
return m.RepeatHash
}

func (m *DirectoryBlockSignature) GetHash() interfaces.IHash {
return m.GetMsgHash()
}

func (m *DirectoryBlockSignature) GetMsgHash() interfaces.IHash {
data, _ := m.MarshalBinary()
if data == nil {
return nil
if m.MsgHash == nil {
data, _ := m.MarshalForSignature()
if data == nil {
return nil
}
m.MsgHash = primitives.Sha(data)
}
m.MsgHash = primitives.Sha(data)

return m.MsgHash
}

Expand Down
9 changes: 8 additions & 1 deletion common/messages/eom.go
Expand Up @@ -78,7 +78,14 @@ func (e *EOM) Process(dbheight uint32, state interfaces.IState) bool {
}

func (m *EOM) GetRepeatHash() interfaces.IHash {
return m.GetMsgHash()
if m.RepeatHash == nil {
data, err := m.MarshalBinary()
if err != nil {
return nil
}
m.RepeatHash = primitives.Sha(data)
}
return m.RepeatHash
}

func (m *EOM) GetHash() interfaces.IHash {
Expand Down
3 changes: 2 additions & 1 deletion controlPanel/dataDump.go
Expand Up @@ -14,7 +14,7 @@ type DataDump struct {
}
DataDump2 struct {
RawDump string
NextDump string
PrevDump string
}
DataDump3 struct {
RawDump string
Expand All @@ -40,6 +40,7 @@ func GetDataDumps() []byte {
holder.DataDump1.RawDump = DsCopy.RawSummary

holder.DataDump2.RawDump = DsCopy.ProcessList
holder.DataDump2.PrevDump = DsCopy.ProcessList2

holder.DataDump3.RawDump = DsCopy.PrintMap

Expand Down
18 changes: 10 additions & 8 deletions state/inMsgQueue.go
Expand Up @@ -4,8 +4,6 @@ import (
"github.com/FactomProject/factomd/common/interfaces"
)

var inMsgQueueRateKeeper *RateCalculator

// InMsgQueueRatePrometheus is for setting the appropriate prometheus calls
type InMsgQueueRatePrometheus struct{}

Expand All @@ -24,9 +22,6 @@ type InMsgMSGQueue chan interfaces.IMsg

func NewInMsgQueue(capacity int) InMsgMSGQueue {
channel := make(chan interfaces.IMsg, capacity)
rc := NewRateCalculator(new(InMsgQueueRatePrometheus))
go rc.Start()
inMsgQueueRateKeeper = rc
return channel
}

Expand All @@ -42,7 +37,7 @@ func (q InMsgMSGQueue) Cap() int {

// Enqueue adds item to channel and instruments based on type
func (q InMsgMSGQueue) Enqueue(m interfaces.IMsg) {
inMsgQueueRateKeeper.Arrival()
//inMsgQueueRateKeeper.Arrival()
measureMessage(q, m, true)
q <- m
}
Expand All @@ -53,7 +48,7 @@ func (q InMsgMSGQueue) Dequeue() interfaces.IMsg {
select {
case v := <-q:
measureMessage(q, v, false)
inMsgQueueRateKeeper.Complete()
//inMsgQueueRateKeeper.Complete()
return v
default:
return nil
Expand All @@ -64,14 +59,21 @@ func (q InMsgMSGQueue) Dequeue() interfaces.IMsg {
func (q InMsgMSGQueue) BlockingDequeue() interfaces.IMsg {
v := <-q
measureMessage(q, v, false)
inMsgQueueRateKeeper.Complete()
//inMsgQueueRateKeeper.Complete()
return v
}

//
// A list of all possible messages and their prometheus incrementing/decrementing
//

func (q InMsgMSGQueue) General(increment bool) {
if !increment {
return
}
TotalMessageQueueInMsgGeneral.Inc()
}

func (q InMsgMSGQueue) EOM(increment bool) {
if !increment {
CurrentMessageQueueInMsgEOM.Dec()
Expand Down
11 changes: 11 additions & 0 deletions state/instrumentation.go
Expand Up @@ -79,6 +79,10 @@ var (

// Queues
// InMsg
TotalMessageQueueInMsgGeneral = prometheus.NewCounter(prometheus.CounterOpts{
Name: "factomd_state_queue_total_general_inmsg",
Help: "Instrumenting the netoutmsg queue",
})
CurrentMessageQueueInMsgEOM = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "factomd_state_queue_current_inmsg_eom",
Help: "Instrumenting the inmsg queue",
Expand Down Expand Up @@ -266,6 +270,10 @@ var (
})

// NetworkOutMsg
TotalMessageQueueNetOutMsgGeneral = prometheus.NewCounter(prometheus.CounterOpts{
Name: "factomd_state_queue_total_general_netoutmsg",
Help: "Instrumenting the netoutmsg queue",
})
TotalMessageQueueNetOutMsgEOM = prometheus.NewCounter(prometheus.CounterOpts{
Name: "factomd_state_queue_total_netoutmsg_eom",
Help: "Instrumenting the netoutmsg queue",
Expand Down Expand Up @@ -555,4 +563,7 @@ func RegisterPrometheus() {
prometheus.MustRegister(NetOutQueueBackupRate)
prometheus.MustRegister(NetOutMovingArrivalQueueRate)
prometheus.MustRegister(NetOutMovingCompleteQueueRate)

prometheus.MustRegister(TotalMessageQueueInMsgGeneral)
prometheus.MustRegister(TotalMessageQueueNetOutMsgGeneral)
}
16 changes: 7 additions & 9 deletions state/netOutMsgQueue.go
Expand Up @@ -4,8 +4,6 @@ import (
"github.com/FactomProject/factomd/common/interfaces"
)

var NetOutMsgQueueRateKeeper *RateCalculator

// NetOutQueueRatePrometheus is for setting the appropriate prometheus calls
type NetOutQueueRatePrometheus struct{}

Expand All @@ -26,10 +24,6 @@ type NetOutMsgQueue chan interfaces.IMsg

func NewNetOutMsgQueue(capacity int) NetOutMsgQueue {
channel := make(chan interfaces.IMsg, capacity)
rc := NewRateCalculator(new(NetOutQueueRatePrometheus))
go rc.Start()
NetOutMsgQueueRateKeeper = rc

return channel
}

Expand All @@ -45,7 +39,7 @@ func (q NetOutMsgQueue) Cap() int {

// Enqueue adds item to channel and instruments based on type
func (q NetOutMsgQueue) Enqueue(m interfaces.IMsg) {
NetOutMsgQueueRateKeeper.Arrival()
//NetOutMsgQueueRateKeeper.Arrival()
measureMessage(q, m, true)
q <- m
}
Expand All @@ -55,7 +49,7 @@ func (q NetOutMsgQueue) Enqueue(m interfaces.IMsg) {
func (q NetOutMsgQueue) Dequeue() interfaces.IMsg {
select {
case v := <-q:
NetOutMsgQueueRateKeeper.Complete()
//NetOutMsgQueueRateKeeper.Complete()
return v
default:
return nil
Expand All @@ -65,14 +59,18 @@ func (q NetOutMsgQueue) Dequeue() interfaces.IMsg {
// BlockingDequeue will block until it retrieves from queue
func (q NetOutMsgQueue) BlockingDequeue() interfaces.IMsg {
v := <-q
NetOutMsgQueueRateKeeper.Complete()
//NetOutMsgQueueRateKeeper.Complete()
return v
}

//
// A list of all possible messages and their prometheus incrementing/decrementing
//

func (q NetOutMsgQueue) General(increment bool) {
TotalMessageQueueNetOutMsgGeneral.Inc()
}

func (q NetOutMsgQueue) EOM(increment bool) {
TotalMessageQueueNetOutMsgEOM.Inc()
}
Expand Down
11 changes: 6 additions & 5 deletions state/processList.go
Expand Up @@ -128,11 +128,12 @@ type DBSig struct {
}

type VM struct {
List []interfaces.IMsg // Lists of acknowledged messages
ListAck []*messages.Ack // Acknowledgements
Height int // Height of messages that have been processed
LeaderMinute int // Where the leader is in acknowledging messages
Synced bool // Is this VM synced yet?
List []interfaces.IMsg // Lists of acknowledged messages
ListAck []*messages.Ack // Acknowledgements
Height int // Height of messages that have been processed
EomMinuteIssued int // Last Minute Issued on this VM (from the leader, when we are the leader)
LeaderMinute int // Where the leader is in acknowledging messages
Synced bool // Is this VM synced yet?
//faultingEOM int64 // Faulting for EOM because it is too late
heartBeat int64 // Just ping ever so often if we have heard nothing.
Signed bool // We have signed the previous block.
Expand Down
3 changes: 3 additions & 0 deletions state/queues.go
Expand Up @@ -57,6 +57,8 @@ func (q GeneralMSGQueue) BlockingDequeue() interfaces.IMsg {
//

type IPrometheusChannel interface {
General(increment bool)

EOM(increment bool)
ACK(increment bool)
AudFault(increment bool)
Expand Down Expand Up @@ -87,6 +89,7 @@ func measureMessage(channel IPrometheusChannel, msg interfaces.IMsg, increment b
if msg == nil {
return
}
channel.General(increment)
switch msg.Type() {
case constants.EOM_MSG: // 1
channel.EOM(increment)
Expand Down
2 changes: 2 additions & 0 deletions state/state.go
Expand Up @@ -61,6 +61,8 @@ type State struct {
ExportData bool
ExportDataSubpath string

LogBits int64 // Bit zero is for logging the Directory Block on DBSig [5]

DBStatesSent []*interfaces.DBStateSent
DBStatesReceivedBase int
DBStatesReceived []*messages.DBStateMsg
Expand Down
20 changes: 16 additions & 4 deletions state/stateConsensus.go
Expand Up @@ -842,13 +842,19 @@ func (s *State) LeaderExecuteEOM(m interfaces.IMsg) {
s.EOMMinute = int(s.CurrentMinute)
}

if vm.EomMinuteIssued >= s.CurrentMinute+1 {
//os.Stderr.WriteString(fmt.Sprintf("Bump detected %s minute %2d\n", s.FactomNodeName, s.CurrentMinute))
return
}

//_, vmindex := pl.GetVirtualServers(s.EOMMinute, s.IdentityChainID)

eom.DBHeight = s.LLeaderHeight
eom.VMIndex = s.LeaderVMIndex
// eom.Minute is zerobased, while LeaderMinute is 1 based. So
// a simple assignment works.
eom.Minute = byte(s.CurrentMinute)
vm.EomMinuteIssued = s.CurrentMinute + 1
eom.Sign(s)
eom.MsgHash = nil
ack := s.NewAck(m, nil).(*messages.Ack)
Expand Down Expand Up @@ -1166,8 +1172,6 @@ func (s *State) SendDBSig(dbheight uint32, vmIndex int) {
dbs.LeaderExecute(s)
vm.Signed = true
pl.DBSigAlreadySent = true
raw, _ := dbs.MarshalBinary()
s.Logf("info", "DirectoryBlockSignature SENT V: %d LDBHT: %d %s\n RAW: %x", dbs.Validate(s), s.GetLeaderHeight(), dbs.String(), raw)
} else {
pl.Ask(vmIndex, 0, 0, 5)
}
Expand Down Expand Up @@ -1206,9 +1210,11 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool {
if s.EOMDone && s.EOMSys {
dbstate := s.GetDBState(dbheight - 1)
if dbstate == nil {
//s.AddStatus(fmt.Sprintf("EOM PROCESS: vm %2d DBState == nil: return on s.EOM(%v) && int(e.Minute(%v)) > s.EOMMinute(%v)", e.VMIndex, s.EOM, e.Minute, s.EOMMinute))
return false
}
if !dbstate.Saved {
//s.AddStatus(fmt.Sprintf("EOM PROCESS: vm %2d DBState not saved: return on s.EOM(%v) && int(e.Minute(%v)) > s.EOMMinute(%v)", e.VMIndex, s.EOM, e.Minute, s.EOMMinute))
return false
}

Expand Down Expand Up @@ -1241,6 +1247,7 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool {
for _, vm := range pl.VMs {
vm.Synced = false
}
//s.AddStatus(fmt.Sprintf("EOM PROCESS: vm %2d First EOM processed: return on s.EOM(%v) && int(e.Minute(%v)) > s.EOMMinute(%v)", e.VMIndex, s.EOM, e.Minute, s.EOMMinute))
return false
}

Expand All @@ -1257,11 +1264,15 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool {
if s.LeaderPL.SysHighest < int(e.SysHeight) {
s.LeaderPL.SysHighest = int(e.SysHeight)
}
//s.AddStatus(fmt.Sprintf("EOM PROCESS: vm %2d Process this EOM: return on s.EOM(%v) && int(e.Minute(%v)) > s.EOMMinute(%v)", e.VMIndex, s.EOM, e.Minute, s.EOMMinute))
return false
}

allfaults := s.LeaderPL.System.Height >= s.LeaderPL.SysHighest

//if !allfaults {
// os.Stderr.WriteString(fmt.Sprintf("%s dbht %d min %d Don't have all faults\n", s.FactomNodeName, e.DBHeight, e.Minute))
//}
// After all EOM markers are processed, Claim we are done. Now we can unwind
if allfaults && s.EOMProcessed == s.EOMLimit && !s.EOMDone {

Expand Down Expand Up @@ -1354,8 +1365,6 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool {
pldbs.DBSigAlreadySent = true

dbs.LeaderExecute(s)
raw, _ := dbs.MarshalBinary()
s.Logf("info", "DirectoryBlockSignature SENT V: %d LDBHT: %d %s\n RAW: %x", dbs.Validate(s), s.GetLeaderHeight(), dbs.String(), raw)
}
s.Saving = true
}
Expand All @@ -1375,6 +1384,9 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool {
delete(s.Acks, k)
}
}
//s.AddStatus(fmt.Sprintf("EOM PROCESS: vm %2d Saving: return on s.EOM(%v) && int(e.Minute(%v)) > s.EOMMinute(%v)", e.VMIndex, s.EOM, e.Minute, s.EOMMinute))
} else {
//s.AddStatus(fmt.Sprintf("EOM PROCESS: vm %2d Do nothing: return on s.EOM(%v) && int(e.Minute(%v)) > s.EOMMinute(%v)", e.VMIndex, s.EOM, e.Minute, s.EOMMinute))
}

return false
Expand Down

0 comments on commit edbbf8e

Please sign in to comment.