Skip to content

Commit

Permalink
FAB-1233 State DB recovery
Browse files Browse the repository at this point in the history
We need to recover the state DB when
(i) the peer fails during commit (partial written state).
(ii) the database gets corrupted.

We introduce a new state in DB called savepoint, and update
it with the block height once all valid tx in the last/recent block is
committed.

Whenever peer starts (first boot up or after a failure), we compare the
savepoint in DB and block height to see whether the state DB is in
consistent state. If not, we execute the following steps:
(i) retrieve all required blocks (#blocks = block height - savepoint)
from block storage,
(ii) compute write set for valid tx, commits these values, & update the
savepoint.

Change-Id: I769d1391de511d3cdb55c40692beb829e2cc5c2f
Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed Dec 8, 2016
1 parent 87a0ce8 commit c0dc54b
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 24 deletions.
59 changes: 53 additions & 6 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,72 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize)
blockStore := fsblkstorage.NewFsBlockStore(blockStorageConf, indexConfig)

var txmgmt txmgmt.TxMgr
if ledgerconfig.IsCouchDBEnabled() == true {
//By default we can talk to CouchDB with empty id and pw (""), or you can add your own id and password to talk to a secured CouchDB
logger.Debugf("===COUCHDB=== NewKVLedger() Using CouchDB instead of RocksDB...hardcoding and passing connection config for now")

couchDBDef := ledgerconfig.GetCouchDBDefinition()

//create new transaction manager based on couchDB
txmgmt := couchdbtxmgmt.NewCouchDBTxMgr(&couchdbtxmgmt.Conf{DBPath: conf.txMgrDBPath},
txmgmt = couchdbtxmgmt.NewCouchDBTxMgr(&couchdbtxmgmt.Conf{DBPath: conf.txMgrDBPath},
couchDBDef.URL, //couchDB connection URL
"system", //couchDB db name matches ledger name, TODO for now use system ledger, eventually allow passing in subledger name
couchDBDef.Username, //enter couchDB id here
couchDBDef.Password) //enter couchDB pw here
return &KVLedger{blockStore, txmgmt, nil}, nil
} else {
// Fall back to using RocksDB lockbased transaction manager
txmgmt = lockbasedtxmgmt.NewLockBasedTxMgr(&lockbasedtxmgmt.Conf{DBPath: conf.txMgrDBPath})
}
l := &KVLedger{blockStore, txmgmt, nil}

// Fall back to using RocksDB lockbased transaction manager
txmgmt := lockbasedtxmgmt.NewLockBasedTxMgr(&lockbasedtxmgmt.Conf{DBPath: conf.txMgrDBPath})
return &KVLedger{blockStore, txmgmt, nil}, nil
if err := recoverStateDB(l); err != nil {
panic(fmt.Errorf(`Error during state DB recovery:%s`, err))
}

return l, nil

}

//Recover the state database by recommitting last valid blocks
func recoverStateDB(l *KVLedger) error {
//If there is no block in blockstorage, nothing to recover.
info, _ := l.blockStore.GetBlockchainInfo()
if info.Height == 0 {
return nil
}

//Getting savepointValue stored in the state DB
var err error
var savepointValue uint64
if savepointValue, err = l.txtmgmt.GetBlockNumFromSavepoint(); err != nil {
return err
}

//Checking whether the savepointValue is in sync with block storage height
if savepointValue == info.Height {
return nil
} else if savepointValue > info.Height {
return errors.New("BlockStorage height is behind savepoint by %d blocks. Recovery the BlockStore first")
}

//Compute updateSet for each missing savepoint and commit to state DB
for blockNumber := savepointValue + 1; blockNumber <= info.Height; blockNumber++ {
if l.pendingBlockToCommit, err = l.GetBlockByNumber(blockNumber); err != nil {
return err
}
logger.Debugf("Constructing updateSet for the block %d", blockNumber)
if _, _, err = l.txtmgmt.ValidateAndPrepare(l.pendingBlockToCommit, false); err != nil {
return err
}
logger.Debugf("Committing block %d to state database", blockNumber)
if err = l.txtmgmt.Commit(); err != nil {
return err
}
}
l.pendingBlockToCommit = nil

return nil
}

// GetTransactionByID retrieves a transaction by id
Expand Down Expand Up @@ -150,7 +197,7 @@ func (l *KVLedger) RemoveInvalidTransactionsAndPrepare(block *common.Block) (*co
var validBlock *common.Block
var invalidTxs []*pb.InvalidTransaction
var err error
validBlock, invalidTxs, err = l.txtmgmt.ValidateAndPrepare(block)
validBlock, invalidTxs, err = l.txtmgmt.ValidateAndPrepare(block, true)
if err == nil {
l.pendingBlockToCommit = validBlock
}
Expand Down
83 changes: 82 additions & 1 deletion core/ledger/kvledger/kv_ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func TestKVLedgerBlockStorage(t *testing.T) {
defer env.cleanup()
ledger, _ := NewKVLedger(env.conf)
defer ledger.Close()

bcInfo, _ := ledger.GetBlockchainInfo()
testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{
Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil})
Expand Down Expand Up @@ -76,3 +75,85 @@ func TestKVLedgerBlockStorage(t *testing.T) {
b2, _ = ledger.GetBlockByNumber(2)
testutil.AssertEquals(t, b2, block2)
}

func TestKVLedgerStateDBRecovery(t *testing.T) {
env := newTestEnv(t)
defer env.cleanup()
ledger, _ := NewKVLedger(env.conf)
defer ledger.Close()

bcInfo, _ := ledger.GetBlockchainInfo()
testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{
Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil})

//creating and committing the first block
simulator, _ := ledger.NewTxSimulator()
//simulating a transaction
simulator.SetState("ns1", "key1", []byte("value1"))
simulator.SetState("ns1", "key2", []byte("value2"))
simulator.SetState("ns1", "key3", []byte("value3"))
simulator.Done()
simRes, _ := simulator.GetTxSimulationResults()
//generating a block based on the simulation result
bg := testutil.NewBlockGenerator(t)
block1 := bg.NextBlock([][]byte{simRes}, false)
//performing validation of read and write set to find valid transactions
ledger.RemoveInvalidTransactionsAndPrepare(block1)
//writing the validated block to block storage and committing the transaction to state DB
ledger.Commit()

bcInfo, _ = ledger.GetBlockchainInfo()
block1Hash := block1.Header.Hash()
testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{
Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}})

//creating the second block but peer fails before committing the transaction to state DB
simulator, _ = ledger.NewTxSimulator()
//simulating transaction
simulator.SetState("ns1", "key1", []byte("value4"))
simulator.SetState("ns1", "key2", []byte("value5"))
simulator.SetState("ns1", "key3", []byte("value6"))
simulator.Done()
simRes, _ = simulator.GetTxSimulationResults()
//generating a block based on the simulation result
block2 := bg.NextBlock([][]byte{simRes}, false)
//performing validation of read and write set to find valid transactions
ledger.RemoveInvalidTransactionsAndPrepare(block2)
//writing the validated block to block storage but not committing the transaction to state DB
ledger.blockStore.AddBlock(ledger.pendingBlockToCommit)
//assume that peer fails here before committing the transaction

bcInfo, _ = ledger.GetBlockchainInfo()
block2Hash := block2.Header.Hash()
testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{
Height: 2, CurrentBlockHash: block2Hash, PreviousBlockHash: block1.Header.Hash()})

simulator, _ = ledger.NewTxSimulator()
value, _ := simulator.GetState("ns1", "key1")
//value for 'key1' should be 'value1' as the last commit failed
testutil.AssertEquals(t, value, []byte("value1"))
value, _ = simulator.GetState("ns1", "key2")
//value for 'key2' should be 'value2' as the last commit failed
testutil.AssertEquals(t, value, []byte("value2"))
value, _ = simulator.GetState("ns1", "key3")
//value for 'key3' should be 'value3' as the last commit failed
testutil.AssertEquals(t, value, []byte("value3"))
simulator.Done()
ledger.Close()

//we assume here that the peer comes online and calls NewKVLedger to get a handler for the ledger
//State DB should be recovered before returning from NewKVLedger call
ledger, _ = NewKVLedger(env.conf)
simulator, _ = ledger.NewTxSimulator()
value, _ = simulator.GetState("ns1", "key1")
//value for 'key1' should be 'value4' after recovery
testutil.AssertEquals(t, value, []byte("value4"))
value, _ = simulator.GetState("ns1", "key2")
//value for 'key2' should be 'value5' after recovery
testutil.AssertEquals(t, value, []byte("value5"))
value, _ = simulator.GetState("ns1", "key3")
//value for 'key3' should be 'value6' after recovery
testutil.AssertEquals(t, value, []byte("value6"))
simulator.Done()
ledger.Close()
}
32 changes: 24 additions & 8 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,23 @@ func (txmgr *CouchDBTxMgr) NewTxSimulator() (ledger.TxSimulator, error) {
}

// ValidateAndPrepare implements method in interface `txmgmt.TxMgr`
func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block) (*common.Block, []*pb.InvalidTransaction, error) {
logger.Debugf("===COUCHDB=== Entering CouchDBTxMgr.ValidateAndPrepare()")
func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidation bool) (*common.Block, []*pb.InvalidTransaction, error) {
if doMVCCValidation == true {
logger.Debugf("===COUCHDB=== Entering CouchDBTxMgr.ValidateAndPrepare()")
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
} else {
logger.Debugf("New block arrived for write set computation:%#v", block)
logger.Debugf("Computing write set for a block with [%d] transactions", len(block.Data.Data))
}
invalidTxs := []*pb.InvalidTransaction{}
var valid bool
txmgr.updateSet = newUpdateSet()
txmgr.blockNum = block.Header.Number
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))

for txIndex, envBytes := range block.Data.Data {
//TODO: Process valid txs bitmap in block.BlockMetadata.Metadata and skip
//this transaction if found invalid.

// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
Expand All @@ -152,15 +161,22 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bloc
// trace the first 2000 characters of RWSet only, in case it is huge
if logger.IsEnabledFor(logging.DEBUG) {
txRWSetString := txRWSet.String()
operation := "validating"
if doMVCCValidation == false {
operation = "computing write set from"
}
if len(txRWSetString) < 2000 {
logger.Debugf("validating txRWSet:[%s]", txRWSetString)
logger.Debugf(operation+" txRWSet:[%s]", txRWSetString)
} else {
logger.Debugf("validating txRWSet:[%s...]", txRWSetString[0:2000])
logger.Debugf(operation+" txRWSet:[%s...]", txRWSetString[0:2000])
}
}

if valid, err = txmgr.validateTx(txRWSet); err != nil {
return nil, nil, err
if doMVCCValidation == true {
if valid, err = txmgr.validateTx(txRWSet); err != nil {
return nil, nil, err
}
} else {
valid = true
}

if valid {
Expand Down
33 changes: 25 additions & 8 deletions core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,23 @@ func (txmgr *LockBasedTxMgr) NewTxSimulator() (ledger.TxSimulator, error) {
}

// ValidateAndPrepare implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) ValidateAndPrepare(block *common.Block) (*common.Block, []*pb.InvalidTransaction, error) {
logger.Debugf("New block arrived for validation:%#v", block)
func (txmgr *LockBasedTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidation bool) (*common.Block, []*pb.InvalidTransaction, error) {
if doMVCCValidation == true {
logger.Debugf("New block arrived for validation:%#v", block)
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))
} else {
logger.Debugf("New block arrived for write set computation:%#v", block)
logger.Debugf("Computing write set for a block with [%d] transactions", len(block.Data.Data))
}
invalidTxs := []*pb.InvalidTransaction{}
var valid bool
txmgr.updateSet = newUpdateSet()
txmgr.blockNum = block.Header.Number
logger.Debugf("Validating a block with [%d] transactions", len(block.Data.Data))

for txIndex, envBytes := range block.Data.Data {
//TODO: Process valid txs bitmap in block.BlockMetadata.Metadata and skip
//this transaction if found invalid.

// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
Expand All @@ -129,16 +138,24 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(block *common.Block) (*common.Bl
// trace the first 2000 characters of RWSet only, in case it is huge
if logger.IsEnabledFor(logging.DEBUG) {
txRWSetString := txRWSet.String()
operation := "validating"
if doMVCCValidation == false {
operation = "computing write set from"
}
if len(txRWSetString) < 2000 {
logger.Debugf("validating txRWSet:[%s]", txRWSetString)
logger.Debugf(operation+" txRWSet:[%s]", txRWSetString)
} else {
logger.Debugf("validating txRWSet:[%s...]", txRWSetString[0:2000])
logger.Debugf(operation+" txRWSet:[%s...]", txRWSetString[0:2000])
}
}

if valid, err = txmgr.validateTx(txRWSet); err != nil {
return nil, nil, err
if doMVCCValidation == true {
if valid, err = txmgr.validateTx(txRWSet); err != nil {
return nil, nil, err
}
} else {
valid = true
}

//TODO add the validation info to the bitmap in the metadata of the block
if valid {
committingTxHeight := version.NewHeight(block.Header.Number, uint64(txIndex+1))
Expand Down
3 changes: 2 additions & 1 deletion core/ledger/kvledger/txmgmt/txmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
type TxMgr interface {
NewQueryExecutor() (ledger.QueryExecutor, error)
NewTxSimulator() (ledger.TxSimulator, error)
ValidateAndPrepare(block *common.Block) (*common.Block, []*pb.InvalidTransaction, error)
ValidateAndPrepare(block *common.Block, doMVCCValidation bool) (*common.Block, []*pb.InvalidTransaction, error)
GetBlockNumFromSavepoint() (uint64, error)
Commit() error
Rollback()
Shutdown()
Expand Down

0 comments on commit c0dc54b

Please sign in to comment.