Skip to content

Commit

Permalink
Reduce CPU&memory cost of collecting endorsements
Browse files Browse the repository at this point in the history
Based on performance analysis of the gateway, the following changes are made in this commit:
1. Instead of collecting and holding copies of all proposal responses in order to reuse the protoutil.CreateTx() function, just store a single response payload and all of the signed endorsement objects.  Check that all payloads are bitwise idential and endorsers are unique at collection time.
2. Eliminate the duplicate unmarshalling that was occuring as a result of reusing protoutil.CreateTx

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
(cherry picked from commit aaaec4f)
  • Loading branch information
andrew-coleman authored and denyeart committed Dec 15, 2021
1 parent fbfdc1d commit 3580b4e
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 151 deletions.
60 changes: 40 additions & 20 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

type endorserResponse struct {
endorsementSet []*peer.ProposalResponse
action *peer.ChaincodeEndorsedAction
err *gp.ErrorDetail
timeoutExpired bool
}
Expand Down Expand Up @@ -134,26 +134,42 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
}
proposal, err := protoutil.UnmarshalProposal(signedProposal.ProposalBytes)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
channel, chaincodeID, hasTransientData, err := getChannelAndChaincodeFromSignedProposal(signedProposal)
header, err := protoutil.UnmarshalHeader(proposal.Header)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
channelHeader, err := protoutil.UnmarshalChannelHeader(header.ChannelHeader)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
payload, err := protoutil.UnmarshalChaincodeProposalPayload(proposal.Payload)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
spec, err := protoutil.UnmarshalChaincodeInvocationSpec(payload.Input)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

channel := channelHeader.ChannelId
chaincodeID := spec.GetChaincodeSpec().GetChaincodeId().GetName()
hasTransientData := len(payload.GetTransientMap()) > 0

defaultInterest := &peer.ChaincodeInterest{
Chaincodes: []*peer.ChaincodeCall{{
Name: chaincodeID,
}},
}

var plan *plan
var endorsements []*peer.ProposalResponse
var action *peer.ChaincodeEndorsedAction
if len(request.EndorsingOrganizations) > 0 {
// The client is specifying the endorsing orgs and taking responsibility for ensuring it meets the signature policy
plan, err = gs.registry.planForOrgs(channel, chaincodeID, request.EndorsingOrganizations)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
return nil, status.Error(codes.Unavailable, err.Error())
}
} else {
// The client is delegating choice of endorsers to the gateway.
Expand All @@ -168,7 +184,7 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// Otherwise, just let discovery pick one.
plan, err = gs.registry.endorsementPlan(channel, defaultInterest, nil)
if err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
}
firstEndorser := plan.endorsers()[0]
Expand Down Expand Up @@ -230,15 +246,18 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// The preferred discovery layout will contain the firstEndorser's Org.
plan, err = gs.registry.endorsementPlan(channel, interest, firstEndorser)
if err != nil {
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
return nil, status.Error(codes.FailedPrecondition, err.Error())
}

// 6. Remove the gateway org's endorser, since we've already done that
endorsements = plan.update(firstEndorser, firstResponse)
action, err = plan.processEndorsement(firstEndorser, firstResponse)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}
}

var errorDetails []proto.Message
for endorsements == nil {
for action == nil {
// loop through the layouts until one gets satisfied
endorsers := plan.endorsers()
if endorsers == nil {
Expand Down Expand Up @@ -272,8 +291,12 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
responseMessage.Payload = nil // Remove any duplicate response payload
}

endorsements := plan.update(e, response)
done <- &endorserResponse{endorsementSet: endorsements}
action, err := plan.processEndorsement(e, response)
if err != nil {
done <- &endorserResponse{err: errorDetail(e.endpointConfig, err.Error())}
return
}
done <- &endorserResponse{action: action}
} else {
logger.Warnw("Endorse call to endorser failed", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "error", message)
if remove {
Expand Down Expand Up @@ -307,8 +330,8 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
logger.Warnw("Endorse call timed out while collecting endorsements", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers))
return nil, newRpcError(codes.DeadlineExceeded, "endorsement timeout expired while collecting endorsements")
}
if response.endorsementSet != nil {
endorsements = response.endorsementSet
if response.action != nil {
action = response.action
break
}
if response.err != nil {
Expand All @@ -317,19 +340,16 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
}
}

if endorsements == nil {
if action == nil {
return nil, newRpcError(codes.Aborted, "failed to collect enough transaction endorsements, see attached details for more info", errorDetails...)
}

env, err := protoutil.CreateTx(proposal, endorsements...)
preparedTransaction, err := prepareTransaction(header, payload, action)
if err != nil {
return nil, status.Errorf(codes.Aborted, "failed to assemble transaction: %s", err)
}

endorseResponse := &gp.EndorseResponse{
PreparedTransaction: env,
}
return endorseResponse, nil
return &gp.EndorseResponse{PreparedTransaction: preparedTransaction}, nil
}

// responseStatus unpacks the proposal response and error values that are returned from ProcessProposal and
Expand Down
11 changes: 9 additions & 2 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,14 @@ func TestEndorse(t *testing.T) {
},
localResponse: "different_response",
errCode: codes.Aborted,
errString: "failed to assemble transaction: ProposalResponsePayloads do not match (base64): 'EhQaEgjIARoNbW9ja19yZXNwb25zZQ==' vs 'EhkaFwjIARoSZGlmZmVyZW50X3Jlc3BvbnNl'",
errString: "failed to collect enough transaction endorsements",
errDetails: []*pb.ErrorDetail{
{
Address: "peer2:9051",
MspId: "msp2",
Message: "ProposalResponsePayloads do not match",
},
},
},
{
name: "discovery fails",
Expand Down Expand Up @@ -1777,7 +1784,7 @@ func TestNilArgs(t *testing.T) {
require.ErrorIs(t, err, status.Error(codes.InvalidArgument, "the proposed transaction must contain a signed proposal"))

_, err = server.Endorse(ctx, &pb.EndorseRequest{ProposedTransaction: &peer.SignedProposal{ProposalBytes: []byte("jibberish")}})
require.ErrorContains(t, err, "rpc error: code = InvalidArgument desc = failed to unpack transaction proposal: error unmarshalling Proposal")
require.ErrorContains(t, err, "rpc error: code = InvalidArgument desc = error unmarshalling Proposal")

_, err = server.Submit(ctx, nil)
require.ErrorIs(t, err, status.Error(codes.InvalidArgument, "a submit request is required"))
Expand Down
34 changes: 29 additions & 5 deletions internal/pkg/gateway/apiutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
gp "github.com/hyperledger/fabric-protos-go/gateway"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/protoutil"
Expand Down Expand Up @@ -57,11 +58,6 @@ func newRpcError(code codes.Code, message string, details ...proto.Message) erro
return st.Err()
}

func wrappedRpcError(err error, message string, details ...proto.Message) error {
statusErr := status.Convert(err)
return newRpcError(statusErr.Code(), message+": "+statusErr.Message(), details...)
}

func toRpcError(err error, unknownCode codes.Code) error {
errStatus := toRpcStatus(err)
if errStatus.Code() != codes.Unknown {
Expand Down Expand Up @@ -101,3 +97,31 @@ func getResultFromProposalResponsePayload(responsePayload *peer.ProposalResponse

return chaincodeAction.GetResponse().GetPayload(), nil
}

func prepareTransaction(header *common.Header, payload *peer.ChaincodeProposalPayload, action *peer.ChaincodeEndorsedAction) (*common.Envelope, error) {
cppNoTransient := &peer.ChaincodeProposalPayload{Input: payload.Input, TransientMap: nil}
cppBytes, err := protoutil.GetBytesChaincodeProposalPayload(cppNoTransient)
if err != nil {
return nil, err
}

cap := &peer.ChaincodeActionPayload{ChaincodeProposalPayload: cppBytes, Action: action}
capBytes, err := protoutil.GetBytesChaincodeActionPayload(cap)
if err != nil {
return nil, err
}

tx := &peer.Transaction{Actions: []*peer.TransactionAction{{Header: header.SignatureHeader, Payload: capBytes}}}
txBytes, err := protoutil.GetBytesTransaction(tx)
if err != nil {
return nil, err
}

payl := &common.Payload{Header: header, Data: txBytes}
paylBytes, err := protoutil.GetBytesPayload(payl)
if err != nil {
return nil, err
}

return &common.Envelope{Payload: paylBytes}, nil
}
56 changes: 43 additions & 13 deletions internal/pkg/gateway/endorsement.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ package gateway

import (
"bytes"
b64 "encoding/base64"
"errors"
"sync"

"github.com/hyperledger/fabric-protos-go/peer"
)

type layout struct {
required map[string]int // group -> quantity
endorsements []*peer.ProposalResponse
endorsements []*peer.Endorsement
}

// The plan structure is initialised with an endorsement plan from discovery. It is used to manage the
Expand All @@ -24,12 +26,13 @@ type layout struct {
// Note that this structure and its methods assume that each endorsing peer is in one and only one group.
// This is a constraint of the current algorithm in the discovery service.
type plan struct {
layouts []*layout
groupEndorsers map[string][]*endorser // group -> endorsing peers
groupIds map[string]string // peer pkiid -> group
nextLayout int
size int
planLock sync.Mutex
layouts []*layout
groupEndorsers map[string][]*endorser // group -> endorsing peers
groupIds map[string]string // peer pkiid -> group
nextLayout int
size int
responsePayload []byte
planLock sync.Mutex
}

// construct and initialise an endorsement plan
Expand Down Expand Up @@ -83,10 +86,10 @@ func (p *plan) endorsers() []*endorser {
return endorsers
}

// Invoke update when an endorsement has been successfully received for the given endorser.
// Invoke processEndorsement when an endorsement has been successfully received for the given endorser.
// All layouts containing the group that contains this endorser are updated with the endorsement.
// Returns array of endorsements, if at least one layout in the plan has been satisfied, otherwise nil.
func (p *plan) update(endorser *endorser, endorsement *peer.ProposalResponse) []*peer.ProposalResponse {
// Returns a ChaincodeEndorsedAction, if at least one layout in the plan has been satisfied, otherwise nil.
func (p *plan) processEndorsement(endorser *endorser, response *peer.ProposalResponse) (*peer.ChaincodeEndorsedAction, error) {
p.planLock.Lock()
defer p.planLock.Unlock()

Expand All @@ -102,25 +105,35 @@ func (p *plan) update(endorser *endorser, endorsement *peer.ProposalResponse) []
}
}

// check the proposal responses are the same
if p.responsePayload == nil {
p.responsePayload = response.GetPayload()
} else {
if !bytes.Equal(p.responsePayload, response.GetPayload()) {
logger.Warnw("ProposalResponsePayloads do not match (base64)", "payload1", b64.StdEncoding.EncodeToString(p.responsePayload), "payload2", b64.StdEncoding.EncodeToString(response.GetPayload()))
return nil, errors.New("ProposalResponsePayloads do not match")
}
}

for i := p.nextLayout; i < len(p.layouts); i++ {
layout := p.layouts[i]
if layout == nil {
continue
}
if quantity, ok := layout.required[group]; ok {
layout.required[group] = quantity - 1
layout.endorsements = append(layout.endorsements, endorsement)
layout.endorsements = append(layout.endorsements, response.Endorsement)
if layout.required[group] == 0 {
// this group for this layout is complete - remove from map
delete(layout.required, group)
if len(layout.required) == 0 {
// no groups left - this layout is now satisfied
return layout.endorsements
return &peer.ChaincodeEndorsedAction{ProposalResponsePayload: p.responsePayload, Endorsements: uniqueEndorsements(layout.endorsements)}, nil
}
}
}
}
return nil
return nil, nil
}

// Invoke nextPeerInGroup if an endorsement fails for the given endorser.
Expand Down Expand Up @@ -154,3 +167,20 @@ func (p *plan) nextPeerInGroup(endorser *endorser) *endorser {

return nil
}

func uniqueEndorsements(endorsements []*peer.Endorsement) []*peer.Endorsement {
endorsersUsed := make(map[string]struct{})
var unique []*peer.Endorsement
for _, e := range endorsements {
if e == nil {
continue
}
key := string(e.Endorser)
if _, used := endorsersUsed[key]; used {
continue
}
unique = append(unique, e)
endorsersUsed[key] = struct{}{}
}
return unique
}

0 comments on commit 3580b4e

Please sign in to comment.