Skip to content

Commit

Permalink
Adding comments
Browse files Browse the repository at this point in the history
Change-Id: Id4969474c8ed4fcccc9332ea7a33de41cc85b7ec
Signed-off-by: Mari Wade <mariwade@us.ibm.com>
  • Loading branch information
mariwade committed Nov 14, 2016
1 parent 7e7e23c commit d01be60
Showing 1 changed file with 95 additions and 2 deletions.
97 changes: 95 additions & 2 deletions core/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,47 +52,109 @@ type blockfileMgr struct {
bcInfo atomic.Value
}

/*
Creates a new manager that will manage the files used for block persistence.
This manager manages the file system FS including
-- the directory where the files are stored
-- the individual files where the blocks are stored
-- the checkpoint which tracks the latest file being persisted to
-- the index which tracks what block and transaction is in what file
When a new blockfile manager is started (i.e. only on start-up), it checks
if this start-up is the first time the system is coming up or is this a restart
of the system.
The blockfile manager stores blocks of data into a file system. That file
storage is done by creating sequentially numbered files of a configured size
i.e blockfile_000000, blockfile_000001, etc..
Each transcation in a block is stored with information about the number of
bytes in that transaction
Adding txLoc [fileSuffixNum=0, offset=3, bytesLength=104] for tx [1:0] to index
Adding txLoc [fileSuffixNum=0, offset=107, bytesLength=104] for tx [1:1] to index
Each block is stored with the total encoded length of that block as well as the
tx location offsets.
Remember that these steps are only done once at start-up of the system.
At start up a new manager:
*) Checks if the directory for storing files exists, if not creates the dir
*) Checks if the key value database exists, if not creates one
(will create a db dir)
*) Determines the checkpoint information (cpinfo) used for storage
-- Loads from db if exist, if not instantiate a new cpinfo
-- If cpinfo was loaded from db, compares to FS
-- If cpinfo and file system are not in sync, syncs cpInfo from FS
*) Starts a new file writer
-- truncates file per cpinfo to remove any excess past last block
*) Determines the index information used to find tx and blocks in
the file blkstorage
-- Instantiates a new blockIdxInfo
-- Loads the index from the db if exists
-- syncIndex comparing the last block indexed to what is in the FS
-- If index and file system are not in sync, syncs index from the FS
*) Updates blockchain info used by the APIs
*/
func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfileMgr {
//Determine the root directory for the blockfile storage, if it does not exist create it
rootDir := conf.blockfilesDir
_, err := util.CreateDirIfMissing(rootDir)
if err != nil {
panic(fmt.Sprintf("Error: %s", err))
}
//Determine the kev value db instance, if it does not exist, create the directory and instantiate the database.
db := initDB(conf)
// Instantiate the manager, i.e. blockFileMgr structure
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: db}

// cp = checkpointInfo, retrieve from the database the file suffix or number of where blocks were stored.
// It also retrieves the current size of that file and the last block number that was written to that file.
// At init checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
cpInfo, err := mgr.loadCurrentInfo()
if err != nil {
panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
}
if cpInfo == nil {
if cpInfo == nil { //if no cpInfo stored in db initiate to zero
cpInfo = &checkpointInfo{latestFileChunkSuffixNum: 0, latestFileChunksize: 0}
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
}
//Verify that the checkpoint stored in db is accurate with what is actually stored in block file system
// If not the same, sync the cpInfo and the file system
syncCPInfoFromFS(conf, cpInfo)
//Open a writer to the file identified by the number and truncate it to only contain the latest block
// that was completely saved (file system, index, cpinfo, etc)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err))
}
//Truncate the file to remove excess past last block
err = currentFileWriter.truncateFile(cpInfo.latestFileChunksize)
if err != nil {
panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
}

// Create a new KeyValue store database handler for the blocks index in the keyvalue database
mgr.index = newBlockIndex(indexConfig, db)

// Update the manager with the checkpoint info and the file writer
mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter
// Create a checkpoint condition (event) variable, for the goroutine waiting for
// or announcing the occurrence of an event.
mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})

// Verify that the index stored in db is accurate with what is actually stored in block file system
// If not the same, sync the index and the file system
mgr.syncIndex()

// init BlockchainInfo
// init BlockchainInfo for external API's
bcInfo := &pb.BlockchainInfo{
Height: 0,
CurrentBlockHash: nil,
PreviousBlockHash: nil}

//If start up is a restart of an existing storage, update BlockchainInfo for external API's
if cpInfo.lastBlockNumber > 0 {
lastBlock, err := mgr.retrieveSerBlockByNumber(cpInfo.lastBlockNumber)
if err != nil {
Expand All @@ -109,6 +171,7 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile
PreviousBlockHash: previousBlockHash}
}
mgr.bcInfo.Store(bcInfo)
//return the new manager (blockfileMgr)
return mgr
}

Expand All @@ -119,24 +182,34 @@ func initDB(conf *Conf) *db.DB {
return dbInst
}

//cp = checkpointInfo, from the database gets the file suffix and the size of
// the file of where the last block was written. Also retrieves contains the
// last block number that was written. At init
//checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
func syncCPInfoFromFS(conf *Conf, cpInfo *checkpointInfo) {
logger.Debugf("Starting checkpoint=%s", cpInfo)
//Checks if the file suffix of where the last block was written exists
rootDir := conf.blockfilesDir
filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
exists, size, err := util.FileExists(filePath)
if err != nil {
panic(fmt.Sprintf("Error in checking whether file [%s] exists: %s", filePath, err))
}
logger.Debugf("status of file [%s]: exists=[%t], size=[%d]", filePath, exists, size)
//Test is !exists because when file number is first used the file does not exist yet
//checks that the file exists and that the size of the file is what is stored in cpinfo
//status of file [/tmp/tests/ledger/blkstorage/fsblkstorage/blocks/blockfile_000000]: exists=[false], size=[0]
if !exists || int(size) == cpInfo.latestFileChunksize {
// check point info is in sync with the file on disk
return
}
//Scan the file system to verify that the checkpoint info stored in db is correct
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
if err != nil {
panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
}
//Updates the checkpoint info for the actual last block number stored and it's end location
cpInfo.lastBlockNumber += uint64(numBlocks)
cpInfo.latestFileChunksize = int(endOffsetLastBlock)
logger.Debugf("Checkpoint after updates by scanning the last file segment:%s", cpInfo)
Expand Down Expand Up @@ -183,6 +256,7 @@ func (mgr *blockfileMgr) addBlock(block *pb.Block2) error {
}
blockBytes := serBlock.GetBytes()
blockHash := serBlock.ComputeHash()
//Get the location / offset where each transaction starts in the block and where the block ends
txOffsets, err := serBlock.GetTxOffsets()
currentOffset := mgr.cpInfo.latestFileChunksize
if err != nil {
Expand All @@ -192,12 +266,16 @@ func (mgr *blockfileMgr) addBlock(block *pb.Block2) error {
blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)

//Determine if we need to start a new file since the size of this block
//exceeds the amount of space left in the current file
if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
mgr.moveToNextFile()
currentOffset = 0
}
//append blockBytesEncodedLen to the file
err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
if err == nil {
//append the actual block bytes to the file
err = mgr.currentFileWriter.append(blockBytes, true)
}
if err != nil {
Expand All @@ -208,11 +286,13 @@ func (mgr *blockfileMgr) addBlock(block *pb.Block2) error {
return fmt.Errorf("Error while appending block to file: %s", err)
}

//Update the checkpoint info with the results of adding the new block
currentCPInfo := mgr.cpInfo
newCPInfo := &checkpointInfo{
latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
latestFileChunksize: currentCPInfo.latestFileChunksize + totalBytesToAppend,
lastBlockNumber: currentCPInfo.lastBlockNumber + 1}
//save the checkpoint information in the database
if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
if truncateErr != nil {
Expand All @@ -221,16 +301,19 @@ func (mgr *blockfileMgr) addBlock(block *pb.Block2) error {
return fmt.Errorf("Error while saving current file info to db: %s", err)
}

//Index block file location pointer updated with file suffex and offset for the new block
blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for i := 0; i < len(txOffsets); i++ {
txOffsets[i] += len(blockBytesEncodedLen)
}
//save the index in the database
mgr.index.indexBlock(&blockIdxInfo{
blockNum: newCPInfo.lastBlockNumber, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets})

//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
return nil
Expand All @@ -239,13 +322,17 @@ func (mgr *blockfileMgr) addBlock(block *pb.Block2) error {
func (mgr *blockfileMgr) syncIndex() error {
var lastBlockIndexed uint64
var err error
//from the database, get the last block that was indexed
if lastBlockIndexed, err = mgr.index.getLastBlockIndexed(); err != nil {
return err
}
//initialize index to file number:zero, offset:zero and block:1
startFileNum := 0
startOffset := 0
blockNum := uint64(1)
//get the last file that blocks were added to using the checkpoint info
endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
//if the index stored in the db has value, update the index information with those values
if lastBlockIndexed != 0 {
var flp *fileLocPointer
if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
Expand All @@ -256,13 +343,16 @@ func (mgr *blockfileMgr) syncIndex() error {
blockNum = lastBlockIndexed
}

//open a blockstream to the file location that was stored in the index
var stream *blockStream
if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
return err
}
var blockBytes []byte
var blockPlacementInfo *blockPlacementInfo

//Should be at the last block, but go ahead and loop looking for next blockBytes
//If there is another block, add it to the index
for {
if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err
Expand All @@ -278,6 +368,7 @@ func (mgr *blockfileMgr) syncIndex() error {
for i := 0; i < len(txOffsets); i++ {
txOffsets[i] += int(blockPlacementInfo.blockBytesOffset)
}
//Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo := &blockIdxInfo{}
blockIdxInfo.blockHash = serBlock2.ComputeHash()
blockIdxInfo.blockNum = blockNum
Expand Down Expand Up @@ -414,6 +505,7 @@ func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
return b, nil
}

//Get the current checkpoint information that is stored in the database
func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
var b []byte
var err error
Expand Down Expand Up @@ -442,6 +534,7 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {
// scanForLastCompleteBlock scan a given block file and detects the last offset in the file
// after which there may lie a block partially written (towards the end of the file in a crash scenario).
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) (int64, int, error) {
//scan the passed file number suffix starting from the passed offset to find the last completed block
numBlocks := 0
blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
if errOpen != nil {
Expand Down

0 comments on commit d01be60

Please sign in to comment.