Skip to content

Commit

Permalink
Gateway should apply OrdererEndpointOverrides
Browse files Browse the repository at this point in the history
The peer local config allows the endpoint addresses and TLS root certs or ordering nodes to be overridden.
The gateway should apply these overrides when connecting to orderer nodes for transaction submit.

Signed-off-by: andrew-coleman <andrew_coleman@uk.ibm.com>
(cherry picked from commit aea8ea8)
  • Loading branch information
andrew-coleman authored and denyeart committed Jan 24, 2023
1 parent 60d26e1 commit 3fd9891
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 52 deletions.
6 changes: 3 additions & 3 deletions internal/pkg/gateway/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,11 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
var errDetails []proto.Message
for _, index := range rand.Perm(len(orderers)) {
orderer := orderers[index]
logger.Infow("Sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.address)
logger.Infow("Sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress)
response, err := gs.broadcast(ctx, orderer, txn)
if err != nil {
errDetails = append(errDetails, errorDetail(orderer.endpointConfig, err.Error()))
logger.Warnw("Error sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.address, "err", err)
logger.Warnw("Error sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress, "err", err)
continue
}

Expand All @@ -450,7 +450,7 @@ func (gs *Server) Submit(ctx context.Context, request *gp.SubmitRequest) (*gp.Su
return &gp.SubmitResponse{}, nil
}

logger.Warnw("Unsuccessful response sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.address, "status", status, "info", response.GetInfo())
logger.Warnw("Unsuccessful response sending transaction to orderer", "txID", request.TransactionId, "endpoint", orderer.logAddress, "status", status, "info", response.GetInfo())

if status >= 400 && status < 500 {
// client error - don't retry
Expand Down
134 changes: 103 additions & 31 deletions internal/pkg/gateway/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
ledgermocks "github.com/hyperledger/fabric/internal/pkg/gateway/ledger/mocks"
"github.com/hyperledger/fabric/internal/pkg/gateway/mocks"
idmocks "github.com/hyperledger/fabric/internal/pkg/identity/mocks"
"github.com/hyperledger/fabric/internal/pkg/peer/orderers"
"github.com/hyperledger/fabric/protoutil"
"github.com/pkg/errors"
"github.com/spf13/viper"
Expand Down Expand Up @@ -123,32 +124,33 @@ const (
)

type testDef struct {
name string
plan endorsementPlan
layouts []endorsementLayout
members []networkMember
config *dp.ConfigResult
identity []byte
localResponse string
errString string
errCode codes.Code
errDetails []*pb.ErrorDetail
endpointDefinition *endpointDef
endorsingOrgs []string
postSetup func(t *testing.T, def *preparedTest)
postTest func(t *testing.T, def *preparedTest)
expectedEndorsers []string
finderStatus *commit.Status
finderErr error
eventErr error
policyErr error
expectedResponse proto.Message
expectedResponses []proto.Message
transientData map[string][]byte
interest *peer.ChaincodeInterest
blocks []*cp.Block
startPosition *ab.SeekPosition
afterTxID string
name string
plan endorsementPlan
layouts []endorsementLayout
members []networkMember
config *dp.ConfigResult
identity []byte
localResponse string
errString string
errCode codes.Code
errDetails []*pb.ErrorDetail
endpointDefinition *endpointDef
endorsingOrgs []string
postSetup func(t *testing.T, def *preparedTest)
postTest func(t *testing.T, def *preparedTest)
expectedEndorsers []string
finderStatus *commit.Status
finderErr error
eventErr error
policyErr error
expectedResponse proto.Message
expectedResponses []proto.Message
transientData map[string][]byte
interest *peer.ChaincodeInterest
blocks []*cp.Block
startPosition *ab.SeekPosition
afterTxID string
ordererEndpointOverrides map[string]*orderers.Endpoint
}

type preparedTest struct {
Expand Down Expand Up @@ -1349,6 +1351,67 @@ func TestSubmit(t *testing.T) {
},
},
},
{
name: "orderer endpoint overrides",
plan: endorsementPlan{
"g1": {{endorser: localhostMock}},
},
ordererEndpointOverrides: map[string]*orderers.Endpoint{
"orderer1:7050": {Address: "override1:1234"},
"orderer3:7050": {Address: "override3:4321"},
},
config: &dp.ConfigResult{
Orderers: map[string]*dp.Endpoints{
"msp1": {
Endpoint: []*dp.Endpoint{
{Host: "orderer1", Port: 7050},
{Host: "orderer2", Port: 7050},
{Host: "orderer3", Port: 7050},
},
},
},
Msps: map[string]*msp.FabricMSPConfig{
"msp1": {
TlsRootCerts: [][]byte{},
},
},
},
endpointDefinition: &endpointDef{
proposalResponseStatus: 200,
ordererBroadcastError: status.Error(codes.Unavailable, "Orderer not listening!"),
},
errCode: codes.Unavailable,
errString: "no orderers could successfully process transaction",
errDetails: []*pb.ErrorDetail{
{
Address: "override1:1234 (mapped from orderer1:7050)",
MspId: "msp1",
Message: "rpc error: code = Unavailable desc = Orderer not listening!",
},
{
Address: "orderer2:7050",
MspId: "msp1",
Message: "rpc error: code = Unavailable desc = Orderer not listening!",
},
{
Address: "override3:4321 (mapped from orderer3:7050)",
MspId: "msp1",
Message: "rpc error: code = Unavailable desc = Orderer not listening!",
},
},
postTest: func(t *testing.T, def *preparedTest) {
var addresses []string
for i := 0; i < def.dialer.CallCount(); i++ {
_, address, _ := def.dialer.ArgsForCall(i)
addresses = append(addresses, address)
}
require.Contains(t, addresses, "override1:1234")
require.NotContains(t, addresses, "orderer1:7050")
require.Contains(t, addresses, "orderer2:7050")
require.Contains(t, addresses, "override3:4321")
require.NotContains(t, addresses, "orderer3:7050")
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -1368,11 +1431,18 @@ func TestSubmit(t *testing.T) {

if checkError(t, &tt, err) {
require.Nil(t, submitResponse, "response on error")
if tt.postTest != nil {
tt.postTest(t, test)
}
return
}

require.NoError(t, err)
require.True(t, proto.Equal(&pb.SubmitResponse{}, submitResponse), "Incorrect response")

if tt.postTest != nil {
tt.postTest(t, test)
}
})
}
}
Expand Down Expand Up @@ -2001,6 +2071,7 @@ func TestNilArgs(t *testing.T) {
&comm.SecureOptions{},
config.GetOptions(viper.New()),
nil,
nil,
)
ctx := context.Background()

Expand Down Expand Up @@ -2220,11 +2291,11 @@ func prepareTest(t *testing.T, tt *testDef) *preparedTest {
Endpoint: "localhost:7051",
}

server := newServer(localEndorser, disc, mockFinder, mockPolicy, mockLedgerProvider, member, "msp1", &comm.SecureOptions{}, options, nil)
server := newServer(localEndorser, disc, mockFinder, mockPolicy, mockLedgerProvider, member, "msp1", &comm.SecureOptions{}, options, nil, tt.ordererEndpointOverrides)

dialer := &mocks.Dialer{}
dialer.Returns(nil, nil)
server.registry.endpointFactory = createEndpointFactory(t, epDef, dialer.Spy)
server.registry.endpointFactory = createEndpointFactory(t, epDef, dialer.Spy, tt.ordererEndpointOverrides)

ctx := context.WithValue(context.Background(), contextKey("orange"), "apples")

Expand Down Expand Up @@ -2413,7 +2484,7 @@ func createMockPeer(t *testing.T, endorser *endorserState) *dp.Peer {
}
}

func createEndpointFactory(t *testing.T, definition *endpointDef, dialer dialer) *endpointFactory {
func createEndpointFactory(t *testing.T, definition *endpointDef, dialer dialer, ordererEndpointOverrides map[string]*orderers.Endpoint) *endpointFactory {
var endpoint string
ca, err := tlsgen.NewCA()
require.NoError(t, err, "failed to create CA")
Expand Down Expand Up @@ -2446,8 +2517,9 @@ func createEndpointFactory(t *testing.T, definition *endpointDef, dialer dialer)
endpoint = target
return dialer(ctx, target, opts...)
},
clientKey: pair.Key,
clientCert: pair.Cert,
clientKey: pair.Key,
clientCert: pair.Cert,
ordererEndpointOverrides: ordererEndpointOverrides,
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/gateway/apiutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func toRpcStatus(err error) *status.Status {
}

func errorDetail(e *endpointConfig, msg string) *gp.ErrorDetail {
return &gp.ErrorDetail{Address: e.address, MspId: e.mspid, Message: msg}
return &gp.ErrorDetail{Address: e.logAddress, MspId: e.mspid, Message: msg}
}

func getResultFromProposalResponse(proposalResponse *peer.ProposalResponse) ([]byte, error) {
Expand Down
29 changes: 20 additions & 9 deletions internal/pkg/gateway/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/internal/pkg/comm"
"github.com/hyperledger/fabric/internal/pkg/peer/orderers"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
Expand All @@ -34,6 +35,7 @@ type orderer struct {
type endpointConfig struct {
pkiid common.PKIidType
address string
logAddress string
mspid string
tlsRootCerts [][]byte
}
Expand All @@ -47,12 +49,13 @@ type (
type dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error)

type endpointFactory struct {
timeout time.Duration
connectEndorser endorserConnector
connectOrderer ordererConnector
dialer dialer
clientCert []byte
clientKey []byte
timeout time.Duration
connectEndorser endorserConnector
connectOrderer ordererConnector
dialer dialer
clientCert []byte
clientKey []byte
ordererEndpointOverrides map[string]*orderers.Endpoint
}

func (ef *endpointFactory) newEndorser(pkiid common.PKIidType, address, mspid string, tlsRootCerts [][]byte) (*endorser, error) {
Expand All @@ -74,12 +77,20 @@ func (ef *endpointFactory) newEndorser(pkiid common.PKIidType, address, mspid st
return &endorser{
client: connectEndorser(conn),
closeConnection: close,
endpointConfig: &endpointConfig{pkiid: pkiid, address: address, mspid: mspid, tlsRootCerts: tlsRootCerts},
endpointConfig: &endpointConfig{pkiid: pkiid, address: address, logAddress: address, mspid: mspid, tlsRootCerts: tlsRootCerts},
}, nil
}

func (ef *endpointFactory) newOrderer(address, mspid string, tlsRootCerts [][]byte) (*orderer, error) {
conn, err := ef.newConnection(address, tlsRootCerts)
connAddress := address
logAddess := address
connCerts := tlsRootCerts
if override, ok := ef.ordererEndpointOverrides[address]; ok {
connAddress = override.Address
connCerts = override.RootCerts
logAddess = fmt.Sprintf("%s (mapped from %s)", connAddress, address)
}
conn, err := ef.newConnection(connAddress, connCerts)
if err != nil {
return nil, err
}
Expand All @@ -90,7 +101,7 @@ func (ef *endpointFactory) newOrderer(address, mspid string, tlsRootCerts [][]by
return &orderer{
client: connectOrderer(conn),
closeConnection: conn.Close,
endpointConfig: &endpointConfig{address: address, mspid: mspid, tlsRootCerts: tlsRootCerts},
endpointConfig: &endpointConfig{address: address, logAddress: logAddess, mspid: mspid, tlsRootCerts: tlsRootCerts},
}, nil
}

Expand Down
19 changes: 15 additions & 4 deletions internal/pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hyperledger/fabric/internal/pkg/gateway/commit"
"github.com/hyperledger/fabric/internal/pkg/gateway/config"
"github.com/hyperledger/fabric/internal/pkg/gateway/ledger"
"github.com/hyperledger/fabric/internal/pkg/peer/orderers"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -77,6 +78,7 @@ func CreateServer(
secureOptions,
options,
systemChaincodes,
peerInstance.OrdererEndpointOverrides,
)

peerInstance.AddConfigCallbacks(server.registry.configUpdate)
Expand All @@ -94,13 +96,22 @@ func newServer(localEndorser peerproto.EndorserClient,
secureOptions *comm.SecureOptions,
options config.Options,
systemChaincodes scc.BuiltinSCCs,
ordererEndpointOverrides map[string]*orderers.Endpoint,
) *Server {
return &Server{
registry: &registry{
localEndorser: &endorser{client: localEndorser, endpointConfig: &endpointConfig{pkiid: localInfo.PKIid, address: localInfo.Endpoint, mspid: localMSPID}},
discovery: discovery,
logger: logger,
endpointFactory: &endpointFactory{timeout: options.DialTimeout, clientCert: secureOptions.Certificate, clientKey: secureOptions.Key},
localEndorser: &endorser{
client: localEndorser,
endpointConfig: &endpointConfig{pkiid: localInfo.PKIid, address: localInfo.Endpoint, logAddress: localInfo.Endpoint, mspid: localMSPID},
},
discovery: discovery,
logger: logger,
endpointFactory: &endpointFactory{
timeout: options.DialTimeout,
clientCert: secureOptions.Certificate,
clientKey: secureOptions.Key,
ordererEndpointOverrides: ordererEndpointOverrides,
},
remoteEndorsers: map[string]*endorser{},
channelInitialized: map[string]bool{},
systemChaincodes: systemChaincodes,
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/gateway/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (reg *registry) orderers(channel string) ([]*orderer, error) {
client, err := reg.endpointFactory.newOrderer(ep.address, ep.mspid, ep.tlsRootCerts)
if err != nil {
// Failed to connect to this orderer for some reason. Log the problem and skip to the next one.
reg.logger.Warnw("Failed to connect to orderer", "address", ep.address, "err", err)
reg.logger.Warnw("Failed to connect to orderer", "address", ep.logAddress, "err", err)
continue
}
var loaded bool
Expand All @@ -285,10 +285,10 @@ func (reg *registry) orderers(channel string) ([]*orderer, error) {
err = client.closeConnection()
if err != nil {
// Failed to close this new connection. Log the problem.
reg.logger.Warnw("Failed to close connection to orderer", "address", ep.address, "err", err)
reg.logger.Warnw("Failed to close connection to orderer", "address", ep.logAddress, "err", err)
}
} else {
reg.logger.Infow("Added orderer to registry", "address", ep.address)
reg.logger.Infow("Added orderer to registry", "address", ep.logAddress)
}
}
orderers = append(orderers, entry.(*orderer))
Expand Down Expand Up @@ -460,7 +460,7 @@ func (reg *registry) closeStaleOrdererConnections(channel string, channelOrderer
if found {
err := client.(*orderer).closeConnection()
if err != nil {
reg.logger.Errorw("Failed to close connection to orderer", "address", ep.address, "mspid", ep.mspid, "err", err)
reg.logger.Errorw("Failed to close connection to orderer", "address", ep.logAddress, "mspid", ep.mspid, "err", err)
}
}
}
Expand Down

0 comments on commit 3fd9891

Please sign in to comment.