Skip to content
This repository has been archived by the owner on Jun 17, 2022. It is now read-only.

Gossip DB fault tolerance #344

Merged
merged 28 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ce61db8
poset db tests stability
sfxdxdev Oct 3, 2019
4487cf6
Store.Commit() panics on error
sfxdxdev Oct 3, 2019
f11a47b
gossip.Store commit and dirty flag
sfxdxdev Oct 3, 2019
d1c780a
commit epoch store
sfxdxdev Oct 3, 2019
a46a259
gossip.Service.ApplyBlock -> onNewBlock
sfxdxdev Oct 3, 2019
6b43aae
wip
devintegral2 Oct 3, 2019
f88e92b
check dbs are synced
devintegral2 Oct 3, 2019
b437828
super db
devintegral2 Oct 3, 2019
efdde51
fixes
devintegral2 Oct 3, 2019
e49a514
sync
devintegral2 Oct 3, 2019
a0b5215
kvdb.SuperDb --> flushable.SyncedPool
sfxdxdev Oct 3, 2019
775de82
deadlock fix
sfxdxdev Oct 3, 2019
24f7a3c
kvdb.DbProducer makes real db, flushable.Flushable manages mem cache,…
sfxdxdev Oct 4, 2019
6ef9330
memorydb producer
sfxdxdev Oct 7, 2019
70466a5
flushable: lazy db creation
sfxdxdev Oct 7, 2019
836be6a
leveldb producer
sfxdxdev Oct 7, 2019
487370d
kvdb/fallible: no custom onClose, onDrop
sfxdxdev Oct 7, 2019
99cb467
flushable.SyncedPool
sfxdxdev Oct 7, 2019
5cf38a3
poset.Store uses flushable.SyncedPool
sfxdxdev Oct 7, 2019
d87338a
gossip.Store uses flushable.SyncedPool
sfxdxdev Oct 7, 2019
d6c4788
integration uses flushable.SyncedPool
sfxdxdev Oct 7, 2019
2dc9d53
fix flushable/iterator.Next()
sfxdxdev Oct 8, 2019
3c6b92d
fix flushable/SyncedPool callbacks
sfxdxdev Oct 9, 2019
82f1f03
flushable.SyncedPool: no GetLastDb()/GetDbByIndex()
sfxdxdev Oct 9, 2019
c620df3
poset.Store: flush if needed
sfxdxdev Oct 9, 2019
1af3142
uniq namespace if tests count > 1
sfxdxdev Oct 9, 2019
5154a58
poset.TestRestore temporary fix
sfxdxdev Oct 9, 2019
5c7ec30
golangcibot fixes
sfxdxdev Oct 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions cmd/db_fault_tolerance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"bytes"
"errors"
"github.com/Fantom-foundation/go-lachesis/hash"
"github.com/Fantom-foundation/go-lachesis/kvdb"
"github.com/Fantom-foundation/go-lachesis/kvdb/flushable"
"time"
)

type RegisteredDbs struct {
wrappers map[string]kvdb.FlushableKeyValueStore
bareDbs map[string]kvdb.KeyValueStore

queuedDrops map[string]struct{}

prevFlushTime time.Time
}

func (dbs *RegisteredDbs) Drop(name string) {
dbs.queuedDrops[name] = struct{}{}
}

func (dbs *RegisteredDbs) Register(name string, db kvdb.KeyValueStore) {
wrapper := flushable.New(db)
wrapper.SetDropper(func() {
dbs.Drop(name)
})

dbs.bareDbs[name] = db
dbs.wrappers[name] = wrapper
delete(dbs.queuedDrops, name)
}

func (dbs *RegisteredDbs) Flush(id hash.Event) error {
key := []byte("mark")

// dirty flag
for _, db := range dbs.bareDbs {
marker := bytes.NewBuffer(nil)
prev, err := db.Get(key)
if err != nil {
return err
}
if prev == nil {
return errors.New("not found prev flushed state marker")
}

marker.Write([]byte("dirty"))
marker.Write(prev)
marker.Write([]byte("->"))
marker.Write(id.Bytes())
err = db.Put(key, marker.Bytes())
if err != nil {
return err
}
}
// flush (along with clean flag)
for _, db := range dbs.wrappers {
db.Flush() // flush data
err := db.Put(key, id.Bytes())
if err != nil {
return err
}
db.Flush() // flush clean flag
}

// drop old DBs
for name := range dbs.queuedDrops {
db := dbs.bareDbs[name]
if db == nil {
// ???? we should register all the DBs at startup
continue
}
db.Drop()
}

dbs.prevFlushTime = time.Now()
return nil
}

func (dbs *RegisteredDbs) FlushIfNeeded(id hash.Event) bool {
if time.Since(dbs.prevFlushTime) > 10 * time.Minute {
dbs.Flush(id)
return true
}

totalNotFlushed := 0
for _, db := range dbs.wrappers {
totalNotFlushed += db.NotFlushedSizeEst()
}

if totalNotFlushed > 100 * 1024 * 1024 {
dbs.Flush(id)
return true
}
return false
}

// call on startup, after all dbs are registered
func (dbs *RegisteredDbs) CheckDbsSynced() error {
key := []byte("mark")
var prevId *hash.Event
for _, db := range dbs.bareDbs {
mark, err := db.Get(key)
if err != nil {
return err
}
if bytes.HasPrefix(mark, []byte("dirty")) {
return errors.New("dirty")
}
eventId := hash.BytesToEvent(mark)
if prevId == nil {
prevId = &eventId
}
if eventId != *prevId {
return errors.New("not synced")
}
}
return nil
}
4 changes: 4 additions & 0 deletions evm_core/apply_genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func ApplyGenesis(db ethdb.Database, net *lachesis.Config) (*EvmBlock, error) {
}

writeBlockIndexes(db, blockNum, block.Hash)
err = statedb.Database().TrieDB().Cap(0)
if err != nil {
return nil, err
}

return block, nil
}
Expand Down
11 changes: 9 additions & 2 deletions gossip/apply_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ import (
"github.com/Fantom-foundation/go-lachesis/inter/pos"
)

// ApplyBlock execs ordered txns on state.
func (s *Service) ApplyBlock(block *inter.Block, stateHash common.Hash, validators pos.Validators) (newStateHash common.Hash, newValidators pos.Validators) {
// onNewBlock execs ordered txns of new block on state.
func (s *Service) onNewBlock(
block *inter.Block,
stateHash common.Hash,
validators pos.Validators,
) (
newStateHash common.Hash,
newValidators pos.Validators,
) {
evmProcessor := evm_core.NewStateProcessor(params.AllEthashProtocolChanges, s.GetEvmStateReader())

// Assemble block data
Expand Down
3 changes: 3 additions & 0 deletions gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func (pm *ProtocolManager) makeFetcher() (*fetcher.Fetcher, *ordering.EventBuffe
if err != nil {
return err
}

pm.store.Commit(e.Epoch)

// If the event is indeed in our own graph, announce it
if atomic.LoadUint32(&pm.synced) != 0 { // announce only fresh events
pm.BroadcastEvent(e, false)
Expand Down
2 changes: 1 addition & 1 deletion gossip/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func NewService(ctx *node.ServiceContext, config Config, store *Store, engine Co
engine: engine,
processEvent: svc.processEvent,
}
svc.engine.Bootstrap(svc.ApplyBlock)
svc.engine.Bootstrap(svc.onNewBlock)

trustedNodes := []string{}
svc.serverPool = newServerPool(store.table.Peers, svc.done, &svc.wg, trustedNodes)
Expand Down
65 changes: 57 additions & 8 deletions gossip/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"

"github.com/Fantom-foundation/go-lachesis/inter/idx"
"github.com/Fantom-foundation/go-lachesis/kvdb"
"github.com/Fantom-foundation/go-lachesis/kvdb/flushable"
"github.com/Fantom-foundation/go-lachesis/kvdb/memorydb"
"github.com/Fantom-foundation/go-lachesis/kvdb/no_key_is_err"
"github.com/Fantom-foundation/go-lachesis/kvdb/table"
Expand All @@ -16,9 +18,10 @@ import (

// Store is a node persistent storage working over physical key-value database.
type Store struct {
persistentDB kvdb.KeyValueStore
bareDb kvdb.KeyValueStore

table struct {
mainDb *flushable.Flushable
table struct {
Peers kvdb.KeyValueStore `table:"peer_"`
Events kvdb.KeyValueStore `table:"event_"`
Blocks kvdb.KeyValueStore `table:"block_"`
Expand Down Expand Up @@ -47,14 +50,19 @@ type Store struct {
// NewStore creates store over key-value db.
func NewStore(db kvdb.KeyValueStore, makeDb func(name string) kvdb.KeyValueStore) *Store {
s := &Store{
persistentDB: db,
makeDb: makeDb,
Instance: logger.MakeInstance(),
bareDb: db,
mainDb: flushable.New(db),
makeDb: makeDb,
Instance: logger.MakeInstance(),
}

table.MigrateTables(&s.table, s.persistentDB)
table.MigrateTables(&s.table, s.mainDb)

evmTable := no_key_is_err.Wrap(table.New(s.persistentDB, []byte("evm_"))) // ETH expects that "not found" is an error
if s.isDirty() {
s.Log.Crit("Service DB is possible inconsistent. Recreate it.")
}

evmTable := no_key_is_err.Wrap(table.New(s.mainDb, []byte("evm_"))) // ETH expects that "not found" is an error
s.table.Evm = rawdb.NewDatabase(evmTable)
s.table.EvmState = state.NewDatabase(s.table.Evm)

Expand All @@ -74,7 +82,48 @@ func NewMemStore() *Store {
// Close leaves underlying database.
func (s *Store) Close() {
table.MigrateTables(&s.table, nil)
s.persistentDB.Close()
s.mainDb.Close()
}

// Commit changes.
func (s *Store) Commit(epoch idx.Epoch) {
s.setDirty(true)
defer s.setDirty(false)

err := s.commitEpochStore(epoch)
if err != nil {
s.Log.Crit("epoch DB commit", "err", err)
}

err = s.mainDb.Flush()
if err != nil {
s.Log.Crit("main DB commit", "err", err)
}
}

// setDirty sets dirty flag.
func (s *Store) setDirty(flag bool) {
key := []byte("is_dirty")
val := make([]byte, 1, 1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S1019: should use make([]byte, 1) instead (from gosimple)

if flag {
val[0] = 1
}

err := s.bareDb.Put(key, val)
if err != nil {
s.Log.Crit("Failed to put key-value", "err", err)
}
}

// isDirty gets dirty flag.
func (s *Store) isDirty() bool {
key := []byte("is_dirty")
val, err := s.bareDb.Get(key)
if err != nil {
s.Log.Crit("Failed to get value", "err", err)
}

return len(val) > 1 && val[0] != 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(val) > 0 && val[0] ==1

}

// StateDB returns state database.
Expand Down
5 changes: 5 additions & 0 deletions gossip/store_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func (s *Store) getEpochStore(epoch idx.Epoch) *epochStore {
return tables.(*epochStore)
}

// commitEpochStore is not safe for concurrent use.
func (s *Store) commitEpochStore(epoch idx.Epoch) error {
return s.commitTmpDb("epoch", uint64(epoch))
}

// delEpochStore is not safe for concurrent use.
func (s *Store) delEpochStore(epoch idx.Epoch) {
s.delTmpDb("epoch", uint64(epoch))
Expand Down
21 changes: 19 additions & 2 deletions gossip/store_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (

"github.com/Fantom-foundation/go-lachesis/common/bigendian"
"github.com/Fantom-foundation/go-lachesis/kvdb"
"github.com/Fantom-foundation/go-lachesis/kvdb/flushable"
)

type (
tmpDb struct {
Db kvdb.KeyValueStore
Db *flushable.Flushable
Tables interface{}
}

Expand All @@ -23,6 +24,9 @@ type (
)

func (s *Store) initTmpDbs() {
s.tmpDbs.Lock()
defer s.tmpDbs.Unlock()

s.tmpDbs.min = make(map[string]uint64)
s.tmpDbs.seq = make(map[string]map[uint64]tmpDb)

Expand Down Expand Up @@ -61,7 +65,7 @@ func (s *Store) getTmpDb(name string, ver uint64, makeTables func(kvdb.KeyValueS
return tmp.Tables
}

db := s.makeDb(tmpDbName(name, ver))
db := flushable.New(s.makeDb(tmpDbName(name, ver)))
tables := makeTables(db)
s.tmpDbs.seq[name][ver] = tmpDb{
Db: db,
Expand All @@ -71,6 +75,19 @@ func (s *Store) getTmpDb(name string, ver uint64, makeTables func(kvdb.KeyValueS
return tables
}

func (s *Store) commitTmpDb(name string, ver uint64) error {
s.tmpDbs.Lock()
defer s.tmpDbs.Unlock()

min, ok := s.tmpDbs.min[name]
if !ok || ver < min {
return nil
}

tmp := s.tmpDbs.seq[name][ver]
return tmp.Db.Flush()
}

func (s *Store) delTmpDb(name string, ver uint64) {
s.tmpDbs.Lock()
defer s.tmpDbs.Unlock()
Expand Down
14 changes: 13 additions & 1 deletion kvdb/flushable/flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Flushable struct {
sizeEstimation *int

lock *sync.Mutex // we have no guarantees that rbt.Tree works with concurrent reads, so we can't use MutexRW

dropper func()
}

// New Flushable wraps underlying DB. All the writes into the cache won't be written in parent until .Flush() is called.
Expand All @@ -38,6 +40,14 @@ func New(parent kvdb.KeyValueStore) *Flushable {
}
}

func (w *Flushable) SetDropper(dropper func()) {
w.dropper = dropper
}

func (w *Flushable) SetUnderlyingDB(parent kvdb.KeyValueStore) {
w.underlying = parent
}

// UnderlyingDB of Flushable.
func (w *Flushable) UnderlyingDB() kvdb.KeyValueStore {
return w.underlying
Expand Down Expand Up @@ -132,7 +142,9 @@ func (w *Flushable) Close() error {

// Drop whole database.
func (w *Flushable) Drop() {
if droper, ok := w.underlying.(kvdb.Droper); ok {
if w.dropper != nil {
w.dropper()
} else if droper, ok := w.underlying.(kvdb.Droper); ok {
droper.Drop()
}
w.underlying = nil
Expand Down
Loading