Skip to content

Commit

Permalink
Merge "integrate cache with LoadCommittedVersion"
Browse files Browse the repository at this point in the history
  • Loading branch information
denyeart authored and Gerrit Code Review committed Nov 20, 2019
2 parents e08ff81 + 29bd8c5 commit 9de6ec5
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 4 deletions.
27 changes: 23 additions & 4 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,35 @@ func (vdb *VersionedDB) GetDBType() string {
// committedVersions cache will be used for state validation of readsets
// revisionNumbers cache will be used during commit phase for couchdb bulk updates
func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) error {
nsKeysMap := map[string][]string{}
missingKeys := map[string][]string{}
committedDataCache := newVersionCache()
for _, compositeKey := range keys {
ns, key := compositeKey.Namespace, compositeKey.Key
committedDataCache.setVerAndRev(ns, key, nil, "")
logger.Debugf("Load into version cache: %s~%s", ns, key)
nsKeysMap[ns] = append(nsKeysMap[ns], key)

if !vdb.cache.Enabled(ns) {
missingKeys[ns] = append(missingKeys[ns], key)
continue
}
cv, err := vdb.cache.GetState(vdb.chainName, ns, key)
if err != nil {
return err
}
if cv == nil {
missingKeys[ns] = append(missingKeys[ns], key)
continue
}
vv, err := constructVersionedValue(cv)
if err != nil {
return err
}
rev := string(cv.AdditionalInfo)
committedDataCache.setVerAndRev(ns, key, vv.Version, rev)
}
nsMetadataMap, err := vdb.retrieveMetadata(nsKeysMap)
logger.Debugf("nsKeysMap=%s", nsKeysMap)

nsMetadataMap, err := vdb.retrieveMetadata(missingKeys)
logger.Debugf("missingKeys=%s", missingKeys)
logger.Debugf("nsMetadataMap=%s", nsMetadataMap)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,3 +1138,100 @@ func testExistInCache(t *testing.T, db *couchdb.CouchDatabase, cache *statedb.Ca
require.NoError(t, err)
require.Equal(t, metadata[0].Rev, string(cacheValue.AdditionalInfo))
}

func TestLoadCommittedVersion(t *testing.T) {
cache := statedb.NewCache(32, []string{"lscc"})

env := newTestVDBEnvWithCache(t, cache)
defer env.Cleanup()
chainID := "testloadcommittedversion"
db, err := env.DBProvider.GetDBHandle(chainID)
require.NoError(t, err)

// scenario: state cache has (ns1, key1), (ns1, key2),
// and (ns2, key1) but misses (ns2, key2). The
// LoadCommittedVersions will fetch the first
// three keys from the state cache and the remaining one from
// the db. To ensure that, the db contains only
// the missing key (ns2, key2).

// store (ns1, key1), (ns1, key2), (ns2, key1) in the state cache
cacheValue := &statedb.CacheValue{
Value: []byte("value1"),
Metadata: []byte("meta1"),
VersionBytes: version.NewHeight(1, 1).ToBytes(),
AdditionalInfo: []byte("rev1"),
}
require.NoError(t, cache.PutState(chainID, "ns1", "key1", cacheValue))

cacheValue = &statedb.CacheValue{
Value: []byte("value2"),
Metadata: []byte("meta2"),
VersionBytes: version.NewHeight(1, 2).ToBytes(),
AdditionalInfo: []byte("rev2"),
}
require.NoError(t, cache.PutState(chainID, "ns1", "key2", cacheValue))

cacheValue = &statedb.CacheValue{
Value: []byte("value3"),
Metadata: []byte("meta3"),
VersionBytes: version.NewHeight(1, 3).ToBytes(),
AdditionalInfo: []byte("rev3"),
}
require.NoError(t, cache.PutState(chainID, "ns2", "key1", cacheValue))

// store (ns2, key2) in the db
batch := statedb.NewUpdateBatch()
vv := &statedb.VersionedValue{Value: []byte("value4"), Metadata: []byte("meta4"), Version: version.NewHeight(1, 4)}
batch.PutValAndMetadata("ns2", "key2", vv.Value, vv.Metadata, vv.Version)
savePoint := version.NewHeight(2, 2)
db.ApplyUpdates(batch, savePoint)

// version cache should be empty
ver, ok := db.(*VersionedDB).GetCachedVersion("ns1", "key1")
require.Nil(t, ver)
require.False(t, ok)
ver, ok = db.(*VersionedDB).GetCachedVersion("ns1", "key2")
require.Nil(t, ver)
require.False(t, ok)
ver, ok = db.(*VersionedDB).GetCachedVersion("ns2", "key1")
require.Nil(t, ver)
require.False(t, ok)
ver, ok = db.(*VersionedDB).GetCachedVersion("ns2", "key2")
require.Nil(t, ver)
require.False(t, ok)

keys := []*statedb.CompositeKey{
{
Namespace: "ns1",
Key: "key1",
},
{
Namespace: "ns1",
Key: "key2",
},
{
Namespace: "ns2",
Key: "key1",
},
{
Namespace: "ns2",
Key: "key2",
},
}

require.NoError(t, db.(*VersionedDB).LoadCommittedVersions(keys))

ver, ok = db.(*VersionedDB).GetCachedVersion("ns1", "key1")
require.Equal(t, version.NewHeight(1, 1), ver)
require.True(t, ok)
ver, ok = db.(*VersionedDB).GetCachedVersion("ns1", "key2")
require.Equal(t, version.NewHeight(1, 2), ver)
require.True(t, ok)
ver, ok = db.(*VersionedDB).GetCachedVersion("ns2", "key1")
require.Equal(t, version.NewHeight(1, 3), ver)
require.True(t, ok)
ver, ok = db.(*VersionedDB).GetCachedVersion("ns2", "key2")
require.Equal(t, version.NewHeight(1, 4), ver)
require.True(t, ok)
}

0 comments on commit 9de6ec5

Please sign in to comment.