Skip to content

Commit

Permalink
Replace RocksDB by goleveldb
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-788
RocksDB has a patent infringement license in it from Facebook
(https://github.com/facebook/rocksdb/blob/master/PATENTS)
Many users may not be comfortable with the term of the license.

The alternatives include
1. LevelDB (https://github.com/google/leveldb) with a go wrapper (https://github.com/jmhodges/levigo),
2. goleveldb (https://github.com/syndtr/goleveldb) - a porting of leveldb in golang
3. BoltDB (https://github.com/boltdb/bolt)

BoltDB is suitable for read heavy workloads (e.g., LDAP)
but has a relatively poor performance for read-write workloads.
Of the other two options, goleveldb is chosen because it is
implemented in golang and hence easy to intergate and maintain.
In addition, as a precedent, ethereum go implementation also
uses this package
https://github.com/ethereum/go-ethereum/blob/master/ethdb/database.go

Change-Id: Ia4fb5a6f9299e613d03d8b414a51bf479bfafd59
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Nov 9, 2016
1 parent 8c9dcc9 commit 27088ac
Show file tree
Hide file tree
Showing 61 changed files with 14,347 additions and 157 deletions.
23 changes: 6 additions & 17 deletions core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/hyperledger/fabric/protos"
"github.com/op/go-logging"
"github.com/tecbot/gorocksdb"
)

var logger = logging.MustGetLogger("kvledger")
Expand All @@ -45,7 +44,6 @@ type blockfileMgr struct {
rootDir string
conf *Conf
db *db.DB
defaultCF *gorocksdb.ColumnFamilyHandle
index index
cpInfo *checkpointInfo
cpInfoCond *sync.Cond
Expand All @@ -60,7 +58,7 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile
panic(fmt.Sprintf("Error: %s", err))
}
db := initDB(conf)
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: db, defaultCF: db.GetDefaultCFHandle()}
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: db}
cpInfo, err := mgr.loadCurrentInfo()
if err != nil {
panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
Expand All @@ -82,7 +80,7 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile
panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
}

mgr.index = newBlockIndex(indexConfig, db, db.GetCFHandle(blockIndexCF))
mgr.index = newBlockIndex(indexConfig, db)
mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter
mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})
Expand Down Expand Up @@ -115,10 +113,7 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile

func initDB(conf *Conf) *db.DB {
dbInst := db.CreateDB(&db.Conf{
DBPath: conf.dbPath,
CFNames: []string{blockIndexCF},
DisableWAL: true})

DBPath: conf.dbPath})
dbInst.Open()
return dbInst
}
Expand Down Expand Up @@ -421,7 +416,7 @@ func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
var b []byte
var err error
if b, err = mgr.db.Get(mgr.defaultCF, blkMgrInfoKey); b == nil || err != nil {
if b, err = mgr.db.Get(blkMgrInfoKey); b == nil || err != nil {
return nil, err
}
i := &checkpointInfo{}
Expand All @@ -432,20 +427,14 @@ func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
return i, nil
}

func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, flush bool) error {
func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {
b, err := i.marshal()
if err != nil {
return err
}
if err = mgr.db.Put(mgr.defaultCF, blkMgrInfoKey, b); err != nil {
if err = mgr.db.Put(blkMgrInfoKey, b, sync); err != nil {
return err
}
if flush {
if err = mgr.db.Flush(true); err != nil {
return err
}
logger.Debugf("saved checkpointInfo:%s", i)
}
return nil
}

Expand Down
29 changes: 13 additions & 16 deletions core/ledger/blkstorage/fsblkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/tecbot/gorocksdb"
"github.com/syndtr/goleveldb/leveldb"
)

const (
Expand Down Expand Up @@ -53,24 +53,22 @@ type blockIdxInfo struct {
type blockIndex struct {
indexItemsMap map[blkstorage.IndexableAttr]bool
db *db.DB
blockIndexCF *gorocksdb.ColumnFamilyHandle
}

func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *db.DB,
indexCFHandle *gorocksdb.ColumnFamilyHandle) *blockIndex {
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *db.DB) *blockIndex {
indexItems := indexConfig.AttrsToIndex
logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
for _, indexItem := range indexItems {
indexItemsMap[indexItem] = true
}
return &blockIndex{indexItemsMap, db, indexCFHandle}
return &blockIndex{indexItemsMap, db}
}

func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
var blockNumBytes []byte
var err error
if blockNumBytes, err = index.db.Get(index.blockIndexCF, indexCheckpointKey); err != nil {
if blockNumBytes, err = index.db.Get(indexCheckpointKey); err != nil {
return 0, nil
}
return decodeBlockNum(blockNumBytes), nil
Expand All @@ -85,19 +83,18 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
logger.Debugf("Indexing block [%s]", blockIdxInfo)
flp := blockIdxInfo.flp
txOffsets := blockIdxInfo.txOffsets
batch := gorocksdb.NewWriteBatch()
defer batch.Destroy()
batch := &leveldb.Batch{}
flpBytes, err := flp.marshal()
if err != nil {
return err
}

if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
batch.PutCF(index.blockIndexCF, constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
}

if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
batch.PutCF(index.blockIndexCF, constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
}

if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
Expand All @@ -110,12 +107,12 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
if marshalErr != nil {
return marshalErr
}
batch.PutCF(index.blockIndexCF, constructTxIDKey(txID), txFlpBytes)
batch.Put(constructTxIDKey(txID), txFlpBytes)
}
}

batch.PutCF(index.blockIndexCF, indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
if err := index.db.WriteBatch(batch); err != nil {
batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
if err := index.db.WriteBatch(batch, false); err != nil {
return err
}
return nil
Expand All @@ -125,7 +122,7 @@ func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, e
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(index.blockIndexCF, constructBlockHashKey(blockHash))
b, err := index.db.Get(constructBlockHashKey(blockHash))
if err != nil {
return nil, err
}
Expand All @@ -141,7 +138,7 @@ func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(index.blockIndexCF, constructBlockNumKey(blockNum))
b, err := index.db.Get(constructBlockNumKey(blockNum))
if err != nil {
return nil, err
}
Expand All @@ -157,7 +154,7 @@ func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(index.blockIndexCF, constructTxIDKey(txID))
b, err := index.db.Get(constructTxIDKey(txID))
if err != nil {
return nil, err
}
Expand Down
6 changes: 2 additions & 4 deletions core/ledger/kvledger/txmgmt/couchdbtxmgmt/couchdb_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/hyperledger/fabric/protos"
putils "github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
"github.com/tecbot/gorocksdb"
)

var logger = logging.MustGetLogger("couchdbtxmgmt")
Expand Down Expand Up @@ -68,7 +67,6 @@ func (u *updateSet) get(compositeKey []byte) *versionedValue {
// This implementation uses a read-write lock to prevent conflicts between transaction simulation and committing
type CouchDBTxMgr struct {
db *db.DB
stateIndexCF *gorocksdb.ColumnFamilyHandle
updateSet *updateSet
commitRWLock sync.RWMutex
couchDB *couchdb.CouchDBConnectionDef // COUCHDB new properties for CouchDB
Expand All @@ -87,7 +85,7 @@ type CouchConnection struct {
func NewCouchDBTxMgr(conf *Conf, host string, port int, dbName string, id string, pw string) *CouchDBTxMgr {

// TODO cleanup this RocksDB handle
db := db.CreateDB(&db.Conf{DBPath: conf.DBPath, CFNames: []string{}})
db := db.CreateDB(&db.Conf{DBPath: conf.DBPath})
db.Open()

couchDB, err := couchdb.CreateConnectionDefinition(host,
Expand All @@ -106,7 +104,7 @@ func NewCouchDBTxMgr(conf *Conf, host string, port int, dbName string, id string
}

// db and stateIndexCF will not be used for CouchDB. TODO to cleanup
return &CouchDBTxMgr{db: db, stateIndexCF: db.GetDefaultCFHandle(), couchDB: couchDB}
return &CouchDBTxMgr{db: db, couchDB: couchDB}
}

// NewQueryExecutor implements method in interface `txmgmt.TxMgr`
Expand Down
16 changes: 7 additions & 9 deletions core/ledger/kvledger/txmgmt/lockbasedtxmgmt/lockbased_txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/hyperledger/fabric/protos"
putils "github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
"github.com/tecbot/gorocksdb"
"github.com/syndtr/goleveldb/leveldb"
)

var logger = logging.MustGetLogger("lockbasedtxmgmt")
Expand Down Expand Up @@ -67,16 +67,15 @@ func (u *updateSet) get(compositeKey []byte) *versionedValue {
// This implementation uses a read-write lock to prevent conflicts between transaction simulation and committing
type LockBasedTxMgr struct {
db *db.DB
stateIndexCF *gorocksdb.ColumnFamilyHandle
updateSet *updateSet
commitRWLock sync.RWMutex
}

// NewLockBasedTxMgr constructs a `LockBasedTxMgr`
func NewLockBasedTxMgr(conf *Conf) *LockBasedTxMgr {
db := db.CreateDB(&db.Conf{DBPath: conf.DBPath, CFNames: []string{}})
db := db.CreateDB(&db.Conf{DBPath: conf.DBPath})
db.Open()
return &LockBasedTxMgr{db: db, stateIndexCF: db.GetDefaultCFHandle()}
return &LockBasedTxMgr{db: db}
}

// NewQueryExecutor implements method in interface `txmgmt.TxMgr`
Expand Down Expand Up @@ -226,18 +225,17 @@ func (txmgr *LockBasedTxMgr) addWriteSetToBatch(txRWSet *txmgmt.TxReadWriteSet)

// Commit implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) Commit() error {
batch := gorocksdb.NewWriteBatch()
batch := &leveldb.Batch{}
if txmgr.updateSet == nil {
panic("validateAndPrepare() method should have been called before calling commit()")
}
for k, v := range txmgr.updateSet.m {
batch.PutCF(txmgr.stateIndexCF, []byte(k), encodeValue(v.value, v.version))
batch.Put([]byte(k), encodeValue(v.value, v.version))
}
txmgr.commitRWLock.Lock()
defer txmgr.commitRWLock.Unlock()
defer func() { txmgr.updateSet = nil }()
defer batch.Destroy()
if err := txmgr.db.WriteBatch(batch); err != nil {
if err := txmgr.db.WriteBatch(batch, false); err != nil {
return err
}
return nil
Expand All @@ -261,7 +259,7 @@ func (txmgr *LockBasedTxMgr) getCommittedValueAndVersion(ns string, key string)
compositeKey := constructCompositeKey(ns, key)
var encodedValue []byte
var err error
if encodedValue, err = txmgr.db.Get(txmgr.stateIndexCF, compositeKey); err != nil {
if encodedValue, err = txmgr.db.Get(compositeKey); err != nil {
return nil, 0, err
}
if encodedValue == nil {
Expand Down

0 comments on commit 27088ac

Please sign in to comment.