Skip to content

Commit

Permalink
Add ExportConfigHistory() API to confighistoryDB (#1288)
Browse files Browse the repository at this point in the history
This commit adds ExportConfigHistory() API to the
confighistoryDB retriever. It utilizes snapshot file
writer to dump keys & values stored in the
confighistoryDB to a snapshot file.

Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed May 20, 2020
1 parent 2e6860e commit 48768be
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 8 deletions.
9 changes: 9 additions & 0 deletions core/ledger/confighistory/db_helper.go
Expand Up @@ -18,6 +18,7 @@ import (
const (
keyPrefix = "s"
separatorByte = byte(0)
nsStopper = byte(1)
)

type compositeKey struct {
Expand Down Expand Up @@ -99,6 +100,14 @@ func (d *db) entryAt(blockNum uint64, ns, key string) (*compositeKV, error) {
return &compositeKV{k, v}, nil
}

func (d *db) getNamespaceIterator(ns string) *leveldbhelper.Iterator {
nsStartKey := []byte(keyPrefix + ns)
nsStartKey = append(nsStartKey, separatorByte)
nsEndKey := []byte(keyPrefix + ns)
nsEndKey = append(nsEndKey, nsStopper)
return d.GetIterator(nsStartKey, nsEndKey)
}

func encodeCompositeKey(ns, key string, blockNum uint64) []byte {
b := []byte(keyPrefix + ns)
b = append(b, separatorByte)
Expand Down
45 changes: 45 additions & 0 deletions core/ledger/confighistory/db_helper_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"testing"

"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -66,6 +67,50 @@ func TestQueries(t *testing.T) {
checkEntryAt(t, "testcase-query10", db, "ns1", "key1", 45, nil)
}

func TestGetNamespaceIterator(t *testing.T) {
testDBPath := "/tmp/fabric/core/ledger/confighistory"
provider, err := newDBProvider(testDBPath)
require.NoError(t, err)
defer deleteTestPath(t, testDBPath)

db := provider.getDB("ledger1")
nsItr1 := db.getNamespaceIterator("ns1")
require.NoError(t, nsItr1.Error())
defer nsItr1.Release()
verifyNsEntries(t, nsItr1, nil)

sampleData := []*compositeKV{
{&compositeKey{ns: "ns1", key: "key1", blockNum: 40}, []byte("val1_40")}, // index 0
{&compositeKey{ns: "ns1", key: "key1", blockNum: 30}, []byte("val1_30")}, // index 1
{&compositeKey{ns: "ns1", key: "key1", blockNum: 20}, []byte("val1_20")}, // index 2
{&compositeKey{ns: "ns2", key: "key1", blockNum: 50}, []byte("val1_50")}, // index 3
{&compositeKey{ns: "ns2", key: "key1", blockNum: 20}, []byte("val1_20")}, // index 4
{&compositeKey{ns: "ns2", key: "key1", blockNum: 10}, []byte("val1_10")}, // index 5
}
populateDBWithSampleData(t, db, sampleData)

nsItr2 := db.getNamespaceIterator("ns1")
require.NoError(t, nsItr2.Error())
defer nsItr2.Release()
verifyNsEntries(t, nsItr2, sampleData[:3])

nsItr3 := db.getNamespaceIterator("ns2")
require.NoError(t, nsItr3.Error())
defer nsItr3.Release()
verifyNsEntries(t, nsItr3, sampleData[3:])
}

func verifyNsEntries(t *testing.T, nsItr *leveldbhelper.Iterator, expectedEntries []*compositeKV) {
var retrievedEntries []*compositeKV
for nsItr.Next() {
require.NoError(t, nsItr.Error())
key := decodeCompositeKey(nsItr.Key())
val := make([]byte, len(nsItr.Value()))
copy(val, nsItr.Value())
retrievedEntries = append(retrievedEntries, &compositeKV{key, val})
}
require.Equal(t, expectedEntries, retrievedEntries)
}
func populateDBWithSampleData(t *testing.T, db *db, sampledata []*compositeKV) {
batch := newBatch()
for _, data := range sampledata {
Expand Down
80 changes: 73 additions & 7 deletions core/ledger/confighistory/mgr.go
Expand Up @@ -8,12 +8,15 @@ package confighistory

import (
"fmt"
"hash"
"path"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/snapshot"
"github.com/hyperledger/fabric/core/ledger"
"github.com/pkg/errors"
)
Expand All @@ -22,12 +25,15 @@ var logger = flogging.MustGetLogger("confighistory")

const (
collectionConfigNamespace = "lscc" // lscc namespace was introduced in version 1.2 and we continue to use this in order to be compatible with existing data
snapshotFileFormat = byte(1)
snapshotDataFileName = "confighistory.data"
snapshotMetadataFileName = "confighistory.metadata"
)

// Mgr should be registered as a state listener. The state listener builds the history and retriever helps in querying the history
type Mgr interface {
ledger.StateListener
GetRetriever(ledgerID string, ledgerInfoRetriever LedgerInfoRetriever) ledger.ConfigHistoryRetriever
GetRetriever(ledgerID string, ledgerInfoRetriever LedgerInfoRetriever) *Retriever
Close()
}

Expand Down Expand Up @@ -105,8 +111,8 @@ func (m *mgr) HandleStateUpdates(trigger *ledger.StateUpdateTrigger) error {
}

// GetRetriever returns an implementation of `ledger.ConfigHistoryRetriever` for the given ledger id.
func (m *mgr) GetRetriever(ledgerID string, ledgerInfoRetriever LedgerInfoRetriever) ledger.ConfigHistoryRetriever {
return &retriever{
func (m *mgr) GetRetriever(ledgerID string, ledgerInfoRetriever LedgerInfoRetriever) *Retriever {
return &Retriever{
ledgerInfoRetriever: ledgerInfoRetriever,
ledgerID: ledgerID,
deployedCCInfoProvider: m.ccInfoProvider,
Expand All @@ -119,15 +125,15 @@ func (m *mgr) Close() {
m.dbProvider.Close()
}

type retriever struct {
type Retriever struct {
ledgerInfoRetriever LedgerInfoRetriever
ledgerID string
deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider
dbHandle *db
}

// MostRecentCollectionConfigBelow implements function from the interface ledger.ConfigHistoryRetriever
func (r *retriever) MostRecentCollectionConfigBelow(blockNum uint64, chaincodeName string) (*ledger.CollectionConfigInfo, error) {
func (r *Retriever) MostRecentCollectionConfigBelow(blockNum uint64, chaincodeName string) (*ledger.CollectionConfigInfo, error) {
compositeKV, err := r.dbHandle.mostRecentEntryBelow(blockNum, collectionConfigNamespace, constructCollectionConfigKey(chaincodeName))
if err != nil {
return nil, err
Expand All @@ -141,7 +147,7 @@ func (r *retriever) MostRecentCollectionConfigBelow(blockNum uint64, chaincodeNa
}

// CollectionConfigAt implements function from the interface ledger.ConfigHistoryRetriever
func (r *retriever) CollectionConfigAt(blockNum uint64, chaincodeName string) (*ledger.CollectionConfigInfo, error) {
func (r *Retriever) CollectionConfigAt(blockNum uint64, chaincodeName string) (*ledger.CollectionConfigInfo, error) {
info, err := r.ledgerInfoRetriever.GetBlockchainInfo()
if err != nil {
return nil, err
Expand All @@ -163,7 +169,67 @@ func (r *retriever) CollectionConfigAt(blockNum uint64, chaincodeName string) (*
return constructCollectionConfigInfo(compositeKV, implicitColls)
}

func (r *retriever) getImplicitCollection(chaincodeName string) ([]*peer.StaticCollectionConfig, error) {
// ExportConfigHistory exports configuration history from the confighistoryDB to
// a file. Currently, we store only one type of configuration in the db, i.e.,
// private data collection configuration.
// We write the full key and value stored in the database as is to the file.
// Though we could decode the key and write a proto message with exact ns, key,
// block number, and collection config, we store the full key and value to avoid
// unnecessary encoding and decoding of proto messages.
// The key format stored in db is "s" + ns + byte(0) + key + "~collection" + byte(0)
// + blockNum. As we store the key as is, we store 13 extra bytes. For a million
// records, it would add only 12 MB overhead. Note that the protobuf also adds some
// extra bytes. Further, the collection config namespace is not expected to have
// millions of entries.
func (r *Retriever) ExportConfigHistory(dir string, hasher hash.Hash) (map[string][]byte, error) {
dataFileWriter, err := snapshot.CreateFile(path.Join(dir, snapshotDataFileName), snapshotFileFormat, hasher)
if err != nil {
return nil, err
}
defer dataFileWriter.Close()

nsItr := r.dbHandle.getNamespaceIterator(collectionConfigNamespace)
if err := nsItr.Error(); err != nil {
return nil, errors.Wrap(err, "internal leveldb error while obtaining db iterator")

}
defer nsItr.Release()
var numCollectionConfigs uint64 = 0
for nsItr.Next() {
if err := nsItr.Error(); err != nil {
return nil, errors.Wrap(err, "internal leveldb error while iterating for collection config history")
}
if err := dataFileWriter.EncodeBytes(nsItr.Key()); err != nil {
return nil, err
}
if err := dataFileWriter.EncodeBytes(nsItr.Value()); err != nil {
return nil, err
}
numCollectionConfigs++
}
dataHash, err := dataFileWriter.Done()
if err != nil {
return nil, err
}

hasher.Reset()
metadataFileWriter, err := snapshot.CreateFile(path.Join(dir, snapshotMetadataFileName), snapshotFileFormat, hasher)
if err != nil {
return nil, err
}
defer metadataFileWriter.Close()
if err = metadataFileWriter.EncodeUVarint(numCollectionConfigs); err != nil {
return nil, err
}
metadataHash, err := metadataFileWriter.Done()

return map[string][]byte{
snapshotDataFileName: dataHash,
snapshotMetadataFileName: metadataHash,
}, nil
}

func (r *Retriever) getImplicitCollection(chaincodeName string) ([]*peer.StaticCollectionConfig, error) {
qe, err := r.ledgerInfoRetriever.NewQueryExecutor()
if err != nil {
return nil, err
Expand Down

0 comments on commit 48768be

Please sign in to comment.