Skip to content

Commit

Permalink
Disable WAL for block storage DB
Browse files Browse the repository at this point in the history
(Rocks) DB WAL adds overheads while using the DB for saving checkpoints
for block storage. Avoiding writing to WAL translates the write workload
(appending blocks to the blockfile) into a sequential disk writes.

This commit changes the way checkpoints are saved - checkpoints are
written to DB as before, however since WAL is disabled, the checkpoint
stays in-memory only. The in-memory checkpoint is flushed explicitly to
disk (via DB flush) at the time of new block file creation. The in-memory
checkpoint would implicitly also be flushed to memory at the time of DB
shutdown. However, if a crash takes place, the checkpoint available in the
DB would be behind the actual block file status. In order to handle this
crash scenario, this commit also adds code to perform a scan of the block
file beyond the last saved checkpoint and update the checkpoint.

Change-Id: Ie88646b225abaa50b595833f5e7ed8d4facae920
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Sep 20, 2016
1 parent 9ec4873 commit 0df6a8d
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 89 deletions.
147 changes: 106 additions & 41 deletions core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
}
if cpInfo == nil {
cpInfo = &checkpointInfo{latestFileChunkSuffixNum: 0, latestFileChunksize: 0}
err = mgr.saveCurrentInfo(cpInfo)
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
}
updateCPInfo(conf, cpInfo)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err))
Expand All @@ -81,9 +82,12 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
mgr.index = newBlockIndex(db)
mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter

// init BlockchainInfo
bcInfo := &protos.BlockchainInfo{Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil}
bcInfo := &protos.BlockchainInfo{
Height: 0,
CurrentBlockHash: nil,
PreviousBlockHash: nil}

if cpInfo.lastBlockNumber > 0 {
lastBlock, err := mgr.retrieveSerBlockByNumber(cpInfo.lastBlockNumber)
if err != nil {
Expand All @@ -104,11 +108,37 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
}

func initDB(conf *Conf) *db.DB {
dbInst := db.CreateDB(&db.Conf{DBPath: conf.dbPath, CFNames: []string{blockIndexCF}})
dbInst := db.CreateDB(&db.Conf{
DBPath: conf.dbPath,
CFNames: []string{blockIndexCF},
DisableWAL: true})

dbInst.Open()
return dbInst
}

func updateCPInfo(conf *Conf, cpInfo *checkpointInfo) {
logger.Debugf("Starting checkpoint=%s", cpInfo)
rootDir := conf.blockfilesDir
filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
exists, size, err := util.FileExists(filePath)
if err != nil {
panic(fmt.Sprintf("Error in checking whether file [%s] exists: %s", filePath, err))
}
logger.Debugf("status of file [%s]: exists=[%t], size=[%d]", filePath, exists, size)
if !exists || int(size) == cpInfo.latestFileChunksize {
// check point info is in sync with the file on disk
return
}
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(filePath, int64(cpInfo.latestFileChunksize))
if err != nil {
panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
}
cpInfo.lastBlockNumber += uint64(numBlocks)
cpInfo.latestFileChunksize = int(endOffsetLastBlock)
logger.Debugf("Checkpoint after updates by scanning the last file segment:%s", cpInfo)
}

func deriveBlockfilePath(rootDir string, suffixNum int) string {
return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum)
}
Expand All @@ -123,13 +153,18 @@ func (mgr *blockfileMgr) close() {
}

func (mgr *blockfileMgr) moveToNextFile() {
nextFileInfo := &checkpointInfo{latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1, latestFileChunksize: 0}
nextFileWriter, err := newBlockfileWriter(deriveBlockfilePath(mgr.rootDir, nextFileInfo.latestFileChunkSuffixNum))
nextFileInfo := &checkpointInfo{
latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1,
latestFileChunksize: 0}

nextFileWriter, err := newBlockfileWriter(
deriveBlockfilePath(mgr.rootDir, nextFileInfo.latestFileChunkSuffixNum))

if err != nil {
panic(fmt.Sprintf("Could not open writer to next file: %s", err))
}
mgr.currentFileWriter.close()
err = mgr.saveCurrentInfo(nextFileInfo)
err = mgr.saveCurrentInfo(nextFileInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
Expand All @@ -145,49 +180,44 @@ func (mgr *blockfileMgr) addBlock(block *protos.Block2) error {
blockBytes := serBlock.GetBytes()
blockHash := serBlock.ComputeHash()
txOffsets, err := serBlock.GetTxOffsets()
currentOffset := mgr.cpInfo.latestFileChunksize
if err != nil {
return fmt.Errorf("Error while serializing block: %s", err)
}
currentOffset := mgr.cpInfo.latestFileChunksize
length := len(blockBytes)
encodedLen := proto.EncodeVarint(uint64(length))
totalLen := length + len(encodedLen)
if currentOffset+totalLen > mgr.conf.maxBlockfileSize {
blockBytesLen := len(blockBytes)
blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)

if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
mgr.moveToNextFile()
currentOffset = 0
}
err = mgr.currentFileWriter.append(encodedLen)
if err != nil {
err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if err1 != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err))
}
return fmt.Errorf("Error while appending block to file: %s", err)
err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
if err == nil {
err = mgr.currentFileWriter.append(blockBytes, true)
}

err = mgr.currentFileWriter.append(blockBytes)
if err != nil {
err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if err1 != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err))
truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
}
return fmt.Errorf("Error while appending block to file: %s", err)
}

mgr.cpInfo.latestFileChunksize += totalLen
mgr.cpInfo.latestFileChunksize += totalBytesToAppend
mgr.cpInfo.lastBlockNumber++
err = mgr.saveCurrentInfo(mgr.cpInfo)
err = mgr.saveCurrentInfo(mgr.cpInfo, false)
if err != nil {
mgr.cpInfo.latestFileChunksize -= totalLen
err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if err1 != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err))
mgr.cpInfo.latestFileChunksize -= totalBytesToAppend
truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
}
return fmt.Errorf("Error while saving current file info to db: %s", err)
}
blockFLP := &fileLocPointer{fileSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
mgr.index.indexBlock(mgr.cpInfo.lastBlockNumber, blockHash, blockFLP, length, len(encodedLen), txOffsets)
mgr.index.indexBlock(mgr.cpInfo.lastBlockNumber, blockHash, blockFLP, blockBytesLen, len(blockBytesEncodedLen), txOffsets)
mgr.updateBlockchainInfo(blockHash, block)
return nil
}
Expand All @@ -198,7 +228,11 @@ func (mgr *blockfileMgr) getBlockchainInfo() *protos.BlockchainInfo {

func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *protos.Block2) {
currentBCInfo := mgr.getBlockchainInfo()
newBCInfo := &protos.BlockchainInfo{Height: currentBCInfo.Height + 1, CurrentBlockHash: latestBlockHash, PreviousBlockHash: latestBlock.PreviousBlockHash}
newBCInfo := &protos.BlockchainInfo{
Height: currentBCInfo.Height + 1,
CurrentBlockHash: latestBlockHash,
PreviousBlockHash: latestBlock.PreviousBlockHash}

mgr.bcInfo.Store(newBCInfo)
}

Expand Down Expand Up @@ -315,33 +349,59 @@ func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
}

func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
b, err := mgr.db.Get(mgr.defaultCF, blkMgrInfoKey)
if err != nil {
return nil, err
}
if b == nil {
var b []byte
var err error
if b, err = mgr.db.Get(mgr.defaultCF, blkMgrInfoKey); b == nil || err != nil {
return nil, err
}
i := &checkpointInfo{}
if err = i.unmarshal(b); err != nil {
return nil, err
}
logger.Debugf("loaded checkpointInfo:%s", i)
return i, nil
}

func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo) error {
func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, flush bool) error {
b, err := i.marshal()
if err != nil {
return err
}
err = mgr.db.Put(mgr.defaultCF, blkMgrInfoKey, b)
if err != nil {
if err = mgr.db.Put(mgr.defaultCF, blkMgrInfoKey, b); err != nil {
return err
}
if flush {
if err = mgr.db.Flush(true); err != nil {
return err
}
logger.Debugf("saved checkpointInfo:%s", i)
}
return nil
}

// blkMgrInfo
func scanForLastCompleteBlock(filePath string, startingOffset int64) (int64, int, error) {
logger.Debugf("scanForLastCompleteBlock(): filePath=[%s], startingOffset=[%d]", filePath, startingOffset)
numBlocks := 0
blockStream, err := newBlockStream(filePath, startingOffset)
if err != nil {
return 0, 0, err
}
defer blockStream.close()
for {
blockBytes, err := blockStream.nextBlockBytes()
if blockBytes == nil || err != nil {
logger.Debugf(`scanForLastCompleteBlock(): error=[%s].
This may happen if a crash has happened during block appending.
Returning current offset as a last complete block's end offset`, err)
break
}
numBlocks++
}
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentFileOffset)
return blockStream.currentFileOffset, numBlocks, err
}

// checkpointInfo
type checkpointInfo struct {
latestFileChunkSuffixNum int
latestFileChunksize int
Expand Down Expand Up @@ -385,3 +445,8 @@ func (i *checkpointInfo) unmarshal(b []byte) error {

return nil
}

func (i *checkpointInfo) String() string {
return fmt.Sprintf("latestFileChunkSuffixNum=[%d], latestFileChunksize=[%d], lastBlockNumber=[%d]",
i.latestFileChunkSuffixNum, i.latestFileChunksize, i.lastBlockNumber)
}
66 changes: 63 additions & 3 deletions core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,83 @@ func TestBlockfileMgrBlockReadWrite(t *testing.T) {
blkfileMgrWrapper.testGetBlockByNumber(blocks, 1)
}

func TestBlockfileMgrCrashDuringWriting(t *testing.T) {
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 10)
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 1)
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 0)
testBlockfileMgrCrashDuringWriting(t, 0, 0, 1000, 10)
}

func testBlockfileMgrCrashDuringWriting(t *testing.T, numBlocksBeforeCheckpoint int,
numBlocksAfterCheckpoint int, numLastBlockBytes int, numPartialBytesToWrite int) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blocksBeforeCP := testutil.ConstructTestBlocks(t, numBlocksBeforeCheckpoint)
blkfileMgrWrapper.addBlocks(blocksBeforeCP)
currentCPInfo := blkfileMgrWrapper.blockfileMgr.cpInfo
cpInfo1 := &checkpointInfo{
currentCPInfo.latestFileChunkSuffixNum,
currentCPInfo.latestFileChunksize,
currentCPInfo.lastBlockNumber}

blocksAfterCP := testutil.ConstructTestBlocks(t, numBlocksAfterCheckpoint)
blkfileMgrWrapper.addBlocks(blocksAfterCP)
cpInfo2 := blkfileMgrWrapper.blockfileMgr.cpInfo

// simulate a crash scenario
lastBlockBytes := []byte{}
encodedLen := proto.EncodeVarint(uint64(numLastBlockBytes))
randomBytes := testutil.ConstructRandomBytes(t, numLastBlockBytes)
lastBlockBytes = append(lastBlockBytes, encodedLen...)
lastBlockBytes = append(lastBlockBytes, randomBytes...)
partialBytes := lastBlockBytes[:numPartialBytesToWrite]
blkfileMgrWrapper.blockfileMgr.currentFileWriter.append(partialBytes, true)
blkfileMgrWrapper.blockfileMgr.saveCurrentInfo(cpInfo1, true)
blkfileMgrWrapper.close()

// simulate a start after a crash
blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
cpInfo3 := blkfileMgrWrapper.blockfileMgr.cpInfo
testutil.AssertEquals(t, cpInfo3, cpInfo2)

// add fresh blocks after restart
numBlocksAfterRestart := 2
blocksAfterRestart := testutil.ConstructTestBlocks(t, numBlocksAfterRestart)
blkfileMgrWrapper.addBlocks(blocksAfterRestart)

// itrerate for all blocks
allBlocks := []*protos.Block2{}
allBlocks = append(allBlocks, blocksBeforeCP...)
allBlocks = append(allBlocks, blocksAfterCP...)
allBlocks = append(allBlocks, blocksAfterRestart...)
numTotalBlocks := len(allBlocks)
testBlockfileMgrBlockIterator(t, blkfileMgrWrapper.blockfileMgr, 1, numTotalBlocks, allBlocks)
}

func TestBlockfileMgrBlockIterator(t *testing.T) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
itr, err := blkfileMgrWrapper.blockfileMgr.retrieveBlocks(1, 8)
testBlockfileMgrBlockIterator(t, blkfileMgrWrapper.blockfileMgr, 1, 8, blocks[0:8])
}

func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
firstBlockNum int, lastBlockNum int, expectedBlocks []*protos.Block2) {
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum), uint64(lastBlockNum))
defer itr.Close()
testutil.AssertNoError(t, err, "Error while getting blocks iterator")
numBlocksItrated := 0
for ; itr.Next(); numBlocksItrated++ {
block, err := itr.Get()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while getting block number [%d] from iterator", numBlocksItrated))
testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), blocks[numBlocksItrated])
testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), expectedBlocks[numBlocksItrated])
}
testutil.AssertEquals(t, numBlocksItrated, 8)
testutil.AssertEquals(t, numBlocksItrated, lastBlockNum-firstBlockNum+1)
}

func TestBlockfileMgrBlockchainInfo(t *testing.T) {
Expand Down
14 changes: 9 additions & 5 deletions core/ledgernext/blkstorage/fsblkstorage/blockfile_rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ func (w *blockfileWriter) truncateFile(targetSize int) error {
return nil
}

func (w *blockfileWriter) append(b []byte) error {
func (w *blockfileWriter) append(b []byte, sync bool) error {
_, err := w.file.Write(b)
if err != nil {
return err
}
w.file.Sync()
if sync {
return w.file.Sync()
}
return nil
}

Expand Down Expand Up @@ -97,8 +99,9 @@ func (r *blockfileReader) close() error {
}

type blockStream struct {
file *os.File
reader *bufio.Reader
file *os.File
reader *bufio.Reader
currentFileOffset int64
}

func newBlockStream(filePath string, offset int64) (*blockStream, error) {
Expand All @@ -115,7 +118,7 @@ func newBlockStream(filePath string, offset int64) (*blockStream, error) {
if newPosition != offset {
panic(fmt.Sprintf("Could not seek file [%s] to given offset [%d]. New position = [%d]", filePath, offset, newPosition))
}
s := &blockStream{file, bufio.NewReader(file)}
s := &blockStream{file, bufio.NewReader(file), offset}
return s, nil
}

Expand All @@ -133,6 +136,7 @@ func (s *blockStream) nextBlockBytes() ([]byte, error) {
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil {
return nil, err
}
s.currentFileOffset += int64(n) + int64(len)
return blockBytes, nil
}

Expand Down

0 comments on commit 0df6a8d

Please sign in to comment.