Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbeam committed Mar 27, 2017
2 parents 4276e2b + a740858 commit 89e3f97
Show file tree
Hide file tree
Showing 33 changed files with 890 additions and 469 deletions.
20 changes: 12 additions & 8 deletions common/directoryBlock/directoryBlock.go
Expand Up @@ -18,8 +18,9 @@ var _ = fmt.Print

type DirectoryBlock struct {
//Not Marshalized
DBHash interfaces.IHash
KeyMR interfaces.IHash
DBHash interfaces.IHash
KeyMR interfaces.IHash
keyMRset bool

//Marshalized
Header interfaces.IDirectoryBlockHeader
Expand Down Expand Up @@ -132,13 +133,15 @@ func (c *DirectoryBlock) GetEBlockDBEntries() []interfaces.IDBEntry {
}

func (c *DirectoryBlock) GetKeyMR() interfaces.IHash {
keyMR, err := c.BuildKeyMerkleRoot()
if err != nil {
panic("Failed to build the key MR")
}

c.KeyMR = keyMR
if !c.keyMRset {
keyMR, err := c.BuildKeyMerkleRoot()
if err != nil {
panic("Failed to build the key MR")
}

c.KeyMR = keyMR
c.keyMRset = true
}
return c.KeyMR
}

Expand Down Expand Up @@ -268,6 +271,7 @@ func (b *DirectoryBlock) BuildBodyMR() (interfaces.IHash, error) {
}

func (b *DirectoryBlock) HeaderHash() (interfaces.IHash, error) {
b.Header.SetBlockCount(uint32(len(b.GetDBEntries())))
binaryEBHeader, err := b.GetHeader().MarshalBinary()
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions common/directoryBlock/directoryBlock_test.go
Expand Up @@ -391,7 +391,7 @@ func TestBuildBlock(t *testing.T) {
h, _ = primitives.HexToHash("b926da5ea5840b34189c37c55db9eb482f6e370bd097a16d6e890bc000c10898")
db.SetEntryHash(h, c, 4)

k, _ = primitives.HexToHash("25212d6cb70ce4f109f09092985fd200da5d59668451601e21ba66e6ee0c9ebb")
k, _ = primitives.HexToHash("eadf05b85c7ad70390c72783a9a3a29ae253f4f7d45d36f176bbc56d56bab9cc")

if !k.IsSameAs(db.GetKeyMR()) {
fmt.Println(k)
Expand Down Expand Up @@ -427,7 +427,7 @@ func TestBuildBlock(t *testing.T) {
}

printout := db.String()
expectedString1 := ` KeyMR: eadf05b85c7ad70390c72783a9a3a29ae253f4f7d45d36f176bbc56d56bab9cc
expectedString1 := fmt.Sprintf(` KeyMR: %s
BodyMR: 01004ae2e96c0344a3c30a0704383c5c90ca2663921a9c1b8dc50658d52850a3
FullHash: 857d121b40c0763cd310c68963d23ebf6fa4241ef6ba26861d9b80aa71c9f3a9
Version: 0
Expand All @@ -436,7 +436,7 @@ func TestBuildBlock(t *testing.T) {
PrevKeyMR: 0000000000000000000000000000000000000000000000000000000000000000
PrevFullHash: 0000000000000000000000000000000000000000000000000000000000000000
Timestamp: 0
Timestamp Str: `
Timestamp Str: `, k.String()) // Use KeyMR from above
epoch := time.Unix(0, 0)
expectedString2 := epoch.Format("2006-01-02 15:04:05")

Expand Down
2 changes: 0 additions & 2 deletions common/interfaces/processList.go
Expand Up @@ -37,8 +37,6 @@ type IProcessList interface {
ResetDiffSigTally()
IncrementDiffSigTally()
CheckDiffSigTally() bool
//GetRequest(now int64, vmIndex int, height int, waitSeconds int64) IRequest
AskDBState(vmIndex int, height int) int
Ask(vmIndex int, height int, waitSeconds int64, tag int) int
TrimVMList(height uint32, vmIndex int)
//Process(state IState) (progress bool)
Expand Down
6 changes: 1 addition & 5 deletions common/messages/MessageBase.go
Expand Up @@ -64,12 +64,8 @@ func (m *MessageBase) SendOut(state interfaces.IState, msg interfaces.IMsg) {
go resend(state, msg, 2, 5)
case ServerFault:
go resend(state, msg, 2, 5)
case MissingMsg:
go resend(state, msg, 1, 1)
case DBStateMissing:
go resend(state, msg, 1, 1)
default:
go resend(state, msg, 1, 1)
go resend(state, msg, 1, 0)
}
}

Expand Down
15 changes: 11 additions & 4 deletions common/messages/dbstateMissing.go
Expand Up @@ -91,15 +91,15 @@ func (m *DBStateMissing) LeaderExecute(state interfaces.IState) {
}

// Only send the same block again after 15 seconds.
func (m *DBStateMissing) send(dbheight uint32, state interfaces.IState) {
func (m *DBStateMissing) send(dbheight uint32, state interfaces.IState) (msglen int) {
send := true

now := state.GetTimestamp()
sents := state.GetDBStatesSent()
var keeps []*interfaces.DBStateSent

for _, v := range sents {
if now.GetTimeSeconds()-v.Sent.GetTimeSeconds() < 1 {
if now.GetTimeSeconds()-v.Sent.GetTimeSeconds() < 10 {
if v.DBHeight == dbheight {
send = false
}
Expand All @@ -109,6 +109,11 @@ func (m *DBStateMissing) send(dbheight uint32, state interfaces.IState) {
if send {
msg, err := state.LoadDBState(dbheight)
if msg != nil && err == nil {
b, err := msg.MarshalBinary()
if err != nil {
return
}
msglen = len(b)
msg.SetOrigin(m.GetOrigin())
msg.SetNetworkOrigin(m.GetNetworkOrigin())
msg.SetNoResend(false)
Expand All @@ -121,6 +126,7 @@ func (m *DBStateMissing) send(dbheight uint32, state interfaces.IState) {
}
state.SetDBStatesSent(keeps)
}
return
}

func (m *DBStateMissing) FollowerExecute(state interfaces.IState) {
Expand All @@ -135,8 +141,9 @@ func (m *DBStateMissing) FollowerExecute(state interfaces.IState) {
if end-start > 200 {
end = start + 200
}
for dbs := start; dbs <= end; dbs++ {
m.send(dbs, state)
sent := 0
for dbs := start; dbs <= end && sent < 1024*1024; dbs++ {
sent += m.send(dbs, state)
}

return
Expand Down
9 changes: 2 additions & 7 deletions common/messages/directoryBlockSignature.go
Expand Up @@ -119,16 +119,11 @@ func (m *DirectoryBlockSignature) Type() byte {
// 0 -- Cannot tell if message is Valid
// 1 -- Message is valid
func (m *DirectoryBlockSignature) Validate(state interfaces.IState) int {
if m.DBHeight < state.GetLLeaderHeight() {
state.AddStatus(fmt.Sprintf("DirectoryBlockSignature: Fail dbht: %v %s", state.GetLLeaderHeight(), m.String()))
if m.DBHeight < state.GetHighestSavedBlk() {
state.AddStatus(fmt.Sprintf("DirectoryBlockSignature: Fail dbstate ht: %v < dbht: %v %s", m.DBHeight, state.GetHighestSavedBlk(), m.String()))
return -1
}

if m.DBHeight > state.GetLLeaderHeight() {
//state.AddStatus(fmt.Sprintf("DirectoryBlockSignature: Wait dbht: %v %s", state.GetLLeaderHeight(), m.String()))
return 0
}

found, _ := state.GetVirtualServers(m.DBHeight, 9, m.ServerIdentityChainID)

if found == false {
Expand Down
6 changes: 3 additions & 3 deletions common/primitives/hash.go
Expand Up @@ -74,8 +74,8 @@ func (h *Hash) UnmarshalText(b []byte) error {
return nil
}

func (h Hash) Fixed() [constants.HASH_LENGTH]byte {
return h
func (h *Hash) Fixed() [constants.HASH_LENGTH]byte {
return *h
}

func (h *Hash) Bytes() []byte {
Expand Down Expand Up @@ -183,7 +183,7 @@ func HexToHash(hexStr string) (h interfaces.IHash, err error) {
}

// Compare two Hashes
func (a Hash) IsSameAs(b interfaces.IHash) bool {
func (a *Hash) IsSameAs(b interfaces.IHash) bool {
if b == nil {
return false
}
Expand Down
5 changes: 5 additions & 0 deletions common/primitives/util.go
Expand Up @@ -247,21 +247,26 @@ func ConvertECPrivateToUserStr(addr interfaces.IAddress) string {
// Returns false if the checksum is wrong.
//
func validateUserStr(prefix []byte, userFAddr string) bool {

if len(userFAddr) != 52 {
return false
}

v := base58.Decode(userFAddr)
if len(v) < 3 {
return false
}

if bytes.Compare(prefix, v[:2]) != 0 {
return false

}

sha256d := Sha(Sha(v[:34]).Bytes()).Bytes()
if bytes.Compare(sha256d[:4], v[34:]) != 0 {
return false
}

return true
}

Expand Down
13 changes: 13 additions & 0 deletions engine/NetStart.go
Expand Up @@ -42,6 +42,7 @@ var logPort string

func NetStart(s *state.State) {
enablenetPtr := flag.Bool("enablenet", true, "Enable or disable networking")
waitEntriesPtr := flag.Bool("waitentries", false, "Wait for Entries to be validated prior to execution of messages")
listenToPtr := flag.Int("node", 0, "Node Number the simulator will set as the focus")
cntPtr := flag.Int("count", 1, "The number of nodes to generate")
netPtr := flag.String("net", "tree", "The default algorithm to build the network connections")
Expand Down Expand Up @@ -79,6 +80,7 @@ func NetStart(s *state.State) {
flag.Parse()

enableNet := *enablenetPtr
waitEntries := *waitEntriesPtr
listenTo := *listenToPtr
cnt := *cntPtr
net := *netPtr
Expand Down Expand Up @@ -123,6 +125,9 @@ func NetStart(s *state.State) {
s.StartDelayLimit = startDelay * 1000
s.Journaling = journaling

// Set the wait for entries flag
s.WaitForEntries = waitEntries

if 999 < portOverride { // The command line flag exists and seems reasonable.
s.SetPort(portOverride)
}
Expand Down Expand Up @@ -242,6 +247,7 @@ func NetStart(s *state.State) {
os.Stderr.WriteString(fmt.Sprintf("%20s %s\n", "Build", Build))
os.Stderr.WriteString(fmt.Sprintf("%20s %s\n", "FNode 0 Salt", s.Salt.String()[:16]))
os.Stderr.WriteString(fmt.Sprintf("%20s %v\n", "enablenet", enableNet))
os.Stderr.WriteString(fmt.Sprintf("%20s %v\n", "waitentries", waitEntries))
os.Stderr.WriteString(fmt.Sprintf("%20s %d\n", "node", listenTo))
os.Stderr.WriteString(fmt.Sprintf("%20s %s\n", "prefix", prefix))
os.Stderr.WriteString(fmt.Sprintf("%20s %d\n", "node count", cnt))
Expand Down Expand Up @@ -343,6 +349,7 @@ func NetStart(s *state.State) {
ConnectionMetricsChannel: connectionMetricsChannel,
}
p2pNetwork = new(p2p.Controller).Init(ci)
fnodes[0].State.NetworkControler = p2pNetwork
p2pNetwork.StartNetwork()
// Setup the proxy (Which translates from network parcels to factom messages, handling addressing for directed messages)
p2pProxy = new(P2PProxy).Init(fnodes[0].State.FactomNodeName, "P2P Network").(*P2PProxy)
Expand Down Expand Up @@ -478,6 +485,11 @@ func NetStart(s *state.State) {
// Start the webserver
go wsapi.Start(fnodes[0].State)

// Start prometheus on port
launchPrometheus(9876)
// Start Package's prometheus
state.RegisterPrometheus()

go controlPanel.ServeControlPanel(fnodes[0].State.ControlPanelChannel, fnodes[0].State, connectionMetricsChannel, p2pNetwork, Build)
// Listen for commands:
SimControl(listenTo)
Expand Down Expand Up @@ -515,6 +527,7 @@ func startServers(load bool) {
if load {
go state.LoadDatabase(fnode.State)
}
go fnode.State.GoSyncEntries()
go Timer(fnode.State)
go fnode.State.ValidatorLoop()
}
Expand Down
23 changes: 23 additions & 0 deletions engine/printSummary.go
Expand Up @@ -186,6 +186,26 @@ func printSummary(summary *int, value int, listenTo *int, wsapiNode *int) {
}
prt = prt + fmt.Sprintf(fmtstr, "NetworkInvalidMsgQueue", list)

prt = prt + "\n"

list = ""
for _, f := range pnodes {
list = list + fmt.Sprintf(" %3d", len(f.State.UpdateEntryHash))
}
prt = prt + fmt.Sprintf(fmtstr, "UpdateEntryHash", list)

list = ""
for _, f := range pnodes {
list = list + fmt.Sprintf(" %3d", len(f.State.MissingEntries))
}
prt = prt + fmt.Sprintf(fmtstr, "MissingEntries", list)

list = ""
for _, f := range pnodes {
list = list + fmt.Sprintf(" %3d", len(f.State.WriteEntry))
}
prt = prt + fmt.Sprintf(fmtstr, "WriteEntry", list)

if f.State.MessageTally {
prt = prt + "\nType:"
for i := 0; i < constants.NUM_MESSAGES; i++ {
Expand Down Expand Up @@ -270,6 +290,9 @@ func printSummary(summary *int, value int, listenTo *int, wsapiNode *int) {
func systemFaults(f *FactomNode) string {
dbheight := f.State.LLeaderHeight
pl := f.State.ProcessLists.Get(dbheight)
if pl == nil {
return ""
}
if len(pl.System.List) == 0 {
str := fmt.Sprintf("%5s %13s %6s Length: 0\n", "", "System List", f.State.FactomNodeName)
return str
Expand Down
8 changes: 8 additions & 0 deletions engine/profiler.go
Expand Up @@ -9,11 +9,19 @@ import (
"log"
"net/http"
_ "net/http/pprof"

"github.com/prometheus/client_golang/prometheus"
)

// StartProfiler runs the go pprof tool
// `go tool pprof http://localhost:6060/debug/pprof/profile`
// https://golang.org/pkg/net/http/pprof/
func StartProfiler() {
log.Println(http.ListenAndServe(fmt.Sprintf("localhost:%s", logPort), nil))
//runtime.SetBlockProfileRate(100000)
}

func launchPrometheus(port int) {
http.Handle("/metrics", prometheus.Handler())
go http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}
24 changes: 24 additions & 0 deletions engine/simControl.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/FactomProject/factomd/controlPanel"
"github.com/FactomProject/factomd/p2p"
"github.com/FactomProject/factomd/wsapi"
"runtime"
)

var _ = fmt.Print
Expand Down Expand Up @@ -90,6 +91,19 @@ func SimControl(listenTo int) {
os.Stderr.WriteString("Reset Node: " + s.FactomNodeName + "\n")
s.Reset()

case 'b' == b[0]:
if len(b) == 1 {
os.Stderr.WriteString("specifivy how long a block will be recorded (in nanoseconds). 1 records all blocks.\n")
break
}
delay, err := strconv.Atoi(string(b[1:]))
if err != nil {
os.Stderr.WriteString("type bnnn where nnn is the number of nanoseconds of a block to record when profiling.\n")
break
}
runtime.SetBlockProfileRate(delay)
os.Stderr.WriteString(fmt.Sprintf("Recording delays due to blocked go routines longer than %d ns (%d ms)\n", delay, delay/1000000))

case 'g' == b[0]:
if len(b) > 1 {
if b[1] == 'c' {
Expand Down Expand Up @@ -152,6 +166,16 @@ func SimControl(listenTo int) {
wsapi.SetState(fnodes[wsapiNode].State)
os.Stderr.WriteString(fmt.Sprintf("--Listen to %s --\n", fnodes[wsapiNode].State.FactomNodeName))
}
case 'W' == b[0]:
if listenTo < 0 || listenTo > len(fnodes) {
break
}
fnodes[listenTo].State.WaitForEntries = !fnodes[listenTo].State.WaitForEntries
if fnodes[listenTo].State.WaitForEntries {
os.Stderr.WriteString("Wait for all Entries\n")
} else {
os.Stderr.WriteString("Don't wait for all Entries\n")
}
case 's' == b[0]:

if len(b) > 1 {
Expand Down

0 comments on commit 89e3f97

Please sign in to comment.