diff --git a/Utilities/CorrectChainHeads/main.go b/Utilities/CorrectChainHeads/main.go new file mode 100644 index 0000000000..488e0106a8 --- /dev/null +++ b/Utilities/CorrectChainHeads/main.go @@ -0,0 +1,312 @@ +package main + +import ( + "encoding/hex" + "flag" + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/FactomProject/factom" + "github.com/FactomProject/factomd/common/directoryBlock" + "github.com/FactomProject/factomd/common/interfaces" + "github.com/FactomProject/factomd/common/primitives" + "github.com/FactomProject/factomd/database/databaseOverlay" + "github.com/FactomProject/factomd/database/hybridDB" +) + +var CheckFloating bool +var UsingAPI bool +var FixIt bool + +const level string = "level" +const bolt string = "bolt" + +func main() { + var ( + useApi = flag.Bool("api", false, "Use API instead") + checkFloating = flag.Bool("floating", false, "Check Floating") + fix = flag.Bool("fix", false, "Actually fix head") + ) + + flag.Parse() + UsingAPI = *useApi + CheckFloating = *checkFloating + FixIt = *fix + + fmt.Println("Usage:") + fmt.Println("CorrectChainHeads level/bolt/api DBFileLocation") + fmt.Println("Program will fix chainheads") + + if len(flag.Args()) < 2 { + fmt.Println("\nNot enough arguments passed") + os.Exit(1) + } + if len(flag.Args()) > 2 { + fmt.Println("\nToo many arguments passed") + os.Exit(1) + } + + if flag.Args()[0] == "api" { + UsingAPI = true + } + + var reader Fetcher + + if UsingAPI { + reader = NewAPIReader(flag.Args()[1]) + } else { + levelBolt := flag.Args()[0] + + if levelBolt != level && levelBolt != bolt { + fmt.Println("\nFirst argument should be `level` or `bolt`") + os.Exit(1) + } + path := flag.Args()[1] + reader = NewDBReader(levelBolt, path) + } + + // dblock, err := reader.FetchDBlockHead() + + FindHeads(reader) +} + +func FindHeads(f Fetcher) { + chainHeads := make(map[string]interfaces.IHash) + + var allEblockLock sync.Mutex + allEblks := make(map[string]interfaces.IHash) + + var err error + var dblock interfaces.IDirectoryBlock + + head, err := f.FetchDBlockHead() + if err != nil { + panic(fmt.Sprintf("Error fetching head")) + } + + height := head.GetDatabaseHeight() + dblock = head + top := height + fmt.Println("Starting at", height) + errCount := 0 + waiting := new(int32) + done := new(int32) + total := 0 + + var wg sync.WaitGroup + allowedSimulataneous := 1000 + permission := make(chan bool, allowedSimulataneous) + for i := 0; i < allowedSimulataneous; i++ { + permission <- true + } + start := time.Now() + + doPrint := CheckFloating + go func() { + for { + if !doPrint { + return + } + time.Sleep(10 * time.Second) + v := atomic.LoadInt32(waiting) + d := atomic.LoadInt32(done) + fmt.Printf("%d are still waiting. %d Done. Permission: %d\n", v, d, len(permission)) + } + }() + + for ; height > 0; height-- { + v := atomic.LoadInt32(waiting) + for v > 50000 { + time.Sleep(1 * time.Second) + } + + dblock, err = f.FetchDBlockByHeight(height) + if err != nil { + fmt.Printf("Error fetching height %d: %s\n", height, err.Error()) + continue + } + + eblockEnts := dblock.GetEBlockDBEntries() + + total += len(eblockEnts) + if CheckFloating { + for i := 0; i < len(eblockEnts); i++ { + wg.Add(1) + atomic.AddInt32(waiting, 1) + func(eb interfaces.IDBEntry) { + defer wg.Done() + defer atomic.AddInt32(waiting, -1) + <-permission + defer func() { + permission <- true + atomic.AddInt32(done, 1) + }() + eblkF, err := f.FetchEBlock(eb.GetKeyMR()) + if err != nil { + fmt.Printf("Error getting eblock %s for %s\n", eb.GetKeyMR().String(), eb.GetChainID().String()) + return + } + kmr, err := eblkF.KeyMR() + if err != nil { + fmt.Printf("Error getting eblock keymr %s for %s\n", eb.GetKeyMR().String(), eb.GetChainID().String()) + return + } + + allEblockLock.Lock() + allEblks[kmr.String()] = eblkF.GetHeader().GetPrevKeyMR() + allEblockLock.Unlock() + }(eblockEnts[i]) + } + } + + for _, eblk := range eblockEnts { + if _, ok := chainHeads[eblk.GetChainID().String()]; ok { + // Chainhead already exists + continue + } + chainHeads[eblk.GetChainID().String()] = eblk.GetKeyMR() + ch, err := f.FetchHeadIndexByChainID(eblk.GetChainID()) + if err != nil { + fmt.Printf("Error getting chainhead for %s\n", eblk.GetChainID().String()) + } else { + if !ch.IsSameAs(eblk.GetKeyMR()) { + fmt.Printf("ERROR: Chainhead found: %s, Expected %s :: For Chain: %s at height %d\n", + ch.String(), eblk.GetKeyMR().String(), eblk.GetChainID().String(), height) + errCount++ + if FixIt { + f.SetChainHeads([]interfaces.IHash{eblk.GetKeyMR()}, []interfaces.IHash{eblk.GetChainID()}) + } + } + } + } + if height%500 == 0 { + d := atomic.LoadInt32(done) + ps := float64(top-height) / time.Since(start).Seconds() + fmt.Printf("Currently on %d out of %d at %.3fp/s. %d Eblocks, %d done. %d ChainHeads so far. %d Are bad\n", height, top, ps, total, d, len(chainHeads), errCount) + } + + var _ = dblock + } + + if CheckFloating { + wg.Wait() + } + doPrint = false + + fmt.Printf("%d Chains found in %f seconds", len(chainHeads), time.Since(start).Seconds()) + errCount = 0 + if CheckFloating { + fmt.Println("Checking all EBLK links") + for k, h := range chainHeads { + var prev interfaces.IHash + prev = h + for { + if prev.IsZero() { + break + } + p, ok := allEblks[prev.String()] + if !ok { + errCount++ + fmt.Printf("Error finding Eblock %s for chain %s\n", h.String(), k) + } + prev = p + } + } + } + fmt.Printf("%d Errors found checking for bad links\n", errCount) + +} + +type Fetcher interface { + FetchDBlockHead() (interfaces.IDirectoryBlock, error) + FetchDBlockByHeight(dBlockHeight uint32) (interfaces.IDirectoryBlock, error) + //FetchDBlock(hash interfaces.IHash) (interfaces.IDirectoryBlock, error) + FetchHeadIndexByChainID(chainID interfaces.IHash) (interfaces.IHash, error) + FetchEBlock(hash interfaces.IHash) (interfaces.IEntryBlock, error) + SetChainHeads(primaryIndexes, chainIDs []interfaces.IHash) error +} + +func NewDBReader(levelBolt string, path string) *databaseOverlay.Overlay { + var dbase *hybridDB.HybridDB + var err error + if levelBolt == bolt { + dbase = hybridDB.NewBoltMapHybridDB(nil, path) + } else { + dbase, err = hybridDB.NewLevelMapHybridDB(path, false) + if err != nil { + panic(err) + } + } + + dbo := databaseOverlay.NewOverlay(dbase) + return dbo +} + +type APIReader struct { + location string +} + +func NewAPIReader(loc string) *APIReader { + a := new(APIReader) + a.location = loc + factom.SetFactomdServer(loc) + + return a +} + +func (a *APIReader) SetChainHeads(primaryIndexes, chainIDs []interfaces.IHash) error { + return nil +} + +func (a *APIReader) FetchEBlock(hash interfaces.IHash) (interfaces.IEntryBlock, error) { + return nil, fmt.Errorf("Not implemented for api") +} + +func (a *APIReader) FetchDBlockHead() (interfaces.IDirectoryBlock, error) { + head, err := factom.GetDBlockHead() + if err != nil { + return nil, err + } + raw, err := factom.GetRaw(head) + if err != nil { + return nil, err + } + return rawBytesToblock(raw) +} + +func (a *APIReader) FetchDBlockByHeight(dBlockHeight uint32) (interfaces.IDirectoryBlock, error) { + raw, err := factom.GetBlockByHeightRaw("d", int64(dBlockHeight)) + if err != nil { + return nil, err + } + + return rawRespToBlock(raw.RawData) +} + +func (a *APIReader) FetchHeadIndexByChainID(chainID interfaces.IHash) (interfaces.IHash, error) { + resp, err := factom.GetChainHead(chainID.String()) + if err != nil { + return nil, err + } + return primitives.HexToHash(resp) +} + +func rawBytesToblock(raw []byte) (interfaces.IDirectoryBlock, error) { + dblock := directoryBlock.NewDirectoryBlock(nil) + err := dblock.UnmarshalBinary(raw) + if err != nil { + return nil, err + } + return dblock, nil +} + +func rawRespToBlock(raw string) (interfaces.IDirectoryBlock, error) { + by, err := hex.DecodeString(raw) + if err != nil { + return nil, err + } + return rawBytesToblock(by) +} diff --git a/common/messages/directoryBlockSignature.go b/common/messages/directoryBlockSignature.go index a9b6d68beb..ec4f1eb67c 100644 --- a/common/messages/directoryBlockSignature.go +++ b/common/messages/directoryBlockSignature.go @@ -31,9 +31,8 @@ type DirectoryBlockSignature struct { SysHash interfaces.IHash //Not marshalled - Matches bool - Processed bool - hash interfaces.IHash + Matches bool + hash interfaces.IHash } var _ interfaces.IMsg = (*DirectoryBlockSignature)(nil) diff --git a/common/messages/eom.go b/common/messages/eom.go index 4d570e8828..9fed1d157d 100644 --- a/common/messages/eom.go +++ b/common/messages/eom.go @@ -29,7 +29,6 @@ type EOM struct { FactoidVM bool //Not marshalled - Processed bool hash interfaces.IHash MarkerSent bool // If we have set EOM markers on blocks like Factoid blocks and such. } diff --git a/state/dbStateManager.go b/state/dbStateManager.go index de28be050d..df46dd33b8 100644 --- a/state/dbStateManager.go +++ b/state/dbStateManager.go @@ -970,7 +970,45 @@ func (list *DBStateList) FixupLinks(p *DBState, d *DBState) (progress bool) { } d.DirectoryBlock.BuildBodyMR() - d.DirectoryBlock.MarshalBinary() + dblockData, _ := d.DirectoryBlock.MarshalBinary() + + // If leader, write to file for debugging purposes + if list.State.IsLeader() { + // Print All to StdOut + f, err := os.OpenFile("lastblock.txt", os.O_CREATE|os.O_RDWR, 0777) + if err != nil { + fmt.Println("Could not open `lastblock.txt`:", err.Error()) + } else { + f.WriteString(fmt.Sprintf("--- Height %d ---\n", d.DirectoryBlock.GetDatabaseHeight())) + f.WriteString("\n--- Directory Block ---\n") + f.WriteString(hex.EncodeToString(dblockData)) + + ablkData, err := d.AdminBlock.MarshalBinary() + f.WriteString(fmt.Sprintf("\n--- Admin Block Err: %v---\n", err)) + f.WriteString(hex.EncodeToString(ablkData)) + + fblkData, err := d.FactoidBlock.MarshalBinary() + f.WriteString(fmt.Sprintf("\n--- Factoid Block Err: %v---\n", err)) + f.WriteString(hex.EncodeToString(fblkData)) + + ecblkData, err := d.EntryCreditBlock.MarshalBinary() + f.WriteString(fmt.Sprintf("\n--- ECBlock Block Err: %v---\n", err)) + f.WriteString(hex.EncodeToString(ecblkData)) + + for i, eb := range d.EntryBlocks { + ebData, err := eb.MarshalBinary() + f.WriteString(fmt.Sprintf("\n--- Eblock %d Block Err: %v---\n", i, err)) + f.WriteString(hex.EncodeToString(ebData)) + } + + for i, e := range d.Entries { + eData, err := e.MarshalBinary() + f.WriteString(fmt.Sprintf("\n--- Entry %d Block Err: %v---\n", i, err)) + f.WriteString(hex.EncodeToString(eData)) + } + + } + } progress = true d.IsNew = false diff --git a/state/dbstateCatchup.go b/state/dbstateCatchup.go index f329f8477e..5c0ddd5cc7 100644 --- a/state/dbstateCatchup.go +++ b/state/dbstateCatchup.go @@ -21,7 +21,12 @@ func (list *DBStateList) Catchup(justDoIt bool) { ask := func() { - if list.TimeToAsk != nil && hk-hs > 1 && now.GetTime().After(list.TimeToAsk.GetTime()) { + tolerance := 1 + if list.State.Leader { + tolerance = 2 + } + + if list.TimeToAsk != nil && hk-hs > tolerance && now.GetTime().After(list.TimeToAsk.GetTime()) { // Find the first dbstate we don't have. for i, v := range list.State.DBStatesReceived { diff --git a/state/stateConsensus.go b/state/stateConsensus.go index 1fb65ef869..d21cdc4a28 100644 --- a/state/stateConsensus.go +++ b/state/stateConsensus.go @@ -535,6 +535,12 @@ func (s *State) FollowerExecuteDBState(msg interfaces.IMsg) { return } + // If we have a process list at the height of a DBState, then get rid of any entry blocks we might have had. + pl := s.ProcessLists.Get(dbheight) + if pl != nil { + pl.NewEBlocks = make(map[[32]byte]interfaces.IEntryBlock, 0) + } + if dbstatemsg.IsInDB == false { //s.AddStatus(fmt.Sprintf("FollowerExecuteDBState(): dbstate added from network at ht %d", dbheight)) dbstate.ReadyToSave = true @@ -1255,13 +1261,12 @@ func (s *State) ProcessEOM(dbheight uint32, msg interfaces.IMsg) bool { } // What I do for each EOM - if !e.Processed { + if !vm.Synced { //fmt.Println(fmt.Sprintf("EOM PROCESS: %10s vm %2d Process Once: !e.Processed(%v) EOM: %s", s.FactomNodeName, e.VMIndex, e.Processed, e.String())) vm.LeaderMinute++ s.EOMProcessed++ //fmt.Println(fmt.Sprintf("EOM PROCESS: %10s vm %2d EOMProcessed++ (%2d)", s.FactomNodeName, e.VMIndex, s.EOMProcessed)) - e.Processed = true vm.Synced = true markNoFault(pl, msg.GetVMIndex()) if s.LeaderPL.SysHighest < int(e.SysHeight) { @@ -1468,7 +1473,7 @@ func (s *State) ProcessDBSig(dbheight uint32, msg interfaces.IMsg) bool { } // Put the stuff that executes per DBSignature here - if !dbs.Processed { + if !vm.Synced { if s.LLeaderHeight > 0 && s.GetHighestCompletedBlk()+1 < s.LLeaderHeight { @@ -1523,7 +1528,6 @@ func (s *State) ProcessDBSig(dbheight uint32, msg interfaces.IMsg) bool { dbs.Matches = true s.AddDBSig(dbheight, dbs.ServerIdentityChainID, dbs.DBSignature) - dbs.Processed = true s.DBSigProcessed++ //fmt.Println(fmt.Sprintf("Process DBSig %10s vm %2v DBSigProcessed++ (%2d)", s.FactomNodeName, dbs.VMIndex, s.DBSigProcessed)) vm.Synced = true