Skip to content

Commit

Permalink
This commit refactors the ledger codes
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-602

The refactoring allows sharing of transaction execution and validation code across goleveldb and couchdb.
This refactoring also allows plugging-in different schemes for various aspects of transaction execution
such as maintianing isolation during simulation, validation and commit.

Change-Id: I0d3fb1cdff99c3f6f991e7b7469920173f194a87
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Dec 14, 2016
1 parent 4c63856 commit 2f03cf3
Show file tree
Hide file tree
Showing 28 changed files with 1,546 additions and 922 deletions.
4 changes: 2 additions & 2 deletions core/ledger/history/couchdb_histmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"bytes"
"strconv"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwset"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/protos/common"
putils "github.com/hyperledger/fabric/protos/utils"
Expand Down Expand Up @@ -72,7 +72,7 @@ func (histmgr *CouchDBHistMgr) Commit(block *common.Block) error {
}

//preparation for extracting RWSet from transaction
txRWSet := &txmgmt.TxReadWriteSet{}
txRWSet := &rwset.TxReadWriteSet{}

// Get the Result from the Action and then Unmarshal
// it into a TxReadWriteSet using custom unmarshalling
Expand Down
20 changes: 9 additions & 11 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/core/ledger/history"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/couchdbtxmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/lockbasedtxmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"

logging "github.com/op/go-logging"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
)
Expand Down Expand Up @@ -60,7 +61,7 @@ func NewConf(filesystemPath string, maxBlockfileSize int) *Conf {
// This implementation provides a key-value based data model
type KVLedger struct {
blockStore blkstorage.BlockStore
txtmgmt txmgmt.TxMgr
txtmgmt txmgr.TxMgr
historymgmt history.HistMgr
pendingBlockToCommit *common.Block
}
Expand All @@ -81,7 +82,7 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
blockStore := fsblkstorage.NewFsBlockStore(blockStorageConf, indexConfig)

//State and History database managers
var txmgmt txmgmt.TxMgr
var txmgmt txmgr.TxMgr
var historymgmt history.HistMgr

if ledgerconfig.IsCouchDBEnabled() == true {
Expand All @@ -97,8 +98,9 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
couchDBDef.Username, //enter couchDB id here
couchDBDef.Password) //enter couchDB pw here
} else {
// Fall back to using RocksDB lockbased transaction manager
txmgmt = lockbasedtxmgmt.NewLockBasedTxMgr(&lockbasedtxmgmt.Conf{DBPath: conf.txMgrDBPath})
// Fall back to using goleveldb lockbased transaction manager
db := stateleveldb.NewVersionedDBProvider(&stateleveldb.Conf{DBPath: conf.txMgrDBPath}).GetDBHandle("Default")
txmgmt = lockbasedtxmgr.NewLockBasedTxMgr(db)
}

if ledgerconfig.IsHistoryDBEnabled() == true {
Expand All @@ -112,13 +114,10 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
couchDBDef.Username, //enter couchDB id here
couchDBDef.Password) //enter couchDB pw here
}

l := &KVLedger{blockStore, txmgmt, historymgmt, nil}

if err := recoverStateDB(l); err != nil {
panic(fmt.Errorf(`Error during state DB recovery:%s`, err))
}

return l, nil
}

Expand All @@ -136,7 +135,7 @@ func recoverStateDB(l *KVLedger) error {
if savepointValue, err = l.txtmgmt.GetBlockNumFromSavepoint(); err != nil {
return err
}

logger.Debugf("savepointValue=%d, info.Height=%d", savepointValue, info.Height)
//Checking whether the savepointValue is in sync with block storage height
if savepointValue == info.Height {
return nil
Expand All @@ -159,7 +158,6 @@ func recoverStateDB(l *KVLedger) error {
}
}
l.pendingBlockToCommit = nil

return nil
}

Expand Down
24 changes: 12 additions & 12 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_tx_simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ import (
"reflect"

"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwset"
logging "github.com/op/go-logging"
)

type kvReadCache struct {
kvRead *txmgmt.KVRead
kvRead *rwset.KVRead
cachedValue []byte
}

type nsRWs struct {
readMap map[string]*kvReadCache
writeMap map[string]*txmgmt.KVWrite
writeMap map[string]*rwset.KVWrite
}

func newNsRWs() *nsRWs {
return &nsRWs{make(map[string]*kvReadCache), make(map[string]*txmgmt.KVWrite)}
return &nsRWs{make(map[string]*kvReadCache), make(map[string]*rwset.KVWrite)}
}

// CouchDBTxSimulator is a transaction simulator used in `CouchDBTxMgr`
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *CouchDBTxSimulator) GetState(ns string, key string) ([]byte, error) {
}
}

nsRWs.readMap[key] = &kvReadCache{txmgmt.NewKVRead(key, version), value}
nsRWs.readMap[key] = &kvReadCache{rwset.NewKVRead(key, version), value}
logger.Debugf("===COUCHDB=== Exiting CouchDBTxSimulator.GetState()")
return value, nil
}
Expand All @@ -131,7 +131,7 @@ func (s *CouchDBTxSimulator) SetState(ns string, key string, value []byte) error
kvWrite.SetValue(value)
return nil
}
nsRWs.writeMap[key] = txmgmt.NewKVWrite(key, value)
nsRWs.writeMap[key] = rwset.NewKVWrite(key, value)
logger.Debugf("===COUCHDB=== Exiting CouchDBTxSimulator.SetState()")
return nil
}
Expand All @@ -147,26 +147,26 @@ func (s *CouchDBTxSimulator) Done() {
s.txmgr.commitRWLock.RUnlock()
}

func (s *CouchDBTxSimulator) getTxReadWriteSet() *txmgmt.TxReadWriteSet {
txRWSet := &txmgmt.TxReadWriteSet{}
func (s *CouchDBTxSimulator) getTxReadWriteSet() *rwset.TxReadWriteSet {
txRWSet := &rwset.TxReadWriteSet{}
sortedNamespaces := getSortedKeys(s.rwMap)
for _, ns := range sortedNamespaces {
//Get namespace specific read-writes
nsReadWriteMap := s.rwMap[ns]
//add read set
reads := []*txmgmt.KVRead{}
reads := []*rwset.KVRead{}
sortedReadKeys := getSortedKeys(nsReadWriteMap.readMap)
for _, key := range sortedReadKeys {
reads = append(reads, nsReadWriteMap.readMap[key].kvRead)
}

//add write set
writes := []*txmgmt.KVWrite{}
writes := []*rwset.KVWrite{}
sortedWriteKeys := getSortedKeys(nsReadWriteMap.writeMap)
for _, key := range sortedWriteKeys {
writes = append(writes, nsReadWriteMap.writeMap[key])
}
nsRWs := &txmgmt.NsReadWriteSet{NameSpace: ns, Reads: reads, Writes: writes}
nsRWs := &rwset.NsReadWriteSet{NameSpace: ns, Reads: reads, Writes: writes}
txRWSet.NsRWs = append(txRWSet.NsRWs, nsRWs)
}

Expand Down Expand Up @@ -239,7 +239,7 @@ func (itr *sKVItr) Next() (ledger.QueryResult, error) {
// Get existing cache for RW at the namespace of the result set if it exists. If none exists, then create it.
nsRWs := itr.simulator.getOrCreateNsRWHolder(itr.scanner.namespace)
nsRWs.readMap[committedKV.key] = &kvReadCache{
&txmgmt.KVRead{Key: committedKV.key, Version: committedKV.version}, committedKV.value}
&rwset.KVRead{Key: committedKV.key, Version: committedKV.version}, committedKV.value}

return &ledger.KV{Key: committedKV.key, Value: committedKV.value}, nil
}
Expand Down
37 changes: 19 additions & 18 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ package couchdbtxmgmt
import (
"bytes"
"encoding/json"
"errors"
"sync"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/op/go-logging"

"github.com/hyperledger/fabric/core/ledger/kvledger/version"
"fmt"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwset"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
putils "github.com/hyperledger/fabric/protos/utils"
Expand Down Expand Up @@ -153,7 +154,7 @@ func (txmgr *CouchDBTxMgr) ValidateAndPrepare(block *common.Block, doMVCCValidat
}

//preparation for extracting RWSet from transaction
txRWSet := &txmgmt.TxReadWriteSet{}
txRWSet := &rwset.TxReadWriteSet{}

// Get the Result from the Action
// and then Unmarshal it into a TxReadWriteSet using custom unmarshalling
Expand Down Expand Up @@ -201,7 +202,7 @@ func (txmgr *CouchDBTxMgr) Shutdown() {
txmgr.db.Close()
}

func (txmgr *CouchDBTxMgr) validateTx(txRWSet *txmgmt.TxReadWriteSet) (bool, error) {
func (txmgr *CouchDBTxMgr) validateTx(txRWSet *rwset.TxReadWriteSet) (bool, error) {

var err error
var currentVersion *version.Height
Expand All @@ -226,7 +227,7 @@ func (txmgr *CouchDBTxMgr) validateTx(txRWSet *txmgmt.TxReadWriteSet) (bool, err
return true, nil
}

func (txmgr *CouchDBTxMgr) addWriteSetToBatch(txRWSet *txmgmt.TxReadWriteSet, txHeight *version.Height) error {
func (txmgr *CouchDBTxMgr) addWriteSetToBatch(txRWSet *rwset.TxReadWriteSet, txHeight *version.Height) error {
if txmgr.updateSet == nil {
txmgr.updateSet = newUpdateSet()
}
Expand Down Expand Up @@ -259,7 +260,7 @@ func (txmgr *CouchDBTxMgr) Commit() error {
// SaveDoc using couchdb client and use JSON format
rev, err := txmgr.couchDB.SaveDoc(k, "", v.value, nil)
if err != nil {
logger.Errorf("===COUCHDB=== Error during Commit(): %s\n", err.Error())
logger.Debugf("===COUCHDB=== Error during Commit(): %s\n", err)
return err
}
if rev != "" {
Expand All @@ -280,7 +281,7 @@ func (txmgr *CouchDBTxMgr) Commit() error {
// SaveDoc using couchdb client and use attachment
rev, err := txmgr.couchDB.SaveDoc(k, "", nil, attachments)
if err != nil {
logger.Errorf("===COUCHDB=== Error during Commit(): %s\n", err.Error())
logger.Debugf("===COUCHDB=== Error during Commit(): %s\n", err)
return err
}
if rev != "" {
Expand All @@ -294,7 +295,7 @@ func (txmgr *CouchDBTxMgr) Commit() error {
// Record a savepoint
err := txmgr.recordSavepoint()
if err != nil {
logger.Errorf("===COUCHDB=== Error during recordSavepoint: %s\n", err.Error())
logger.Debugf("===COUCHDB=== Error during recordSavepoint: %s\n", err)
return err
}

Expand All @@ -312,37 +313,37 @@ func (txmgr *CouchDBTxMgr) recordSavepoint() error {
// ensure full commit to flush all changes until now to disk
dbResponse, err := txmgr.couchDB.EnsureFullCommit()
if err != nil || dbResponse.Ok != true {
logger.Errorf("====COUCHDB==== Failed to perform full commit\n")
return errors.New("Failed to perform full commit")
logger.Debugf("====COUCHDB==== Failed to perform full commit\n")
return fmt.Errorf("Failed to perform full commit. Err: %s", err)
}

// construct savepoint document
// UpdateSeq would be useful if we want to get all db changes since a logical savepoint
dbInfo, _, err := txmgr.couchDB.GetDatabaseInfo()
if err != nil {
logger.Errorf("====COUCHDB==== Failed to get DB info %s\n", err.Error())
logger.Debugf("====COUCHDB==== Failed to get DB info %s\n", err)
return err
}
savepointDoc.BlockNum = txmgr.blockNum
savepointDoc.UpdateSeq = dbInfo.UpdateSeq

savepointDocJSON, err := json.Marshal(savepointDoc)
if err != nil {
logger.Errorf("====COUCHDB==== Failed to create savepoint data %s\n", err.Error())
logger.Debugf("====COUCHDB==== Failed to create savepoint data %s\n", err)
return err
}

// SaveDoc using couchdb client and use JSON format
_, err = txmgr.couchDB.SaveDoc(savepointDocID, "", savepointDocJSON, nil)
if err != nil {
logger.Errorf("====CouchDB==== Failed to save the savepoint to DB %s\n", err.Error())
logger.Debugf("====CouchDB==== Failed to save the savepoint to DB %s\n", err)
}

// ensure full commit to flush savepoint to disk
dbResponse, err = txmgr.couchDB.EnsureFullCommit()
if err != nil || dbResponse.Ok != true {
logger.Errorf("====COUCHDB==== Failed to perform full commit\n")
return errors.New("Failed to perform full commit")
logger.Debugf("====COUCHDB==== Failed to perform full commit\n")
return fmt.Errorf("Failed to perform full commit. Err:%s", err)
}
return nil
}
Expand All @@ -354,14 +355,14 @@ func (txmgr *CouchDBTxMgr) GetBlockNumFromSavepoint() (uint64, error) {
savepointJSON, _, err := txmgr.couchDB.ReadDoc(savepointDocID)
if err != nil {
// TODO: differentiate between 404 and some other error code
logger.Errorf("====COUCHDB==== Failed to read savepoint data %s\n", err.Error())
logger.Debugf("====COUCHDB==== Failed to read savepoint data %s\n", err)
return 0, err
}

savepointDoc := &couchSavepointData{}
err = json.Unmarshal(savepointJSON, &savepointDoc)
if err != nil {
logger.Errorf("====COUCHDB==== Failed to read savepoint data %s\n", err.Error())
logger.Debugf("====COUCHDB==== Failed to read savepoint data %s\n", err)
return 0, err
}

Expand Down

0 comments on commit 2f03cf3

Please sign in to comment.