From bfc762b01cc8ca239984560a5deec26c7b6556b9 Mon Sep 17 00:00:00 2001 From: Danny Cao Date: Tue, 7 Jan 2020 04:32:41 -0500 Subject: [PATCH] [FAB-17184] Skip pulling from remote peers if all remaining missing rwsets (#365) are invalid when peer is configured to skip pulling invalid transactions * Prior to this change pvtdataprovider would enter the pull retry loop and still attempt to fetch from peers even if all missing rwsets were invalid Signed-off-by: Danny Cao Signed-off-by: Jay Guo --- gossip/privdata/pvtdataprovider.go | 20 +++-- gossip/privdata/pvtdataprovider_test.go | 98 +++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 8 deletions(-) diff --git a/gossip/privdata/pvtdataprovider.go b/gossip/privdata/pvtdataprovider.go index dca794a4dc2..875a1e8161c 100644 --- a/gossip/privdata/pvtdataprovider.go +++ b/gossip/privdata/pvtdataprovider.go @@ -137,7 +137,6 @@ func (ec *eligibilityComputer) computeEligibility(pvtdataToRetrieve []*ledger.Tx eligibleMissingKeys[key] = rwsetInfo{ invalid: invalid, } - sources[key] = endorsersFromEligibleOrgs(ns, col, endorsers, policy.MemberOrgs()) } } @@ -225,13 +224,9 @@ func (pdp *PvtdataProvider) RetrievePvtdata(pvtdataToRetrieve []*ledger.TxPvtdat pdp.logger.Debugf("Fetching %d collection private write sets from remote peers for a maximum duration of %s", len(pvtdataRetrievalInfo.eligibleMissingKeys), retryThresh) startPull := time.Now() for len(pvtdataRetrievalInfo.eligibleMissingKeys) > 0 && time.Since(startPull) < retryThresh { - pdp.populateFromRemotePeers(pvtdata, pvtdataRetrievalInfo) - - // If succeeded to fetch everything, break to skip sleep - if len(pvtdataRetrievalInfo.eligibleMissingKeys) == 0 { + if needToRetry := pdp.populateFromRemotePeers(pvtdata, pvtdataRetrievalInfo); !needToRetry { break } - // If there are still missing keys, sleep before retry pdp.sleeper.Sleep(pullRetrySleepInterval) } @@ -344,13 +339,15 @@ func (pdp *PvtdataProvider) populateFromTransientStore(pvtdata rwsetByKeys, pvtd // populateFromRemotePeers populates pvtdata with data fetched from remote peers and updates // pvtdataRetrievalInfo by removing missing data that was fetched from remote peers -func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) { +func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdataRetrievalInfo *pvtdataRetrievalInfo) bool { pdp.logger.Debugf("Attempting to retrieve %d private write sets from remote peers.", len(pvtdataRetrievalInfo.eligibleMissingKeys)) dig2src := make(map[pvtdatacommon.DigKey][]*peer.Endorsement) + var skipped int for k, v := range pvtdataRetrievalInfo.eligibleMissingKeys { if v.invalid && pdp.skipPullingInvalidTransactions { pdp.logger.Debugf("Skipping invalid key [%v] because peer is configured to skip pulling rwsets of invalid transactions.", k) + skipped++ continue } pdp.logger.Debugf("Fetching [%v] from remote peers", k) @@ -363,10 +360,15 @@ func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdata } dig2src[dig] = pvtdataRetrievalInfo.sources[k] } + + if len(dig2src) == 0 { + return false + } + fetchedData, err := pdp.fetcher.fetch(dig2src) if err != nil { pdp.logger.Warningf("Failed fetching private data from remote peers for dig2src:[%v], err: %s", dig2src, err) - return + return true } // Iterate over data fetched from remote peers @@ -409,6 +411,8 @@ func (pdp *PvtdataProvider) populateFromRemotePeers(pvtdata rwsetByKeys, pvtdata } } } + + return len(pvtdataRetrievalInfo.eligibleMissingKeys) > skipped } // prepareBlockPvtdata consolidates the fetched private data as well as ineligible and eligible diff --git a/gossip/privdata/pvtdataprovider_test.go b/gossip/privdata/pvtdataprovider_test.go index 15a36c7280d..6d6596b4437 100644 --- a/gossip/privdata/pvtdataprovider_test.go +++ b/gossip/privdata/pvtdataprovider_test.go @@ -984,6 +984,104 @@ func TestRetryFetchFromPeer(t *testing.T) { assert.Equal(t, fakeSleeper.SleepArgsForCall(0), pullRetrySleepInterval) } +func TestSkipPullingAllInvalidTransactions(t *testing.T) { + err := msptesttools.LoadMSPSetupForTesting() + require.NoError(t, err, fmt.Sprintf("Failed to setup local msp for testing, got err %s", err)) + + identity := mspmgmt.GetLocalSigningIdentityOrPanic(factory.GetDefault()) + serializedID, err := identity.Serialize() + require.NoError(t, err, fmt.Sprintf("Serialize should have succeeded, got err %s", err)) + data := []byte{1, 2, 3} + signature, err := identity.Sign(data) + require.NoError(t, err, fmt.Sprintf("Could not sign identity, got err %s", err)) + peerSelfSignedData := protoutil.SignedData{ + Identity: serializedID, + Signature: signature, + Data: data, + } + endorser := protoutil.MarshalOrPanic(&mspproto.SerializedIdentity{ + Mspid: identity.GetMSPIdentifier(), + IdBytes: []byte(fmt.Sprintf("p0%s", identity.GetMSPIdentifier())), + }) + + ts := testSupport{ + preHash: []byte("rws-pre-image"), + hash: util2.ComputeSHA256([]byte("rws-pre-image")), + channelID: "testchannelid", + blockNum: uint64(1), + endorsers: []string{identity.GetMSPIdentifier()}, + peerSelfSignedData: peerSelfSignedData, + } + + ns1c1 := collectionPvtdataInfoFromTemplate("ns1", "c1", identity.GetMSPIdentifier(), ts.hash, endorser, signature) + ns1c2 := collectionPvtdataInfoFromTemplate("ns1", "c2", identity.GetMSPIdentifier(), ts.hash, endorser, signature) + + tempdir, err := ioutil.TempDir("", "ts") + require.NoError(t, err, fmt.Sprintf("Failed to create test directory, got err %s", err)) + storeProvider, err := transientstore.NewStoreProvider(tempdir) + require.NoError(t, err, fmt.Sprintf("Failed to create store provider, got err %s", err)) + store, err := storeProvider.OpenStore(ts.channelID) + require.NoError(t, err, fmt.Sprintf("Failed to open store, got err %s", err)) + + defer storeProvider.Close() + defer os.RemoveAll(tempdir) + + storePvtdataOfInvalidTx := true + skipPullingInvalidTransactions := true + rwSetsInCache := []rwSet{} + rwSetsInTransientStore := []rwSet{} + rwSetsInPeer := []rwSet{} + expectedDigKeys := []privdatacommon.DigKey{} + expectedBlockPvtdata := &ledger.BlockPvtdata{ + PvtData: ledger.TxPvtDataMap{}, + MissingPvtData: ledger.TxMissingPvtDataMap{ + 1: []*ledger.MissingPvtData{ + { + Namespace: "ns1", + Collection: "c1", + IsEligible: true, + }, + { + Namespace: "ns1", + Collection: "c2", + IsEligible: true, + }, + }, + }, + } + pvtdataToRetrieve := []*ledger.TxPvtdataInfo{ + { + TxID: "tx1", + Invalid: true, + SeqInBlock: 1, + CollectionPvtdataInfo: []*ledger.CollectionPvtdataInfo{ + ns1c1, + ns1c2, + }, + }, + } + pdp := setupPrivateDataProvider(t, ts, testConfig, + storePvtdataOfInvalidTx, skipPullingInvalidTransactions, store, + rwSetsInCache, rwSetsInTransientStore, rwSetsInPeer, + expectedDigKeys) + require.NotNil(t, pdp) + + fakeSleeper := &mocks.Sleeper{} + SetSleeper(pdp, fakeSleeper) + newFetcher := &fetcherMock{t: t} + pdp.fetcher = newFetcher + + retrievedPvtdata, err := pdp.RetrievePvtdata(pvtdataToRetrieve) + assert.NoError(t, err) + + blockPvtdata := sortBlockPvtdata(retrievedPvtdata.GetBlockPvtdata()) + assert.Equal(t, expectedBlockPvtdata, blockPvtdata) + + // Check sleep and fetch were never called + assert.Equal(t, fakeSleeper.SleepCallCount(), 0) + assert.Len(t, newFetcher.Calls, 0) +} + func TestRetrievedPvtdataPurgeBelowHeight(t *testing.T) { conf := testConfig conf.TransientBlockRetention = 5