Skip to content

Commit b4df072

Browse files
committed
rollback: skip uncommittedPvtData of oldBlocks
As we are storing pvtData of both valid and invalid tx in the pvtdataStore, we do store missing info for both valid and invalid tx. When the reconciler fetches the missing pvtData, it passes to the ledger coordinator. As the pvtData contains data of both valid and invalid tx, we need to pass both to pvtdataStore and only valid tx's pvtData to the txmgr. Similarly, during failure and recovery, we fetch the pvtData of last updated old blocks and filter uncommitted tx pvtData while syncing the stateDB FAB-14861 #done Change-Id: I858939beba2c702df7f197aa6a6c555f4655a83b Signed-off-by: senthil <cendhu@gmail.com> (cherry picked from commit df3616a)
1 parent 3a9e33b commit b4df072

File tree

4 files changed

+151
-25
lines changed

4 files changed

+151
-25
lines changed

core/ledger/kvledger/hashcheck_pvtdata.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ import (
1717
"github.com/hyperledger/fabric/protoutil"
1818
)
1919

20-
// ConstructValidAndInvalidPvtData computes the valid pvt data and hash mismatch list
20+
// constructValidAndInvalidPvtData computes the valid pvt data and hash mismatch list
2121
// from a received pvt data list of old blocks.
22-
func ConstructValidAndInvalidPvtData(blocksPvtData []*ledger.BlockPvtData, blockStore *ledgerstorage.Store) (
22+
func constructValidAndInvalidPvtData(blocksPvtData []*ledger.BlockPvtData, blockStore *ledgerstorage.Store) (
2323
map[uint64][]*ledger.TxPvtData, []*ledger.PvtdataHashMismatch, error,
2424
) {
2525
// for each block, for each transaction, retrieve the txEnvelope to

core/ledger/kvledger/hashcheck_pvtdata_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestConstructValidInvalidBlocksPvtData(t *testing.T) {
104104
},
105105
}
106106

107-
blocksValidPvtData, hashMismatched, err := ConstructValidAndInvalidPvtData(blocksPvtData, lg.(*kvLedger).blockStore)
107+
blocksValidPvtData, hashMismatched, err := constructValidAndInvalidPvtData(blocksPvtData, lg.(*kvLedger).blockStore)
108108
assert.NoError(t, err)
109109
assert.Equal(t, len(expectedValidBlocksPvtData), len(blocksValidPvtData))
110110
assert.ElementsMatch(t, expectedValidBlocksPvtData[1], blocksValidPvtData[1])
@@ -133,7 +133,7 @@ func TestConstructValidInvalidBlocksPvtData(t *testing.T) {
133133
},
134134
}
135135

136-
blocksValidPvtData, hashMismatches, err := ConstructValidAndInvalidPvtData(blocksPvtData, lg.(*kvLedger).blockStore)
136+
blocksValidPvtData, hashMismatches, err := constructValidAndInvalidPvtData(blocksPvtData, lg.(*kvLedger).blockStore)
137137
assert.NoError(t, err)
138138
assert.Len(t, blocksValidPvtData, 0)
139139

core/ledger/kvledger/kv_ledger.go

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr"
2525
"github.com/hyperledger/fabric/core/ledger/ledgerstorage"
2626
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
27+
lutil "github.com/hyperledger/fabric/core/ledger/util"
2728
"github.com/hyperledger/fabric/protos/common"
2829
"github.com/hyperledger/fabric/protos/peer"
2930
"github.com/hyperledger/fabric/protoutil"
@@ -245,22 +246,38 @@ func (l *kvLedger) syncStateDBWithPvtdatastore() error {
245246
// of state update should not lie with the source (i.e., pvtdatastorage). A potential fix
246247
// is mentioned in FAB-12731
247248

248-
// TODO: GetLastUpdatedOldBlocksPvtData would give the pvtData of both valid and
249-
// invalid transactions. We would need only the valid transaction's pvtData. FAB-14861
250249
blocksPvtData, err := l.blockStore.GetLastUpdatedOldBlocksPvtData()
251250
if err != nil {
252251
return err
253252
}
254-
if err := l.txtmgmt.RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData); err != nil {
255-
return err
253+
254+
if l.blockStore.IsPvtStoreAheadOfBlockStore() {
255+
if err := l.filterYetToCommitBlocks(blocksPvtData); err != nil {
256+
return err
257+
}
256258
}
257-
if err := l.blockStore.ResetLastUpdatedOldBlocksList(); err != nil {
259+
260+
if err = l.applyValidTxPvtDataOfOldBlocks(blocksPvtData); err != nil {
258261
return err
259262
}
260263

261264
return nil
262265
}
263266

267+
func (l *kvLedger) filterYetToCommitBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error {
268+
info, err := l.blockStore.GetBlockchainInfo()
269+
if err != nil {
270+
return err
271+
}
272+
for blkNum := range blocksPvtData {
273+
if blkNum > info.Height-1 {
274+
logger.Infof("found pvtdata associated with yet to be committed block [%d]", blkNum)
275+
delete(blocksPvtData, blkNum)
276+
}
277+
}
278+
return nil
279+
}
280+
264281
//recommitLostBlocks retrieves blocks in specified range and commit the write set to either
265282
//state DB or history DB or both
266283
func (l *kvLedger) recommitLostBlocks(firstBlockNum uint64, lastBlockNum uint64, recoverables ...recoverable) error {
@@ -537,35 +554,44 @@ func (l *kvLedger) GetConfigHistoryRetriever() (ledger.ConfigHistoryRetriever, e
537554
func (l *kvLedger) CommitPvtDataOfOldBlocks(pvtData []*ledger.BlockPvtData) ([]*ledger.PvtdataHashMismatch, error) {
538555
logger.Debugf("[%s:] Comparing pvtData of [%d] old blocks against the hashes in transaction's rwset to find valid and invalid data",
539556
l.ledgerID, len(pvtData))
540-
// TODO: validPvtData consists of pvtData that is matching the hashed pvtData present in the
541-
// block. Now, we store pvtData of both committed and failed transactions. Hence, we need to
542-
// separate validPvtData into pvtData associated with committed tx (i.e., valid tx) and
543-
// failed tx (i.e,. invalid tx). FAB-14861
544-
validPvtData, hashMismatches, err := ConstructValidAndInvalidPvtData(pvtData, l.blockStore)
557+
558+
hashVerifiedPvtData, hashMismatches, err := constructValidAndInvalidPvtData(pvtData, l.blockStore)
545559
if err != nil {
546560
return nil, err
547561
}
548562

549563
logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the pvtdatastore", l.ledgerID, len(pvtData))
550-
err = l.blockStore.CommitPvtDataOfOldBlocks(validPvtData)
564+
err = l.blockStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData)
551565
if err != nil {
552566
return nil, err
553567
}
554568

555-
logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the stateDB", l.ledgerID, len(pvtData))
556-
// TODO: As the validPvtData will be consisting of both committed and failed tx's pvtData,
557-
// we need to pass only the pvtData of committed tx. FAB-14861
558-
err = l.txtmgmt.RemoveStaleAndCommitPvtDataOfOldBlocks(validPvtData)
569+
err = l.applyValidTxPvtDataOfOldBlocks(hashVerifiedPvtData)
559570
if err != nil {
560571
return nil, err
561572
}
562573

574+
return hashMismatches, nil
575+
}
576+
577+
func (l *kvLedger) applyValidTxPvtDataOfOldBlocks(hashVerifiedPvtData map[uint64][]*ledger.TxPvtData) error {
578+
logger.Debugf("[%s:] Filtering pvtData of invalidation transactions", l.ledgerID)
579+
committedPvtData, err := filterPvtDataOfInvalidTx(hashVerifiedPvtData, l.blockStore)
580+
if err != nil {
581+
return err
582+
}
583+
584+
logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the stateDB", l.ledgerID, len(hashVerifiedPvtData))
585+
err = l.txtmgmt.RemoveStaleAndCommitPvtDataOfOldBlocks(committedPvtData)
586+
if err != nil {
587+
return err
588+
}
589+
563590
logger.Debugf("[%s:] Clearing the bookkeeping information from pvtdatastore", l.ledgerID)
564591
if err := l.blockStore.ResetLastUpdatedOldBlocksList(); err != nil {
565-
return nil, err
592+
return err
566593
}
567-
568-
return hashMismatches, nil
594+
return nil
569595
}
570596

571597
func (l *kvLedger) GetMissingPvtDataTracker() (ledger.MissingPvtDataTracker, error) {
@@ -630,3 +656,27 @@ func (a *ccEventListenerAdaptor) HandleChaincodeDeploy(chaincodeDefinition *ledg
630656
func (a *ccEventListenerAdaptor) ChaincodeDeployDone(succeeded bool) {
631657
a.legacyEventListener.ChaincodeDeployDone(succeeded)
632658
}
659+
660+
func filterPvtDataOfInvalidTx(hashVerifiedPvtData map[uint64][]*ledger.TxPvtData, blockStore *ledgerstorage.Store) (map[uint64][]*ledger.TxPvtData, error) {
661+
committedPvtData := make(map[uint64][]*ledger.TxPvtData)
662+
for blkNum, txsPvtData := range hashVerifiedPvtData {
663+
664+
// TODO: Instead of retrieving the whole block, we need to retireve only
665+
// the TxValidationFlags from the block metadata. For that, we would need
666+
// to add a new index for the block metadata. FAB- FAB-15808
667+
block, err := blockStore.RetrieveBlockByNumber(blkNum)
668+
if err != nil {
669+
return nil, err
670+
}
671+
blockValidationFlags := lutil.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
672+
673+
var blksPvtData []*ledger.TxPvtData
674+
for _, pvtData := range txsPvtData {
675+
if blockValidationFlags.IsValid(int(pvtData.SeqInBlock)) {
676+
blksPvtData = append(blksPvtData, pvtData)
677+
}
678+
}
679+
committedPvtData[blkNum] = blksPvtData
680+
}
681+
return committedPvtData, nil
682+
}

core/ledger/kvledger/kv_ledger_test.go

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,11 +457,9 @@ func testSyncStateDBWithPvtdatastore(t *testing.T) {
457457
map[string]uint64{"coll": 0},
458458
conf,
459459
)
460-
defer provider.Close()
461460
testLedgerid := "testLedger"
462461
bg, gb := testutil.NewBlockGenerator(t, testLedgerid, false)
463462
ledger, _ := provider.Create(gb)
464-
defer ledger.Close()
465463

466464
// create and commit two data block (both with missing pvtdata)
467465
blockAndPvtdata1, pvtdata1 := prepareNextBlockWithMissingPvtDataForTest(t, ledger, bg, "SimulateForBlk1",
@@ -476,28 +474,75 @@ func testSyncStateDBWithPvtdatastore(t *testing.T) {
476474

477475
assert.NoError(t, ledger.CommitWithPvtData(blockAndPvtdata2, &lgr.CommitOptions{}))
478476

477+
blockAndPvtdata3, pvtdata3 := prepareNextBlockWithMissingPvtDataForTest(t, ledger, bg, "SimulateForBlk3",
478+
map[string]string{"key1": "value1.3", "key2": "value2.3", "key3": "value3.3"},
479+
map[string]string{"key1": "pvtValue1.3", "key2": "pvtValue2.3", "key3": "pvtValue3.3"})
480+
481+
blockAndPvtdata3.Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER][0] = byte(peer.TxValidationCode_PHANTOM_READ_CONFLICT)
482+
assert.NoError(t, ledger.CommitWithPvtData(blockAndPvtdata3, &lgr.CommitOptions{}))
483+
484+
blockAndPvtdata4, pvtdata4 := prepareNextBlockWithMissingPvtDataForTest(t, ledger, bg, "SimulateForBlk4",
485+
map[string]string{"key4": "value4"},
486+
map[string]string{"key4": "pvtValue4"})
487+
488+
assert.NoError(t, ledger.CommitWithPvtData(blockAndPvtdata4, &lgr.CommitOptions{}))
489+
490+
blockAndPvtdata5, pvtdata5 := prepareNextBlockWithMissingPvtDataForTest(t, ledger, bg, "SimulateForBlk5",
491+
map[string]string{"key5": "value4"},
492+
map[string]string{"key5": "pvtValue5"})
493+
494+
assert.NoError(t, ledger.CommitWithPvtData(blockAndPvtdata5, &lgr.CommitOptions{}))
495+
479496
txSim, err := ledger.NewTxSimulator("test")
480497
assert.NoError(t, err)
481498
value, err := txSim.GetPrivateData("ns", "coll", "key1")
482499
_, ok := err.(*txmgr.ErrPvtdataNotAvailable)
483500
assert.True(t, ok)
484501
assert.Nil(t, value)
485502

503+
value, err = txSim.GetPrivateData("ns", "coll", "key4")
504+
_, ok = err.(*txmgr.ErrPvtdataNotAvailable)
505+
assert.True(t, ok)
506+
assert.Nil(t, value)
507+
508+
value, err = txSim.GetPrivateData("ns", "coll", "key5")
509+
_, ok = err.(*txmgr.ErrPvtdataNotAvailable)
510+
assert.True(t, ok)
511+
assert.Nil(t, value)
512+
486513
blocksPvtData := map[uint64][]*lgr.TxPvtData{
487514
1: {
488515
pvtdata1,
489516
},
490517
2: {
491518
pvtdata2,
492519
},
520+
3: {
521+
pvtdata3,
522+
},
523+
4: {
524+
pvtdata4,
525+
},
526+
5: {
527+
pvtdata5,
528+
},
493529
}
494530

495531
assert.NoError(t, ledger.(*kvLedger).blockStore.CommitPvtDataOfOldBlocks(blocksPvtData))
532+
// ensure that the pvtdata of the invalid transaction in block 3 got stored in the
533+
// pvtdataStore
534+
pvtdata, _ := ledger.GetPvtDataByNum(3, nil)
535+
assert.NotNil(t, pvtdata)
536+
assert.Equal(t, 1, len(pvtdata))
537+
assert.True(t, pvtdata[0].Has("ns", "coll"))
496538

497539
// Now, assume that peer fails here before committing the pvtData to stateDB
498540
ledger.Close()
499541
provider.Close()
500542

543+
// Rollback to block 3
544+
RollbackKVLedger(conf.RootFSPath, testLedgerid, 3)
545+
501546
// Here the peer comes online and calls NewKVLedger to get a handler for the ledger
502547
// StateDB and HistoryDB should be recovered before returning from NewKVLedger call
503548
provider = testutilNewProviderWithCollectionConfig(
@@ -506,13 +551,44 @@ func testSyncStateDBWithPvtdatastore(t *testing.T) {
506551
map[string]uint64{"coll": 0},
507552
conf,
508553
)
509-
ledger, _ = provider.Open(testLedgerid)
554+
defer provider.Close()
555+
556+
ledger, err = provider.Open(testLedgerid)
557+
assert.NoError(t, err)
558+
defer ledger.Close()
510559

511560
txSim, err = ledger.NewTxSimulator("test")
512561
assert.NoError(t, err)
513562
value, err = txSim.GetPrivateData("ns", "coll", "key1")
514563
assert.NoError(t, err)
564+
// the value should match the string provided in the block 2 (i.e., pvtValue1.2)
565+
// rather than block 3 (i.e., pvtValue1.3)
515566
assert.Equal(t, value, []byte("pvtValue1.2"))
567+
568+
// block 4 is not committed yet
569+
value, err = txSim.GetPrivateData("ns", "coll", "key4")
570+
assert.Nil(t, err)
571+
assert.Nil(t, value)
572+
573+
// block 5 is not committed yet
574+
value, err = txSim.GetPrivateData("ns", "coll", "key5")
575+
assert.Nil(t, err)
576+
assert.Nil(t, value)
577+
txSim.Done()
578+
579+
// recommit block 4 & 5
580+
assert.NoError(t, ledger.CommitWithPvtData(blockAndPvtdata4, &lgr.CommitOptions{FetchPvtDataFromLedger: true}))
581+
assert.NoError(t, ledger.CommitWithPvtData(blockAndPvtdata5, &lgr.CommitOptions{FetchPvtDataFromLedger: true}))
582+
583+
txSim, err = ledger.NewTxSimulator("test")
584+
assert.NoError(t, err)
585+
value, err = txSim.GetPrivateData("ns", "coll", "key4")
586+
assert.NoError(t, err)
587+
assert.Equal(t, value, []byte("pvtValue4"))
588+
589+
value, err = txSim.GetPrivateData("ns", "coll", "key5")
590+
assert.NoError(t, err)
591+
assert.Equal(t, value, []byte("pvtValue5"))
516592
}
517593

518594
func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {

0 commit comments

Comments
 (0)