Skip to content

Commit

Permalink
Randomize selection of orderer nodes with retry (#2951)
Browse files Browse the repository at this point in the history
In the Submit() API method, the list of available orderers has been randomized to support improved load balancing.
Retry logic has been added such that if the selected orderer fails to return a success code, then the next orderer in the list is tried. If no orderers succeed, then return an error (containing details from each orderer) to the client.

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
  • Loading branch information
andrew-coleman committed Sep 28, 2021
1 parent 210d20f commit 3a93662
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 20 deletions.
30 changes: 22 additions & 8 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package gateway
import (
"context"
"fmt"
"math/rand"
"sync"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -281,31 +282,44 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
return nil, status.Errorf(codes.Unavailable, "no orderer nodes available")
}

orderer := orderers[0] // send to first orderer for now
// try each orderer in random order
var errDetails []proto.Message
for _, index := range rand.Perm(len(orderers)) {
orderer := orderers[index]
err := gs.broadcast(ctx, orderer, txn)
if err == nil {
return &gp.SubmitResponse{}, nil
}
logger.Warnw("Error sending transaction to orderer", "TxID", request.TransactionId, "endpoint", orderer.address, "err", err)
errDetails = append(errDetails, endpointError(orderer.endpointConfig, err))
}

return nil, rpcError(codes.Aborted, "no orderers could successfully process transaction", errDetails...)
}

func (gs *Server) broadcast(ctx context.Context, orderer *orderer, txn *common.Envelope) error {
broadcast, err := orderer.client.Broadcast(ctx)
if err != nil {
return nil, wrappedRpcError(err, "failed to create BroadcastClient", endpointError(orderer.endpointConfig, err))
return fmt.Errorf("failed to create BroadcastClient: %w", err)
}
logger.Info("Submitting txn to orderer")
if err := broadcast.Send(txn); err != nil {
return nil, wrappedRpcError(err, "failed to send transaction to orderer", endpointError(orderer.endpointConfig, err))
return fmt.Errorf("failed to send transaction to orderer: %w", err)
}

response, err := broadcast.Recv()
if err != nil {
return nil, wrappedRpcError(err, "failed to receive response from orderer", endpointError(orderer.endpointConfig, err))
return fmt.Errorf("failed to receive response from orderer: %w", err)
}

if response == nil {
return nil, status.Error(codes.Aborted, "received nil response from orderer")
return fmt.Errorf("received nil response from orderer")
}

if response.Status != common.Status_SUCCESS {
return nil, status.Errorf(codes.Aborted, "received unsuccessful response from orderer: %s", common.Status_name[int32(response.Status)])
return fmt.Errorf("received unsuccessful response from orderer: %s", common.Status_name[int32(response.Status)])
}

return &gp.SubmitResponse{}, nil
return nil
}

// CommitStatus returns the validation code for a specific transaction on a specific channel. If the transaction is
Expand Down
122 changes: 110 additions & 12 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,11 +736,11 @@ func TestSubmit(t *testing.T) {
proposalResponseStatus: 200,
ordererBroadcastError: status.Error(codes.FailedPrecondition, "Orderer not listening!"),
},
errString: "rpc error: code = FailedPrecondition desc = failed to create BroadcastClient: Orderer not listening!",
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
errDetails: []*pb.EndpointError{{
Address: "orderer:7050",
MspId: "msp1",
Message: "rpc error: code = FailedPrecondition desc = Orderer not listening!",
Message: "failed to create BroadcastClient: rpc error: code = FailedPrecondition desc = Orderer not listening!",
}},
},
{
Expand All @@ -752,11 +752,11 @@ func TestSubmit(t *testing.T) {
proposalResponseStatus: 200,
ordererSendError: status.Error(codes.Internal, "Orderer says no!"),
},
errString: "rpc error: code = Internal desc = failed to send transaction to orderer: Orderer says no!",
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
errDetails: []*pb.EndpointError{{
Address: "orderer:7050",
MspId: "msp1",
Message: "rpc error: code = Internal desc = Orderer says no!",
Message: "failed to send transaction to orderer: rpc error: code = Internal desc = Orderer says no!",
}},
},
{
Expand All @@ -768,11 +768,11 @@ func TestSubmit(t *testing.T) {
proposalResponseStatus: 200,
ordererRecvError: status.Error(codes.FailedPrecondition, "Orderer not happy!"),
},
errString: "rpc error: code = FailedPrecondition desc = failed to receive response from orderer: Orderer not happy!",
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
errDetails: []*pb.EndpointError{{
Address: "orderer:7050",
MspId: "msp1",
Message: "rpc error: code = FailedPrecondition desc = Orderer not happy!",
Message: "failed to receive response from orderer: rpc error: code = FailedPrecondition desc = Orderer not happy!",
}},
},
{
Expand All @@ -789,7 +789,12 @@ func TestSubmit(t *testing.T) {
return abc
}
},
errString: "rpc error: code = Aborted desc = received nil response from orderer",
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
errDetails: []*pb.EndpointError{{
Address: "orderer:7050",
MspId: "msp1",
Message: "received nil response from orderer",
}},
},
{
name: "orderer returns unsuccessful response",
Expand All @@ -808,7 +813,12 @@ func TestSubmit(t *testing.T) {
return abc
}
},
errString: "rpc error: code = Aborted desc = received unsuccessful response from orderer: " + cp.Status_name[int32(cp.Status_BAD_REQUEST)],
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
errDetails: []*pb.EndpointError{{
Address: "orderer:7050",
MspId: "msp1",
Message: "received unsuccessful response from orderer: " + cp.Status_name[int32(cp.Status_BAD_REQUEST)],
}},
},
{
name: "dialing orderer endpoint fails",
Expand All @@ -825,6 +835,96 @@ func TestSubmit(t *testing.T) {
},
errString: "rpc error: code = Unavailable desc = no orderer nodes available",
},
{
name: "orderer retry",
plan: endorsementPlan{
"g1": {{endorser: localhostMock}},
},
config: &dp.ConfigResult{
Orderers: map[string]*dp.Endpoints{
"msp1": {
Endpoint: []*dp.Endpoint{
{Host: "orderer1", Port: 7050},
{Host: "orderer2", Port: 7050},
{Host: "orderer3", Port: 7050},
},
},
},
Msps: map[string]*msp.FabricMSPConfig{
"msp1": {
TlsRootCerts: [][]byte{},
},
},
},
postSetup: func(t *testing.T, def *preparedTest) {
abc := &mocks.ABClient{}
abbc := &mocks.ABBClient{}
abbc.SendReturnsOnCall(0, status.Error(codes.FailedPrecondition, "First orderer error"))
abbc.SendReturnsOnCall(1, status.Error(codes.FailedPrecondition, "Second orderer error"))
abbc.SendReturnsOnCall(2, nil) // third time lucky
abbc.RecvReturns(&ab.BroadcastResponse{
Info: "success",
Status: cp.Status(200),
}, nil)
abc.BroadcastReturns(abbc, nil)
def.server.registry.endpointFactory = &endpointFactory{
timeout: 5 * time.Second,
connectEndorser: func(conn *grpc.ClientConn) peer.EndorserClient {
return &mocks.EndorserClient{}
},
connectOrderer: func(_ *grpc.ClientConn) ab.AtomicBroadcastClient {
return abc
},
dialer: func(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return nil, nil
},
}
},
},
{
name: "multiple orderers all fail",
plan: endorsementPlan{
"g1": {{endorser: localhostMock}},
},
config: &dp.ConfigResult{
Orderers: map[string]*dp.Endpoints{
"msp1": {
Endpoint: []*dp.Endpoint{
{Host: "orderer1", Port: 7050},
{Host: "orderer2", Port: 7050},
{Host: "orderer3", Port: 7050},
},
},
},
Msps: map[string]*msp.FabricMSPConfig{
"msp1": {
TlsRootCerts: [][]byte{},
},
},
},
endpointDefinition: &endpointDef{
proposalResponseStatus: 200,
ordererBroadcastError: status.Error(codes.FailedPrecondition, "Orderer not listening!"),
},
errString: "rpc error: code = Aborted desc = no orderers could successfully process transaction",
errDetails: []*pb.EndpointError{
{
Address: "orderer1:7050",
MspId: "msp1",
Message: "failed to create BroadcastClient: rpc error: code = FailedPrecondition desc = Orderer not listening!",
},
{
Address: "orderer2:7050",
MspId: "msp1",
Message: "failed to create BroadcastClient: rpc error: code = FailedPrecondition desc = Orderer not listening!",
},
{
Address: "orderer3:7050",
MspId: "msp1",
Message: "failed to create BroadcastClient: rpc error: code = FailedPrecondition desc = Orderer not listening!",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -1506,10 +1606,8 @@ func checkError(t *testing.T, err error, errString string, details []*pb.Endpoin
s, ok := status.FromError(err)
require.True(t, ok, "Expected a gRPC status error")
require.Len(t, s.Details(), len(details))
for i, detail := range details {
require.Equal(t, detail.Message, s.Details()[i].(*pb.EndpointError).Message)
require.Equal(t, detail.MspId, s.Details()[i].(*pb.EndpointError).MspId)
require.Equal(t, detail.Address, s.Details()[i].(*pb.EndpointError).Address)
for _, detail := range s.Details() {
require.Contains(t, details, detail)
}
}

Expand Down

0 comments on commit 3a93662

Please sign in to comment.