Skip to content

Commit

Permalink
Block stream across files
Browse files Browse the repository at this point in the history
This commit allows a block stream consumer read across multiple files.

Change-Id: I1bd053e8d022bf1daf01f61f9e3e1ae1629e334b
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Sep 26, 2016
1 parent 9c2ecfc commit 130ad7c
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 60 deletions.
158 changes: 158 additions & 0 deletions core/ledgernext/blkstorage/fsblkstorage/block_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fsblkstorage

import (
"bufio"
"errors"
"fmt"
"io"
"os"

"github.com/golang/protobuf/proto"
)

// ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment
// this can happen mainly if a crash occurs during appening a block and partial block contents
// get written towards the end of the file
var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")

// blockfileStream reads blocks sequentially from a single file.
// It starts from the given offset and can traverse till the end of the file
type blockfileStream struct {
file *os.File
reader *bufio.Reader
currentOffset int64
}

// blockStream reads blocks sequentially from multiple files.
// it starts from a given file offset and continues with the next
// file segment until the end of the last segment (`endFileNum`)
type blockStream struct {
rootDir string
currentFileNum int
endFileNum int
currentFileStream *blockfileStream
}

///////////////////////////////////
// blockfileStream functions
////////////////////////////////////
func newBlockfileStream(filePath string, startOffset int64) (*blockfileStream, error) {
logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset)
var file *os.File
var err error
if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil {
return nil, err
}
var newPosition int64
if newPosition, err = file.Seek(startOffset, 0); err != nil {
// file.Seek does not raise an error - simply seeks to the new position
return nil, err
}
if newPosition != startOffset {
panic(fmt.Sprintf("Could not seek file [%s] to given startOffset [%d]. New position = [%d]",
filePath, startOffset, newPosition))
}
s := &blockfileStream{file, bufio.NewReader(file), startOffset}
return s, nil
}

func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
var lenBytes []byte
var err error
if lenBytes, err = s.reader.Peek(8); err != nil {
// reader.Peek raises io.EOF error if enough bytes not available
if err == io.EOF {
if len(lenBytes) > 0 {
return nil, ErrUnexpectedEndOfBlockfile
}
return nil, nil
}
return nil, err
}
len, n := proto.DecodeVarint(lenBytes)
if n == 0 {
panic(fmt.Errorf("Error in decoding varint bytes"))
}
if _, err = s.reader.Discard(n); err != nil {
return nil, err
}
blockBytes := make([]byte, len)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil {
// io.ReadAtLeast raises io.ErrUnexpectedEOF error if it is able to
// read a fewer (non-zero) bytes and io.EOF is encountered
if err == io.ErrUnexpectedEOF {
return nil, ErrUnexpectedEndOfBlockfile
}
return nil, err
}
s.currentOffset += int64(n) + int64(len)
return blockBytes, nil
}

func (s *blockfileStream) close() error {
return s.file.Close()
}

///////////////////////////////////
// blockStream functions
////////////////////////////////////
func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) {
startFile := deriveBlockfilePath(rootDir, startFileNum)
startFileStream, err := newBlockfileStream(startFile, startOffset)
if err != nil {
return nil, err
}
return &blockStream{rootDir, startFileNum, endFileNum, startFileStream}, nil
}

func (s *blockStream) moveToNextBlockfileStream() error {
var err error
if err = s.currentFileStream.close(); err != nil {
return err
}
s.currentFileNum++
nextFile := deriveBlockfilePath(s.rootDir, s.currentFileNum)
if s.currentFileStream, err = newBlockfileStream(nextFile, 0); err != nil {
return err
}
return nil
}

func (s *blockStream) nextBlockBytes() ([]byte, error) {
var blockBytes []byte
var err error
if blockBytes, err = s.currentFileStream.nextBlockBytes(); err != nil {
logger.Debugf("current file [%d]", s.currentFileNum)
logger.Debugf("blockbytes [%d]. Err:%s", len(blockBytes), err)
return nil, err
}
logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
if blockBytes == nil && s.currentFileNum < s.endFileNum {
logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
if err = s.moveToNextBlockfileStream(); err != nil {
return nil, err
}
return s.nextBlockBytes()
}
return blockBytes, nil
}

func (s *blockStream) close() error {
return s.currentFileStream.close()
}
133 changes: 133 additions & 0 deletions core/ledgernext/blkstorage/fsblkstorage/block_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fsblkstorage

import (
"testing"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledgernext/testutil"
)

func TestBlockfileStream(t *testing.T) {
testBlockfileStream(t, 0)
testBlockfileStream(t, 1)
testBlockfileStream(t, 10)
}

func testBlockfileStream(t *testing.T, numBlocks int) {
env := newTestEnv(t)
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
blockfileMgr := w.blockfileMgr

blocks := testutil.ConstructTestBlocks(t, numBlocks)
w.addBlocks(blocks)
w.close()

s, err := newBlockfileStream(deriveBlockfilePath(blockfileMgr.rootDir, 0), 0)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing blockfile stream")

blockCount := 0
for {
blockBytes, err := s.nextBlockBytes()
testutil.AssertNoError(t, err, "Error in getting next block")
if blockBytes == nil {
break
}
blockCount++
}
// After the stream has been exhausted, both blockBytes and err should be nil
blockBytes, err := s.nextBlockBytes()
testutil.AssertNil(t, blockBytes)
testutil.AssertNoError(t, err, "Error in getting next block after exhausting the file")
testutil.AssertEquals(t, blockCount, numBlocks)
}

func TestBlockFileStreamUnexpectedEOF(t *testing.T) {
partialBlockBytes := []byte{}
dummyBlockBytes := testutil.ConstructRandomBytes(t, 100)
lenBytes := proto.EncodeVarint(uint64(len(dummyBlockBytes)))
partialBlockBytes = append(partialBlockBytes, lenBytes...)
partialBlockBytes = append(partialBlockBytes, dummyBlockBytes...)
testBlockFileStreamUnexpectedEOF(t, 10, partialBlockBytes[:1])
testBlockFileStreamUnexpectedEOF(t, 10, partialBlockBytes[:2])
testBlockFileStreamUnexpectedEOF(t, 10, partialBlockBytes[:5])
testBlockFileStreamUnexpectedEOF(t, 10, partialBlockBytes[:20])
}

func testBlockFileStreamUnexpectedEOF(t *testing.T, numBlocks int, partialBlockBytes []byte) {
env := newTestEnv(t)
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
blockfileMgr := w.blockfileMgr
blocks := testutil.ConstructTestBlocks(t, numBlocks)
w.addBlocks(blocks)
blockfileMgr.currentFileWriter.append(partialBlockBytes, true)
w.close()
s, err := newBlockfileStream(deriveBlockfilePath(blockfileMgr.rootDir, 0), 0)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing blockfile stream")

for i := 0; i < numBlocks; i++ {
blockBytes, err := s.nextBlockBytes()
testutil.AssertNotNil(t, blockBytes)
testutil.AssertNoError(t, err, "Error in getting next block")
}
blockBytes, err := s.nextBlockBytes()
testutil.AssertNil(t, blockBytes)
testutil.AssertSame(t, err, ErrUnexpectedEndOfBlockfile)
}

func TestBlockStream(t *testing.T) {
testBlockStream(t, 1)
testBlockStream(t, 2)
testBlockStream(t, 10)
}

func testBlockStream(t *testing.T, numFiles int) {
env := newTestEnv(t)
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
defer w.close()
blockfileMgr := w.blockfileMgr

numBlocksInEachFile := 10
for i := 0; i < numFiles; i++ {
blocks := testutil.ConstructTestBlocks(t, numBlocksInEachFile)
w.addBlocks(blocks)
blockfileMgr.moveToNextFile()
}
s, err := newBlockStream(blockfileMgr.rootDir, 0, 0, numFiles-1)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing new block stream")
blockCount := 0
for {
blockBytes, err := s.nextBlockBytes()
testutil.AssertNoError(t, err, "Error in getting next block")
if blockBytes == nil {
break
}
blockCount++
}
// After the stream has been exhausted, both blockBytes and err should be nil
blockBytes, err := s.nextBlockBytes()
testutil.AssertNil(t, blockBytes)
testutil.AssertNoError(t, err, "Error in getting next block after exhausting the file")
testutil.AssertEquals(t, blockCount, numFiles*numBlocksInEachFile)
}
17 changes: 8 additions & 9 deletions core/ledgernext/blkstorage/fsblkstorage/blockfile_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,9 @@ func (mgr *blockfileMgr) retrieveBlocks(startNum uint64, endNum uint64) (*Blocks
if lp, err = mgr.index.getBlockLocByBlockNum(startNum); err != nil {
return nil, err
}
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)

var stream *blockStream
if stream, err = newBlockStream(filePath, int64(lp.offset)); err != nil {
if stream, err = newBlockStream(mgr.rootDir, lp.fileSuffixNum,
int64(lp.offset), mgr.cpInfo.latestFileChunkSuffixNum); err != nil {
return nil, err
}
return newBlockItr(stream, int(endNum-startNum)+1), nil
Expand Down Expand Up @@ -322,7 +321,7 @@ func (mgr *blockfileMgr) fetchTransaction(lp *fileLocPointer) (*protos.Transacti

func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
stream, err := newBlockStream(filePath, int64(lp.offset))
stream, err := newBlockfileStream(filePath, int64(lp.offset))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -382,23 +381,23 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, flush bool) error {
func scanForLastCompleteBlock(filePath string, startingOffset int64) (int64, int, error) {
logger.Debugf("scanForLastCompleteBlock(): filePath=[%s], startingOffset=[%d]", filePath, startingOffset)
numBlocks := 0
blockStream, err := newBlockStream(filePath, startingOffset)
blockStream, err := newBlockfileStream(filePath, startingOffset)
if err != nil {
return 0, 0, err
}
defer blockStream.close()
for {
blockBytes, err := blockStream.nextBlockBytes()
if blockBytes == nil || err != nil {
if blockBytes == nil || err == ErrUnexpectedEndOfBlockfile {
logger.Debugf(`scanForLastCompleteBlock(): error=[%s].
This may happen if a crash has happened during block appending.
The error may happen if a crash has happened during block appending.
Returning current offset as a last complete block's end offset`, err)
break
}
numBlocks++
}
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentFileOffset)
return blockStream.currentFileOffset, numBlocks, err
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
return blockStream.currentOffset, numBlocks, err
}

// checkpointInfo
Expand Down

0 comments on commit 130ad7c

Please sign in to comment.