Skip to content

Commit

Permalink
Enhance gateway error logic
Browse files Browse the repository at this point in the history
Add logic to determine the origin of the error returned by ProcessProposal, and whether the evaluate/endorse methods should close the connection to the peer and/or retry the proposal on another peer.

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
  • Loading branch information
andrew-coleman authored and denyeart committed Nov 18, 2021
1 parent 5014709 commit 5c2e358
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 71 deletions.
9 changes: 7 additions & 2 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ type ApplicationConfigRetriever interface {
GetApplicationConfig(cid string) (channelconfig.Application, bool)
}

const (
ErrorExecutionTimeout = "timeout expired while executing transaction"
ErrorStreamTerminated = "chaincode stream terminated"
)

// Handler implements the peer side of the chaincode stream.
type Handler struct {
// Keepalive specifies the interval at which keep-alive messages are sent.
Expand Down Expand Up @@ -1180,10 +1185,10 @@ func (h *Handler) Execute(txParams *ccprovider.TransactionParams, namespace stri
// response is sent to user or calling chaincode. ChaincodeMessage_ERROR
// are typically treated as error
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
err = errors.New(ErrorExecutionTimeout)
h.Metrics.ExecuteTimeouts.With("chaincode", h.chaincodeID).Add(1)
case <-h.streamDone():
err = errors.New("chaincode stream terminated")
err = errors.New(ErrorStreamTerminated)
}

return ccresp, err
Expand Down
123 changes: 78 additions & 45 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"fmt"
"math/rand"
"strings"
"sync"

"github.com/golang/protobuf/proto"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/core/aclmgmt/resources"
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/internal/pkg/gateway/event"
"github.com/hyperledger/fabric/protoutil"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -71,33 +73,29 @@ func (gs *Server) Evaluate(ctx context.Context, request *gp.EvaluateRequest) (*g
gs.logger.Debugw("Sending to peer:", "channel", channel, "chaincode", chaincodeID, "MSPID", endorser.mspid, "endpoint", endorser.address)

pr, err := endorser.client.ProcessProposal(ctx, signedProposal)
if err != nil {
logger.Debugw("Evaluate call to endorser failed", "chaincode", chaincodeID, "channel", channel, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "error", err)
errDetails = append(errDetails, errorDetail(endorser.endpointConfig, err))
gs.registry.removeEndorser(endorser)
endorser = plan.retry(endorser)
if endorser == nil {
return nil, newRpcError(codes.Aborted, "failed to evaluate transaction, see attached details for more info", errDetails...)
}
}

response = pr.GetResponse()
if response != nil {
if response.Status < 200 || response.Status >= 400 {
logger.Debugw("Evaluate call to endorser returned a malformed or error response", "chaincode", chaincodeID, "channel", channel, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", response.Status, "message", response.Message)
err = fmt.Errorf("error %d returned from chaincode %s on channel %s: %s", response.Status, chaincodeID, channel, response.Message)
endpointErr := errorDetail(endorser.endpointConfig, err)
errDetails = append(errDetails, endpointErr)
// this is a chaincode error response - don't retry
return nil, newRpcError(codes.Aborted, "evaluate call to endorser returned error: "+response.Message, errDetails...)
}

success, message, retry, remove := gs.responseStatus(pr, err)
if success {
response = pr.Response
// Prefer result from proposal response as Response.Payload is not required to be transaction result
if result, err := getResultFromProposalResponse(pr); err == nil {
response.Payload = result
} else {
logger.Warnw("Successful proposal response contained no transaction result", "error", err.Error(), "chaincode", chaincodeID, "channel", channel, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "status", response.Status, "message", response.Message)
}
} else {
logger.Debugw("Evaluate call to endorser failed", "chaincode", chaincodeID, "channel", channel, "txid", request.TransactionId, "endorserAddress", endorser.endpointConfig.address, "endorserMspid", endorser.endpointConfig.mspid, "error", message)
errDetails = append(errDetails, errorDetail(endorser.endpointConfig, message))
if remove {
gs.registry.removeEndorser(endorser)
}
if retry {
endorser = plan.nextPeerInGroup(endorser)
} else {
return nil, newRpcError(codes.Aborted, "evaluate call to endorser returned error: "+message, errDetails...)
}
if endorser == nil {
return nil, newRpcError(codes.Aborted, "failed to evaluate transaction, see attached details for more info", errDetails...)
}
}
}

Expand Down Expand Up @@ -170,17 +168,15 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
var errDetails []proto.Message
for firstResponse == nil && firstEndorser != nil {
firstResponse, err = firstEndorser.client.ProcessProposal(ctx, signedProposal)
if err != nil {
logger.Debugw("Endorse call to endorser failed", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "error", err)
errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, err))
gs.registry.removeEndorser(firstEndorser)
firstEndorser = plan.retry(firstEndorser)
} else if firstResponse.Response.Status < 200 || firstResponse.Response.Status >= 400 {
logger.Debugw("Endorse call to endorser returned failure", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "status", firstResponse.Response.Status, "message", firstResponse.Response.Message)
err := fmt.Errorf("error %d, %s", firstResponse.Response.Status, firstResponse.Response.Message)
endpointErr := errorDetail(firstEndorser.endpointConfig, err)
errDetails = append(errDetails, endpointErr)
firstEndorser = plan.retry(firstEndorser)
success, message, _, remove := gs.responseStatus(firstResponse, err)

if !success {
logger.Debugw("Endorse call to endorser failed", "channel", request.ChannelId, "txid", request.TransactionId, "endorserAddress", firstEndorser.endpointConfig.address, "endorserMspid", firstEndorser.endpointConfig.mspid, "error", message)
errDetails = append(errDetails, errorDetail(firstEndorser.endpointConfig, message))
if remove {
gs.registry.removeEndorser(firstEndorser)
}
firstEndorser = plan.nextPeerInGroup(firstEndorser)
firstResponse = nil
}
}
Expand Down Expand Up @@ -232,18 +228,11 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
for e != nil {
gs.logger.Debugw("Sending to endorser:", "channel", channel, "chaincode", chaincodeID, "MSPID", e.mspid, "endpoint", e.address)
response, err := e.client.ProcessProposal(ctx, signedProposal)
switch {
case err != nil:
logger.Debugw("Endorse call to endorser failed", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "error", err)
responseCh <- &endorserResponse{err: errorDetail(e.endpointConfig, err)}
gs.registry.removeEndorser(e)
e = plan.retry(e)
case response.Response.Status < 200 || response.Response.Status >= 400:
// this is an error case and will be returned in the error details to the client
logger.Debugw("Endorse call to endorser returned failure", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)
responseCh <- &endorserResponse{err: errorDetail(e.endpointConfig, fmt.Errorf("error %d, %s", response.Response.Status, response.Response.Message))}
e = plan.retry(e)
default:
// Ignore the retry flag returned by the following responseStatus call. Endorse will retry until all endorsement layouts have been exhausted.
// It tries to get a successful endorsement from each org and minimise the changes of a rogue peer scuppering the transaction.
// If an org is behaving badly, it can move on to a different layout.
success, message, _, remove := gs.responseStatus(response, err)
if success {
logger.Debugw("Endorse call to endorser returned success", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "status", response.Response.Status, "message", response.Response.Message)

responseMessage := response.GetResponse()
Expand All @@ -254,6 +243,13 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
endorsements := plan.update(e, response)
responseCh <- &endorserResponse{endorsementSet: endorsements}
e = nil
} else {
logger.Debugw("Endorse call to endorser failed", "channel", request.ChannelId, "txid", request.TransactionId, "numEndorsers", len(endorsers), "endorserAddress", e.endpointConfig.address, "endorserMspid", e.endpointConfig.mspid, "error", message)
responseCh <- &endorserResponse{err: errorDetail(e.endpointConfig, message)}
if remove {
gs.registry.removeEndorser(e)
}
e = plan.nextPeerInGroup(e)
}
}
}(e)
Expand Down Expand Up @@ -287,6 +283,43 @@ func (gs *Server) Endorse(ctx context.Context, request *gp.EndorseRequest) (*gp.
return endorseResponse, nil
}

// responseStatus unpacks the proposal response and error values that are returned from ProcessProposal and
// determines how the gateway should react (retry?, close connection?).
// Uses the grpc canonical status error codes and their recommended actions.
// Returns:
// - response successful (bool)
// - error message extracted from the err or generated from 500 proposal response (string)
// - should the gateway retry (only the Evaluate() uses this) (bool)
// - should the gateway close the connection and remove the peer from its registry (bool)
func (gs *Server) responseStatus(response *peer.ProposalResponse, err error) (success bool, message string, retry bool, remove bool) {
if err != nil {
if response == nil {
// there is no ProposalResponse, so this must have been generated by grpc in response to an unavailable peer
// - close the connection and retry on another
return false, err.Error(), true, true
}
// there is a response and an err, so it must have been from the unpackProposal() or preProcess() stages
// preProcess does all the signature and ACL checking. In either case, no point retrying, or closing the connection (it's a client error)
return false, err.Error(), false, false
}
if response.Response.Status < 200 || response.Response.Status >= 400 {
if response.Payload == nil && response.Response.Status == 500 {
// there's a error 500 response but no payload, so the response was generated in the peer rather than the chaincode
if strings.HasSuffix(response.Response.Message, chaincode.ErrorStreamTerminated) {
// chaincode container crashed probably. Close connection and retry on another peer
return false, response.Response.Message, true, true
}
// some other error - retry on another peer
return false, response.Response.Message, true, false
} else {
// otherwise it must be an error response generated by the chaincode
return false, fmt.Sprintf("chaincode response %d, %s", response.Response.Status, response.Response.Message), false, false
}
}
// anything else is a success
return true, "", false, false
}

// Submit will send the signed transaction to the ordering service. The response indicates whether the transaction was
// successfully received by the orderer. This does not imply successful commit of the transaction, only that is has
// been delivered to the orderer.
Expand Down Expand Up @@ -320,7 +353,7 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
return &gp.SubmitResponse{}, nil
}
logger.Warnw("Error sending transaction to orderer", "TxID", request.TransactionId, "endpoint", orderer.address, "err", err)
errDetails = append(errDetails, errorDetail(orderer.endpointConfig, err))
errDetails = append(errDetails, errorDetail(orderer.endpointConfig, err.Error()))
}

return nil, newRpcError(codes.Aborted, "no orderers could successfully process transaction", errDetails...)
Expand Down
96 changes: 83 additions & 13 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,11 @@ func TestEvaluate(t *testing.T) {
proposalResponseStatus: 400,
proposalResponseMessage: "Mock chaincode error",
},
errString: "rpc error: code = Aborted desc = evaluate call to endorser returned error: Mock chaincode error",
errString: "rpc error: code = Aborted desc = evaluate call to endorser returned error: chaincode response 400, Mock chaincode error",
errDetails: []*pb.ErrorDetail{{
Address: "peer1:8051",
MspId: "msp1",
Message: "error 400 returned from chaincode test_chaincode on channel test_channel: Mock chaincode error",
Message: "chaincode response 400, Mock chaincode error",
}},
},
{
Expand All @@ -338,8 +338,8 @@ func TestEvaluate(t *testing.T) {
"g3": {{endorser: peer4Mock, height: 5}}, // msp3
},
postSetup: func(t *testing.T, def *preparedTest) {
def.localEndorser.ProcessProposalReturns(nil, status.Error(codes.Aborted, "bad local endorser"))
peer1Mock.client.(*mocks.EndorserClient).ProcessProposalReturns(nil, status.Error(codes.Aborted, "bad peer1 endorser"))
def.localEndorser.ProcessProposalReturns(createErrorResponse(t, 500, "bad local endorser", nil), nil)
peer1Mock.client.(*mocks.EndorserClient).ProcessProposalReturns(createErrorResponse(t, 500, "bad peer1 endorser", nil), nil)
},
expectedEndorsers: []string{"peer4:11051"},
},
Expand All @@ -358,21 +358,81 @@ func TestEvaluate(t *testing.T) {
"g3": {{endorser: peer4Mock, height: 5}}, // msp3
},
postSetup: func(t *testing.T, def *preparedTest) {
def.localEndorser.ProcessProposalReturns(nil, status.Error(codes.Aborted, "bad local endorser"))
peer1Mock.client.(*mocks.EndorserClient).ProcessProposalReturns(nil, status.Error(codes.Aborted, "bad peer1 endorser"))
def.localEndorser.ProcessProposalReturns(createErrorResponse(t, 500, "bad local endorser", nil), nil)
peer1Mock.client.(*mocks.EndorserClient).ProcessProposalReturns(createErrorResponse(t, 500, "bad peer1 endorser", nil), nil)
},
endorsingOrgs: []string{"msp1"},
errString: "rpc error: code = Aborted desc = failed to evaluate transaction, see attached details for more info",
errDetails: []*pb.ErrorDetail{
{
Address: "localhost:7051",
MspId: "msp1",
Message: "bad local endorser",
},
{
Address: "peer1:8051",
MspId: "msp1",
Message: "bad peer1 endorser",
},
},
},
{
name: "fails due to invalid signature (pre-process check) - does not retry",
members: []networkMember{
{"id1", "localhost:7051", "msp1", 4},
{"id2", "peer1:8051", "msp1", 4},
{"id3", "peer2:9051", "msp2", 3},
{"id4", "peer3:10051", "msp2", 4},
{"id5", "peer4:11051", "msp3", 5},
},
plan: endorsementPlan{
"g1": {{endorser: localhostMock, height: 4}, {endorser: peer1Mock, height: 4}}, // msp1
"g2": {{endorser: peer2Mock, height: 3}, {endorser: peer3Mock, height: 4}}, // msp2
"g3": {{endorser: peer4Mock, height: 5}}, // msp3
},
postSetup: func(t *testing.T, def *preparedTest) {
def.localEndorser.ProcessProposalReturns(createErrorResponse(t, 500, "invalid signature", nil), fmt.Errorf("invalid signature"))
},
endorsingOrgs: []string{"msp1"},
errString: "rpc error: code = Aborted desc = evaluate call to endorser returned error: invalid signature",
errDetails: []*pb.ErrorDetail{
{
Address: "localhost:7051",
MspId: "msp1",
Message: "invalid signature",
},
},
},
{
name: "fails due to chaincode panic - retry on next peer",
members: []networkMember{
{"id1", "localhost:7051", "msp1", 4},
{"id2", "peer1:8051", "msp1", 4},
{"id3", "peer2:9051", "msp2", 3},
{"id4", "peer3:10051", "msp2", 4},
{"id5", "peer4:11051", "msp3", 5},
},
plan: endorsementPlan{
"g1": {{endorser: localhostMock, height: 4}, {endorser: peer1Mock, height: 4}}, // msp1
"g2": {{endorser: peer2Mock, height: 3}, {endorser: peer3Mock, height: 4}}, // msp2
"g3": {{endorser: peer4Mock, height: 5}}, // msp3
},
postSetup: func(t *testing.T, def *preparedTest) {
def.localEndorser.ProcessProposalReturns(createErrorResponse(t, 500, "error in simulation: chaincode stream terminated", nil), nil)
peer1Mock.client.(*mocks.EndorserClient).ProcessProposalReturns(createErrorResponse(t, 500, "error in simulation: chaincode stream terminated", nil), nil)
},
endorsingOrgs: []string{"msp1"},
errString: "rpc error: code = Aborted desc = failed to evaluate transaction, see attached details for more info",
errDetails: []*pb.ErrorDetail{
{
Address: "localhost:7051",
MspId: "msp1",
Message: "rpc error: code = Aborted desc = bad local endorser",
Message: "error in simulation: chaincode stream terminated",
},
{
Address: "peer1:8051",
MspId: "msp1",
Message: "rpc error: code = Aborted desc = bad peer1 endorser",
Message: "error in simulation: chaincode stream terminated",
},
},
},
Expand Down Expand Up @@ -779,12 +839,12 @@ func TestEndorse(t *testing.T) {
{
Address: "localhost:7051",
MspId: "msp1",
Message: "error 400, Mock chaincode error",
Message: "chaincode response 400, Mock chaincode error",
},
{
Address: "peer1:8051",
MspId: "msp1",
Message: "error 400, Mock chaincode error",
Message: "chaincode response 400, Mock chaincode error",
},
},
},
Expand All @@ -805,7 +865,7 @@ func TestEndorse(t *testing.T) {
errDetails: []*pb.ErrorDetail{{
Address: "peer4:11051",
MspId: "msp3",
Message: "error 400, Mock chaincode error",
Message: "chaincode response 400, Mock chaincode error",
}},
},
{
Expand Down Expand Up @@ -1653,15 +1713,15 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest {
epDef = defaultEndpointDef
}
if epDef.proposalError != nil {
localEndorser.ProcessProposalReturns(nil, epDef.proposalError)
localEndorser.ProcessProposalReturns(createErrorResponse(t, 500, epDef.proposalError.Error(), nil), nil)
} else {
localEndorser.ProcessProposalReturns(createProposalResponseWithInterest(t, localhostMock.address, localResponse, epDef.proposalResponseStatus, epDef.proposalResponseMessage, tt.interest), nil)
}

for _, e := range endorsers {
e.client = &mocks.EndorserClient{}
if epDef.proposalError != nil {
e.client.(*mocks.EndorserClient).ProcessProposalReturns(nil, epDef.proposalError)
e.client.(*mocks.EndorserClient).ProcessProposalReturns(createErrorResponse(t, 500, epDef.proposalError.Error(), nil), nil)
} else {
e.client.(*mocks.EndorserClient).ProcessProposalReturns(createProposalResponseWithInterest(t, e.address, epDef.proposalResponseValue, epDef.proposalResponseStatus, epDef.proposalResponseMessage, tt.interest), nil)
}
Expand Down Expand Up @@ -2003,6 +2063,16 @@ func createProposalResponseWithInterest(t *testing.T, endorser, value string, st
return response
}

func createErrorResponse(t *testing.T, status int32, errMessage string, payload []byte) *peer.ProposalResponse {
return &peer.ProposalResponse{
Response: &peer.Response{
Status: status,
Payload: payload,
Message: errMessage,
},
}
}

func marshal(msg proto.Message, t *testing.T) []byte {
buf, err := proto.Marshal(msg)
require.NoError(t, err, "Failed to marshal message")
Expand Down

0 comments on commit 5c2e358

Please sign in to comment.