Skip to content

Commit

Permalink
FAB-1425: Record savepoint for historyDB
Browse files Browse the repository at this point in the history
At time of commit() record a savepoint in historydb to store and
help in recovery. Removed extra fsync. Returning err or SaveDoc error

Change-Id: I93a2ab101c6bcde58b5e3050f86c621e9d13157d
Signed-off-by: Balaji Viswanathan <balaji.viswanathan@gmail.com>
  • Loading branch information
bviswana101 committed Jan 10, 2017
1 parent 01de0e4 commit c497883
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 1 deletion.
79 changes: 78 additions & 1 deletion core/ledger/history/couchdb_histmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package history

import (
"bytes"
"encoding/json"
"fmt"
"strconv"

"github.com/hyperledger/fabric/core/ledger"
Expand All @@ -28,6 +30,15 @@ import (
logging "github.com/op/go-logging"
)

// Savepoint docid (key) for couchdb
const savepointDocID = "histdb_savepoint"

// Savepoint data for couchdb
type couchSavepointData struct {
BlockNum uint64 `json:"BlockNum"`
UpdateSeq string `json:"UpdateSeq"`
}

var logger = logging.MustGetLogger("history")

var compositeKeySep = []byte{0x00}
Expand Down Expand Up @@ -125,10 +136,55 @@ func (histmgr *CouchDBHistMgr) Commit(block *common.Block) error {
if rev != "" {
logger.Debugf("===HISTORYDB=== Saved document revision number: %s\n", rev)
}

}
}
}

// Record a savepoint
err := histmgr.recordSavepoint(blockNo)
if err != nil {
logger.Debugf("===COUCHDB=== Error during recordSavepoint: %s\n", err)
return err
}

return nil
}

// recordSavepoint Record a savepoint in historydb.
// Couch parallelizes writes in cluster or sharded setup and ordering is not guaranteed.
// Hence we need to fence the savepoint with sync. So ensure_full_commit is called before AND after writing savepoint document
// TODO: Optimization - merge 2nd ensure_full_commit with savepoint by using X-Couch-Full-Commit header
func (txmgr *CouchDBHistMgr) recordSavepoint(blockNo uint64) error {
var err error
var savepointDoc couchSavepointData
// ensure full commit to flush all changes until now to disk
dbResponse, err := txmgr.couchDB.EnsureFullCommit()
if err != nil || dbResponse.Ok != true {
logger.Debugf("====COUCHDB==== Failed to perform full commit\n")
return fmt.Errorf("Failed to perform full commit. Err: %s", err)
}

// construct savepoint document
// UpdateSeq would be useful if we want to get all db changes since a logical savepoint
dbInfo, _, err := txmgr.couchDB.GetDatabaseInfo()
if err != nil {
logger.Debugf("====COUCHDB==== Failed to get DB info %s\n", err)
return err
}
savepointDoc.BlockNum = blockNo
savepointDoc.UpdateSeq = dbInfo.UpdateSeq

savepointDocJSON, err := json.Marshal(savepointDoc)
if err != nil {
logger.Debugf("====COUCHDB==== Failed to create savepoint data %s\n", err)
return err
}

// SaveDoc using couchdb client and use JSON format
_, err = txmgr.couchDB.SaveDoc(savepointDocID, "", savepointDocJSON, nil)
if err != nil {
logger.Debugf("====CouchDB==== Failed to save the savepoint to DB %s\n", err)
return err
}
return nil
}
Expand All @@ -149,6 +205,27 @@ func (histmgr *CouchDBHistMgr) getTransactionsForNsKey(namespace string, key str
return newHistScanner(compositeStartKey, *queryResult), nil
}

// GetBlockNumFromSavepoint Reads the savepoint from database and returns the corresponding block number.
// If no savepoint is found, it returns 0
func (txmgr *CouchDBHistMgr) GetBlockNumFromSavepoint() (uint64, error) {
var err error
savepointJSON, _, err := txmgr.couchDB.ReadDoc(savepointDocID)
if err != nil {
// TODO: differentiate between 404 and some other error code
logger.Debugf("====COUCHDB==== Failed to read savepoint data %s\n", err)
return 0, err
}

savepointDoc := &couchSavepointData{}
err = json.Unmarshal(savepointJSON, &savepointDoc)
if err != nil {
logger.Debugf("====COUCHDB==== Failed to read savepoint data %s\n", err)
return 0, err
}

return savepointDoc.BlockNum, nil
}

func constructCompositeKey(ns string, key string, blocknum uint64, trannum uint64) string {
//History Key is: "namespace key blocknum trannum"", with namespace being the chaincode id

Expand Down
34 changes: 34 additions & 0 deletions core/ledger/history/couchdb_histmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,40 @@ func TestConstructCompositeKey(t *testing.T) {
testutil.AssertEquals(t, compositeKey, "ns1"+strKeySep+"key1"+strKeySep+"1"+strKeySep+"1")
}

//TestSavepoint tests the recordSavepoint and GetBlockNumfromSavepoint methods for recording and reading a savepoint document
func TestSavepoint(t *testing.T) {

if ledgerconfig.IsHistoryDBEnabled() == true {

env := newTestEnvHistoryCouchDB(t, "history-test")
env.cleanup() //cleanup at the beginning to ensure the database doesn't exist already
defer env.cleanup() //and cleanup at the end

logger.Debugf("===HISTORYDB=== env.couchDBAddress: %v , env.couchDatabaseName: %v env.couchUsername: %v env.couchPassword: %v\n",
env.couchDBAddress, env.couchDatabaseName, env.couchUsername, env.couchPassword)

histMgr := NewCouchDBHistMgr(
env.couchDBAddress, //couchDB Address
env.couchDatabaseName, //couchDB db name
env.couchUsername, //enter couchDB id
env.couchPassword) //enter couchDB pw

// read the savepoint
blockNum, err := histMgr.GetBlockNumFromSavepoint()
testutil.AssertEquals(t, blockNum, 0)

// record savepoint
blockNo := uint64(5)
err = histMgr.recordSavepoint(blockNo)
testutil.AssertNoError(t, err, fmt.Sprintf("Error when saving recordpoint data"))

// read the savepoint
blockNum, err = histMgr.GetBlockNumFromSavepoint()
testutil.AssertNoError(t, err, fmt.Sprintf("Error when saving recordpoint data"))
testutil.AssertEquals(t, blockNo, blockNum)
}
}

//History Database commit and read is being tested with kv_ledger_test.go.
//This test will push some of the testing down into history itself
func TestHistoryDatabaseCommit(t *testing.T) {
Expand Down

0 comments on commit c497883

Please sign in to comment.