Skip to content

Commit 29bd8c5

Browse files
committed
integrate cache with LoadCommittedVersion
FAB-17063 #done Change-Id: If68356f59e3bdbef6f9aa360c2378b942bc6f32b Signed-off-by: senthil <cendhu@gmail.com> Signed-off-by: manish <manish.sethi@gmail.com>
1 parent e9be627 commit 29bd8c5

File tree

2 files changed

+120
-4
lines changed

2 files changed

+120
-4
lines changed

core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,16 +276,35 @@ func (vdb *VersionedDB) GetDBType() string {
276276
// committedVersions cache will be used for state validation of readsets
277277
// revisionNumbers cache will be used during commit phase for couchdb bulk updates
278278
func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) error {
279-
nsKeysMap := map[string][]string{}
279+
missingKeys := map[string][]string{}
280280
committedDataCache := newVersionCache()
281281
for _, compositeKey := range keys {
282282
ns, key := compositeKey.Namespace, compositeKey.Key
283283
committedDataCache.setVerAndRev(ns, key, nil, "")
284284
logger.Debugf("Load into version cache: %s~%s", ns, key)
285-
nsKeysMap[ns] = append(nsKeysMap[ns], key)
285+
286+
if !vdb.cache.Enabled(ns) {
287+
missingKeys[ns] = append(missingKeys[ns], key)
288+
continue
289+
}
290+
cv, err := vdb.cache.GetState(vdb.chainName, ns, key)
291+
if err != nil {
292+
return err
293+
}
294+
if cv == nil {
295+
missingKeys[ns] = append(missingKeys[ns], key)
296+
continue
297+
}
298+
vv, err := constructVersionedValue(cv)
299+
if err != nil {
300+
return err
301+
}
302+
rev := string(cv.AdditionalInfo)
303+
committedDataCache.setVerAndRev(ns, key, vv.Version, rev)
286304
}
287-
nsMetadataMap, err := vdb.retrieveMetadata(nsKeysMap)
288-
logger.Debugf("nsKeysMap=%s", nsKeysMap)
305+
306+
nsMetadataMap, err := vdb.retrieveMetadata(missingKeys)
307+
logger.Debugf("missingKeys=%s", missingKeys)
289308
logger.Debugf("nsMetadataMap=%s", nsMetadataMap)
290309
if err != nil {
291310
return err

core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,3 +1138,100 @@ func testExistInCache(t *testing.T, db *couchdb.CouchDatabase, cache *statedb.Ca
11381138
require.NoError(t, err)
11391139
require.Equal(t, metadata[0].Rev, string(cacheValue.AdditionalInfo))
11401140
}
1141+
1142+
func TestLoadCommittedVersion(t *testing.T) {
1143+
cache := statedb.NewCache(32, []string{"lscc"})
1144+
1145+
env := newTestVDBEnvWithCache(t, cache)
1146+
defer env.Cleanup()
1147+
chainID := "testloadcommittedversion"
1148+
db, err := env.DBProvider.GetDBHandle(chainID)
1149+
require.NoError(t, err)
1150+
1151+
// scenario: state cache has (ns1, key1), (ns1, key2),
1152+
// and (ns2, key1) but misses (ns2, key2). The
1153+
// LoadCommittedVersions will fetch the first
1154+
// three keys from the state cache and the remaining one from
1155+
// the db. To ensure that, the db contains only
1156+
// the missing key (ns2, key2).
1157+
1158+
// store (ns1, key1), (ns1, key2), (ns2, key1) in the state cache
1159+
cacheValue := &statedb.CacheValue{
1160+
Value: []byte("value1"),
1161+
Metadata: []byte("meta1"),
1162+
VersionBytes: version.NewHeight(1, 1).ToBytes(),
1163+
AdditionalInfo: []byte("rev1"),
1164+
}
1165+
require.NoError(t, cache.PutState(chainID, "ns1", "key1", cacheValue))
1166+
1167+
cacheValue = &statedb.CacheValue{
1168+
Value: []byte("value2"),
1169+
Metadata: []byte("meta2"),
1170+
VersionBytes: version.NewHeight(1, 2).ToBytes(),
1171+
AdditionalInfo: []byte("rev2"),
1172+
}
1173+
require.NoError(t, cache.PutState(chainID, "ns1", "key2", cacheValue))
1174+
1175+
cacheValue = &statedb.CacheValue{
1176+
Value: []byte("value3"),
1177+
Metadata: []byte("meta3"),
1178+
VersionBytes: version.NewHeight(1, 3).ToBytes(),
1179+
AdditionalInfo: []byte("rev3"),
1180+
}
1181+
require.NoError(t, cache.PutState(chainID, "ns2", "key1", cacheValue))
1182+
1183+
// store (ns2, key2) in the db
1184+
batch := statedb.NewUpdateBatch()
1185+
vv := &statedb.VersionedValue{Value: []byte("value4"), Metadata: []byte("meta4"), Version: version.NewHeight(1, 4)}
1186+
batch.PutValAndMetadata("ns2", "key2", vv.Value, vv.Metadata, vv.Version)
1187+
savePoint := version.NewHeight(2, 2)
1188+
db.ApplyUpdates(batch, savePoint)
1189+
1190+
// version cache should be empty
1191+
ver, ok := db.(*VersionedDB).GetCachedVersion("ns1", "key1")
1192+
require.Nil(t, ver)
1193+
require.False(t, ok)
1194+
ver, ok = db.(*VersionedDB).GetCachedVersion("ns1", "key2")
1195+
require.Nil(t, ver)
1196+
require.False(t, ok)
1197+
ver, ok = db.(*VersionedDB).GetCachedVersion("ns2", "key1")
1198+
require.Nil(t, ver)
1199+
require.False(t, ok)
1200+
ver, ok = db.(*VersionedDB).GetCachedVersion("ns2", "key2")
1201+
require.Nil(t, ver)
1202+
require.False(t, ok)
1203+
1204+
keys := []*statedb.CompositeKey{
1205+
{
1206+
Namespace: "ns1",
1207+
Key: "key1",
1208+
},
1209+
{
1210+
Namespace: "ns1",
1211+
Key: "key2",
1212+
},
1213+
{
1214+
Namespace: "ns2",
1215+
Key: "key1",
1216+
},
1217+
{
1218+
Namespace: "ns2",
1219+
Key: "key2",
1220+
},
1221+
}
1222+
1223+
require.NoError(t, db.(*VersionedDB).LoadCommittedVersions(keys))
1224+
1225+
ver, ok = db.(*VersionedDB).GetCachedVersion("ns1", "key1")
1226+
require.Equal(t, version.NewHeight(1, 1), ver)
1227+
require.True(t, ok)
1228+
ver, ok = db.(*VersionedDB).GetCachedVersion("ns1", "key2")
1229+
require.Equal(t, version.NewHeight(1, 2), ver)
1230+
require.True(t, ok)
1231+
ver, ok = db.(*VersionedDB).GetCachedVersion("ns2", "key1")
1232+
require.Equal(t, version.NewHeight(1, 3), ver)
1233+
require.True(t, ok)
1234+
ver, ok = db.(*VersionedDB).GetCachedVersion("ns2", "key2")
1235+
require.Equal(t, version.NewHeight(1, 4), ver)
1236+
require.True(t, ok)
1237+
}

0 commit comments

Comments
 (0)