Skip to content

Commit

Permalink
Gateway endorsement retry logic
Browse files Browse the repository at this point in the history
Rather than selecting one layout from the discovery endorsement plan and failing if one of the endorsers fails, this commit attempts to create a set of endorsements by retrying the proposal on other endorser until one of the layouts is satisfied.
Additionally, rather than connect to all peers in a channel once on first usage and then never update that cache, this commit adds support for later additions and removals to/from the cache and closing stale connections to peers.

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
  • Loading branch information
andrew-coleman authored and manish-sethi committed Oct 26, 2021
1 parent b4c2731 commit 0f08904
Show file tree
Hide file tree
Showing 7 changed files with 788 additions and 227 deletions.
160 changes: 81 additions & 79 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
)

type endorserResponse struct {
pr *peer.ProposalResponse
err *gp.ErrorDetail
endorsementSet []*peer.ProposalResponse
err *gp.ErrorDetail
}

// Evaluate will invoke the transaction function as specified in the SignedProposal
Expand All @@ -41,7 +41,7 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err)
}

err = gs.registry.registerChannel(channel)
err = gs.registry.connectChannelPeers(channel, false)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}
Expand Down Expand Up @@ -104,11 +104,6 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
return nil, status.Errorf(codes.InvalidArgument, "failed to unpack transaction proposal: %s", err)
}

err = gs.registry.registerChannel(channel)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}

ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()

Expand All @@ -118,47 +113,55 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
}},
}

var endorsers []*endorser
var responses []*peer.ProposalResponse
var plan *plan
var endorsements []*peer.ProposalResponse
if len(request.EndorsingOrganizations) > 0 {
// The client is specifying the endorsing orgs and taking responsibility for ensuring it meets the signature policy
endorsers, err = gs.registry.endorsersForOrgs(channel, chaincodeID, request.EndorsingOrganizations)
plan, err = gs.registry.planForOrgs(channel, chaincodeID, request.EndorsingOrganizations)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}
} else {
// The client is delegating choice of endorsers to the gateway.

// 1. Choose an endorser from the gateway's organization
var firstEndorser *endorser
es, ok := gs.registry.endorsersByOrg(channel, chaincodeID)[gs.registry.localEndorser.mspid]
if !ok {
plan, err = gs.registry.planForOrgs(channel, chaincodeID, []string{gs.registry.localEndorser.mspid})
if err != nil {
// No local org endorsers for this channel/chaincode. If transient data is involved, return error
if hasTransientData {
return nil, status.Error(codes.FailedPrecondition, "no endorsers found in the gateway's organization; retry specifying endorsing organization(s) to protect transient data")
}
// Otherwise, just let discovery pick one.
endorsers, err = gs.registry.endorsers(channel, defaultInterest, "")
plan, err = gs.registry.endorsementPlan(channel, defaultInterest, nil)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}
firstEndorser = endorsers[0]
} else {
firstEndorser = es[0].endorser
}
firstEndorser := plan.endorsers()[0]

gs.logger.Debugw("Sending to first endorser:", "channel", channel, "chaincode", chaincodeID, "MSPID", firstEndorser.mspid, "endpoint", firstEndorser.address)

// 2. Process the proposal on this endorser
firstResponse, err := firstEndorser.client.ProcessProposal(ctx, signedProposal)
if err != nil {
return nil, wrappedRpcError(err, "failed to endorse transaction", errorDetail(firstEndorser.endpointConfig, err))
var firstResponse *peer.ProposalResponse
var errDetails []proto.Message
for firstResponse == nil && firstEndorser != nil {
firstResponse, err = firstEndorser.client.ProcessProposal(ctx, signedProposal)
if err != nil {
logger.Debugw("Endorse call to endorser failed", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "error", err)
errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, err))
gs.registry.removeEndorser(firstEndorser)
firstEndorser = plan.retry(firstEndorser)
} else if firstResponse.Response.Status < 200 || firstResponse.Response.Status >= 400 {
logger.Debugw("Endorse call to endorser returned failure", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "status", firstResponse.Response.Status, "message", firstResponse.Response.Message)
err := fmt.Errorf("error %d, %s", firstResponse.Response.Status, firstResponse.Response.Message)
endpointErr := errorDetail(firstEndorser.endpointConfig, err)
errDetails = append(errDetails, endpointErr)
firstEndorser = plan.retry(firstEndorser)
firstResponse = nil
}
}
if firstResponse.Response.Status < 200 || firstResponse.Response.Status >= 400 {
err := fmt.Errorf("error %d, %s", firstResponse.Response.Status, firstResponse.Response.Message)
endpointErr := errorDetail(firstEndorser.endpointConfig, err)
errorMessage := "failed to endorse transaction: " + detailsAsString(endpointErr)
return nil, rpcError(codes.Aborted, errorMessage, endpointErr)
if firstEndorser == nil || firstResponse == nil {
return nil, rpcError(codes.Aborted, "failed to endorse transaction, see attached details for more info", errDetails...)
}

// 3. Extract ChaincodeInterest and SBE policies
Expand All @@ -178,79 +181,78 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.

// 5. Get a set of endorsers from discovery via the registry
// The preferred discovery layout will contain the firstEndorser's Org.
endorsers, err = gs.registry.endorsers(channel, interest, firstEndorser.mspid)
plan, err = gs.registry.endorsementPlan(channel, interest, firstEndorser)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
}

// 6. Remove the gateway org's endorser, since we've already done that
for i, e := range endorsers {
if e.mspid == firstEndorser.mspid {
endorsers = append(endorsers[:i], endorsers[i+1:]...)
responses = append(responses, firstResponse)
break
}
}
endorsements = plan.update(firstEndorser, firstResponse)
}

if len(endorsers) > 0 {
gs.logger.Infow("Seeking extra endorsements from:", func() []interface{} {
var es []interface{}
for _, e := range endorsers {
es = append(es, e.mspid)
es = append(es, e.address)
var errorDetails []proto.Message
for endorsements == nil {
// loop through the layouts until one gets satisfied
endorsers := plan.endorsers()
if endorsers == nil {
// no more layouts
break
}
var wg sync.WaitGroup
responseCh := make(chan *endorserResponse, plan.size)
// send to all the endorsers
for _, e := range endorsers {
wg.Add(1)
go func(e *endorser) {
defer wg.Done()
for e != nil {
gs.logger.Debugw("Sending to endorser:", "channel", channel, "chaincode", chaincodeID, "MSPID", e.mspid, "endpoint", e.address)
response, err := e.client.ProcessProposal(ctx, signedProposal)
switch {
case err != nil:
logger.Debugw("Endorse call to endorser failed", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "error", err)
responseCh <- &endorserResponse{err: errorDetail(e.endpointConfig, err)}
gs.registry.removeEndorser(e)
e = plan.retry(e)
case response.Response.Status < 200 || response.Response.Status >= 400:
// this is an error case and will be returned in the error details to the client
logger.Debugw("Endorse call to endorser returned failure", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)
responseCh <- &endorserResponse{err: errorDetail(e.endpointConfig, fmt.Errorf("error %d, %s", response.Response.Status, response.Response.Message))}
e = plan.retry(e)
default:
logger.Debugw("Endorse call to endorser returned success", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)
endorsements := plan.update(e, response)
responseCh <- &endorserResponse{endorsementSet: endorsements}
e = nil
}
}
return es
}()...)
}(e)
}
}
wg.Wait()
close(responseCh)

var wg sync.WaitGroup
responseCh := make(chan *endorserResponse, len(endorsers))
// send to all the endorsers
for _, e := range endorsers {
wg.Add(1)
go func(e *endorser) {
defer wg.Done()
gs.logger.Debugw("Sending to endorser:", "channel", channel, "chaincode", chaincodeID, "MSPID", e.mspid, "endpoint", e.address)
response, err := e.client.ProcessProposal(ctx, signedProposal)
switch {
case err != nil:
logger.Debugw("Endorse call to endorser failed", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "error", err)
responseCh <- &endorserResponse{err: errorDetail(e.endpointConfig, err)}
case response.Response.Status < 200 || response.Response.Status >= 400:
// this is an error case and will be returned in the error details to the client
logger.Debugw("Endorse call to endorser returned failure", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)
responseCh <- &endorserResponse{err: errorDetail(e.endpointConfig, fmt.Errorf("error %d, %s", response.Response.Status, response.Response.Message))}
default:
logger.Debugw("Endorse call to endorser returned success", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)
responseCh <- &endorserResponse{pr: response}
for response := range responseCh {
if response.endorsementSet != nil {
endorsements = response.endorsementSet
break
}
if response.err != nil {
errorDetails = append(errorDetails, response.err)
}
}(e)
}
wg.Wait()
close(responseCh)

var errorDetails []proto.Message
for response := range responseCh {
if response.err != nil {
errorDetails = append(errorDetails, response.err)
} else {
responses = append(responses, response.pr)
}
}

if len(errorDetails) != 0 {
errorMessage := "failed to endorse transaction: " + detailsAsString(errorDetails...)
return nil, rpcError(codes.Aborted, errorMessage, errorDetails...)
if endorsements == nil {
return nil, rpcError(codes.Aborted, "failed to endorse transaction, see attached details for more info", errorDetails...)
}

env, err := protoutil.CreateTx(proposal, responses...)
env, err := protoutil.CreateTx(proposal, endorsements...)
if err != nil {
return nil, status.Errorf(codes.Aborted, "failed to assemble transaction: %s", err)
}

endorseResponse := &gp.EndorseResponse{
Result: responses[0].GetResponse(),
Result: endorsements[0].GetResponse(),
PreparedTransaction: env,
}
return endorseResponse, nil
Expand Down

0 comments on commit 0f08904

Please sign in to comment.