From 910e496c0cf7e47397e63696cf4b5e1775d82182 Mon Sep 17 00:00:00 2001 From: manish Date: Tue, 20 Sep 2016 16:22:04 +0530 Subject: [PATCH] Sync block index with block storage This commit adds the functionality of checkpointing block index progress and sync-ing (updating) the index during start of the block storage system Change-Id: Ib1a325add455bce47e510ccfc7af052db51117e6 Signed-off-by: manish --- .../blkstorage/fsblkstorage/block_stream.go | 59 +++++++--- .../fsblkstorage/block_stream_test.go | 4 +- .../blkstorage/fsblkstorage/blockfile_mgr.go | 81 ++++++++++++-- .../blkstorage/fsblkstorage/blockindex.go | 103 ++++++++++++++---- .../fsblkstorage/blockindex_test.go | 99 +++++++++++++++++ 5 files changed, 296 insertions(+), 50 deletions(-) create mode 100644 core/ledgernext/blkstorage/fsblkstorage/blockindex_test.go diff --git a/core/ledgernext/blkstorage/fsblkstorage/block_stream.go b/core/ledgernext/blkstorage/fsblkstorage/block_stream.go index b1cfb1ff7c8..69226024b2d 100644 --- a/core/ledgernext/blkstorage/fsblkstorage/block_stream.go +++ b/core/ledgernext/blkstorage/fsblkstorage/block_stream.go @@ -34,6 +34,7 @@ var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile") // blockfileStream reads blocks sequentially from a single file. // It starts from the given offset and can traverse till the end of the file type blockfileStream struct { + fileNum int file *os.File reader *bufio.Reader currentOffset int64 @@ -49,10 +50,19 @@ type blockStream struct { currentFileStream *blockfileStream } +// blockPlacementInfo captures the information related +// to block's placement in the file. +type blockPlacementInfo struct { + fileNum int + blockStartOffset int64 + blockBytesOffset int64 +} + /////////////////////////////////// // blockfileStream functions //////////////////////////////////// -func newBlockfileStream(filePath string, startOffset int64) (*blockfileStream, error) { +func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) { + filePath := deriveBlockfilePath(rootDir, fileNum) logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset) var file *os.File var err error @@ -68,41 +78,50 @@ func newBlockfileStream(filePath string, startOffset int64) (*blockfileStream, e panic(fmt.Sprintf("Could not seek file [%s] to given startOffset [%d]. New position = [%d]", filePath, startOffset, newPosition)) } - s := &blockfileStream{file, bufio.NewReader(file), startOffset} + s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset} return s, nil } func (s *blockfileStream) nextBlockBytes() ([]byte, error) { + blockBytes, _, err := s.nextBlockBytesAndPlacementInfo() + return blockBytes, err +} + +func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) { var lenBytes []byte var err error if lenBytes, err = s.reader.Peek(8); err != nil { // reader.Peek raises io.EOF error if enough bytes not available if err == io.EOF { if len(lenBytes) > 0 { - return nil, ErrUnexpectedEndOfBlockfile + return nil, nil, ErrUnexpectedEndOfBlockfile } - return nil, nil + return nil, nil, nil } - return nil, err + return nil, nil, err } len, n := proto.DecodeVarint(lenBytes) if n == 0 { panic(fmt.Errorf("Error in decoding varint bytes")) } if _, err = s.reader.Discard(n); err != nil { - return nil, err + return nil, nil, err } blockBytes := make([]byte, len) if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil { // io.ReadAtLeast raises io.ErrUnexpectedEOF error if it is able to // read a fewer (non-zero) bytes and io.EOF is encountered if err == io.ErrUnexpectedEOF { - return nil, ErrUnexpectedEndOfBlockfile + return nil, nil, ErrUnexpectedEndOfBlockfile } - return nil, err + return nil, nil, err } + blockPlacementInfo := &blockPlacementInfo{ + fileNum: s.fileNum, + blockStartOffset: s.currentOffset, + blockBytesOffset: s.currentOffset + int64(n)} s.currentOffset += int64(n) + int64(len) - return blockBytes, nil + return blockBytes, blockPlacementInfo, nil } func (s *blockfileStream) close() error { @@ -113,8 +132,7 @@ func (s *blockfileStream) close() error { // blockStream functions //////////////////////////////////// func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) { - startFile := deriveBlockfilePath(rootDir, startFileNum) - startFileStream, err := newBlockfileStream(startFile, startOffset) + startFileStream, err := newBlockfileStream(rootDir, startFileNum, startOffset) if err != nil { return nil, err } @@ -127,30 +145,35 @@ func (s *blockStream) moveToNextBlockfileStream() error { return err } s.currentFileNum++ - nextFile := deriveBlockfilePath(s.rootDir, s.currentFileNum) - if s.currentFileStream, err = newBlockfileStream(nextFile, 0); err != nil { + if s.currentFileStream, err = newBlockfileStream(s.rootDir, s.currentFileNum, 0); err != nil { return err } return nil } func (s *blockStream) nextBlockBytes() ([]byte, error) { + blockBytes, _, err := s.nextBlockBytesAndPlacementInfo() + return blockBytes, err +} + +func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) { var blockBytes []byte + var blockPlacementInfo *blockPlacementInfo var err error - if blockBytes, err = s.currentFileStream.nextBlockBytes(); err != nil { + if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil { logger.Debugf("current file [%d]", s.currentFileNum) logger.Debugf("blockbytes [%d]. Err:%s", len(blockBytes), err) - return nil, err + return nil, nil, err } logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum) if blockBytes == nil && s.currentFileNum < s.endFileNum { logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum) if err = s.moveToNextBlockfileStream(); err != nil { - return nil, err + return nil, nil, err } - return s.nextBlockBytes() + return s.nextBlockBytesAndPlacementInfo() } - return blockBytes, nil + return blockBytes, blockPlacementInfo, nil } func (s *blockStream) close() error { diff --git a/core/ledgernext/blkstorage/fsblkstorage/block_stream_test.go b/core/ledgernext/blkstorage/fsblkstorage/block_stream_test.go index 2f8626d04e5..d8ad17e7bb6 100644 --- a/core/ledgernext/blkstorage/fsblkstorage/block_stream_test.go +++ b/core/ledgernext/blkstorage/fsblkstorage/block_stream_test.go @@ -39,7 +39,7 @@ func testBlockfileStream(t *testing.T, numBlocks int) { w.addBlocks(blocks) w.close() - s, err := newBlockfileStream(deriveBlockfilePath(blockfileMgr.rootDir, 0), 0) + s, err := newBlockfileStream(blockfileMgr.rootDir, 0, 0) defer s.close() testutil.AssertNoError(t, err, "Error in constructing blockfile stream") @@ -80,7 +80,7 @@ func testBlockFileStreamUnexpectedEOF(t *testing.T, numBlocks int, partialBlockB w.addBlocks(blocks) blockfileMgr.currentFileWriter.append(partialBlockBytes, true) w.close() - s, err := newBlockfileStream(deriveBlockfilePath(blockfileMgr.rootDir, 0), 0) + s, err := newBlockfileStream(blockfileMgr.rootDir, 0, 0) defer s.close() testutil.AssertNoError(t, err, "Error in constructing blockfile stream") diff --git a/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr.go b/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr.go index c58046a1917..25a1447a55a 100644 --- a/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr.go +++ b/core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr.go @@ -44,7 +44,7 @@ type blockfileMgr struct { conf *Conf db *db.DB defaultCF *gorocksdb.ColumnFamilyHandle - index *blockIndex + index index cpInfo *checkpointInfo currentFileWriter *blockfileWriter bcInfo atomic.Value @@ -79,9 +79,11 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr { panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err)) } - mgr.index = newBlockIndex(db) + mgr.index = newBlockIndex(db, db.GetCFHandle(blockIndexCF)) mgr.cpInfo = cpInfo mgr.currentFileWriter = currentFileWriter + mgr.syncIndex() + // init BlockchainInfo bcInfo := &protos.BlockchainInfo{ Height: 0, @@ -130,7 +132,8 @@ func updateCPInfo(conf *Conf, cpInfo *checkpointInfo) { // check point info is in sync with the file on disk return } - endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(filePath, int64(cpInfo.latestFileChunksize)) + endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock( + rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize)) if err != nil { panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err)) } @@ -217,11 +220,73 @@ func (mgr *blockfileMgr) addBlock(block *protos.Block2) error { } blockFLP := &fileLocPointer{fileSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum} blockFLP.offset = currentOffset - mgr.index.indexBlock(mgr.cpInfo.lastBlockNumber, blockHash, blockFLP, blockBytesLen, len(blockBytesEncodedLen), txOffsets) + // shift the txoffset because we prepend length of bytes before block bytes + for i := 0; i < len(txOffsets); i++ { + txOffsets[i] += len(blockBytesEncodedLen) + } + mgr.index.indexBlock(&blockIdxInfo{ + blockNum: mgr.cpInfo.lastBlockNumber, blockHash: blockHash, + flp: blockFLP, txOffsets: txOffsets}) mgr.updateBlockchainInfo(blockHash, block) return nil } +func (mgr *blockfileMgr) syncIndex() error { + var lastBlockIndexed uint64 + var err error + if lastBlockIndexed, err = mgr.index.getLastBlockIndexed(); err != nil { + return err + } + startFileNum := 0 + startOffset := 0 + blockNum := uint64(1) + endFileNum := mgr.cpInfo.latestFileChunkSuffixNum + if lastBlockIndexed != 0 { + var flp *fileLocPointer + if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil { + return err + } + startFileNum = flp.fileSuffixNum + startOffset = flp.locPointer.offset + blockNum = lastBlockIndexed + } + + var stream *blockStream + if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil { + return err + } + var blockBytes []byte + var blockPlacementInfo *blockPlacementInfo + + for { + if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil { + return err + } + if blockBytes == nil { + break + } + serBlock2 := protos.NewSerBlock2(blockBytes) + var txOffsets []int + if txOffsets, err = serBlock2.GetTxOffsets(); err != nil { + return err + } + for i := 0; i < len(txOffsets); i++ { + txOffsets[i] += int(blockPlacementInfo.blockBytesOffset) + } + blockIdxInfo := &blockIdxInfo{} + blockIdxInfo.blockHash = serBlock2.ComputeHash() + blockIdxInfo.blockNum = blockNum + blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum, + locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}} + blockIdxInfo.txOffsets = txOffsets + if err = mgr.index.indexBlock(blockIdxInfo); err != nil { + return err + } + blockNum++ + } + return nil +} + func (mgr *blockfileMgr) getBlockchainInfo() *protos.BlockchainInfo { return mgr.bcInfo.Load().(*protos.BlockchainInfo) } @@ -320,8 +385,7 @@ func (mgr *blockfileMgr) fetchTransaction(lp *fileLocPointer) (*protos.Transacti } func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) { - filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum) - stream, err := newBlockfileStream(filePath, int64(lp.offset)) + stream, err := newBlockfileStream(mgr.rootDir, lp.fileSuffixNum, int64(lp.offset)) if err != nil { return nil, err } @@ -378,10 +442,9 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, flush bool) error { return nil } -func scanForLastCompleteBlock(filePath string, startingOffset int64) (int64, int, error) { - logger.Debugf("scanForLastCompleteBlock(): filePath=[%s], startingOffset=[%d]", filePath, startingOffset) +func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) (int64, int, error) { numBlocks := 0 - blockStream, err := newBlockfileStream(filePath, startingOffset) + blockStream, err := newBlockfileStream(rootDir, fileNum, startingOffset) if err != nil { return 0, 0, err } diff --git a/core/ledgernext/blkstorage/fsblkstorage/blockindex.go b/core/ledgernext/blkstorage/fsblkstorage/blockindex.go index 6f5b73eb1ae..97978ee2ddc 100644 --- a/core/ledgernext/blkstorage/fsblkstorage/blockindex.go +++ b/core/ledgernext/blkstorage/fsblkstorage/blockindex.go @@ -17,6 +17,7 @@ limitations under the License. package fsblkstorage import ( + "errors" "fmt" "github.com/golang/protobuf/proto" @@ -25,37 +26,75 @@ import ( "github.com/tecbot/gorocksdb" ) +const ( + blockNumIdxKeyPrefix = 'n' + blockHashIdxKeyPrefix = 'h' + txIDIdxKeyPrefix = 't' + indexCheckpointKeyStr = "indexCheckpointKey" +) + +var indexCheckpointKey = []byte(indexCheckpointKeyStr) + +type index interface { + getLastBlockIndexed() (uint64, error) + indexBlock(blockIdxInfo *blockIdxInfo) error + getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) + getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) + getTxLoc(txID string) (*fileLocPointer, error) +} + +type blockIdxInfo struct { + blockNum uint64 + blockHash []byte + flp *fileLocPointer + txOffsets []int +} + +// ErrNotFoundInIndex is used to indicate missing entry in the index +var ErrNotFoundInIndex = errors.New("Entry not found in index") + type blockIndex struct { db *db.DB blockIndexCF *gorocksdb.ColumnFamilyHandle } -func newBlockIndex(db *db.DB) *blockIndex { - //TODO during init make sure that the index is in sync with block strorage - return &blockIndex{db, db.GetCFHandle(blockIndexCF)} +func newBlockIndex(db *db.DB, indexCFHandle *gorocksdb.ColumnFamilyHandle) *blockIndex { + return &blockIndex{db, indexCFHandle} +} + +func (index *blockIndex) getLastBlockIndexed() (uint64, error) { + var blockNumBytes []byte + var err error + if blockNumBytes, err = index.db.Get(index.blockIndexCF, indexCheckpointKey); err != nil { + return 0, nil + } + return decodeBlockNum(blockNumBytes), nil } -func (index *blockIndex) indexBlock(blockNum uint64, blockHash []byte, flp *fileLocPointer, blockLen int, skip int, txOffsets []int) error { - logger.Debugf("Adding blockLoc [%s] to index", flp) +func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error { + logger.Debugf("Indexing block [%s]", blockIdxInfo) + flp := blockIdxInfo.flp + txOffsets := blockIdxInfo.txOffsets batch := gorocksdb.NewWriteBatch() defer batch.Destroy() flpBytes, err := flp.marshal() if err != nil { return err } - batch.PutCF(index.blockIndexCF, index.constructBlockHashKey(blockHash), flpBytes) - batch.PutCF(index.blockIndexCF, index.constructBlockNumKey(blockNum), flpBytes) + batch.PutCF(index.blockIndexCF, constructBlockHashKey(blockIdxInfo.blockHash), flpBytes) + batch.PutCF(index.blockIndexCF, constructBlockNumKey(blockIdxInfo.blockNum), flpBytes) for i := 0; i < len(txOffsets)-1; i++ { - txID := constructTxID(blockNum, i) + txID := constructTxID(blockIdxInfo.blockNum, i) txBytesLength := txOffsets[i+1] - txOffsets[i] - txFLP := newFileLocationPointer(flp.fileSuffixNum, flp.offset+skip, &locPointer{txOffsets[i], txBytesLength}) - logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFLP, txID) - txFLPBytes, marshalErr := txFLP.marshal() + txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, &locPointer{txOffsets[i], txBytesLength}) + logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFlp, txID) + txFlpBytes, marshalErr := txFlp.marshal() if marshalErr != nil { return marshalErr } - batch.PutCF(index.blockIndexCF, index.constructTxIDKey(txID), txFLPBytes) + batch.PutCF(index.blockIndexCF, constructTxIDKey(txID), txFlpBytes) } + batch.PutCF(index.blockIndexCF, indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum)) if err := index.db.WriteBatch(batch); err != nil { return err } @@ -63,52 +102,70 @@ func (index *blockIndex) indexBlock(blockNum uint64, blockHash []byte, flp *file } func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) { - b, err := index.db.Get(index.blockIndexCF, index.constructBlockHashKey(blockHash)) + b, err := index.db.Get(index.blockIndexCF, constructBlockHashKey(blockHash)) if err != nil { return nil, err } + if b == nil { + return nil, ErrNotFoundInIndex + } blkLoc := &fileLocPointer{} blkLoc.unmarshal(b) return blkLoc, nil } func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) { - b, err := index.db.Get(index.blockIndexCF, index.constructBlockNumKey(blockNum)) + b, err := index.db.Get(index.blockIndexCF, constructBlockNumKey(blockNum)) if err != nil { return nil, err } + if b == nil { + return nil, ErrNotFoundInIndex + } blkLoc := &fileLocPointer{} blkLoc.unmarshal(b) return blkLoc, nil } func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) { - b, err := index.db.Get(index.blockIndexCF, index.constructTxIDKey(txID)) + b, err := index.db.Get(index.blockIndexCF, constructTxIDKey(txID)) if err != nil { return nil, err } + if b == nil { + return nil, ErrNotFoundInIndex + } txFLP := &fileLocPointer{} txFLP.unmarshal(b) return txFLP, nil } -func (index *blockIndex) constructBlockNumKey(blockNum uint64) []byte { +func constructBlockNumKey(blockNum uint64) []byte { blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum) - return append([]byte{'n'}, blkNumBytes...) + return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...) } -func (index *blockIndex) constructBlockHashKey(blockHash []byte) []byte { - return append([]byte{'b'}, blockHash...) +func constructBlockHashKey(blockHash []byte) []byte { + return append([]byte{blockHashIdxKeyPrefix}, blockHash...) } -func (index *blockIndex) constructTxIDKey(txID string) []byte { - return append([]byte{'t'}, []byte(txID)...) +func constructTxIDKey(txID string) []byte { + return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...) } func constructTxID(blockNum uint64, txNum int) string { return fmt.Sprintf("%d:%d", blockNum, txNum) } +func encodeBlockNum(blockNum uint64) []byte { + return proto.EncodeVarint(blockNum) +} + +func decodeBlockNum(blockNumBytes []byte) uint64 { + blockNum, _ := proto.DecodeVarint(blockNumBytes) + return blockNum +} + type locPointer struct { offset int bytesLength int @@ -173,3 +230,7 @@ func (flp *fileLocPointer) unmarshal(b []byte) error { func (flp *fileLocPointer) String() string { return fmt.Sprintf("fileSuffixNum=%d, %s", flp.fileSuffixNum, flp.locPointer.String()) } + +func (blockIdxInfo *blockIdxInfo) String() string { + return fmt.Sprintf("blockNum=%d, blockHash=%#v", blockIdxInfo.blockNum, blockIdxInfo.blockHash) +} diff --git a/core/ledgernext/blkstorage/fsblkstorage/blockindex_test.go b/core/ledgernext/blkstorage/fsblkstorage/blockindex_test.go new file mode 100644 index 00000000000..bae867cfb0e --- /dev/null +++ b/core/ledgernext/blkstorage/fsblkstorage/blockindex_test.go @@ -0,0 +1,99 @@ +/* +Copyright IBM Corp. 2016 All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fsblkstorage + +import ( + "fmt" + "testing" + + "github.com/hyperledger/fabric/core/ledgernext/testutil" +) + +type noopIndex struct { +} + +func (i *noopIndex) getLastBlockIndexed() (uint64, error) { + return 0, nil +} +func (i *noopIndex) indexBlock(blockIdxInfo *blockIdxInfo) error { + return nil +} +func (i *noopIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) { + return nil, nil +} +func (i *noopIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) { + return nil, nil +} +func (i *noopIndex) getTxLoc(txID string) (*fileLocPointer, error) { + return nil, nil +} + +func TestBlockIndexSync(t *testing.T) { + testBlockIndexSync(t, 10, 5, false) + testBlockIndexSync(t, 10, 5, true) + testBlockIndexSync(t, 10, 0, true) + testBlockIndexSync(t, 10, 10, true) +} + +func testBlockIndexSync(t *testing.T, numBlocks int, numBlocksToIndex int, syncByRestart bool) { + env := newTestEnv(t) + defer env.Cleanup() + blkfileMgrWrapper := newTestBlockfileWrapper(t, env) + defer blkfileMgrWrapper.close() + blkfileMgr := blkfileMgrWrapper.blockfileMgr + origIndex := blkfileMgr.index + // construct blocks for testing + blocks := testutil.ConstructTestBlocks(t, numBlocks) + // add a few blocks + blkfileMgrWrapper.addBlocks(blocks[:numBlocksToIndex]) + + // Plug-in a noop index and add remaining blocks + blkfileMgr.index = &noopIndex{} + blkfileMgrWrapper.addBlocks(blocks[numBlocksToIndex:]) + + // Plug-in back the original index + blkfileMgr.index = origIndex + // The first set of blocks should be present in the orginal index + for i := 1; i <= numBlocksToIndex; i++ { + block, err := blkfileMgr.retrieveBlockByNumber(uint64(i)) + testutil.AssertNoError(t, err, fmt.Sprintf("block [%d] should have been present in the index", i)) + testutil.AssertEquals(t, block, blocks[i-1]) + } + + // The last set of blocks should not be present in the original index + for i := numBlocksToIndex + 1; i <= numBlocks; i++ { + _, err := blkfileMgr.retrieveBlockByNumber(uint64(i)) + testutil.AssertSame(t, err, ErrNotFoundInIndex) + } + + // perform index sync + if syncByRestart { + blkfileMgrWrapper.close() + blkfileMgrWrapper = newTestBlockfileWrapper(t, env) + defer blkfileMgrWrapper.close() + blkfileMgr = blkfileMgrWrapper.blockfileMgr + } else { + blkfileMgr.syncIndex() + } + + // Now, last set of blocks should also be present in original index + for i := numBlocksToIndex + 1; i <= numBlocks; i++ { + block, err := blkfileMgr.retrieveBlockByNumber(uint64(i)) + testutil.AssertNoError(t, err, fmt.Sprintf("block [%d] should have been present in the index", i)) + testutil.AssertEquals(t, block, blocks[i-1]) + } +}