Skip to content

Commit eb0864c

Browse files
author
Simon Stone
committed
[FAB-15897] Improve CLI perf w/ multiple endorsers
If you specify multiple peers to "peer chaincode invoke", currently these endorsements are run one at a time, in order specified on the CLI. This is slow, which might not be seen as a problem for the CLI, however in the current version of chaincode lifecycle this command is used to trigger the building of chaincode containers. This may be causing CI failures when chaincode needs to be built in a single transaction across multiple peers. This change runs the endorsements concurrently, which should trigger the chaincode to be built across all peers simultaneously. Also, this is how the various SDKs behave today. Signed-off-by: Simon Stone <sstone1@uk.ibm.com> Change-Id: Ib9f69c7bbd7f4f1197f2959599e45d0a2545d366
1 parent 277f77e commit eb0864c

File tree

2 files changed

+77
-7
lines changed

2 files changed

+77
-7
lines changed

internal/peer/chaincode/common.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,36 @@ func InitCmdFactory(cmdName string, isEndorserRequired, isOrdererRequired bool)
446446
}, nil
447447
}
448448

449+
// processProposals sends a signed proposal to a set of peers, and gathers all the responses.
450+
func processProposals(endorserClients []pb.EndorserClient, signedProposal *pb.SignedProposal) ([]*pb.ProposalResponse, error) {
451+
responsesCh := make(chan *pb.ProposalResponse, len(endorserClients))
452+
errorCh := make(chan error, len(endorserClients))
453+
wg := sync.WaitGroup{}
454+
for _, endorser := range endorserClients {
455+
wg.Add(1)
456+
go func(endorser pb.EndorserClient) {
457+
defer wg.Done()
458+
proposalResp, err := endorser.ProcessProposal(context.Background(), signedProposal)
459+
if err != nil {
460+
errorCh <- err
461+
return
462+
}
463+
responsesCh <- proposalResp
464+
}(endorser)
465+
}
466+
wg.Wait()
467+
close(responsesCh)
468+
close(errorCh)
469+
for err := range errorCh {
470+
return nil, err
471+
}
472+
var responses []*pb.ProposalResponse
473+
for response := range responsesCh {
474+
responses = append(responses, response)
475+
}
476+
return responses, nil
477+
}
478+
449479
// ChaincodeInvokeOrQuery invokes or queries the chaincode. If successful, the
450480
// INVOKE form prints the ProposalResponse to STDOUT, and the QUERY form prints
451481
// the query result on STDOUT. A command-line flag (-r, --raw) determines
@@ -496,13 +526,10 @@ func ChaincodeInvokeOrQuery(
496526
if err != nil {
497527
return nil, errors.WithMessagef(err, "error creating signed proposal for %s", funcName)
498528
}
499-
var responses []*pb.ProposalResponse
500-
for _, endorser := range endorserClients {
501-
proposalResp, err := endorser.ProcessProposal(context.Background(), signedProp)
502-
if err != nil {
503-
return nil, errors.WithMessagef(err, "error endorsing %s", funcName)
504-
}
505-
responses = append(responses, proposalResp)
529+
530+
responses, err := processProposals(endorserClients, signedProp)
531+
if err != nil {
532+
return nil, errors.WithMessagef(err, "error endorsing %s", funcName)
506533
}
507534

508535
if len(responses) == 0 {

internal/peer/chaincode/common_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"encoding/json"
1414
"errors"
1515
"fmt"
16+
"sort"
1617
"testing"
1718
"time"
1819

@@ -695,3 +696,45 @@ func TestChaincodeInvokeOrQuery_waitForEvent(t *testing.T) {
695696
close(delayChan)
696697
})
697698
}
699+
700+
func TestProcessProposals(t *testing.T) {
701+
// Build clients that return a range of status codes (for verifying each client is called).
702+
mockClients := []pb.EndorserClient{}
703+
for i := 2; i <= 5; i++ {
704+
response := &pb.ProposalResponse{
705+
Response: &pb.Response{Status: int32(i * 100)},
706+
Endorsement: &pb.Endorsement{},
707+
}
708+
mockClients = append(mockClients, common.GetMockEndorserClient(response, nil))
709+
}
710+
mockErrorClient := common.GetMockEndorserClient(nil, errors.New("failed to call endorser"))
711+
signedProposal := &pb.SignedProposal{}
712+
t.Run("should process a proposal for a single peer", func(t *testing.T) {
713+
responses, err := processProposals([]pb.EndorserClient{mockClients[0]}, signedProposal)
714+
assert.NoError(t, err)
715+
assert.Len(t, responses, 1)
716+
assert.Equal(t, responses[0].Response.Status, int32(200))
717+
})
718+
t.Run("should process a proposal for multiple peers", func(t *testing.T) {
719+
responses, err := processProposals(mockClients, signedProposal)
720+
assert.NoError(t, err)
721+
assert.Len(t, responses, 4)
722+
// Sort the statuses (as they may turn up in different order) before comparing.
723+
statuses := []int32{}
724+
for _, response := range responses {
725+
statuses = append(statuses, response.Response.Status)
726+
}
727+
sort.Slice(statuses, func(i, j int) bool { return statuses[i] < statuses[j] })
728+
assert.EqualValues(t, []int32{200, 300, 400, 500}, statuses)
729+
})
730+
t.Run("should return an error from processing a proposal for a single peer", func(t *testing.T) {
731+
responses, err := processProposals([]pb.EndorserClient{mockErrorClient}, signedProposal)
732+
assert.EqualError(t, err, "failed to call endorser")
733+
assert.Nil(t, responses)
734+
})
735+
t.Run("should return an error from processing a proposal for a single peer within multiple peers", func(t *testing.T) {
736+
responses, err := processProposals([]pb.EndorserClient{mockClients[0], mockErrorClient, mockClients[1]}, signedProposal)
737+
assert.EqualError(t, err, "failed to call endorser")
738+
assert.Nil(t, responses)
739+
})
740+
}

0 commit comments

Comments
 (0)