Skip to content

Commit b558893

Browse files
committed
[FAB-16026] Implement PvtdataProvider
A PvtdataProvider takes numerous dependencies from gossip's coordinator required for retrieving private data. Ledger will use the provider at commit time to retrieve necessary private data for the queried ledger.TxPvtdataInfo and then purge txs from the transient store through invocation of the Purge() method on the RetrievedPvtdata Change-Id: I3c72e6dd1c0620eaee8d8dc9fae75cef7e2b7059 Signed-off-by: Danny Cao <dcao@us.ibm.com>
1 parent d55d200 commit b558893

File tree

8 files changed

+1402
-37
lines changed

8 files changed

+1402
-37
lines changed

core/ledger/ledger_interface.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,30 @@ type Hasher interface {
646646
Hash(msg []byte, opts bccsp.HashOpts) (hash []byte, err error)
647647
}
648648

649+
// TxPvtdataInfo captures information about the requested private data to be retrieved
650+
// and is populated by ledger during commit
651+
type TxPvtdataInfo struct {
652+
TxID string
653+
Invalid bool
654+
SeqInBlock uint64
655+
CollectionPvtdataInfo []*CollectionPvtdataInfo
656+
}
657+
658+
// CollectionPvtdataInfo contains information about the private data for a given collection
659+
type CollectionPvtdataInfo struct {
660+
Namespace, Collection string
661+
ExpectedHash []byte
662+
CollectionConfig *common.StaticCollectionConfig
663+
Endorsers []*peer.Endorsement
664+
}
665+
666+
// BlockPvtdata contains the retrieved private data as well as missing and ineligible
667+
// private data for use at commit time
668+
type BlockPvtdata struct {
669+
PvtData TxPvtDataMap
670+
MissingPvtData TxMissingPvtDataMap
671+
}
672+
649673
//go:generate counterfeiter -o mock/state_listener.go -fake-name StateListener . StateListener
650674
//go:generate counterfeiter -o mock/query_executor.go -fake-name QueryExecutor . QueryExecutor
651675
//go:generate counterfeiter -o mock/tx_simulator.go -fake-name TxSimulator . TxSimulator

gossip/privdata/coordinator.go

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
260260
}
261261

262262
// populate missing RWSets for ineligible collections to be passed to the ledger
263-
for _, missingRWS := range privateInfo.missingRWSButIneligible {
263+
for missingRWS := range privateInfo.missingRWSButIneligible {
264264
blockAndPvtData.MissingPvtData.Add(missingRWS.seqInBlock, missingRWS.namespace, missingRWS.collection, false)
265265
}
266266

@@ -294,7 +294,7 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
294294
return nil
295295
}
296296

297-
func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) {
297+
func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *pvtdataInfo) {
298298
dig2src := make(map[privdatacommon.DigKey][]*peer.Endorsement)
299299
privateInfo.missingKeys.foreach(func(k rwSetKey) {
300300
logger.Debug("Fetching", k, "from peers")
@@ -661,16 +661,16 @@ func endorsersFromOrgs(ns string, col string, endorsers []*peer.Endorsement, org
661661
return res
662662
}
663663

664-
type privateDataInfo struct {
664+
type pvtdataInfo struct {
665665
sources map[rwSetKey][]*peer.Endorsement
666666
missingKeysByTxIDs rwSetKeysByTxIDs
667667
missingKeys rwsetKeys
668668
txns txns
669-
missingRWSButIneligible []rwSetKey
669+
missingRWSButIneligible rwsetKeys
670670
}
671671

672672
// listMissingPrivateData identifies missing private write sets and attempts to retrieve them from local transient store
673-
func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (*privateDataInfo, error) {
673+
func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (*pvtdataInfo, error) {
674674
if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {
675675
return nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")
676676
}
@@ -680,27 +680,29 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
680680
}
681681

682682
sources := make(map[rwSetKey][]*peer.Endorsement)
683-
privateRWsetsInBlock := make(map[rwSetKey]struct{})
683+
requestedEligiblePrivateRWSets := make(map[rwSetKey]struct{})
684684
missing := make(rwSetKeysByTxIDs)
685+
missingButIneligible := make(rwSetKeysByTxIDs)
685686
data := blockData(block.Data.Data)
686687
bi := &transactionInspector{
687-
sources: sources,
688-
missingKeys: missing,
689-
ownedRWsets: ownedRWsets,
690-
privateRWsetsInBlock: privateRWsetsInBlock,
691-
coordinator: c,
688+
sources: sources,
689+
missingKeys: missing,
690+
ownedRWsets: ownedRWsets,
691+
requestedEligiblePrivateRWSets: requestedEligiblePrivateRWSets,
692+
coordinator: c,
693+
missingRWSButIneligible: missingButIneligible,
692694
}
693695
storePvtDataOfInvalidTx := c.Support.CapabilityProvider.Capabilities().StorePvtDataOfInvalidTx()
694696
txList, err := data.forEachTxn(storePvtDataOfInvalidTx, txsFilter, bi.inspectTransaction)
695697
if err != nil {
696698
return nil, err
697699
}
698700

699-
privateInfo := &privateDataInfo{
701+
privateInfo := &pvtdataInfo{
700702
sources: sources,
701703
missingKeysByTxIDs: missing,
704+
missingRWSButIneligible: bi.missingRWSButIneligible.flatten(),
702705
txns: txList,
703-
missingRWSButIneligible: bi.missingRWSButIneligible,
704706
}
705707

706708
logger.Debug("Retrieving private write sets for", len(privateInfo.missingKeysByTxIDs), "transactions from transient store")
@@ -709,9 +711,9 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
709711
c.fetchMissingFromTransientStore(privateInfo.missingKeysByTxIDs, ownedRWsets)
710712

711713
// In the end, iterate over the ownedRWsets, and if the key doesn't exist in
712-
// the privateRWsetsInBlock - delete it from the ownedRWsets
714+
// the requestedEligiblePrivateRWSets - delete it from the ownedRWsets
713715
for k := range ownedRWsets {
714-
if _, exists := privateRWsetsInBlock[k]; !exists {
716+
if _, exists := requestedEligiblePrivateRWSets[k]; !exists {
715717
logger.Warning("Removed", k.namespace, k.collection, "hash", k.hash, "from the data passed to the ledger")
716718
delete(ownedRWsets, k)
717719
}
@@ -729,11 +731,11 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
729731

730732
type transactionInspector struct {
731733
*coordinator
732-
privateRWsetsInBlock map[rwSetKey]struct{}
733-
missingKeys rwSetKeysByTxIDs
734-
sources map[rwSetKey][]*peer.Endorsement
735-
ownedRWsets map[rwSetKey][]byte
736-
missingRWSButIneligible []rwSetKey
734+
requestedEligiblePrivateRWSets rwsetKeys
735+
missingKeys rwSetKeysByTxIDs
736+
sources map[rwSetKey][]*peer.Endorsement
737+
ownedRWsets map[rwSetKey][]byte
738+
missingRWSButIneligible rwSetKeysByTxIDs
737739
}
738740

739741
func (bi *transactionInspector) inspectTransaction(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, endorsers []*peer.Endorsement) error {
@@ -766,20 +768,20 @@ func (bi *transactionInspector) inspectTransaction(seqInBlock uint64, chdr *comm
766768
collection: hashedCollection.CollectionName,
767769
}
768770

771+
txAndSeq := txAndSeqInBlock{
772+
txID: chdr.TxId,
773+
seqInBlock: seqInBlock,
774+
}
769775
if !bi.isEligible(policy, ns.NameSpace, hashedCollection.CollectionName) {
770776
logger.Debugf("Peer is not eligible for collection, channel [%s], chaincode [%s], "+
771777
"collection name [%s], txID [%s] the policy is [%#v]. Skipping.",
772778
chdr.ChannelId, ns.NameSpace, hashedCollection.CollectionName, chdr.TxId, policy)
773-
bi.missingRWSButIneligible = append(bi.missingRWSButIneligible, key)
779+
bi.missingRWSButIneligible[txAndSeq] = append(bi.missingRWSButIneligible[txAndSeq], key)
774780
continue
775781
}
776782

777-
bi.privateRWsetsInBlock[key] = struct{}{}
783+
bi.requestedEligiblePrivateRWSets[key] = struct{}{}
778784
if _, exists := bi.ownedRWsets[key]; !exists {
779-
txAndSeq := txAndSeqInBlock{
780-
txID: chdr.TxId,
781-
seqInBlock: seqInBlock,
782-
}
783785
bi.missingKeys[txAndSeq] = append(bi.missingKeys[txAndSeq], key)
784786
bi.sources[key] = endorsersFromOrgs(ns.NameSpace, hashedCollection.CollectionName, endorsers, policy.MemberOrgs())
785787
}

gossip/privdata/coordinator_test.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"github.com/hyperledger/fabric-protos-go/msp"
2626
"github.com/hyperledger/fabric-protos-go/peer"
2727
tspb "github.com/hyperledger/fabric-protos-go/transientstore"
28-
"github.com/hyperledger/fabric/bccsp/factory"
2928
"github.com/hyperledger/fabric/common/metrics/disabled"
3029
util2 "github.com/hyperledger/fabric/common/util"
3130
"github.com/hyperledger/fabric/core/common/privdata"
@@ -43,10 +42,6 @@ import (
4342
"github.com/stretchr/testify/mock"
4443
)
4544

46-
func init() {
47-
factory.InitFactories(nil)
48-
}
49-
5045
var testConfig = CoordinatorConfig{
5146
PullRetryThreshold: time.Second * 3,
5247
TransientBlockRetention: 1000,
@@ -138,18 +133,18 @@ func (f *fetcherMock) On(methodName string, arguments ...interface{}) *fetchCall
138133
}
139134

140135
func (f *fetcherMock) fetch(dig2src dig2sources) (*privdatacommon.FetchedPvtDataContainer, error) {
136+
uniqueEndorsements := make(map[string]interface{})
141137
for _, endorsements := range dig2src {
142138
for _, endorsement := range endorsements {
143139
_, exists := f.expectedEndorsers[string(endorsement.Endorser)]
144140
if !exists {
145141
f.t.Fatalf("Encountered a non-expected endorser: %s", string(endorsement.Endorser))
146142
}
147-
// Else, it exists, so delete it so we will end up with an empty expected map at the end of the call
148-
delete(f.expectedEndorsers, string(endorsement.Endorser))
143+
uniqueEndorsements[string(endorsement.Endorser)] = struct{}{}
149144
}
150145
}
151-
assert.True(f.t, digests(dig2src.keys()).Equal(digests(f.expectedDigests)))
152-
assert.Empty(f.t, f.expectedEndorsers)
146+
assert.True(f.t, digests(f.expectedDigests).Equal(digests(dig2src.keys())))
147+
assert.Equal(f.t, len(f.expectedEndorsers), len(uniqueEndorsements))
153148
args := f.Called(dig2src)
154149
if args.Get(1) == nil {
155150
return args.Get(0).(*privdatacommon.FetchedPvtDataContainer), nil

gossip/privdata/mocks/sleeper.go

Lines changed: 72 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)