Skip to content

Commit

Permalink
Refactor db package
Browse files Browse the repository at this point in the history
Currently db will open when GetDBHandle method is invoked first time,
so GetDBHandle method needs to use mutex.
This patch makes db open and close in its lifecycle methods Start and
Stop, invokes Start only when peer starts and invokes Stop when peer
stops.
After that, methods in db do not need to support concurrency.

Change-Id: Id8612f2a846c5d626bd42c5cd4ae482076f6a975
Signed-off-by: grapebaba <281165273@qq.com>
  • Loading branch information
GrapeBaBa committed Aug 1, 2016
1 parent fb7da0d commit 9544025
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 61 deletions.
17 changes: 17 additions & 0 deletions core/chaincode/exectransaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/hyperledger/fabric/core/container"
"github.com/hyperledger/fabric/core/container/ccintf"
"github.com/hyperledger/fabric/core/crypto"
"github.com/hyperledger/fabric/core/db"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/util"
"github.com/hyperledger/fabric/membersrvc/ca"
Expand All @@ -46,6 +47,8 @@ import (
// attributes to request in the batch of tcerts while deploying, invoking or querying
var attributes = []string{"company", "position"}

var testDBWrapper = db.NewTestDBWrapper()

func getNowMillis() int64 {
nanos := time.Now().UnixNano()
return nanos / 1000000
Expand Down Expand Up @@ -355,6 +358,7 @@ func executeDeployTransaction(t *testing.T, url string) {

// Test deploy of a transaction
func TestExecuteDeployTransaction(t *testing.T) {
testDBWrapper.CleanDB(t)
executeDeployTransaction(t, "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example01")
}

Expand All @@ -364,6 +368,7 @@ func TestGopathExecuteDeployTransaction(t *testing.T) {
// and a couple of elements - it doesn't matter what they are
os.Setenv("GOPATH", os.Getenv("GOPATH")+string(os.PathSeparator)+string(os.PathListSeparator)+"/tmp/foo"+string(os.PathListSeparator)+"/tmp/bar")
fmt.Printf("set GOPATH to: \"%s\"\n", os.Getenv("GOPATH"))
testDBWrapper.CleanDB(t)
executeDeployTransaction(t, "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example01")
}

Expand All @@ -372,6 +377,7 @@ func TestHTTPExecuteDeployTransaction(t *testing.T) {
// The chaincode used here cannot be from the fabric repo
// itself or it won't be downloaded because it will be found
// in GOPATH, which would defeat the test
testDBWrapper.CleanDB(t)
executeDeployTransaction(t, "http://github.com/hyperledger/fabric-test-resources/examples/chaincode/go/chaincode_example01")
}

Expand Down Expand Up @@ -465,6 +471,7 @@ func invokeExample02Transaction(ctxt context.Context, cID *pb.ChaincodeID, args
}

func TestExecuteInvokeTransaction(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption

//TLS is on by default. This is the ONLY test that does NOT use TLS
Expand Down Expand Up @@ -570,6 +577,7 @@ func exec(ctxt context.Context, chaincodeID string, numTrans int, numQueries int

// Test the execution of a query.
func TestExecuteQuery(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
Expand Down Expand Up @@ -653,6 +661,7 @@ func TestExecuteQuery(t *testing.T) {

// Test the execution of an invalid transaction.
func TestExecuteInvokeInvalidTransaction(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
Expand Down Expand Up @@ -714,6 +723,7 @@ func TestExecuteInvokeInvalidTransaction(t *testing.T) {

// Test the execution of an invalid query.
func TestExecuteInvalidQuery(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
Expand Down Expand Up @@ -785,6 +795,7 @@ func TestExecuteInvalidQuery(t *testing.T) {

// Test the execution of a chaincode that invokes another chaincode.
func TestChaincodeInvokeChaincode(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
Expand Down Expand Up @@ -898,6 +909,7 @@ func TestChaincodeInvokeChaincode(t *testing.T) {
// Test the execution of a chaincode that invokes another chaincode with wrong parameters. Should receive error from
// from the called chaincode
func TestChaincodeInvokeChaincodeErrorCase(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
Expand Down Expand Up @@ -1098,6 +1110,7 @@ func chaincodeQueryChaincode(user string) error {

// Test the execution of a chaincode query that queries another chaincode without security enabled
func TestChaincodeQueryChaincode(t *testing.T) {
testDBWrapper.CleanDB(t)
var peerLis net.Listener
var err error
if peerLis, err = initPeer(); err != nil {
Expand All @@ -1119,6 +1132,7 @@ func TestChaincodeQueryChaincode(t *testing.T) {
// Test the execution of a chaincode that queries another chaincode with invalid parameter. Should receive error from
// from the called chaincode
func TestChaincodeQueryChaincodeErrorCase(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
Expand Down Expand Up @@ -1229,6 +1243,7 @@ func TestChaincodeQueryChaincodeErrorCase(t *testing.T) {
// Test the execution of a chaincode query that queries another chaincode with security enabled
// NOTE: this really needs to be a behave test. Remove when we have support in behave for multiple chaincodes
func TestChaincodeQueryChaincodeWithSec(t *testing.T) {
testDBWrapper.CleanDB(t)
viper.Set("security.enabled", "true")

//Initialize crypto
Expand Down Expand Up @@ -1282,6 +1297,7 @@ func TestChaincodeQueryChaincodeWithSec(t *testing.T) {

// Test the invocation of a transaction.
func TestRangeQuery(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
Expand Down Expand Up @@ -1352,6 +1368,7 @@ func TestRangeQuery(t *testing.T) {
}

func TestGetEvent(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
Expand Down
50 changes: 16 additions & 34 deletions core/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"os"
"path"
"strings"
"sync"

"github.com/op/go-logging"
"github.com/spf13/viper"
Expand All @@ -45,13 +44,6 @@ var columnfamilies = []string{
persistCF, // persistent per-peer state (consensus)
}

type dbState int32

const (
closed dbState = iota
opened
)

// OpenchainDB encapsulates rocksdb's structures
type OpenchainDB struct {
DB *gorocksdb.DB
Expand All @@ -60,23 +52,30 @@ type OpenchainDB struct {
StateDeltaCF *gorocksdb.ColumnFamilyHandle
IndexesCF *gorocksdb.ColumnFamilyHandle
PersistCF *gorocksdb.ColumnFamilyHandle
dbState dbState
mux sync.Mutex
}

var openchainDB = Create()
var openchainDB = create()

// Create create an openchainDB instance
func Create() *OpenchainDB {
return &OpenchainDB{dbState: closed}
func create() *OpenchainDB {
return &OpenchainDB{}
}

// GetDBHandle get an opened openchainDB singleton
// GetDBHandle gets an opened openchainDB singleton. Note that method Start must always be invoked before this method.
func GetDBHandle() *OpenchainDB {
openchainDB.Open()
return openchainDB
}

// Start the db, init the openchainDB instance and open the db. Note this method has no guarantee correct behavior concurrent invocation.
func Start() {
openchainDB.open()
}

// Stop the db. Note this method has no guarantee correct behavior concurrent invocation.
func Stop() {
openchainDB.close()
}

// GetFromBlockchainCF get value for given key from column family - blockchainCF
func (openchainDB *OpenchainDB) GetFromBlockchainCF(key []byte) ([]byte, error) {
return openchainDB.Get(openchainDB.BlockchainCF, key)
Expand Down Expand Up @@ -142,15 +141,7 @@ func getDBPath() string {
}

// Open open underlying rocksdb
func (openchainDB *OpenchainDB) Open() {
openchainDB.mux.Lock()
if openchainDB.dbState == opened {
openchainDB.mux.Unlock()
return
}

defer openchainDB.mux.Unlock()

func (openchainDB *OpenchainDB) open() {
dbPath := getDBPath()
missing, err := dirMissingOrEmpty(dbPath)
if err != nil {
Expand Down Expand Up @@ -190,25 +181,16 @@ func (openchainDB *OpenchainDB) Open() {
openchainDB.StateDeltaCF = cfHandlers[3]
openchainDB.IndexesCF = cfHandlers[4]
openchainDB.PersistCF = cfHandlers[5]
openchainDB.dbState = opened
}

// Close releases all column family handles and closes rocksdb
func (openchainDB *OpenchainDB) Close() {
openchainDB.mux.Lock()
if openchainDB.dbState == closed {
openchainDB.mux.Unlock()
return
}

defer openchainDB.mux.Unlock()
func (openchainDB *OpenchainDB) close() {
openchainDB.BlockchainCF.Destroy()
openchainDB.StateCF.Destroy()
openchainDB.StateDeltaCF.Destroy()
openchainDB.IndexesCF.Destroy()
openchainDB.PersistCF.Destroy()
openchainDB.DB.Close()
openchainDB.dbState = closed
}

// DeleteState delets ALL state keys/values from the DB. This is generally
Expand Down
36 changes: 14 additions & 22 deletions core/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,72 +41,64 @@ func TestGetDBPathEmptyPath(t *testing.T) {
}
}()
defer viper.Set("peer.fileSystemPath", originalSetting)
Start()
GetDBHandle()
}

func TestCreateDB(t *testing.T) {
openchainDB := Create()
openchainDB.Open()
defer deleteTestDBPath()
defer openchainDB.Close()
}

func TestOpenDB_DirDoesNotExist(t *testing.T) {
openchainDB := Create()
func TestStartDB_DirDoesNotExist(t *testing.T) {
deleteTestDBPath()

defer deleteTestDBPath()
defer openchainDB.Close()
defer Stop()
defer func() {
if r := recover(); r != nil {
t.Fatalf("Failed to open DB: %s", r)
}
}()
openchainDB.Open()
Start()
}

func TestOpenDB_NonEmptyDirExists(t *testing.T) {
openchainDB := Create()
func TestStartDB_NonEmptyDirExists(t *testing.T) {
deleteTestDBPath()
createNonEmptyTestDBPath()

defer deleteTestDBPath()
defer openchainDB.Close()
defer func() {
if r := recover(); r == nil {
t.Fatalf("dbPath is already exists. DB open should throw error")
}
}()
openchainDB.Open()
Start()
}

func TestWriteAndRead(t *testing.T) {
openchainDB := GetDBHandle()
deleteTestDBPath()
Start()
defer deleteTestDBPath()
defer openchainDB.Close()
defer Stop()
performBasicReadWrite(openchainDB, t)
}

// This test verifies that when a new column family is added to the DB
// users at an older level of the DB will still be able to open it with new code
func TestDBColumnUpgrade(t *testing.T) {
openchainDB := GetDBHandle()
openchainDB.Close()
deleteTestDBPath()
Start()
Stop()

oldcfs := columnfamilies
columnfamilies = append([]string{"Testing"}, columnfamilies...)
defer func() {
columnfamilies = oldcfs
}()
openchainDB = GetDBHandle()

defer deleteTestDBPath()
defer openchainDB.Close()
defer Stop()
defer func() {
if r := recover(); r != nil {
t.Fatalf("Error re-opening DB with upgraded columnFamilies")
}
}()
Start()
}

func TestDeleteState(t *testing.T) {
Expand Down
12 changes: 9 additions & 3 deletions core/db/db_test_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (testDB *TestDBWrapper) CleanDB(t testing.TB) {
testDB.removeDBPath()
t.Logf("Creating testDB")

Start()
testDB.performCleanup = true
}

Expand All @@ -55,12 +56,13 @@ func (testDB *TestDBWrapper) CreateFreshDBGinkgo() {
// at the end of the test
testDB.cleanup()
testDB.removeDBPath()
Start()
testDB.performCleanup = true
}

func (testDB *TestDBWrapper) cleanup() {
if testDB.performCleanup {
GetDBHandle().Close()
Stop()
testDB.performCleanup = false
}
}
Expand Down Expand Up @@ -116,8 +118,12 @@ func (testDB *TestDBWrapper) GetFromStateDeltaCF(t testing.TB, key []byte) []byt

// CloseDB closes the db
func (testDB *TestDBWrapper) CloseDB(t testing.TB) {
openchainDB := GetDBHandle()
openchainDB.Close()
Stop()
}

// OpenDB opens the db
func (testDB *TestDBWrapper) OpenDB(t testing.TB) {
Start()
}

// GetEstimatedNumKeys returns estimated number of key-values in db. This is not accurate in all the cases
Expand Down
5 changes: 4 additions & 1 deletion core/ledger/blockchain_indexes_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,11 @@ func TestIndexesAsync_IndexPendingBlocks(t *testing.T) {
t.Fatalf("Error populating block chain with sample data: %s", err)
}

// close the db and create new instance of blockchain (and the associated async indexer) - the indexer should index the pending blocks
// close the db
testDBWrapper.CloseDB(t)
// open the db again and create new instance of blockchain (and the associated async indexer)
// the indexer should index the pending blocks
testDBWrapper.OpenDB(t)
testBlockchainWrapper = newTestBlockchainWrapper(t)
defer chain.indexer.stop()

Expand Down
4 changes: 4 additions & 0 deletions core/system_chaincode/systemchaincode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/core/db"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/system_chaincode/api"
"github.com/hyperledger/fabric/core/system_chaincode/samplesyscc"
Expand All @@ -34,6 +35,8 @@ import (
"google.golang.org/grpc"
)

var testDBWrapper = db.NewTestDBWrapper()

// Invoke or query a chaincode.
func invoke(ctx context.Context, spec *pb.ChaincodeSpec, typ pb.Transaction_Type) (*pb.ChaincodeEvent, string, []byte, error) {
chaincodeInvocationSpec := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec}
Expand Down Expand Up @@ -75,6 +78,7 @@ func closeListenerAndSleep(l net.Listener) {

// Test deploy of a transaction.
func TestExecuteDeploySysChaincode(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
viper.Set("peer.fileSystemPath", "/var/hyperledger/test/tmpdb")
Expand Down

0 comments on commit 9544025

Please sign in to comment.