Skip to content

Commit

Permalink
[FAB-17184] Skip pulling from remote peers if all remaining missing r…
Browse files Browse the repository at this point in the history
…wsets (#365)

are invalid when peer is configured to skip pulling invalid transactions

* Prior to this change pvtdataprovider would enter the pull retry loop
and still attempt to fetch from peers even if all missing rwsets were
invalid

Signed-off-by: Danny Cao <dcao@us.ibm.com>
Signed-off-by: Jay Guo <guojiannan1101@gmail.com>
  • Loading branch information
caod123 authored and C0rWin committed Jan 7, 2020
1 parent ff7c083 commit bfc762b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 8 deletions.
20 changes: 12 additions & 8 deletions gossip/privdata/pvtdataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ func (ec *eligibilityComputer) computeEligibility(pvtdataToRetrieve []*ledger.Tx
eligibleMissingKeys[key] = rwsetInfo{
invalid: invalid,
}

sources[key] = endorsersFromEligibleOrgs(ns, col, endorsers, policy.MemberOrgs())
}
}
Expand Down Expand Up @@ -225,13 +224,9 @@ func (pdp *PvtdataProvider) RetrievePvtdata(pvtdataToRetrieve []*ledger.TxPvtdat
pdp.logger.Debugf("Fetching %d collection private write sets from remote peers for a maximum duration of %s", len(pvtdataRetrievalInfo.eligibleMissingKeys), retryThresh)
startPull := time.Now()
for len(pvtdataRetrievalInfo.eligibleMissingKeys) > 0 && time.Since(startPull) < retryThresh {
pdp.populateFromRemotePeers(pvtdata, pvtdataRetrievalInfo)

// If succeeded to fetch everything, break to skip sleep
if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 {
if needToRetry := pdp.populateFromRemotePeers(pvtdata, pvtdataRetrievalInfo); !needToRetry {
break
}

// If there are still missing keys, sleep before retry
pdp.sleeper.Sleep(pullRetrySleepInterval)
}
Expand Down Expand Up @@ -344,13 +339,15 @@ func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtd

// populateFromRemotePeers populates pvtdata with data fetched from remote peers and updates
// pvtdataRetrievalInfo by removing missing data that was fetched from remote peers
func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) {
func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) bool {
pdp.logger.Debugf("Attempting to retrieve %d private write sets from remote peers.", len(pvtdataRetrievalInfo.eligibleMissingKeys))

dig2src := make(map[pvtdatacommon.DigKey][]*peer.Endorsement)
var skipped int
for k, v := range pvtdataRetrievalInfo.eligibleMissingKeys {
if v.invalid && pdp.skipPullingInvalidTransactions {
pdp.logger.Debugf("Skipping invalid key [%v] because peer is configured to skip pulling rwsets of invalid transactions.", k)
skipped++
continue
}
pdp.logger.Debugf("Fetching [%v] from remote peers", k)
Expand All @@ -363,10 +360,15 @@ func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdata
}
dig2src[dig] = pvtdataRetrievalInfo.sources[k]
}

if len(dig2src) == 0 {
return false
}

fetchedData, err := pdp.fetcher.fetch(dig2src)
if err != nil {
pdp.logger.Warningf("Failed fetching private data from remote peers for dig2src:[%v], err: %s", dig2src, err)
return
return true
}

// Iterate over data fetched from remote peers
Expand Down Expand Up @@ -409,6 +411,8 @@ func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdata
}
}
}

return len(pvtdataRetrievalInfo.eligibleMissingKeys) > skipped
}

// prepareBlockPvtdata consolidates the fetched private data as well as ineligible and eligible
Expand Down
98 changes: 98 additions & 0 deletions gossip/privdata/pvtdataprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,104 @@ func TestRetryFetchFromPeer(t *testing.T) {
assert.Equal(t, fakeSleeper.SleepArgsForCall(0), pullRetrySleepInterval)
}

func TestSkipPullingAllInvalidTransactions(t *testing.T) {
err := msptesttools.LoadMSPSetupForTesting()
require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err))

identity := mspmgmt.GetLocalSigningIdentityOrPanic(factory.GetDefault())
serializedID, err := identity.Serialize()
require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err))
data := []byte{1, 2, 3}
signature, err := identity.Sign(data)
require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err))
peerSelfSignedData := protoutil.SignedData{
Identity: serializedID,
Signature: signature,
Data: data,
}
endorser := protoutil.MarshalOrPanic(&mspproto.SerializedIdentity{
Mspid: identity.GetMSPIdentifier(),
IdBytes: []byte(fmt.Sprintf("p0%s", identity.GetMSPIdentifier())),
})

ts := testSupport{
preHash: []byte("rws-pre-image"),
hash: util2.ComputeSHA256([]byte("rws-pre-image")),
channelID: "testchannelid",
blockNum: uint64(1),
endorsers: []string{identity.GetMSPIdentifier()},
peerSelfSignedData: peerSelfSignedData,
}

ns1c1 := collectionPvtdataInfoFromTemplate("ns1", "c1", identity.GetMSPIdentifier(), ts.hash, endorser, signature)
ns1c2 := collectionPvtdataInfoFromTemplate("ns1", "c2", identity.GetMSPIdentifier(), ts.hash, endorser, signature)

tempdir, err := ioutil.TempDir("", "ts")
require.NoError(t, err, fmt.Sprintf("Failed to create test directory, got err %s", err))
storeProvider, err := transientstore.NewStoreProvider(tempdir)
require.NoError(t, err, fmt.Sprintf("Failed to create store provider, got err %s", err))
store, err := storeProvider.OpenStore(ts.channelID)
require.NoError(t, err, fmt.Sprintf("Failed to open store, got err %s", err))

defer storeProvider.Close()
defer os.RemoveAll(tempdir)

storePvtdataOfInvalidTx := true
skipPullingInvalidTransactions := true
rwSetsInCache := []rwSet{}
rwSetsInTransientStore := []rwSet{}
rwSetsInPeer := []rwSet{}
expectedDigKeys := []privdatacommon.DigKey{}
expectedBlockPvtdata := &ledger.BlockPvtdata{
PvtData: ledger.TxPvtDataMap{},
MissingPvtData: ledger.TxMissingPvtDataMap{
1: []*ledger.MissingPvtData{
{
Namespace: "ns1",
Collection: "c1",
IsEligible: true,
},
{
Namespace: "ns1",
Collection: "c2",
IsEligible: true,
},
},
},
}
pvtdataToRetrieve := []*ledger.TxPvtdataInfo{
{
TxID: "tx1",
Invalid: true,
SeqInBlock: 1,
CollectionPvtdataInfo: []*ledger.CollectionPvtdataInfo{
ns1c1,
ns1c2,
},
},
}
pdp := setupPrivateDataProvider(t, ts, testConfig,
storePvtdataOfInvalidTx, skipPullingInvalidTransactions, store,
rwSetsInCache, rwSetsInTransientStore, rwSetsInPeer,
expectedDigKeys)
require.NotNil(t, pdp)

fakeSleeper := &mocks.Sleeper{}
SetSleeper(pdp, fakeSleeper)
newFetcher := &fetcherMock{t: t}
pdp.fetcher = newFetcher

retrievedPvtdata, err := pdp.RetrievePvtdata(pvtdataToRetrieve)
assert.NoError(t, err)

blockPvtdata := sortBlockPvtdata(retrievedPvtdata.GetBlockPvtdata())
assert.Equal(t, expectedBlockPvtdata, blockPvtdata)

// Check sleep and fetch were never called
assert.Equal(t, fakeSleeper.SleepCallCount(), 0)
assert.Len(t, newFetcher.Calls, 0)
}

func TestRetrievedPvtdataPurgeBelowHeight(t *testing.T) {
conf := testConfig
conf.TransientBlockRetention = 5
Expand Down

0 comments on commit bfc762b

Please sign in to comment.