Skip to content

Commit

Permalink
Refine Gateway gRPC error status codes (#3075)
Browse files Browse the repository at this point in the history
Signed-off-by: Mark S. Lewis <mark_lewis@uk.ibm.com>
  • Loading branch information
bestbeforetoday committed Nov 25, 2021
1 parent 40fea67 commit 6a7cdd0
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 119 deletions.
81 changes: 45 additions & 36 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package gateway
import (
"context"
"fmt"
"io"
"math/rand"
"strings"
"sync"
Expand Down Expand Up @@ -59,9 +60,9 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
plan, err := gs.registry.evaluator(channel, chaincodeID, targetOrgs)
if err != nil {
if transientProtected {
return nil, status.Errorf(codes.Unavailable, "no endorsers found in the gateway's organization; retry specifying target organization(s) to protect transient data: %s", err)
return nil, status.Errorf(codes.FailedPrecondition, "no endorsers found in the gateway's organization; retry specifying target organization(s) to protect transient data: %s", err)
}
return nil, status.Errorf(codes.Unavailable, "%s", err)
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
}

endorser := plan.endorsers()[0]
Expand All @@ -76,8 +77,8 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()
pr, err := endorser.client.ProcessProposal(ctx, signedProposal)
success, message, retry, remove := gs.responseStatus(pr, err)
if success {
code, message, retry, remove := gs.responseStatus(pr, err)
if code == codes.OK {
response = pr.Response
// Prefer result from proposal response as Response.Payload is not required to be transaction result
if result, err := getResultFromProposalResponse(pr); err == nil {
Expand All @@ -94,10 +95,10 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
if retry {
endorser = plan.nextPeerInGroup(endorser)
} else {
done <- newRpcError(codes.Aborted, "evaluate call to endorser returned error: "+message, errDetails...)
done <- newRpcError(code, "evaluate call to endorser returned error: "+message, errDetails...)
}
if endorser == nil {
done <- newRpcError(codes.Aborted, "failed to evaluate transaction, see attached details for more info", errDetails...)
done <- newRpcError(code, "failed to evaluate transaction, see attached details for more info", errDetails...)
}
}
}()
Expand Down Expand Up @@ -167,7 +168,7 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// Otherwise, just let discovery pick one.
plan, err = gs.registry.endorsementPlan(channel, defaultInterest, nil)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
}
}
firstEndorser := plan.endorsers()[0]
Expand All @@ -185,9 +186,9 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
ctx, cancel := context.WithTimeout(ctx, gs.options.EndorsementTimeout)
defer cancel()
firstResponse, err = firstEndorser.client.ProcessProposal(ctx, signedProposal)
success, message, _, remove := gs.responseStatus(firstResponse, err)
code, message, _, remove := gs.responseStatus(firstResponse, err)

if !success {
if code != codes.OK {
logger.Warnw("Endorse call to endorser failed", "channel", request.ChannelId, "txID", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "error", message)
errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, message))
if remove {
Expand Down Expand Up @@ -229,7 +230,7 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// The preferred discovery layout will contain the firstEndorser's Org.
plan, err = gs.registry.endorsementPlan(channel, interest, firstEndorser)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
}

// 6. Remove the gateway org's endorser, since we've already done that
Expand Down Expand Up @@ -262,8 +263,8 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// Ignore the retry flag returned by the following responseStatus call. Endorse will retry until all endorsement layouts have been exhausted.
// It tries to get a successful endorsement from each org and minimise the changes of a rogue peer scuppering the transaction.
// If an org is behaving badly, it can move on to a different layout.
success, message, _, remove := gs.responseStatus(response, err)
if success {
code, message, _, remove := gs.responseStatus(response, err)
if code == codes.OK {
logger.Debugw("Endorse call to endorser returned success", "channel", request.ChannelId, "txID", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)

responseMessage := response.GetResponse()
Expand Down Expand Up @@ -335,37 +336,37 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
// determines how the gateway should react (retry?, close connection?).
// Uses the grpc canonical status error codes and their recommended actions.
// Returns:
// - response successful (bool)
// - response status code, with codes.OK indicating success and other values indicating likely error type
// - error message extracted from the err or generated from 500 proposal response (string)
// - should the gateway retry (only the Evaluate() uses this) (bool)
// - should the gateway close the connection and remove the peer from its registry (bool)
func (gs *Server) responseStatus(response *peer.ProposalResponse, err error) (success bool, message string, retry bool, remove bool) {
func (gs *Server) responseStatus(response *peer.ProposalResponse, err error) (statusCode codes.Code, message string, retry bool, remove bool) {
if err != nil {
if response == nil {
// there is no ProposalResponse, so this must have been generated by grpc in response to an unavailable peer
// - close the connection and retry on another
return false, err.Error(), true, true
return codes.Unavailable, err.Error(), true, true
}
// there is a response and an err, so it must have been from the unpackProposal() or preProcess() stages
// preProcess does all the signature and ACL checking. In either case, no point retrying, or closing the connection (it's a client error)
return false, err.Error(), false, false
return codes.FailedPrecondition, err.Error(), false, false
}
if response.Response.Status < 200 || response.Response.Status >= 400 {
if response.Payload == nil && response.Response.Status == 500 {
// there's a error 500 response but no payload, so the response was generated in the peer rather than the chaincode
if strings.HasSuffix(response.Response.Message, chaincode.ErrorStreamTerminated) {
// chaincode container crashed probably. Close connection and retry on another peer
return false, response.Response.Message, true, true
return codes.Aborted, response.Response.Message, true, true
}
// some other error - retry on another peer
return false, response.Response.Message, true, false
return codes.Aborted, response.Response.Message, true, false
} else {
// otherwise it must be an error response generated by the chaincode
return false, fmt.Sprintf("chaincode response %d, %s", response.Response.Status, response.Response.Message), false, false
return codes.Unknown, fmt.Sprintf("chaincode response %d, %s", response.Response.Status, response.Response.Message), false, false
}
}
// anything else is a success
return true, "", false, false
return codes.OK, "", false, false
}

// Submit will send the signed transaction to the ordering service. The response indicates whether the transaction was
Expand All @@ -384,7 +385,7 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
}
orderers, err := gs.registry.orderers(request.ChannelId)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "%s", err)
return nil, status.Errorf(codes.FailedPrecondition, "%s", err)
}

if len(orderers) == 0 {
Expand All @@ -400,34 +401,38 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
if err == nil {
return &gp.SubmitResponse{}, nil
}

logger.Warnw("Error sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.address, "err", err)
errDetails = append(errDetails, errorDetail(orderer.endpointConfig, err.Error()))

errStatus := toRpcStatus(err)
if errStatus.Code() != codes.Unavailable {
return nil, newRpcError(errStatus.Code(), errStatus.Message(), errDetails...)
}
}

return nil, newRpcError(codes.Aborted, "no orderers could successfully process transaction", errDetails...)
return nil, newRpcError(codes.Unavailable, "no orderers could successfully process transaction", errDetails...)
}

func (gs *Server) broadcast(ctx context.Context, orderer *orderer, txn *common.Envelope) error {
broadcast, err := orderer.client.Broadcast(ctx)
if err != nil {
return fmt.Errorf("failed to create BroadcastClient: %w", err)
return err
}

if err := broadcast.Send(txn); err != nil {
return fmt.Errorf("failed to send transaction to orderer: %w", err)
return err
}

response, err := broadcast.Recv()
if err != nil {
return fmt.Errorf("failed to receive response from orderer: %w", err)
return err
}

if response == nil {
return fmt.Errorf("received nil response from orderer")
if response.GetStatus() != common.Status_SUCCESS {
return status.Errorf(codes.Aborted, "received unsuccessful response from orderer: %s", common.Status_name[int32(response.GetStatus())])
}

if response.Status != common.Status_SUCCESS {
return fmt.Errorf("received unsuccessful response from orderer: %s", common.Status_name[int32(response.Status)])
}
return nil
}

Expand Down Expand Up @@ -458,7 +463,7 @@ func (gs *Server) CommitStatus(ctx context.Context, signedRequest *gp.SignedComm

txStatus, err := gs.commitFinder.TransactionStatus(ctx, request.ChannelId, request.TransactionId)
if err != nil {
return nil, toRpcError(err, codes.FailedPrecondition)
return nil, toRpcError(err, codes.Aborted)
}

response := &gp.CommitStatusResponse{
Expand Down Expand Up @@ -494,7 +499,7 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest

ledger, err := gs.ledgerProvider.Ledger(request.GetChannelId())
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
return status.Error(codes.NotFound, err.Error())
}

startBlock, err := startBlockFromLedgerPosition(ledger, request.GetStartPosition())
Expand All @@ -504,7 +509,7 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest

ledgerIter, err := ledger.GetBlocksIterator(startBlock)
if err != nil {
return status.Error(codes.Unavailable, err.Error())
return status.Error(codes.Aborted, err.Error())
}

eventsIter := event.NewChaincodeEventsIterator(ledgerIter)
Expand All @@ -513,7 +518,7 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
for {
response, err := eventsIter.Next()
if err != nil {
return status.Error(codes.Unavailable, err.Error())
return status.Error(codes.Aborted, err.Error())
}

var matchingEvents []*peer.ChaincodeEvent
Expand All @@ -531,7 +536,11 @@ func (gs *Server) ChaincodeEvents(signedRequest *gp.SignedChaincodeEventsRequest
response.Events = matchingEvents

if err := stream.Send(response); err != nil {
return err // Likely stream closed by the client
if err == io.EOF {
// Stream closed by the client
return status.Error(codes.Canceled, err.Error())
}
return err
}
}
}
Expand All @@ -548,7 +557,7 @@ func startBlockFromLedgerPosition(ledger ledger.Ledger, position *ab.SeekPositio

ledgerInfo, err := ledger.GetBlockchainInfo()
if err != nil {
return 0, status.Error(codes.Unavailable, err.Error())
return 0, status.Error(codes.Aborted, err.Error())
}

return ledgerInfo.GetHeight(), nil
Expand Down

0 comments on commit 6a7cdd0

Please sign in to comment.