Skip to content

Commit

Permalink
fsync and do not generate empty files in snapshots (#1345)
Browse files Browse the repository at this point in the history
This commit fixes following issues
- Perform sync on the snapshot files
- Does not generate empty files
- Use filepath instead of path package

Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed May 28, 2020
1 parent 41f8b0a commit 251e989
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 95 deletions.
24 changes: 15 additions & 9 deletions common/ledger/blkstorage/blockindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ package blkstorage
import (
"bytes"
"fmt"
"path"
"path/filepath"
"unicode/utf8"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -260,13 +260,6 @@ func (index *blockIndex) exportUniqueTxIDs(dir string, newHashFunc snapshot.NewH
return nil, ErrAttrNotIndexed
}

// create the data file
dataFile, err := snapshot.CreateFile(path.Join(dir, snapshotDataFileName), snapshotFileFormat, newHashFunc)
if err != nil {
return nil, err
}
defer dataFile.Close()

dbItr := index.db.GetIterator([]byte{txIDIdxKeyPrefix}, []byte{txIDIdxKeyPrefix + 1})
defer dbItr.Release()
if err := dbItr.Error(); err != nil {
Expand All @@ -275,6 +268,8 @@ func (index *blockIndex) exportUniqueTxIDs(dir string, newHashFunc snapshot.NewH

var previousTxID string
var numTxIDs uint64 = 0
var dataFile *snapshot.FileWriter
var err error
for dbItr.Next() {
if err := dbItr.Error(); err != nil {
return nil, errors.Wrap(err, "internal leveldb error while iterating for txids")
Expand All @@ -288,19 +283,30 @@ func (index *blockIndex) exportUniqueTxIDs(dir string, newHashFunc snapshot.NewH
continue
}
previousTxID = txID
if numTxIDs == 0 { // first iteration, create the data file
dataFile, err = snapshot.CreateFile(filepath.Join(dir, snapshotDataFileName), snapshotFileFormat, newHashFunc)
if err != nil {
return nil, err
}
defer dataFile.Close()
}
if err := dataFile.EncodeString(txID); err != nil {
return nil, err
}
numTxIDs++
}

if dataFile == nil {
return nil, nil
}

dataHash, err := dataFile.Done()
if err != nil {
return nil, err
}

// create the metadata file
metadataFile, err := snapshot.CreateFile(path.Join(dir, snapshotMetadataFileName), snapshotFileFormat, newHashFunc)
metadataFile, err := snapshot.CreateFile(filepath.Join(dir, snapshotMetadataFileName), snapshotFileFormat, newHashFunc)
if err != nil {
return nil, err
}
Expand Down
33 changes: 20 additions & 13 deletions common/ledger/blkstorage/blockindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"hash"
"io/ioutil"
"os"
"path"
"path/filepath"
"testing"

"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -270,20 +270,27 @@ func TestExportUniqueTxIDs(t *testing.T) {
defer blkfileMgrWrapper.close()
blkfileMgr := blkfileMgrWrapper.blockfileMgr

bg, gb := testutil.NewBlockGenerator(t, "myChannel", false)
blkfileMgr.addBlock(gb)

testSnapshotDir := testPath()
defer os.RemoveAll(testSnapshotDir)

// empty store generates no output
fileHashes, err := blkfileMgr.index.exportUniqueTxIDs(testSnapshotDir, testNewHashFunc)
require.NoError(t, err)
require.Empty(t, fileHashes)
files, err := ioutil.ReadDir(testSnapshotDir)
require.NoError(t, err)
require.Len(t, files, 0)

// add genesis block and test the exported bytes
bg, gb := testutil.NewBlockGenerator(t, "myChannel", false)
blkfileMgr.addBlock(gb)
configTxID, err := protoutil.GetOrComputeTxIDFromEnvelope(gb.Data.Data[0])
require.NoError(t, err)
fileHashes, err := blkfileMgr.index.exportUniqueTxIDs(testSnapshotDir, testNewHashFunc)
fileHashes, err = blkfileMgr.index.exportUniqueTxIDs(testSnapshotDir, testNewHashFunc)
require.NoError(t, err)
verifyExportedTxIDs(t, testSnapshotDir, fileHashes, configTxID)
os.Remove(path.Join(testSnapshotDir, snapshotDataFileName))
os.Remove(path.Join(testSnapshotDir, snapshotMetadataFileName))
os.Remove(filepath.Join(testSnapshotDir, snapshotDataFileName))
os.Remove(filepath.Join(testSnapshotDir, snapshotMetadataFileName))

// add block-1 and test the exported bytes
block1 := bg.NextBlockWithTxid(
Expand All @@ -300,8 +307,8 @@ func TestExportUniqueTxIDs(t *testing.T) {
fileHashes, err = blkfileMgr.index.exportUniqueTxIDs(testSnapshotDir, testNewHashFunc)
require.NoError(t, err)
verifyExportedTxIDs(t, testSnapshotDir, fileHashes, "txid-1", "txid-2", "txid-3", configTxID) //"txid-1" appears once, Txids appear in radix sort order
os.Remove(path.Join(testSnapshotDir, snapshotDataFileName))
os.Remove(path.Join(testSnapshotDir, snapshotMetadataFileName))
os.Remove(filepath.Join(testSnapshotDir, snapshotDataFileName))
os.Remove(filepath.Join(testSnapshotDir, snapshotMetadataFileName))

// add block-2 and test the exported bytes
block2 := bg.NextBlockWithTxid(
Expand Down Expand Up @@ -351,7 +358,7 @@ func TestExportUniqueTxIDsErrorCases(t *testing.T) {
defer os.RemoveAll(testSnapshotDir)

// error during data file creation
dataFilePath := path.Join(testSnapshotDir, snapshotDataFileName)
dataFilePath := filepath.Join(testSnapshotDir, snapshotDataFileName)
_, err := os.Create(dataFilePath)
require.NoError(t, err)
_, err = blkfileMgrWrapper.blockfileMgr.index.exportUniqueTxIDs(testSnapshotDir, testNewHashFunc)
Expand All @@ -361,7 +368,7 @@ func TestExportUniqueTxIDsErrorCases(t *testing.T) {
// error during metadata file creation
fmt.Printf("testSnapshotDir=%s", testSnapshotDir)
require.NoError(t, os.MkdirAll(testSnapshotDir, 0700))
metadataFilePath := path.Join(testSnapshotDir, snapshotMetadataFileName)
metadataFilePath := filepath.Join(testSnapshotDir, snapshotMetadataFileName)
_, err = os.Create(metadataFilePath)
require.NoError(t, err)
_, err = blkfileMgrWrapper.blockfileMgr.index.exportUniqueTxIDs(testSnapshotDir, testNewHashFunc)
Expand All @@ -388,13 +395,13 @@ func verifyExportedTxIDs(t *testing.T, dir string, fileHashes map[string][]byte,
require.Contains(t, fileHashes, snapshotDataFileName)
require.Contains(t, fileHashes, snapshotMetadataFileName)

dataFile := path.Join(dir, snapshotDataFileName)
dataFile := filepath.Join(dir, snapshotDataFileName)
dataFileContent, err := ioutil.ReadFile(dataFile)
require.NoError(t, err)
dataFileHash := sha256.Sum256(dataFileContent)
require.Equal(t, dataFileHash[:], fileHashes[snapshotDataFileName])

metadataFile := path.Join(dir, snapshotMetadataFileName)
metadataFile := filepath.Join(dir, snapshotMetadataFileName)
metadataFileContent, err := ioutil.ReadFile(metadataFile)
require.NoError(t, err)
metadataFileHash := sha256.Sum256(metadataFileContent)
Expand Down
3 changes: 3 additions & 0 deletions common/ledger/snapshot/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func (c *FileWriter) Done() ([]byte, error) {
if err := c.bufWriter.Flush(); err != nil {
return nil, errors.Wrapf(err, "error while flushing to the snapshot file: %s ", c.file.Name())
}
if err := c.file.Sync(); err != nil {
return nil, err
}
if err := c.file.Close(); err != nil {
return nil, errors.Wrapf(err, "error while closing the snapshot file: %s ", c.file.Name())
}
Expand Down
26 changes: 17 additions & 9 deletions core/ledger/confighistory/mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package confighistory

import (
"fmt"
"path"
"path/filepath"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -181,23 +181,27 @@ func (r *Retriever) CollectionConfigAt(blockNum uint64, chaincodeName string) (*
// extra bytes. Further, the collection config namespace is not expected to have
// millions of entries.
func (r *Retriever) ExportConfigHistory(dir string, newHashFunc snapshot.NewHashFunc) (map[string][]byte, error) {
dataFileWriter, err := snapshot.CreateFile(path.Join(dir, snapshotDataFileName), snapshotFileFormat, newHashFunc)
if err != nil {
return nil, err
}
defer dataFileWriter.Close()

nsItr := r.dbHandle.getNamespaceIterator(collectionConfigNamespace)
if err := nsItr.Error(); err != nil {
return nil, errors.Wrap(err, "internal leveldb error while obtaining db iterator")

}
defer nsItr.Release()

var numCollectionConfigs uint64 = 0
var dataFileWriter *snapshot.FileWriter
var err error
for nsItr.Next() {
if err := nsItr.Error(); err != nil {
return nil, errors.Wrap(err, "internal leveldb error while iterating for collection config history")
}
if numCollectionConfigs == 0 { // first iteration, create the data file
dataFileWriter, err = snapshot.CreateFile(filepath.Join(dir, snapshotDataFileName), snapshotFileFormat, newHashFunc)
if err != nil {
return nil, err
}
defer dataFileWriter.Close()
}
if err := dataFileWriter.EncodeBytes(nsItr.Key()); err != nil {
return nil, err
}
Expand All @@ -206,12 +210,16 @@ func (r *Retriever) ExportConfigHistory(dir string, newHashFunc snapshot.NewHash
}
numCollectionConfigs++
}

if dataFileWriter == nil {
return nil, nil
}

dataHash, err := dataFileWriter.Done()
if err != nil {
return nil, err
}

metadataFileWriter, err := snapshot.CreateFile(path.Join(dir, snapshotMetadataFileName), snapshotFileFormat, newHashFunc)
metadataFileWriter, err := snapshot.CreateFile(filepath.Join(dir, snapshotMetadataFileName), snapshotFileFormat, newHashFunc)
if err != nil {
return nil, err
}
Expand Down
22 changes: 18 additions & 4 deletions core/ledger/confighistory/mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,10 @@ func TestExportConfigHistory(t *testing.T) {
// config history database is empty
fileHashes, err := env.retriever.ExportConfigHistory(env.testSnapshotDir, testNewHashFunc)
require.NoError(t, err)
verifyExportedConfigHistory(t, env.testSnapshotDir, fileHashes, nil)
os.Remove(path.Join(env.testSnapshotDir, snapshotDataFileName))
os.Remove(path.Join(env.testSnapshotDir, snapshotMetadataFileName))
require.Empty(t, fileHashes)
files, err := ioutil.ReadDir(env.testSnapshotDir)
require.NoError(t, err)
require.Len(t, files, 0)

// config history database has 3 chaincodes each with 1 collection config entry in the
// collectionConfigNamespace
Expand Down Expand Up @@ -426,10 +427,23 @@ func verifyExportedConfigHistory(t *testing.T, dir string, fileHashes map[string
func TestExportConfigHistoryErrorCase(t *testing.T) {
env := newTestEnvForSnapshot(t)
defer env.cleanup()

dbHandle := env.mgr.dbProvider.getDB("ledger1")
cc1collConfigPackage := testutilCreateCollConfigPkg([]string{"Explicit-cc1-coll-1", "Explicit-cc1-coll-2"})
batch, err := prepareDBBatch(
map[string]*peer.CollectionConfigPackage{
"chaincode1": cc1collConfigPackage,
},
50,
)
assert.NoError(t, err)
assert.NoError(t, dbHandle.writeBatch(batch, true))

// error during data file creation
dataFilePath := path.Join(env.testSnapshotDir, snapshotDataFileName)
_, err := os.Create(dataFilePath)
_, err = os.Create(dataFilePath)
require.NoError(t, err)

_, err = env.retriever.ExportConfigHistory(env.testSnapshotDir, testNewHashFunc)
require.Contains(t, err.Error(), "error while creating the snapshot file: "+dataFilePath)
os.RemoveAll(env.testSnapshotDir)
Expand Down
83 changes: 47 additions & 36 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package privacyenabledstate

import (
"hash"
"path"
"path/filepath"

"github.com/hyperledger/fabric/common/ledger/snapshot"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
Expand All @@ -33,28 +33,8 @@ func (s *DB) ExportPubStateAndPvtStateHashes(dir string, newHashFunc snapshot.Ne
}
defer itr.Close()

pubStateWriter, err := newSnapshotWriter(
path.Join(dir, pubStateDataFileName),
path.Join(dir, pubStateMetadataFileName),
dbValueFormat,
newHashFunc,
)
if err != nil {
return nil, err
}
defer pubStateWriter.close()

pvtStateHashesWriter, err := newSnapshotWriter(
path.Join(dir, pvtStateHashesFileName),
path.Join(dir, pvtStateHashesMetadataFileName),
dbValueFormat,
newHashFunc,
)
if err != nil {
return nil, err
}
defer pvtStateHashesWriter.close()

var pubStateWriter *snapshotWriter
var pvtStateHashesWriter *snapshotWriter
for {
compositeKey, dbValue, err := itr.Next()
if err != nil {
Expand All @@ -65,30 +45,61 @@ func (s *DB) ExportPubStateAndPvtStateHashes(dir string, newHashFunc snapshot.Ne
}
switch {
case isHashedDataNs(compositeKey.Namespace):
if pvtStateHashesWriter == nil { // encountered first time the pvt state hash element
pvtStateHashesWriter, err = newSnapshotWriter(
filepath.Join(dir, pvtStateHashesFileName),
filepath.Join(dir, pvtStateHashesMetadataFileName),
dbValueFormat,
newHashFunc,
)
if err != nil {
return nil, err
}
defer pvtStateHashesWriter.close()
}
if err := pvtStateHashesWriter.addData(compositeKey, dbValue); err != nil {
return nil, err
}
default:
if pubStateWriter == nil { // encountered first time the pub state element
pubStateWriter, err = newSnapshotWriter(
filepath.Join(dir, pubStateDataFileName),
filepath.Join(dir, pubStateMetadataFileName),
dbValueFormat,
newHashFunc,
)
if err != nil {
return nil, err
}
defer pubStateWriter.close()
}
if err := pubStateWriter.addData(compositeKey, dbValue); err != nil {
return nil, err
}
}
}
pubStateDataHash, pubStateMetadataHash, err := pubStateWriter.done()
if err != nil {
return nil, err

snapshotFilesInfo := map[string][]byte{}

if pubStateWriter != nil {
pubStateDataHash, pubStateMetadataHash, err := pubStateWriter.done()
if err != nil {
return nil, err
}
snapshotFilesInfo[pubStateDataFileName] = pubStateDataHash
snapshotFilesInfo[pubStateMetadataFileName] = pubStateMetadataHash
}
pvtStateHahshesDataHash, pvtStateHashesMetadataHash, err := pvtStateHashesWriter.done()
if err != nil {
return nil, err

if pvtStateHashesWriter != nil {
pvtStateHahshesDataHash, pvtStateHashesMetadataHash, err := pvtStateHashesWriter.done()
if err != nil {
return nil, err
}
snapshotFilesInfo[pvtStateHashesFileName] = pvtStateHahshesDataHash
snapshotFilesInfo[pvtStateHashesMetadataFileName] = pvtStateHashesMetadataHash
}
return map[string][]byte{
pubStateDataFileName: pubStateDataHash,
pubStateMetadataFileName: pubStateMetadataHash,
pvtStateHashesFileName: pvtStateHahshesDataHash,
pvtStateHashesMetadataFileName: pvtStateHashesMetadataHash,
},
nil

return snapshotFilesInfo, nil
}

// snapshotWriter generates two files, a data file and a metadata file. The datafile contains a series of tuples <key, dbValue>
Expand Down
Loading

0 comments on commit 251e989

Please sign in to comment.