Skip to content

Commit

Permalink
Use snapshot file creator in blockstorage
Browse files Browse the repository at this point in the history
This commit leverages the snapshot file creator
to generate snapshot files and computing hashes for TXIDs.

FAB-17870

Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and denyeart committed May 12, 2020
1 parent e12530c commit 6d7a21b
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 47 deletions.
65 changes: 47 additions & 18 deletions common/ledger/blkstorage/blockindex.go
Expand Up @@ -8,14 +8,15 @@ package blkstorage

import (
"bytes"
"encoding/binary"
"fmt"
"io"
"hash"
"path"
"unicode/utf8"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/ledger/snapshot"
"github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/internal/pkg/txflags"
Expand All @@ -28,7 +29,10 @@ const (
txIDIdxKeyPrefix = 't'
blockNumTranNumIdxKeyPrefix = 'a'
indexCheckpointKeyStr = "indexCheckpointKey"
exportFileFormatVersion = byte(1)

snapshotFileFormat = byte(1)
snapshotDataFileName = "txids.data"
snapshotMetadataFileName = "txids.metadata"
)

var indexCheckpointKey = []byte(indexCheckpointKeyStr)
Expand Down Expand Up @@ -252,42 +256,67 @@ func (index *blockIndex) getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint
return txFLP, nil
}

func (index *blockIndex) exportUniqueTxIDs(writer io.Writer) error {
func (index *blockIndex) exportUniqueTxIDs(dir string, hasher hash.Hash) (map[string][]byte, error) {
if !index.isAttributeIndexed(IndexableAttrTxID) {
return ErrAttrNotIndexed
return nil, ErrAttrNotIndexed
}

// create the data file
dataFile, err := snapshot.CreateFile(path.Join(dir, snapshotDataFileName), snapshotFileFormat, hasher)
if err != nil {
return nil, err
}
writer.Write([]byte{exportFileFormatVersion})
defer dataFile.Close()

dbItr := index.db.GetIterator([]byte{txIDIdxKeyPrefix}, []byte{txIDIdxKeyPrefix + 1})
defer dbItr.Release()
if err := dbItr.Error(); err != nil {
return errors.Wrap(err, "internal leveldb error while obtaining db iterator")
return nil, errors.Wrap(err, "internal leveldb error while obtaining db iterator")
}

var previousTxID string
reusableBuf := make([]byte, binary.MaxVarintLen64)
var numTxIDs uint64 = 0
for dbItr.Next() {
if err := dbItr.Error(); err != nil {
return errors.Wrap(err, "internal leveldb error while iterating for txids")
return nil, errors.Wrap(err, "internal leveldb error while iterating for txids")
}
txID, err := retrieveTxID(dbItr.Key())
if err != nil {
return err
return nil, err
}
// duplicate TxID may be present in the index
if previousTxID == txID {
continue
}
previousTxID = txID

n := binary.PutUvarint(reusableBuf, uint64(len(txID)))
if _, err := writer.Write(reusableBuf[:n]); err != nil {
return err
}
if _, err := writer.Write([]byte(txID)); err != nil {
return err
if err := dataFile.EncodeString(txID); err != nil {
return nil, err
}
numTxIDs++
}
return nil

dataHash, err := dataFile.Done()
if err != nil {
return nil, err
}

// create the metadata file
hasher.Reset()
metadataFile, err := snapshot.CreateFile(path.Join(dir, snapshotMetadataFileName), snapshotFileFormat, hasher)
if err != nil {
return nil, err
}
defer metadataFile.Close()

if err = metadataFile.EncodeUVarint(numTxIDs); err != nil {
return nil, err
}
metadataHash, err := metadataFile.Done()

return map[string][]byte{
snapshotDataFileName: dataHash,
snapshotMetadataFileName: metadataHash,
}, nil
}

func constructBlockNumKey(blockNum uint64) []byte {
Expand Down
108 changes: 83 additions & 25 deletions common/ledger/blkstorage/blockindex_test.go
Expand Up @@ -7,19 +7,21 @@ SPDX-License-Identifier: Apache-2.0
package blkstorage

import (
"bytes"
"encoding/binary"
"crypto/sha256"
"fmt"
"io/ioutil"
"os"
"path"
"testing"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric/common/ledger/snapshot"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/common/ledger/util"
commonledgerutil "github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/internal/pkg/txflags"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -264,13 +266,17 @@ func TestExportUniqueTxIDs(t *testing.T) {
bg, gb := testutil.NewBlockGenerator(t, "myChannel", false)
blkfileMgr.addBlock(gb)

testSnapshotDir := testPath()
defer os.RemoveAll(testSnapshotDir)

// add genesis block and test the exported bytes
configTxID, err := protoutil.GetOrComputeTxIDFromEnvelope(gb.Data.Data[0])
require.NoError(t, err)
testWriter := bytes.NewBuffer(nil)
err = blkfileMgr.index.exportUniqueTxIDs(testWriter)
fileHashes, err := blkfileMgr.index.exportUniqueTxIDs(testSnapshotDir, sha256.New())
require.NoError(t, err)
verifyExportedTxIDs(t, testWriter.Bytes(), configTxID)
verifyExportedTxIDs(t, testSnapshotDir, fileHashes, configTxID)
os.Remove(path.Join(testSnapshotDir, snapshotDataFileName))
os.Remove(path.Join(testSnapshotDir, snapshotMetadataFileName))

// add block-1 and test the exported bytes
block1 := bg.NextBlockWithTxid(
Expand All @@ -284,10 +290,11 @@ func TestExportUniqueTxIDs(t *testing.T) {
)
err = blkfileMgr.addBlock(block1)
require.NoError(t, err)
testWriter.Reset()
err = blkfileMgr.index.exportUniqueTxIDs(testWriter)
fileHashes, err = blkfileMgr.index.exportUniqueTxIDs(testSnapshotDir, sha256.New())
require.NoError(t, err)
verifyExportedTxIDs(t, testWriter.Bytes(), "txid-1", "txid-2", "txid-3", configTxID) //"txid-1" appears once, Txids appear in radix sort order
verifyExportedTxIDs(t, testSnapshotDir, fileHashes, "txid-1", "txid-2", "txid-3", configTxID) //"txid-1" appears once, Txids appear in radix sort order
os.Remove(path.Join(testSnapshotDir, snapshotDataFileName))
os.Remove(path.Join(testSnapshotDir, snapshotMetadataFileName))

// add block-2 and test the exported bytes
block2 := bg.NextBlockWithTxid(
Expand All @@ -300,10 +307,10 @@ func TestExportUniqueTxIDs(t *testing.T) {
)
blkfileMgr.addBlock(block2)
require.NoError(t, err)
testWriter.Reset()
err = blkfileMgr.index.exportUniqueTxIDs(testWriter)

fileHashes, err = blkfileMgr.index.exportUniqueTxIDs(testSnapshotDir, sha256.New())
require.NoError(t, err)
verifyExportedTxIDs(t, testWriter.Bytes(), "txid-1", "txid-2", "txid-3", "txid-4", "txid-0000000", configTxID) // "txid-1", and "txid-3 appears once and Txids appear in radix sort order
verifyExportedTxIDs(t, testSnapshotDir, fileHashes, "txid-1", "txid-2", "txid-3", "txid-4", "txid-0000000", configTxID) // "txid-1", and "txid-3 appears once and Txids appear in radix sort order
}

func TestExportUniqueTxIDsWhenTxIDsNotIndexed(t *testing.T) {
Expand All @@ -315,7 +322,9 @@ func TestExportUniqueTxIDsWhenTxIDsNotIndexed(t *testing.T) {
blocks := testutil.ConstructTestBlocks(t, 5)
blkfileMgrWrapper.addBlocks(blocks)

err := blkfileMgrWrapper.blockfileMgr.index.exportUniqueTxIDs(bytes.NewBuffer(nil))
testSnapshotDir := testPath()
defer os.RemoveAll(testSnapshotDir)
_, err := blkfileMgrWrapper.blockfileMgr.index.exportUniqueTxIDs(testSnapshotDir, sha256.New())
require.Equal(t, err, ErrAttrNotIndexed)
}

Expand All @@ -331,27 +340,76 @@ func TestExportUniqueTxIDsErrorCases(t *testing.T) {
blockfileMgr := blkfileMgrWrapper.blockfileMgr
index := blockfileMgr.index

err := index.exportUniqueTxIDs(&errorThrowingWriter{errors.New("always return error")})
require.EqualError(t, err, "always return error")
testSnapshotDir := testPath()
defer os.RemoveAll(testSnapshotDir)

// error during data file creation
dataFilePath := path.Join(testSnapshotDir, snapshotDataFileName)
_, err := os.Create(dataFilePath)
require.NoError(t, err)
_, err = blkfileMgrWrapper.blockfileMgr.index.exportUniqueTxIDs(testSnapshotDir, sha256.New())
require.Contains(t, err.Error(), "error while creating the snapshot file: "+dataFilePath)
os.RemoveAll(testSnapshotDir)

// error during metadata file creation
fmt.Printf("testSnapshotDir=%s", testSnapshotDir)
require.NoError(t, os.MkdirAll(testSnapshotDir, 0700))
metadataFilePath := path.Join(testSnapshotDir, snapshotMetadataFileName)
_, err = os.Create(metadataFilePath)
require.NoError(t, err)
_, err = blkfileMgrWrapper.blockfileMgr.index.exportUniqueTxIDs(testSnapshotDir, sha256.New())
require.Contains(t, err.Error(), "error while creating the snapshot file: "+metadataFilePath)
os.RemoveAll(testSnapshotDir)

// error while retrieving the txid key
require.NoError(t, os.MkdirAll(testSnapshotDir, 0700))
index.db.Put([]byte{txIDIdxKeyPrefix}, []byte("some junk value"), true)
err = index.exportUniqueTxIDs(bytes.NewBuffer(nil))
_, err = index.exportUniqueTxIDs(testSnapshotDir, sha256.New())
require.EqualError(t, err, "invalid txIDKey {74}: number of consumed bytes from DecodeVarint is invalid, expected 1, but got 0")
os.RemoveAll(testSnapshotDir)

// error while reading from leveldb
require.NoError(t, os.MkdirAll(testSnapshotDir, 0700))
env.provider.leveldbProvider.Close()
err = index.exportUniqueTxIDs(bytes.NewBuffer(nil))
_, err = index.exportUniqueTxIDs(testSnapshotDir, sha256.New())
require.EqualError(t, err, "internal leveldb error while obtaining db iterator: leveldb: closed")
os.RemoveAll(testSnapshotDir)
}

func verifyExportedTxIDs(t *testing.T, actual []byte, expectedTxIDs ...string) {
expectedBytes := []byte{exportFileFormatVersion}
buf := make([]byte, binary.MaxVarintLen64)
for _, txID := range expectedTxIDs {
n := binary.PutUvarint(buf, uint64(len(txID)))
expectedBytes = append(expectedBytes, buf[:n]...)
expectedBytes = append(expectedBytes, []byte(txID)...)
func verifyExportedTxIDs(t *testing.T, dir string, fileHashes map[string][]byte, expectedTxIDs ...string) {
require.Len(t, fileHashes, 2)
require.Contains(t, fileHashes, snapshotDataFileName)
require.Contains(t, fileHashes, snapshotMetadataFileName)

dataFile := path.Join(dir, snapshotDataFileName)
dataFileContent, err := ioutil.ReadFile(dataFile)
require.NoError(t, err)
dataFileHash := sha256.Sum256(dataFileContent)
require.Equal(t, dataFileHash[:], fileHashes[snapshotDataFileName])

metadataFile := path.Join(dir, snapshotMetadataFileName)
metadataFileContent, err := ioutil.ReadFile(metadataFile)
require.NoError(t, err)
metadataFileHash := sha256.Sum256(metadataFileContent)
require.Equal(t, metadataFileHash[:], fileHashes[snapshotMetadataFileName])

metadataReader, err := snapshot.OpenFile(metadataFile, snapshotFileFormat)
require.NoError(t, err)
defer metadataReader.Close()

dataReader, err := snapshot.OpenFile(dataFile, snapshotFileFormat)
require.NoError(t, err)
defer dataReader.Close()

numTxIDs, err := metadataReader.DecodeUVarInt()
require.NoError(t, err)
retrievedTxIDs := []string{}
for i := uint64(0); i < numTxIDs; i++ {
txID, err := dataReader.DecodeString()
require.NoError(t, err)
retrievedTxIDs = append(retrievedTxIDs, txID)
}
require.Equal(t, expectedBytes, actual)
require.Equal(t, expectedTxIDs, retrievedTxIDs)
}

func appendAllAndTrimLastByte(input ...[]byte) []byte {
Expand Down
10 changes: 6 additions & 4 deletions common/ledger/blkstorage/blockstore.go
Expand Up @@ -7,7 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package blkstorage

import (
"io"
"hash"
"time"

"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -89,10 +89,12 @@ func (store *BlockStore) RetrieveTxValidationCodeByTxID(txID string) (peer.TxVal
return store.fileMgr.retrieveTxValidationCodeByTxID(txID)
}

// ExportTxIds writes all the TxIDs to the writer. Technically, the TxIDs appear in the sort order of radix-sort/shortlex. However,
// ExportTxIds creates two files in the specified dir and returns a map that contains
// the mapping between the names of the files and their hashes.
// Technically, the TxIDs appear in the sort order of radix-sort/shortlex. However,
// since practically all the TxIDs are of same length, so the sort order would be the lexical sort order
func (store *BlockStore) ExportTxIds(writer io.Writer) error {
return store.fileMgr.index.exportUniqueTxIDs(writer)
func (store *BlockStore) ExportTxIds(dir string, hasher hash.Hash) (map[string][]byte, error) {
return store.fileMgr.index.exportUniqueTxIDs(dir, hasher)
}

// Shutdown shuts down the block store
Expand Down

0 comments on commit 6d7a21b

Please sign in to comment.