Skip to content

Commit bd5df09

Browse files
committed
recon: commit pvtData of oldBlks tp StateDB
Commit the valid old pvtData to the stateDB FAB-11765 #done Change-Id: I5e4bd529329f39cbad204a85490f80889ba953c2 Signed-off-by: senthil <cendhu@gmail.com>
1 parent e00dfcb commit bd5df09

File tree

2 files changed

+273
-20
lines changed

2 files changed

+273
-20
lines changed

core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_txmgr.go

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type LockBasedTxMgr struct {
3838
stateListeners []ledger.StateListener
3939
ccInfoProvider ledger.DeployedChaincodeInfoProvider
4040
commitRWLock sync.RWMutex
41+
oldBlockCommit sync.Mutex
4142
current *current
4243
}
4344

@@ -115,10 +116,10 @@ func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAnd
115116
// RemoveStaleAndCommitPvtDataOfOldBlocks implements method in interface `txmgmt.TxMgr`
116117
// The following six operations are performed:
117118
// (1) contructs the unique pvt data from the passed blocksPvtData
118-
// (2) acquires the exclusive lock before checking for the stale pvtData
119+
// (2) acquire a lock on oldBlockCommit
119120
// (3) checks for stale pvtData by comparing [version, valueHash] and removes stale data
120121
// (4) creates update batch from the the non-stale pvtData
121-
// (5) update the BTL bookkeeping managed by the purge manager
122+
// (5) update the BTL bookkeeping managed by the purge manager and prepare expiring keys.
122123
// (6) commit the non-stale pvt data to the stateDB
123124
// This function assumes that the passed input contains only transactions that had been
124125
// marked "Valid". In the current design, this assumption holds true as we store
@@ -141,10 +142,13 @@ func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtDat
141142
return err
142143
}
143144

144-
// (2) acquire an exclusive lock on the stateDB as we cannot allow any regular block
145-
// commit to happen while validating and committing the pvtData of old blocks
146-
txmgr.commitRWLock.Lock()
147-
defer txmgr.commitRWLock.Unlock()
145+
// (2) acquire a lock on oldBlockCommit. If the regular block commit has already
146+
// acquired this lock, commit of old blocks' pvtData cannot proceed until the lock
147+
// is released. This is required as the PrepareForExpiringKeys() used in step (5)
148+
// of this function might affect the result of DeleteExpiredAndUpdateBookkeeping()
149+
// in Commit()
150+
txmgr.oldBlockCommit.Lock()
151+
defer txmgr.oldBlockCommit.Unlock()
148152

149153
// (3) remove the pvt data which does not matches the hashed
150154
// value stored in the public state
@@ -153,10 +157,24 @@ func (txmgr *LockBasedTxMgr) RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtDat
153157
}
154158

155159
// (4) create the update batch from the uniquePvtData
156-
_ = uniquePvtData.transformToUpdateBatch()
160+
batch := uniquePvtData.transformToUpdateBatch()
161+
162+
// (5) update booking in the purge manager and prepare expiring keys.
163+
// Though the expiring keys would have been loaded in memory during last
164+
// PrepareExpiringKeys from Commit but we rerun this here because,
165+
// RemoveStaleAndCommitPvtDataOfOldBlocks may have added new data which might be
166+
// eligible for expiry during the next regular block commit.
167+
txmgr.pvtdataPurgeMgr.UpdateBookkeepingForPvtDataOfOldBlocks(batch.PvtUpdates)
168+
nextBlockNumToBeCommitted, err := txmgr.getNextBlockNumberToBeCommitted()
169+
if err != nil {
170+
return err
171+
}
172+
txmgr.pvtdataPurgeMgr.PrepareForExpiringKeys(nextBlockNumToBeCommitted)
157173

158-
// (5) TODO: update booking in the purge manager
159-
// (6) TODO: commit the pvt data of old blocks to the sateDB
174+
// (6) commit the pvt data to the stateDB
175+
if err := txmgr.db.ApplyPrivacyAwareUpdates(batch, nil); err != nil {
176+
return err
177+
}
160178
return nil
161179
}
162180

@@ -333,6 +351,16 @@ func checkIfPvtWriteIsStale(hashedKey *privacyenabledstate.HashedCompositeKey,
333351
return true, nil
334352
}
335353

354+
// getNextBlockNumberToBeCommittedreturn the last committed block number + 1
355+
func (txmgr *LockBasedTxMgr) getNextBlockNumberToBeCommitted() (uint64, error) {
356+
lastCommittedBlk, err := txmgr.db.GetLatestSavePoint()
357+
if err != nil {
358+
return 0, err
359+
}
360+
361+
return lastCommittedBlk.BlockNum + 1, nil
362+
}
363+
336364
func (uniquePvtData uniquePvtDataMap) transformToUpdateBatch() *privacyenabledstate.UpdateBatch {
337365
batch := privacyenabledstate.NewUpdateBatch()
338366
for hashedCompositeKey, pvtWrite := range uniquePvtData {
@@ -390,6 +418,16 @@ func (txmgr *LockBasedTxMgr) Shutdown() {
390418

391419
// Commit implements method in interface `txmgmt.TxMgr`
392420
func (txmgr *LockBasedTxMgr) Commit() error {
421+
// we need to acquire a lock on oldBlockCommit. This is required because
422+
// the DeleteExpiredAndUpdateBookkeeping() would perform incorrect operation if
423+
// PrepareForExpiringKeys() in RemoveStaleAndCommitPvtDataOfOldBlocks() is allowed to
424+
// execute parallely. RemoveStaleAndCommitPvtDataOfOldBlocks computes the update
425+
// batch based on the current state and if we allow regular block commits at the
426+
// same time, the former may overwrite the newer versions of the data and we may
427+
// end up with an incorrect update batch.
428+
txmgr.oldBlockCommit.Lock()
429+
defer txmgr.oldBlockCommit.Unlock()
430+
393431
// When using the purge manager for the first block commit after peer start, the asynchronous function
394432
// 'PrepareForExpiringKeys' is invoked in-line. However, for the subsequent blocks commits, this function is invoked
395433
// in advance for the next block
@@ -413,24 +451,19 @@ func (txmgr *LockBasedTxMgr) Commit() error {
413451
return err
414452
}
415453

416-
// EXCLUSIVE LOCK STARTS
454+
commitHeight := version.NewHeight(txmgr.current.blockNum(), txmgr.current.maxTxNumber())
417455
txmgr.commitRWLock.Lock()
418456
logger.Debugf("Write lock acquired for committing updates to state database")
419-
commitHeight := version.NewHeight(txmgr.current.blockNum(), txmgr.current.maxTxNumber())
420457
if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.current.batch, commitHeight); err != nil {
421458
txmgr.commitRWLock.Unlock()
422459
return err
423460
}
424-
// only during the exclusive lock duration, we should clear the cache as the cache is being
425-
// used by the old pvtData committer as well
426-
txmgr.clearCache() // note that we should clear the cache before calling
427-
// PrepareForExpiringKeys as it uses the cache as well. To be precise,
428-
// we should not clear the cache until PrepareForExpiringKeys completes
429-
// the task.
430-
logger.Debugf("cleared cached")
431461
txmgr.commitRWLock.Unlock()
432-
// EXCLUSIVE LOCK ENDS
433-
logger.Debugf("Updates committed to state database")
462+
// only while holding a lock on oldBlockCommit, we should clear the cache as the
463+
// cache is being used by the old pvtData committer to load the version of
464+
// hashedKeys. Also, note that the PrepareForExpiringKeys uses the cache.
465+
txmgr.clearCache()
466+
logger.Debugf("Updates committed to state database and the write lock is released")
434467

435468
// purge manager should be called (in this call the purge mgr removes the expiry entries from schedules) after committing to statedb
436469
if err := txmgr.pvtdataPurgeMgr.BlockCommitDone(); err != nil {

core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/txmgr_test.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,6 +1080,113 @@ func producePvtdata(t *testing.T, txNum uint64, nsColls []string, keys []string,
10801080
}
10811081
}
10821082

1083+
func TestRemoveStaleAndCommitPvtDataOfOldBlocks(t *testing.T) {
1084+
for _, testEnv := range testEnvs {
1085+
t.Logf("Running test for TestEnv = %s", testEnv.getName())
1086+
testLedgerID := "testvalidationandcommitofoldpvtdata"
1087+
testEnv.init(t, testLedgerID, nil)
1088+
testValidationAndCommitOfOldPvtData(t, testEnv)
1089+
testEnv.cleanup()
1090+
}
1091+
}
1092+
1093+
func testValidationAndCommitOfOldPvtData(t *testing.T, env testEnv) {
1094+
ledgerid := "testvalidationandcommitofoldpvtdata"
1095+
btlPolicy := btltestutil.SampleBTLPolicy(
1096+
map[[2]string]uint64{
1097+
{"ns1", "coll1"}: 0,
1098+
{"ns1", "coll2"}: 0,
1099+
},
1100+
)
1101+
env.init(t, ledgerid, btlPolicy)
1102+
txMgr := env.getTxMgr()
1103+
populateCollConfigForTest(t, txMgr.(*LockBasedTxMgr),
1104+
[]collConfigkey{
1105+
{"ns1", "coll1"},
1106+
{"ns1", "coll2"},
1107+
},
1108+
version.NewHeight(1, 1),
1109+
)
1110+
1111+
db := env.getVDB()
1112+
updateBatch := privacyenabledstate.NewUpdateBatch()
1113+
// all pvt data are missing
1114+
updateBatch.HashUpdates.Put("ns1", "coll1", util.ComputeStringHash("key1"), util.ComputeStringHash("value1"), version.NewHeight(1, 1)) // E1
1115+
updateBatch.HashUpdates.Put("ns1", "coll1", util.ComputeStringHash("key2"), util.ComputeStringHash("value2"), version.NewHeight(1, 2)) // E2
1116+
updateBatch.HashUpdates.Put("ns1", "coll2", util.ComputeStringHash("key3"), util.ComputeStringHash("value3"), version.NewHeight(1, 2)) // E3
1117+
updateBatch.HashUpdates.Put("ns1", "coll2", util.ComputeStringHash("key4"), util.ComputeStringHash("value4"), version.NewHeight(1, 3)) // E4
1118+
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(1, 2))
1119+
1120+
updateBatch = privacyenabledstate.NewUpdateBatch()
1121+
updateBatch.HashUpdates.Put("ns1", "coll1", util.ComputeStringHash("key1"), util.ComputeStringHash("new-value1"), version.NewHeight(2, 1)) // E1 is updated
1122+
updateBatch.HashUpdates.Delete("ns1", "coll1", util.ComputeStringHash("key2"), version.NewHeight(2, 2)) // E2 is being deleted
1123+
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(2, 2))
1124+
1125+
updateBatch = privacyenabledstate.NewUpdateBatch()
1126+
updateBatch.HashUpdates.Put("ns1", "coll1", util.ComputeStringHash("key1"), util.ComputeStringHash("another-new-value1"), version.NewHeight(3, 1)) // E1 is again updated
1127+
updateBatch.HashUpdates.Put("ns1", "coll2", util.ComputeStringHash("key3"), util.ComputeStringHash("value3"), version.NewHeight(3, 2)) // E3 gets only metadata update
1128+
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(3, 2))
1129+
1130+
v1 := []byte("value1")
1131+
// ns1-coll1-key1 should be rejected as it is updated in the future by Blk2Tx1
1132+
pvtDataBlk1Tx1 := producePvtdata(t, 1, []string{"ns1:coll1"}, []string{"key1"}, [][]byte{v1})
1133+
// ns1-coll2-key3 should be accepted but ns1-coll1-key2 as it is updated in the future by Blk2Tx2
1134+
v2 := []byte("value2")
1135+
v3 := []byte("value3")
1136+
pvtDataBlk1Tx2 := producePvtdata(t, 2, []string{"ns1:coll1", "ns1:coll2"}, []string{"key2", "key3"}, [][]byte{v2, v3})
1137+
// ns1-coll2-key4 should be accepted
1138+
v4 := []byte("value4")
1139+
pvtDataBlk1Tx3 := producePvtdata(t, 3, []string{"ns1:coll2"}, []string{"key4"}, [][]byte{v4})
1140+
1141+
nv1 := []byte("new-value1")
1142+
// ns1-coll1-key1 should be rejected as it is updated in the future by Blk3Tx1
1143+
pvtDataBlk2Tx1 := producePvtdata(t, 1, []string{"ns1:coll1"}, []string{"key1"}, [][]byte{nv1})
1144+
// ns1-coll1-key2 should be accepted -- a delete operation
1145+
pvtDataBlk2Tx2 := producePvtdata(t, 2, []string{"ns1:coll1"}, []string{"key2"}, [][]byte{nil})
1146+
1147+
anv1 := []byte("another-new-value1")
1148+
// ns1-coll1-key1 should be accepted
1149+
pvtDataBlk3Tx1 := producePvtdata(t, 1, []string{"ns1:coll1"}, []string{"key1"}, [][]byte{anv1})
1150+
// ns1-coll2-key3 should be accepted -- assume that only metadata is being updated
1151+
pvtDataBlk3Tx2 := producePvtdata(t, 2, []string{"ns1:coll2"}, []string{"key3"}, [][]byte{v3})
1152+
1153+
blocksPvtData := map[uint64][]*ledger.TxPvtData{
1154+
1: {
1155+
pvtDataBlk1Tx1,
1156+
pvtDataBlk1Tx2,
1157+
pvtDataBlk1Tx3,
1158+
},
1159+
2: {
1160+
pvtDataBlk2Tx1,
1161+
pvtDataBlk2Tx2,
1162+
},
1163+
3: {
1164+
pvtDataBlk3Tx1,
1165+
pvtDataBlk3Tx2,
1166+
},
1167+
}
1168+
1169+
err := txMgr.RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData)
1170+
assert.NoError(t, err)
1171+
1172+
vv, err := db.GetPrivateData("ns1", "coll1", "key1")
1173+
assert.NoError(t, err)
1174+
assert.Equal(t, anv1, vv.Value) // last updated value
1175+
1176+
vv, err = db.GetPrivateData("ns1", "coll1", "key2")
1177+
assert.NoError(t, err)
1178+
assert.Equal(t, nil, nil) // deleted
1179+
1180+
vv, err = db.GetPrivateData("ns1", "coll2", "key3")
1181+
assert.NoError(t, err)
1182+
assert.Equal(t, v3, vv.Value)
1183+
assert.Equal(t, version.NewHeight(3, 2), vv.Version) // though we passed with version {1,2}, we should get {3,2} due to metadata update
1184+
1185+
vv, err = db.GetPrivateData("ns1", "coll2", "key4")
1186+
assert.NoError(t, err)
1187+
assert.Equal(t, v4, vv.Value)
1188+
}
1189+
10831190
func TestTxSimulatorMissingPvtdata(t *testing.T) {
10841191
testEnv := testEnvs[0]
10851192
testEnv.init(t, "TestTxSimulatorUnsupportedTxQueries", nil)
@@ -1132,6 +1239,119 @@ func TestTxSimulatorMissingPvtdata(t *testing.T) {
11321239
assert.Nil(t, val)
11331240
}
11341241

1242+
func TestRemoveStaleAndCommitPvtDataOfOldBlocksWithExpiry(t *testing.T) {
1243+
ledgerid := "TestTxSimulatorMissingPvtdataExpiry"
1244+
btlPolicy := btltestutil.SampleBTLPolicy(
1245+
map[[2]string]uint64{
1246+
{"ns", "coll"}: 1,
1247+
},
1248+
)
1249+
testEnv := testEnvs[0]
1250+
testEnv.init(t, ledgerid, btlPolicy)
1251+
defer testEnv.cleanup()
1252+
1253+
txMgr := testEnv.getTxMgr()
1254+
populateCollConfigForTest(t, txMgr.(*LockBasedTxMgr),
1255+
[]collConfigkey{
1256+
{"ns", "coll"},
1257+
},
1258+
version.NewHeight(1, 1),
1259+
)
1260+
1261+
viper.Set(fmt.Sprintf("ledger.pvtdata.btlpolicy.%s.ns.coll", ledgerid), 1)
1262+
bg, _ := testutil.NewBlockGenerator(t, ledgerid, false)
1263+
1264+
// storing hashed data but the pvt key is missing
1265+
// stored pvt key would get expired and purged while committing block 3
1266+
blkAndPvtdata := prepareNextBlockForTest(t, txMgr, bg, "txid-1",
1267+
map[string]string{"pubkey1": "pub-value1"}, map[string]string{"pvtkey1": "pvt-value1"}, true)
1268+
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
1269+
// committing block 1
1270+
assert.NoError(t, txMgr.Commit())
1271+
1272+
// pvt data should not exist
1273+
simulator, _ := txMgr.NewTxSimulator("tx-tmp")
1274+
pvtval, err := simulator.GetPrivateData("ns", "coll", "pvtkey1")
1275+
_, ok := err.(*txmgr.ErrPvtdataNotAvailable)
1276+
assert.Equal(t, ok, true)
1277+
assert.Nil(t, pvtval)
1278+
simulator.Done()
1279+
1280+
// committing pvt data of block 1
1281+
v1 := []byte("pvt-value1")
1282+
pvtDataBlk1Tx1 := producePvtdata(t, 1, []string{"ns:coll"}, []string{"pvtkey1"}, [][]byte{v1})
1283+
blocksPvtData := map[uint64][]*ledger.TxPvtData{
1284+
1: {
1285+
pvtDataBlk1Tx1,
1286+
},
1287+
}
1288+
err = txMgr.RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData)
1289+
assert.NoError(t, err)
1290+
1291+
// pvt data should exist
1292+
simulator, _ = txMgr.NewTxSimulator("tx-tmp")
1293+
pvtval, err = simulator.GetPrivateData("ns", "coll", "pvtkey1")
1294+
assert.Nil(t, err)
1295+
assert.Equal(t, pvtval, v1)
1296+
simulator.Done()
1297+
1298+
// storing hashed data but the pvt key is missing
1299+
// stored pvt key would get expired and purged while committing block 4
1300+
blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-2",
1301+
map[string]string{"pubkey2": "pub-value2"}, map[string]string{"pvtkey2": "pvt-value2"}, true)
1302+
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
1303+
// committing block 2
1304+
assert.NoError(t, txMgr.Commit())
1305+
1306+
// pvt data should not exist
1307+
simulator, _ = txMgr.NewTxSimulator("tx-tmp")
1308+
pvtval, err = simulator.GetPrivateData("ns", "coll", "pvtkey2")
1309+
_, ok = err.(*txmgr.ErrPvtdataNotAvailable)
1310+
assert.Equal(t, ok, true)
1311+
assert.Nil(t, pvtval)
1312+
simulator.Done()
1313+
1314+
blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-3",
1315+
map[string]string{"pubkey3": "pub-value3"}, nil, false)
1316+
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
1317+
// committing block 3
1318+
assert.NoError(t, txMgr.Commit())
1319+
1320+
// prepareForExpiringKey must have selected the pvtkey2 as it would
1321+
// get expired during next block commit
1322+
1323+
// committing pvt data of block 2
1324+
v2 := []byte("pvt-value2")
1325+
pvtDataBlk2Tx1 := producePvtdata(t, 1, []string{"ns:coll"}, []string{"pvtkey2"}, [][]byte{v2})
1326+
blocksPvtData = map[uint64][]*ledger.TxPvtData{
1327+
2: {
1328+
pvtDataBlk2Tx1,
1329+
},
1330+
}
1331+
1332+
err = txMgr.RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData)
1333+
assert.NoError(t, err)
1334+
1335+
// pvt data should exist
1336+
simulator, _ = txMgr.NewTxSimulator("tx-tmp")
1337+
pvtval, err = simulator.GetPrivateData("ns", "coll", "pvtkey2")
1338+
assert.Nil(t, err)
1339+
assert.Equal(t, pvtval, v2)
1340+
simulator.Done()
1341+
1342+
blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-4",
1343+
map[string]string{"pubkey4": "pub-value4"}, nil, false)
1344+
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
1345+
// committing block 4 and should purge pvtkey2
1346+
assert.NoError(t, txMgr.Commit())
1347+
1348+
simulator, _ = txMgr.NewTxSimulator("tx-tmp")
1349+
pvtval, err = simulator.GetPrivateData("ns", "coll", "pvtkey2")
1350+
assert.NoError(t, err)
1351+
assert.Nil(t, pvtval)
1352+
simulator.Done()
1353+
}
1354+
11351355
func TestDeleteOnCursor(t *testing.T) {
11361356
cID := "cid"
11371357
env := testEnvs[0]

0 commit comments

Comments
 (0)