Skip to content

Commit

Permalink
[FAB-11068] Resolve additional endorsers for pvt data
Browse files Browse the repository at this point in the history
Logic is added to the Endorsement Handler in the channel
client to automatically select additional endorsers based
on the hashed private data RWSets in the proposal response.
If additional private data collections are found, then the
Selection Service recalculates the required set of endorsers
and requests are sent to those additional endorsers.

Change-Id: Iff2fc3fa0dcfb2bd8536ff3889c7a8cb9e310ac2
Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Jul 7, 2018
1 parent 4d8c403 commit 47c8f80
Show file tree
Hide file tree
Showing 15 changed files with 887 additions and 257 deletions.
191 changes: 138 additions & 53 deletions pkg/client/channel/invoke/selectendorsehandler.go
Expand Up @@ -8,6 +8,7 @@ package invoke

import (
selectopts "github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/errors/retry"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/peer"
Expand Down Expand Up @@ -64,23 +65,24 @@ func (e *SelectAndEndorseHandler) Handle(requestContext *RequestContext, clientC
if len(targets) == 0 && len(requestContext.Response.Responses) > 0 {
additionalEndorsers, err := getAdditionalEndorsers(requestContext, clientContext, ccCalls)
if err != nil {
requestContext.Error = errors.WithMessage(err, "error getting additional endorsers")
return
}

if len(additionalEndorsers) > 0 {
requestContext.Opts.Targets = additionalEndorsers
logger.Debugf("...getting additional endorsements from %d target(s)", len(additionalEndorsers))
additionalResponses, err := clientContext.Transactor.SendTransactionProposal(requestContext.Response.Proposal, peer.PeersToTxnProcessors(additionalEndorsers))
if err != nil {
requestContext.Error = errors.WithMessage(err, "error sending transaction proposal")
return
}

// Add the new endorsements to the list of responses
requestContext.Response.Responses = append(requestContext.Response.Responses, additionalResponses...)
// Log a warning. No need to fail the endorsement. Use the responses collected so far,
// which may be sufficient to satisfy the chaincode policy.
logger.Warnf("error getting additional endorsers: %s", err)
} else {
logger.Debugf("...no additional endorsements are required.")
if len(additionalEndorsers) > 0 {
requestContext.Opts.Targets = additionalEndorsers
logger.Debugf("...getting additional endorsements from %d target(s)", len(additionalEndorsers))
additionalResponses, err := clientContext.Transactor.SendTransactionProposal(requestContext.Response.Proposal, peer.PeersToTxnProcessors(additionalEndorsers))
if err != nil {
requestContext.Error = errors.WithMessage(err, "error sending transaction proposal")
return
}

// Add the new endorsements to the list of responses
requestContext.Response.Responses = append(requestContext.Response.Responses, additionalResponses...)
} else {
logger.Debugf("...no additional endorsements are required.")
}
}
}

Expand All @@ -103,43 +105,52 @@ func NewChainedCCFilter(filters ...CCFilter) CCFilter {
}
}

func getEndorsers(requestContext *RequestContext, clientContext *ClientContext) (ccCalls []*fab.ChaincodeCall, peers []fab.Peer, err error) {
func getEndorsers(requestContext *RequestContext, clientContext *ClientContext, opts ...options.Opt) ([]*fab.ChaincodeCall, []fab.Peer, error) {
var selectionOpts []options.Opt
selectionOpts = append(selectionOpts, opts...)
if requestContext.SelectionFilter != nil {
selectionOpts = append(selectionOpts, selectopts.WithPeerFilter(requestContext.SelectionFilter))
}

ccCalls = newChaincodeCalls(requestContext.Request)
peers, err = clientContext.Selection.GetEndorsersForChaincode(ccCalls, selectionOpts...)
return
ccCalls := newInvocationChain(requestContext)
peers, err := clientContext.Selection.GetEndorsersForChaincode(newInvocationChain(requestContext), selectionOpts...)
return ccCalls, peers, err
}

func getAdditionalEndorsers(requestContext *RequestContext, clientContext *ClientContext, ccCalls []*fab.ChaincodeCall) ([]fab.Peer, error) {
ccIDs, err := getChaincodes(requestContext.Response.Responses[0])
func getAdditionalEndorsers(requestContext *RequestContext, clientContext *ClientContext, invocationChain []*fab.ChaincodeCall) ([]fab.Peer, error) {
invocationChainFromResponse, err := getInvocationChainFromResponse(requestContext.Response.Responses[0])
if err != nil {
return nil, err
return nil, errors.WithMessage(err, "error getting invocation chain from proposal response")
}

additionalCalls := getAdditionalCalls(ccCalls, ccIDs, getCCFilter(requestContext))
if len(additionalCalls) == 0 {
invocationChain, foundAdditional := mergeInvocationChains(invocationChain, invocationChainFromResponse, getCCFilter(requestContext))
if !foundAdditional {
return nil, nil
}

logger.Debugf("Checking if additional endorsements are required...")
requestContext.Request.InvocationChain = append(requestContext.Request.InvocationChain, additionalCalls...)
requestContext.Request.InvocationChain = invocationChain

logger.Debugf("Found additional chaincodes/collections. Checking if additional endorsements are required...")

_, endorsers, err := getEndorsers(requestContext, clientContext)
// If using Fabric selection then disable retries. We don't want to keep retrying if the endorsement query returns an error.
// Also, add a priority selector that gives priority to peers from which we already have endorsements. This way, we don't
// unnecessarily get endorsements from other orgs.
_, endorsers, err := getEndorsers(
requestContext, clientContext,
selectopts.WithRetryOpts(retry.Opts{}),
selectopts.WithPrioritySelector(prioritizePeers(requestContext.Opts.Targets)))
if err != nil {
return nil, err
return nil, errors.WithMessage(err, "error getting additional endorsers")
}

var additionalEndorsers []fab.Peer
for _, endorser := range endorsers {
if !containsMSP(requestContext.Opts.Targets, endorser.MSPID()) {
logger.Debugf("Will ask for additional endorsement from [%s] in order to satisfy the chaincode policy", endorser.URL())
logger.Debugf("... will ask for additional endorsement from [%s] in order to satisfy the chaincode policy", endorser.URL())
additionalEndorsers = append(additionalEndorsers, endorser)
}
}

return additionalEndorsers, nil
}

Expand All @@ -159,12 +170,23 @@ func containsMSP(peers []fab.Peer, mspID string) bool {
return false
}

func getChaincodes(response *fab.TransactionProposalResponse) ([]string, error) {
func getInvocationChainFromResponse(response *fab.TransactionProposalResponse) ([]*fab.ChaincodeCall, error) {
rwSets, err := getRWSetsFromProposalResponse(response.ProposalResponse)
if err != nil {
return nil, err
}
return getNamespaces(rwSets), nil

invocationChain := make([]*fab.ChaincodeCall, len(rwSets))
for i, rwSet := range rwSets {
collections := make([]string, len(rwSet.CollHashedRwSets))
for j, collRWSet := range rwSet.CollHashedRwSets {
collections[j] = collRWSet.CollectionName
}
logger.Debugf("Found chaincode in RWSet [%s], Collections %v", rwSet.NameSpace, collections)
invocationChain[i] = &fab.ChaincodeCall{ID: rwSet.NameSpace, Collections: collections}
}

return invocationChain, nil
}

func getRWSetsFromProposalResponse(response *pb.ProposalResponse) ([]*rwsetutil.NsRwSet, error) {
Expand Down Expand Up @@ -196,37 +218,100 @@ func getRWSetsFromProposalResponse(response *pb.ProposalResponse) ([]*rwsetutil.
return txRWSet.NsRwSets, nil
}

func getNamespaces(rwSets []*rwsetutil.NsRwSet) []string {
namespaceMap := make(map[string]bool)
for _, rwSet := range rwSets {
namespaceMap[rwSet.NameSpace] = true
func mergeInvocationChains(invocChain []*fab.ChaincodeCall, respInvocChain []*fab.ChaincodeCall, filter CCFilter) ([]*fab.ChaincodeCall, bool) {
var mergedInvocChain []*fab.ChaincodeCall
var changed bool
for _, respCCCall := range respInvocChain {
if !filter(respCCCall.ID) {
logger.Debugf("Ignoring chaincode [%s] in the RW set since it was filtered out", respCCCall.ID)
continue
}
mergedCCCall, merged := mergeCCCall(invocChain, respCCCall)
if merged {
changed = true
}
mergedInvocChain = append(mergedInvocChain, mergedCCCall)
}
return mergedInvocChain, changed
}

var namespaces []string
for ns := range namespaceMap {
namespaces = append(namespaces, ns)
// mergeCCCall checks if the provided invocation chain contains the given Chaincode Call.
// - If the invocation chain does not contain the chaincode call then return (respCCCall,true)
// - If the invocation chain contains the chaincode call but the collection sets are different, then return (mergedCCCall,true)
// - If the invocation chain contains the chaincode call and the collection sets are the same, then return (respCCCall,false)
func mergeCCCall(invocChain []*fab.ChaincodeCall, respCCCall *fab.ChaincodeCall) (*fab.ChaincodeCall, bool) {
ccCall, ok := getCCCall(invocChain, respCCCall.ID)
if ok {
logger.Debugf("Already have chaincode [%s]. Checking to see if any private data collections were detected in the proposal response", respCCCall.ID)
c, merged := merge(ccCall, respCCCall)
if merged {
logger.Debugf("Modifying chaincode call for chaincode [%s] since additional private data collections were detected in the RW set", respCCCall.ID)
} else {
logger.Debugf("No additional private data collections were detected for chaincode [%s]", respCCCall.ID)
}
return c, merged
}
return namespaces

logger.Debugf("Detected chaincode [%s] in the RW set of the proposal response that was not part of the original invocation chain", respCCCall.ID)
return respCCCall, true
}

func getAdditionalCalls(ccCalls []*fab.ChaincodeCall, namespaces []string, filter CCFilter) []*fab.ChaincodeCall {
var additionalCalls []*fab.ChaincodeCall
for _, ccID := range namespaces {
if !filter(ccID) {
logger.Debugf("Ignoring chaincode [%s] in the RW set since it was filtered out", ccID)
continue
// getCC returns the ChaincodeCall from the invocation chain that matches the chaincode ID or
// returns nil if the ChaincodeCall is not found.
func getCCCall(invocChain []*fab.ChaincodeCall, ccID string) (*fab.ChaincodeCall, bool) {
for _, ccCall := range invocChain {
if ccCall.ID == ccID {
return ccCall, true
}
if !containsCC(ccCalls, ccID) {
logger.Debugf("Found additional chaincode [%s] in the RW set that was not part of the original invocation chain", ccID)
additionalCalls = append(additionalCalls, &fab.ChaincodeCall{ID: ccID})
}
return nil, false
}

// merge merges the collections from c1 and c2 and returns the resulting ChaincodeCall.
// true is returned if a merge was necessary; false is returned if the two ChaincodeCalls were the same.
func merge(c1 *fab.ChaincodeCall, c2 *fab.ChaincodeCall) (*fab.ChaincodeCall, bool) {
c := &fab.ChaincodeCall{ID: c1.ID, Collections: c1.Collections}
merged := false
for _, coll := range c2.Collections {
if !contains(c.Collections, coll) {
c.Collections = append(c.Collections, coll)
merged = true
}
}
return additionalCalls
return c, merged
}

func containsCC(ccCalls []*fab.ChaincodeCall, ccID string) bool {
for _, ccCall := range ccCalls {
if ccCall.ID == ccID {
func contains(values []string, value string) bool {
for _, val := range values {
if val == value {
return true
}
}
return false
}

// prioritizePeers is a priority selector that gives priority to the peers that are in the given set
func prioritizePeers(peers []fab.Peer) selectopts.PrioritySelector {
return func(peer1, peer2 fab.Peer) int {
hasPeer1 := containsPeer(peers, peer1)
hasPeer2 := containsPeer(peers, peer2)

if hasPeer1 && hasPeer2 {
return 0
}
if hasPeer1 {
return 1
}
if hasPeer2 {
return -1
}
return 0
}
}

func containsPeer(peers []fab.Peer, peer fab.Peer) bool {
for _, p := range peers {
if p.URL() == peer.URL() {
return true
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/client/channel/invoke/txnhandler.go
Expand Up @@ -71,7 +71,7 @@ func (h *ProposalProcessorHandler) Handle(requestContext *RequestContext, client
selectionOpts = append(selectionOpts, selectopts.WithPeerFilter(requestContext.SelectionFilter))
}

endorsers, err := clientContext.Selection.GetEndorsersForChaincode(newChaincodeCalls(requestContext.Request), selectionOpts...)
endorsers, err := clientContext.Selection.GetEndorsersForChaincode(newInvocationChain(requestContext), selectionOpts...)
if err != nil {
requestContext.Error = errors.WithMessage(err, "Failed to get endorsing peers")
return
Expand All @@ -85,16 +85,16 @@ func (h *ProposalProcessorHandler) Handle(requestContext *RequestContext, client
}
}

func newChaincodeCalls(request Request) []*fab.ChaincodeCall {
chaincodes := []*fab.ChaincodeCall{{ID: request.ChaincodeID}}
for _, ccCall := range request.InvocationChain {
if ccCall.ID == chaincodes[0].ID {
chaincodes[0].Collections = ccCall.Collections
func newInvocationChain(requestContext *RequestContext) []*fab.ChaincodeCall {
invocChain := []*fab.ChaincodeCall{{ID: requestContext.Request.ChaincodeID}}
for _, ccCall := range requestContext.Request.InvocationChain {
if ccCall.ID == invocChain[0].ID {
invocChain[0].Collections = ccCall.Collections
} else {
chaincodes = append(chaincodes, ccCall)
invocChain = append(invocChain, ccCall)
}
}
return chaincodes
return invocChain
}

//EndorsementValidationHandler for transaction proposal response filtering
Expand Down
55 changes: 52 additions & 3 deletions pkg/client/channel/invoke/txnhandler_test.go
Expand Up @@ -281,7 +281,7 @@ func TestProposalProcessorHandler(t *testing.T) {
}
}

func TestNewChaincodeCalls(t *testing.T) {
func TestNewInvocationChain(t *testing.T) {
ccID1 := "cc1"
ccID2 := "cc2"
col1 := "col1"
Expand All @@ -299,7 +299,7 @@ func TestNewChaincodeCalls(t *testing.T) {
},
}

ccCalls := newChaincodeCalls(request)
ccCalls := newInvocationChain(&RequestContext{Request: request})
require.Truef(t, len(ccCalls) == 2, "expecting 2 CC calls")
require.Equal(t, ccID1, ccCalls[0].ID)
require.Equal(t, ccID2, ccCalls[1].ID)
Expand All @@ -322,14 +322,63 @@ func TestNewChaincodeCalls(t *testing.T) {
},
}

ccCalls = newChaincodeCalls(request)
ccCalls = newInvocationChain(&RequestContext{Request: request})
require.Truef(t, len(ccCalls) == 2, "expecting 2 CC calls")
require.Equal(t, ccID1, ccCalls[0].ID)
require.Equal(t, ccID2, ccCalls[1].ID)
require.Truef(t, len(ccCalls[0].Collections) == 2, "expecting 2 collections for [%s]", ccID1)
require.Truef(t, len(ccCalls[1].Collections) == 1, "expecting 1 collection for [%s]", ccID2)
}

func TestMergeInvocationChains(t *testing.T) {
ccID1 := "cc1"
ccID2 := "cc2"
ccID3 := "cc3"
col1 := "col1"
col2 := "col2"
col3 := "col3"

ccCall1A := &fab.ChaincodeCall{ID: ccID1}
ccCall1B := &fab.ChaincodeCall{ID: ccID2, Collections: []string{col1, col3}}

ccCall2A := &fab.ChaincodeCall{ID: ccID1, Collections: []string{col1}}
ccCall2B := &fab.ChaincodeCall{ID: ccID2, Collections: []string{col1, col2}}
ccCall2C := &fab.ChaincodeCall{ID: ccID3}

acceptAllFilter := func(ccID string) bool { return true }

t.Run("No change to invocation chain", func(t *testing.T) {
invocChain, changed := mergeInvocationChains([]*fab.ChaincodeCall{ccCall1A}, []*fab.ChaincodeCall{ccCall1A}, acceptAllFilter)
assert.Falsef(t, changed, "Expecting invocation chain NOT to have changed")
require.NotEmptyf(t, invocChain, "Invocation chain is empty")
assert.Equalf(t, []*fab.ChaincodeCall{ccCall1A}, invocChain, "Expecting the invocation chain the be the same")
})

t.Run("Additional chaincodes and collections", func(t *testing.T) {
invocChain, changed := mergeInvocationChains([]*fab.ChaincodeCall{ccCall1A, ccCall1B}, []*fab.ChaincodeCall{ccCall2A, ccCall2B, ccCall2C}, acceptAllFilter)
assert.Truef(t, changed, "Expecting invocation chain to have changed")
require.NotEmptyf(t, invocChain, "Invocation chain is empty")
assert.Equalf(t, 3, len(invocChain), "Expecting 3 chaincode calls in the invocation chain")

assertContainsAll := func(t *testing.T, expectedColls []string, colls []string, ccID string) {
for _, coll := range expectedColls {
assert.Containsf(t, colls, coll, ccID+" does not contain all collections")
}
}

for _, ccCall := range invocChain {
switch ccCall.ID {
case ccID1:
assertContainsAll(t, []string{col1}, ccCall.Collections, ccID1)
case ccID2:
assertContainsAll(t, []string{col1, col2, col3}, ccCall.Collections, ccID2)
case ccID3:
assertContainsAll(t, nil, ccCall.Collections, ccID3)
}
}
})
}

//prepareHandlerContexts prepares context objects for handlers
func prepareRequestContext(request Request, opts Opts, t *testing.T) *RequestContext {
requestContext := &RequestContext{Request: request,
Expand Down

0 comments on commit 47c8f80

Please sign in to comment.