Skip to content

Commit

Permalink
Merge pull request #1737 from GrapeBaBa/issue-1660
Browse files Browse the repository at this point in the history
Fix #1660 to use sync.once to protect db singleton
  • Loading branch information
srderson committed Jul 6, 2016
2 parents cd031cf + a03d10e commit 94b567b
Show file tree
Hide file tree
Showing 20 changed files with 140 additions and 176 deletions.
118 changes: 50 additions & 68 deletions core/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path"
"strings"
"sync"

"github.com/op/go-logging"
"github.com/spf13/viper"
Expand All @@ -44,6 +45,13 @@ var columnfamilies = []string{
persistCF, // persistent per-peer state (consensus)
}

type dbState int32

const (
closed dbState = iota
opened
)

// OpenchainDB encapsulates rocksdb's structures
type OpenchainDB struct {
DB *gorocksdb.DB
Expand All @@ -52,59 +60,20 @@ type OpenchainDB struct {
StateDeltaCF *gorocksdb.ColumnFamilyHandle
IndexesCF *gorocksdb.ColumnFamilyHandle
PersistCF *gorocksdb.ColumnFamilyHandle
dbState dbState
mux sync.Mutex
}

var openchainDB *OpenchainDB
var isOpen bool
var openchainDB = Create()

// CreateDB creates a rocks db database
func CreateDB() error {
dbPath := getDBPath()
dbLogger.Debugf("Creating DB at [%s]", dbPath)
missing, err := dirMissingOrEmpty(dbPath)
if err != nil {
return err
}

if !missing {
return fmt.Errorf("db dir [%s] already exists", dbPath)
}
err = os.MkdirAll(path.Dir(dbPath), 0755)
if err != nil {
dbLogger.Errorf("Error calling os.MkdirAll for directory path [%s]: %s", dbPath, err)
return fmt.Errorf("Error making directory path [%s]: %s", dbPath, err)
}
opts := gorocksdb.NewDefaultOptions()
defer opts.Destroy()
opts.SetCreateIfMissing(true)

db, err := gorocksdb.OpenDb(opts, dbPath)
if err != nil {
return err
}

defer db.Close()

dbLogger.Debugf("DB created at [%s]", dbPath)
return nil
// Create create an openchainDB instance
func Create() *OpenchainDB {
return &OpenchainDB{dbState: closed}
}

// GetDBHandle returns a handle to OpenchainDB
// GetDBHandle get an opened openchainDB singleton
func GetDBHandle() *OpenchainDB {
var err error
if isOpen {
return openchainDB
}

err = createDBIfDBPathEmpty()
if err != nil {
panic(fmt.Sprintf("Error while trying to create DB: %s", err))
}

openchainDB, err = openDB()
if err != nil {
panic(fmt.Sprintf("Could not open openchain db error = [%s]", err))
}
openchainDB.Open()
return openchainDB
}

Expand Down Expand Up @@ -172,32 +141,34 @@ func getDBPath() string {
return dbPath + "db"
}

func createDBIfDBPathEmpty() error {
// Open open underlying rocksdb
func (openchainDB *OpenchainDB) Open() {
openchainDB.mux.Lock()
if openchainDB.dbState == opened {
openchainDB.mux.Unlock()
return
}

defer openchainDB.mux.Unlock()

dbPath := getDBPath()
missing, err := dirMissingOrEmpty(dbPath)
if err != nil {
return err
panic(fmt.Sprintf("Error while trying to open DB: %s", err))
}
dbLogger.Debugf("Is db path [%s] empty [%t]", dbPath, missing)

if missing {
err := CreateDB()
err = os.MkdirAll(path.Dir(dbPath), 0755)
if err != nil {
return nil
panic(fmt.Sprintf("Error making directory path [%s]: %s", dbPath, err))
}
}
return nil
}

func openDB() (*OpenchainDB, error) {
if isOpen {
return openchainDB, nil
}

dbPath := getDBPath()
opts := gorocksdb.NewDefaultOptions()
defer opts.Destroy()

opts.SetCreateIfMissing(false)
opts.SetCreateIfMissing(missing)
opts.SetCreateIfMissingColumnFamilies(true)

cfNames := []string{"default"}
Expand All @@ -210,23 +181,34 @@ func openDB() (*OpenchainDB, error) {
db, cfHandlers, err := gorocksdb.OpenDbColumnFamilies(opts, dbPath, cfNames, cfOpts)

if err != nil {
fmt.Println("Error opening DB", err)
return nil, err
panic(fmt.Sprintf("Error opening DB: %s", err))
}
isOpen = true
// XXX should we close cfHandlers[0]?
return &OpenchainDB{db, cfHandlers[1], cfHandlers[2], cfHandlers[3], cfHandlers[4], cfHandlers[5]}, nil

openchainDB.DB = db
openchainDB.BlockchainCF = cfHandlers[1]
openchainDB.StateCF = cfHandlers[2]
openchainDB.StateDeltaCF = cfHandlers[3]
openchainDB.IndexesCF = cfHandlers[4]
openchainDB.PersistCF = cfHandlers[5]
openchainDB.dbState = opened
}

// CloseDB releases all column family handles and closes rocksdb
func (openchainDB *OpenchainDB) CloseDB() {
// Close releases all column family handles and closes rocksdb
func (openchainDB *OpenchainDB) Close() {
openchainDB.mux.Lock()
if openchainDB.dbState == closed {
openchainDB.mux.Unlock()
return
}

defer openchainDB.mux.Unlock()
openchainDB.BlockchainCF.Destroy()
openchainDB.StateCF.Destroy()
openchainDB.StateDeltaCF.Destroy()
openchainDB.IndexesCF.Destroy()
openchainDB.PersistCF.Destroy()
openchainDB.DB.Close()
isOpen = false
openchainDB.dbState = closed
}

// DeleteState delets ALL state keys/values from the DB. This is generally
Expand Down
109 changes: 48 additions & 61 deletions core/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,72 +44,74 @@ func TestGetDBPathEmptyPath(t *testing.T) {
GetDBHandle()
}

func TestCreateDB_DirDoesNotExist(t *testing.T) {
err := CreateDB()
if err != nil {
t.Fatalf("Failed to create DB: %s", err)
}
deleteTestDB()
func TestCreateDB(t *testing.T) {
openchainDB := Create()
openchainDB.Open()
defer deleteTestDBPath()
defer openchainDB.Close()
}

func TestCreateDB_NonEmptyDirExists(t *testing.T) {
createNonEmptyTestDBPath()
err := CreateDB()
if err == nil {
t.Fatal("Dir alrady exists. DB creation should throw error")
}
func TestOpenDB_DirDoesNotExist(t *testing.T) {
openchainDB := Create()
deleteTestDBPath()
}

func TestWriteAndRead(t *testing.T) {
createTestDB()
defer deleteTestDB()
performBasicReadWrite(t)
defer deleteTestDBPath()
defer openchainDB.Close()
defer func() {
if r := recover(); r != nil {
t.Fatalf("Failed to open DB: %s", r)
}
}()
openchainDB.Open()
}

func TestOpenDB_DirDoesNotExist(t *testing.T) {
func TestOpenDB_NonEmptyDirExists(t *testing.T) {
openchainDB := Create()
deleteTestDBPath()
defer deleteTestDB()
performBasicReadWrite(t)
createNonEmptyTestDBPath()

defer deleteTestDBPath()
defer openchainDB.Close()
defer func() {
if r := recover(); r == nil {
t.Fatalf("dbPath is already exists. DB open should throw error")
}
}()
openchainDB.Open()
}

func TestOpenDB_DirEmpty(t *testing.T) {
deleteTestDBPath()
createTestDBPath()
defer deleteTestDB()
performBasicReadWrite(t)
func TestWriteAndRead(t *testing.T) {
openchainDB := GetDBHandle()
defer deleteTestDBPath()
defer openchainDB.Close()
performBasicReadWrite(openchainDB, t)
}

// This test verifies that when a new column family is added to the DB
// users at an older level of the DB will still be able to open it with new code
func TestDBColumnUpgrade(t *testing.T) {
deleteTestDBPath()
createTestDBPath()
err := CreateDB()
if nil != err {
t.Fatalf("Error creating DB")
}
db, err := openDB()
if nil != err {
t.Fatalf("Error opening DB")
}
db.CloseDB()
openchainDB := GetDBHandle()
openchainDB.Close()

oldcfs := columnfamilies
columnfamilies = append([]string{"Testing"}, columnfamilies...)
defer func() {
columnfamilies = oldcfs
}()
db, err = openDB()
if nil != err {
t.Fatalf("Error re-opening DB with upgraded columnFamilies")
}
db.CloseDB()
openchainDB = GetDBHandle()

defer deleteTestDBPath()
defer openchainDB.Close()
defer func() {
if r := recover(); r != nil {
t.Fatalf("Error re-opening DB with upgraded columnFamilies")
}
}()
}

func TestDeleteState(t *testing.T) {
testDBWrapper := NewTestDBWrapper()
testDBWrapper.CreateFreshDB(t)
testDBWrapper.CleanDB(t)
openchainDB := GetDBHandle()
defer testDBWrapper.cleanup()
openchainDB.Put(openchainDB.StateCF, []byte("key1"), []byte("value1"))
Expand All @@ -134,7 +136,7 @@ func TestDeleteState(t *testing.T) {

func TestDBSnapshot(t *testing.T) {
testDBWrapper := NewTestDBWrapper()
testDBWrapper.CreateFreshDB(t)
testDBWrapper.CleanDB(t)
openchainDB := GetDBHandle()
defer testDBWrapper.cleanup()

Expand Down Expand Up @@ -187,7 +189,7 @@ func TestDBSnapshot(t *testing.T) {

func TestDBIteratorAndSnapshotIterator(t *testing.T) {
testDBWrapper := NewTestDBWrapper()
testDBWrapper.CreateFreshDB(t)
testDBWrapper.CleanDB(t)
openchainDB := GetDBHandle()
defer testDBWrapper.cleanup()

Expand Down Expand Up @@ -226,6 +228,7 @@ func TestDBIteratorAndSnapshotIterator(t *testing.T) {
testIterator(t, itr, map[string][]byte{"key6": []byte("value6"), "key7": []byte("value7")})
}

// db helper functions
func testIterator(t *testing.T, itr *gorocksdb.Iterator, expectedValues map[string][]byte) {
itrResults := make(map[string][]byte)
itr.SeekToFirst()
Expand All @@ -246,31 +249,16 @@ func testIterator(t *testing.T, itr *gorocksdb.Iterator, expectedValues map[stri
}
}

// db helper functions
func createTestDBPath() {
dbPath := viper.GetString("peer.fileSystemPath")
os.MkdirAll(dbPath, 0775)
}

func createNonEmptyTestDBPath() {
dbPath := viper.GetString("peer.fileSystemPath")
os.MkdirAll(dbPath+"/db/tmpFile", 0775)
}

func createTestDB() error {
return CreateDB()
}

func deleteTestDBPath() {
dbPath := viper.GetString("peer.fileSystemPath")
os.RemoveAll(dbPath)
}

func deleteTestDB() {
GetDBHandle().CloseDB()
deleteTestDBPath()
}

func setupTestConfig() {
tempDir, err := ioutil.TempDir("", "fabric-db-test")
if err != nil {
Expand All @@ -280,8 +268,7 @@ func setupTestConfig() {
deleteTestDBPath()
}

func performBasicReadWrite(t *testing.T) {
openchainDB := GetDBHandle()
func performBasicReadWrite(openchainDB *OpenchainDB, t *testing.T) {
opt := gorocksdb.NewDefaultWriteOptions()
defer opt.Destroy()
writeBatch := gorocksdb.NewWriteBatch()
Expand Down

0 comments on commit 94b567b

Please sign in to comment.