Skip to content

Commit

Permalink
Fix perf issue by key-endorsement to regular chaincode
Browse files Browse the repository at this point in the history
Key-level endorsement needs to access the metadata for each
of the keys in the write-set. Even the chaincodes that do not
use this feature will end up paying this additional penality.
Especially, in an environment that uses  couchdb, the performance
regression will be visible for the existing chaincodes.

The major reason is that the metadata is accessed one at a time
during validation and commit path whereas, couchdb is better
used if the data can be loaded in bulk.

Ideally, the data required by key-level endorsement for the entire
block should be loaded in a single bulk load call. Which would
benefit even the chaincodes that leverage this feature. However,
this would need some refactoring that can be taken up in a future
release.

This CR introduces a fix which would save the regular chaincodes
(that do not use key-level endorsement feature) from paying
the avobe mentioned penality. In this fix, we track the chaincodes
that do not use metadata and for such chaincodes, the GetStateMetadata
call simply returns nil without going to the underlying db.

Even when we start using the bulk-load for key-level endorsement,
this fix will still be relevant in the sense that the bulkload call
will be avoided for regular chaincodes

FAB-11700 #done

Change-Id: Icfc654b7d27f727f1811a5a300400e92b6e8be9d
Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi committed Sep 21, 2018
1 parent 861254d commit 945138e
Show file tree
Hide file tree
Showing 17 changed files with 1,238 additions and 73 deletions.
3 changes: 3 additions & 0 deletions common/ledger/util/leveldbhelper/leveldb_provider.go
Expand Up @@ -81,6 +81,9 @@ func (h *DBHandle) Delete(key []byte, sync bool) error {

// WriteBatch writes a batch in an atomic way
func (h *DBHandle) WriteBatch(batch *UpdateBatch, sync bool) error {
if len(batch.KVs) == 0 {
return nil
}
levelBatch := &leveldb.Batch{}
for k, v := range batch.KVs {
key := constructLevelKey(h.dbName, []byte(k))
Expand Down
2 changes: 2 additions & 0 deletions core/ledger/kvledger/bookkeeping/provider.go
Expand Up @@ -19,6 +19,8 @@ type Category int
const (
// PvtdataExpiry repersents the bookkeeping related to expiry of pvtdata because of BTL policy
PvtdataExpiry Category = iota
// MetadataPresenceIndicator maintains the bookkeeping about whether metadata is ever set for a namespace
MetadataPresenceIndicator
)

// Provider provides handle to different bookkeepers for the given ledger
Expand Down
2 changes: 1 addition & 1 deletion core/ledger/kvledger/bookkeeping/test_exports.go
Expand Up @@ -28,7 +28,7 @@ type TestEnv struct {
}

// NewTestEnv construct a TestEnv for testing
func NewTestEnv(t *testing.T) *TestEnv {
func NewTestEnv(t testing.TB) *TestEnv {
removePath(t)
provider := NewProvider()
return &TestEnv{t, provider}
Expand Down
4 changes: 2 additions & 2 deletions core/ledger/kvledger/kv_ledger_provider.go
Expand Up @@ -57,14 +57,14 @@ func NewProvider() (ledger.PeerLedgerProvider, error) {
// Initialize the ID store (inventory of chainIds/ledgerIds)
idStore := openIDStore(ledgerconfig.GetLedgerProviderPath())
ledgerStoreProvider := ledgerstorage.NewProvider()
bookkeepingProvider := bookkeeping.NewProvider()
// Initialize the versioned database (state database)
vdbProvider, err := privacyenabledstate.NewCommonStorageDBProvider()
vdbProvider, err := privacyenabledstate.NewCommonStorageDBProvider(bookkeepingProvider)
if err != nil {
return nil, err
}
// Initialize the history database (index for history of values by key)
historydbProvider := historyleveldb.NewHistoryDBProvider()
bookkeepingProvider := bookkeeping.NewProvider()
logger.Info("ledger provider Initialized")
provider := &Provider{idStore, ledgerStoreProvider,
vdbProvider, historydbProvider, nil, nil, bookkeepingProvider, nil}
Expand Down
Expand Up @@ -15,13 +15,13 @@ import (
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
"github.com/hyperledger/fabric/protos/common"

"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/pkg/errors"
)

Expand All @@ -36,10 +36,11 @@ const (
// CommonStorageDBProvider implements interface DBProvider
type CommonStorageDBProvider struct {
statedb.VersionedDBProvider
bookkeepingProvider bookkeeping.Provider
}

// NewCommonStorageDBProvider constructs an instance of DBProvider
func NewCommonStorageDBProvider() (DBProvider, error) {
func NewCommonStorageDBProvider(bookkeeperProvider bookkeeping.Provider) (DBProvider, error) {
var vdbProvider statedb.VersionedDBProvider
var err error
if ledgerconfig.IsCouchDBEnabled() {
Expand All @@ -49,7 +50,7 @@ func NewCommonStorageDBProvider() (DBProvider, error) {
} else {
vdbProvider = stateleveldb.NewVersionedDBProvider()
}
return &CommonStorageDBProvider{vdbProvider}, nil
return &CommonStorageDBProvider{vdbProvider, bookkeeperProvider}, nil
}

// GetDBHandle implements function from interface DBProvider
Expand All @@ -58,7 +59,9 @@ func (p *CommonStorageDBProvider) GetDBHandle(id string) (DB, error) {
if err != nil {
return nil, err
}
return NewCommonStorageDB(vdb, id)
bookkeeper := p.bookkeepingProvider.GetDBHandle(id, bookkeeping.MetadataPresenceIndicator)
metadataHint := newMetadataHint(bookkeeper)
return NewCommonStorageDB(vdb, id, metadataHint)
}

// Close implements function from interface DBProvider
Expand All @@ -70,12 +73,13 @@ func (p *CommonStorageDBProvider) Close() {
// both the public and private data
type CommonStorageDB struct {
statedb.VersionedDB
metadataHint *metadataHint
}

// NewCommonStorageDB wraps a VersionedDB instance. The public data is managed directly by the wrapped versionedDB.
// For managing the hashed data and private data, this implementation creates separate namespaces in the wrapped db
func NewCommonStorageDB(vdb statedb.VersionedDB, ledgerid string) (DB, error) {
return &CommonStorageDB{VersionedDB: vdb}, nil
func NewCommonStorageDB(vdb statedb.VersionedDB, ledgerid string, metadataHint *metadataHint) (DB, error) {
return &CommonStorageDB{vdb, metadataHint}, nil
}

// IsBulkOptimizable implements corresponding function in interface DB
Expand Down Expand Up @@ -197,9 +201,39 @@ func (s *CommonStorageDB) ApplyPrivacyAwareUpdates(updates *UpdateBatch, height
combinedUpdates := updates.PubUpdates
addPvtUpdates(combinedUpdates, updates.PvtUpdates)
addHashedUpdates(combinedUpdates, updates.HashUpdates, !s.BytesKeySuppoted())
s.metadataHint.setMetadataUsedFlag(updates)
return s.VersionedDB.ApplyUpdates(combinedUpdates.UpdateBatch, height)
}

// GetStateMetadata implements corresponding function in interface DB. This implementation provides
// an optimization such that it keeps track if a namespaces has never stored metadata for any of
// its items, the value 'nil' is returned without going to the db. This is intented to be invoked
// in the validation and commit path. This saves the chaincodes from paying unnecessary performance
// penality if they do not use features that leverage metadata (such as key-level endorsement),
func (s *CommonStorageDB) GetStateMetadata(namespace, key string) ([]byte, error) {
if !s.metadataHint.metadataEverUsedFor(namespace) {
return nil, nil
}
vv, err := s.GetState(namespace, key)
if err != nil || vv == nil {
return nil, err
}
return vv.Metadata, nil
}

// GetPrivateDataMetadataByHash implements corresponding function in interface DB. For additional details, see
// decription of the similar function 'GetStateMetadata'
func (s *CommonStorageDB) GetPrivateDataMetadataByHash(namespace, collection string, keyHash []byte) ([]byte, error) {
if !s.metadataHint.metadataEverUsedFor(namespace) {
return nil, nil
}
vv, err := s.GetValueHash(namespace, collection, keyHash)
if err != nil || vv == nil {
return nil, err
}
return vv.Metadata, nil
}

func (s *CommonStorageDB) getCollectionConfigMap(chaincodeDefinition *cceventmgmt.ChaincodeDefinition) (map[string]bool, error) {
var collectionConfigsBytes []byte
collectionConfigsMap := make(map[string]bool)
Expand Down Expand Up @@ -254,17 +288,14 @@ func (s *CommonStorageDB) getCollectionConfigMap(chaincodeDefinition *cceventmgm
// is acceptable since peer can continue in the committing role without the indexes. However, executing chaincode queries
// may be affected, until a new chaincode with fixed indexes is installed and instantiated
func (s *CommonStorageDB) HandleChaincodeDeploy(chaincodeDefinition *cceventmgmt.ChaincodeDefinition, dbArtifactsTar []byte) error {

//Check to see if the interface for IndexCapable is implemented
indexCapable, ok := s.VersionedDB.(statedb.IndexCapable)
if !ok {
return nil
}

if chaincodeDefinition == nil {
return errors.New("chaincode definition not found while creating couchdb index")
}

dbArtifacts, err := ccprovider.ExtractFileEntries(dbArtifactsTar, indexCapable.GetDBType())
if err != nil {
logger.Errorf("Index creation: error extracting db artifacts from tar for chaincode [%s]: %s", chaincodeDefinition.Name, err)
Expand Down Expand Up @@ -296,7 +327,6 @@ func (s *CommonStorageDB) HandleChaincodeDeploy(chaincodeDefinition *cceventmgmt
if !ok {
logger.Errorf("Error processing index for chaincode [%s]: cannot create an index for an undefined collection=[%s]", chaincodeDefinition.Name, collectionName)
} else {

err := indexCapable.ProcessIndexesForChaincodeDeploy(derivePvtDataNs(chaincodeDefinition.Name, collectionName),
archiveDirectoryEntries)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/db.go
Expand Up @@ -35,6 +35,8 @@ type DB interface {
GetKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, error)
GetPrivateDataMultipleKeys(namespace, collection string, keys []string) ([]*statedb.VersionedValue, error)
GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (statedb.ResultsIterator, error)
GetStateMetadata(namespace, key string) ([]byte, error)
GetPrivateDataMetadataByHash(namespace, collection string, keyHash []byte) ([]byte, error)
ExecuteQueryOnPrivateData(namespace, collection, query string) (statedb.ResultsIterator, error)
ApplyPrivacyAwareUpdates(updates *UpdateBatch, height *version.Height) error
}
Expand Down
43 changes: 43 additions & 0 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/db_test.go
Expand Up @@ -542,11 +542,54 @@ func testHandleChainCodeDeploy(t *testing.T, env TestEnv) {

}

func TestMetadataRetrieval(t *testing.T) {
for _, env := range testEnvs {
t.Run(env.GetName(), func(t *testing.T) {
testMetadataRetrieval(t, env)
})
}
}

func testMetadataRetrieval(t *testing.T, env TestEnv) {
env.Init(t)
defer env.Cleanup()
db := env.GetDBHandle("test-ledger-id")

updates := NewUpdateBatch()
updates.PubUpdates.PutValAndMetadata("ns1", "key1", []byte("value1"), []byte("metadata1"), version.NewHeight(1, 1))
updates.PubUpdates.PutValAndMetadata("ns1", "key2", []byte("value2"), nil, version.NewHeight(1, 2))
updates.PubUpdates.PutValAndMetadata("ns2", "key3", []byte("value3"), nil, version.NewHeight(1, 3))

putPvtUpdatesWithMetadata(t, updates, "ns1", "coll1", "key1", []byte("pvt_value1"), []byte("metadata1"), version.NewHeight(1, 4))
putPvtUpdatesWithMetadata(t, updates, "ns1", "coll1", "key2", []byte("pvt_value2"), nil, version.NewHeight(1, 5))
putPvtUpdatesWithMetadata(t, updates, "ns2", "coll1", "key3", []byte("pvt_value3"), nil, version.NewHeight(1, 6))
db.ApplyPrivacyAwareUpdates(updates, version.NewHeight(2, 6))

vm, _ := db.GetStateMetadata("ns1", "key1")
assert.Equal(t, vm, []byte("metadata1"))
vm, _ = db.GetStateMetadata("ns1", "key2")
assert.Nil(t, vm)
vm, _ = db.GetStateMetadata("ns2", "key3")
assert.Nil(t, vm)

vm, _ = db.GetPrivateDataMetadataByHash("ns1", "coll1", util.ComputeStringHash("key1"))
assert.Equal(t, vm, []byte("metadata1"))
vm, _ = db.GetPrivateDataMetadataByHash("ns1", "coll1", util.ComputeStringHash("key2"))
assert.Nil(t, vm)
vm, _ = db.GetPrivateDataMetadataByHash("ns2", "coll1", util.ComputeStringHash("key3"))
assert.Nil(t, vm)
}

func putPvtUpdates(t *testing.T, updates *UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
updates.PvtUpdates.Put(ns, coll, key, value, ver)
updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver)
}

func putPvtUpdatesWithMetadata(t *testing.T, updates *UpdateBatch, ns, coll, key string, value []byte, metadata []byte, ver *version.Height) {
updates.PvtUpdates.Put(ns, coll, key, value, ver)
updates.HashUpdates.PutValHashAndMetadata(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), metadata, ver)
}

func deletePvtUpdates(t *testing.T, updates *UpdateBatch, ns, coll, key string, ver *version.Height) {
updates.PvtUpdates.Delete(ns, coll, key, ver)
updates.HashUpdates.Delete(ns, coll, util.ComputeStringHash(key), ver)
Expand Down
69 changes: 69 additions & 0 deletions core/ledger/kvledger/txmgmt/privacyenabledstate/optimization.go
@@ -0,0 +1,69 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package privacyenabledstate

import (
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
)

type metadataHint struct {
cache map[string]bool
bookkeeper *leveldbhelper.DBHandle
}

func newMetadataHint(bookkeeper *leveldbhelper.DBHandle) *metadataHint {
cache := map[string]bool{}
itr := bookkeeper.GetIterator(nil, nil)
defer itr.Release()
for itr.Next() {
namespace := string(itr.Key())
cache[namespace] = true
}
return &metadataHint{cache, bookkeeper}
}

func (h *metadataHint) metadataEverUsedFor(namespace string) bool {
return h.cache[namespace]
}

func (h *metadataHint) setMetadataUsedFlag(updates *UpdateBatch) {
batch := leveldbhelper.NewUpdateBatch()
for ns := range filterNamespacesThatHasMetadata(updates) {
if h.cache[ns] {
continue
}
h.cache[ns] = true
batch.Put([]byte(ns), []byte{})
}
h.bookkeeper.WriteBatch(batch, true)
}

func filterNamespacesThatHasMetadata(updates *UpdateBatch) map[string]bool {
namespaces := map[string]bool{}
pubUpdates, hashUpdates := updates.PubUpdates, updates.HashUpdates
// add ns for public data
for _, ns := range pubUpdates.GetUpdatedNamespaces() {
for _, vv := range updates.PubUpdates.GetUpdates(ns) {
if vv.Metadata == nil {
continue
}
namespaces[ns] = true
}
}
// add ns for private hashes
for ns, nsBatch := range hashUpdates.UpdateMap {
for _, coll := range nsBatch.GetCollectionNames() {
for _, vv := range nsBatch.GetUpdates(coll) {
if vv.Metadata == nil {
continue
}
namespaces[ns] = true
}
}
}
return namespaces
}
@@ -0,0 +1,80 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package privacyenabledstate

import (
"testing"

"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/mock"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/stretchr/testify/assert"
)

func TestMetadataHintCorrectness(t *testing.T) {
bookkeepingTestEnv := bookkeeping.NewTestEnv(t)
defer bookkeepingTestEnv.Cleanup()
bookkeeper := bookkeepingTestEnv.TestProvider.GetDBHandle("ledger1", bookkeeping.MetadataPresenceIndicator)

metadataHint := newMetadataHint(bookkeeper)
assert.False(t, metadataHint.metadataEverUsedFor("ns1"))

updates := NewUpdateBatch()
updates.PubUpdates.PutValAndMetadata("ns1", "key", []byte("value"), []byte("metadata"), version.NewHeight(1, 1))
updates.PubUpdates.PutValAndMetadata("ns2", "key", []byte("value"), []byte("metadata"), version.NewHeight(1, 2))
updates.PubUpdates.PutValAndMetadata("ns3", "key", []byte("value"), nil, version.NewHeight(1, 3))
updates.HashUpdates.PutValAndMetadata("ns1_pvt", "key", "coll", []byte("value"), []byte("metadata"), version.NewHeight(1, 1))
updates.HashUpdates.PutValAndMetadata("ns2_pvt", "key", "coll", []byte("value"), []byte("metadata"), version.NewHeight(1, 3))
updates.HashUpdates.PutValAndMetadata("ns3_pvt", "key", "coll", []byte("value"), nil, version.NewHeight(1, 3))
metadataHint.setMetadataUsedFlag(updates)

t.Run("MetadataAddedInCurrentSession", func(t *testing.T) {
assert.True(t, metadataHint.metadataEverUsedFor("ns1"))
assert.True(t, metadataHint.metadataEverUsedFor("ns2"))
assert.True(t, metadataHint.metadataEverUsedFor("ns1_pvt"))
assert.True(t, metadataHint.metadataEverUsedFor("ns2_pvt"))
assert.False(t, metadataHint.metadataEverUsedFor("ns3"))
assert.False(t, metadataHint.metadataEverUsedFor("ns4"))
})

t.Run("MetadataFromPersistence", func(t *testing.T) {
metadataHintFromPersistence := newMetadataHint(bookkeeper)
assert.True(t, metadataHintFromPersistence.metadataEverUsedFor("ns1"))
assert.True(t, metadataHintFromPersistence.metadataEverUsedFor("ns2"))
assert.True(t, metadataHintFromPersistence.metadataEverUsedFor("ns1_pvt"))
assert.True(t, metadataHintFromPersistence.metadataEverUsedFor("ns2_pvt"))
assert.False(t, metadataHintFromPersistence.metadataEverUsedFor("ns3"))
assert.False(t, metadataHintFromPersistence.metadataEverUsedFor("ns4"))
})
}

func TestMetadataHintOptimizationSkippingGoingToDB(t *testing.T) {
bookkeepingTestEnv := bookkeeping.NewTestEnv(t)
defer bookkeepingTestEnv.Cleanup()
bookkeeper := bookkeepingTestEnv.TestProvider.GetDBHandle("ledger1", bookkeeping.MetadataPresenceIndicator)

mockVersionedDB := &mock.VersionedDB{}
db, err := NewCommonStorageDB(mockVersionedDB, "testledger", newMetadataHint(bookkeeper))
assert.NoError(t, err)
updates := NewUpdateBatch()
updates.PubUpdates.PutValAndMetadata("ns1", "key", []byte("value"), []byte("metadata"), version.NewHeight(1, 1))
updates.PubUpdates.PutValAndMetadata("ns2", "key", []byte("value"), nil, version.NewHeight(1, 2))
db.ApplyPrivacyAwareUpdates(updates, version.NewHeight(1, 3))

db.GetStateMetadata("ns1", "randomkey")
assert.Equal(t, 1, mockVersionedDB.GetStateCallCount())
db.GetPrivateDataMetadataByHash("ns1", "randomColl", []byte("randomKeyhash"))
assert.Equal(t, 2, mockVersionedDB.GetStateCallCount())

db.GetStateMetadata("ns2", "randomkey")
db.GetPrivateDataMetadataByHash("ns2", "randomColl", []byte("randomKeyhash"))
assert.Equal(t, 2, mockVersionedDB.GetStateCallCount())

db.GetStateMetadata("randomeNs", "randomkey")
db.GetPrivateDataMetadataByHash("randomeNs", "randomColl", []byte("randomKeyhash"))
assert.Equal(t, 2, mockVersionedDB.GetStateCallCount())
}

0 comments on commit 945138e

Please sign in to comment.