Skip to content

Commit

Permalink
[FABG-860] fix dial ctx error using same context
Browse files Browse the repository at this point in the history
	When a server is down and we do DialContext
	using the same context to dial multiple servers,
	the call to the failing server will cause
	a 'context deadline exceeded' for all subsquent
	calls using the same context.

	the solution to this problem is to create a new
	child context (with the original one as parent)
	having its own dial timeout for each connection
	and the original (parent) context should never timeout.

Change-Id: I31ad02077981dc4c8537293c107f1526cb76947d
Signed-off-by: Baha Shaaban <baha.shaaban@securekey.com>
  • Loading branch information
Baha Shaaban committed May 14, 2019
1 parent 48bb0d1 commit 4def0f9
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 77 deletions.
10 changes: 1 addition & 9 deletions pkg/fab/channel/transactor.go
Expand Up @@ -176,13 +176,5 @@ func (t *Transactor) CreateTransaction(request fab.TransactionRequest) (*fab.Tra

// SendTransaction send a transaction to the chain’s orderer service (one or more orderer endpoints) for consensus and committing to the ledger.
func (t *Transactor) SendTransaction(tx *fab.Transaction) (*fab.TransactionResponse, error) {
ctx, ok := contextImpl.RequestClientContext(t.reqCtx)
if !ok {
return nil, errors.New("failed get client context from reqContext for SendTransaction")
}

reqCtx, cancel := contextImpl.NewRequest(ctx, contextImpl.WithTimeoutType(fab.OrdererResponse), contextImpl.WithParent(t.reqCtx))
defer cancel()

return txn.Send(reqCtx, tx, t.orderers)
return txn.Send(t.reqCtx, tx, t.orderers)
}
2 changes: 1 addition & 1 deletion pkg/fab/comm/connector.go
Expand Up @@ -194,7 +194,7 @@ func (cc *CachingConnector) createConn(ctx context.Context, target string, opts
logger.Debugf("creating connection [%s]", target)
conn, err := grpc.DialContext(ctx, target, opts...)
if err != nil {
return nil, errors.WithMessage(err, "dialing peer failed")
return nil, errors.WithMessage(err, "dialing node failed")
}

logger.Debugf("storing connection [%s]", target)
Expand Down
97 changes: 96 additions & 1 deletion pkg/fab/mocks/mockorderer.go
Expand Up @@ -9,13 +9,19 @@ package mocks
import (
reqContext "context"
"fmt"
"net"
"sync"

"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/util/test"
"github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

// MockOrderer is a mock fabricclient.Orderer
// Nothe that calling broadcast doesn't deliver anythng. This implies
// Note that calling broadcast doesn't deliver anythng. This implies
// that the broadcast side and the deliver side are totally
// independent from the mocking point of view.
type MockOrderer struct {
Expand Down Expand Up @@ -127,3 +133,92 @@ func (o *MockOrderer) EnqueueForSendDeliver(value interface{}) {
panic(fmt.Sprintf("Value not *common.Block nor error: %+v", value))
}
}

// MockGrpcOrderer is a mock fabricclient.Orderer to test
// connectivity to the orderer. It only wraps a GRPC server.
// Note that calling broadcast doesn't deliver anythng.
// This implies that the broadcast side and the deliver side are totally
// independent from the mocking point of view.
type MockGrpcOrderer struct {
Creds credentials.TransportCredentials
srv *grpc.Server
wg sync.WaitGroup
OrdererURL string
}

// NewMockGrpcOrderer will create a new instance for the given url and TLS credentials (optional)
func NewMockGrpcOrderer(url string, tls credentials.TransportCredentials) *MockGrpcOrderer {
o := &MockGrpcOrderer{
OrdererURL: url,
Creds: tls,
}

return o
}

// Start with start the underlying GRPC server for this MockGrpcOrderer
// it updates the OrdererUrl with the address returned by the GRPC server
func (o *MockGrpcOrderer) Start() string {
// pass in TLS creds if present
if o.Creds != nil {
o.srv = grpc.NewServer(grpc.Creds(o.Creds))
} else {
o.srv = grpc.NewServer()
}
lis, err := net.Listen("tcp", o.OrdererURL)
if err != nil {
panic(fmt.Sprintf("Error starting GRPC Orderer %s", err))
}
addr := lis.Addr().String()

test.Logf("Starting MockGrpcOrderer [%s]", addr)
o.wg.Add(1)
go func() {
defer o.wg.Done()
if err := o.srv.Serve(lis); err != nil {
test.Logf("Start MockGrpcOrderer failed [%s]", err)
}
}()

o.OrdererURL = addr
return addr
}

// Stop the mock broadcast server and wait for completion.
func (o *MockGrpcOrderer) Stop() {
if o.srv == nil {
panic("MockGrpcOrderer not started")
}
test.Logf("Stopping MockGrpcOrderer [%s]", o.OrdererURL)
o.srv.Stop()
o.wg.Wait()
o.srv = nil
test.Logf("Stopped MockGrpcOrderer [%s]", o.OrdererURL)
}

// URL returns the URL of the mock GRPC Orderer
func (o *MockGrpcOrderer) URL() string {
return o.OrdererURL
}

// SendBroadcast accepts client broadcast calls and attempts connection to the grpc server
// it does not attempt to broadcast the envelope, it only tries to connect to the server
func (o *MockGrpcOrderer) SendBroadcast(ctx reqContext.Context, envelope *fab.SignedEnvelope) (*common.Status, error) {
test.Logf("creating connection [%s]", o.OrdererURL)
var err error
if o.Creds != nil {
_, err = grpc.DialContext(ctx, o.OrdererURL, grpc.WithTransportCredentials(o.Creds))
} else {
_, err = grpc.DialContext(ctx, o.OrdererURL, grpc.WithInsecure())
}
if err != nil {
return nil, errors.WithMessage(err, "dialing orderer failed")
}

return nil, nil
}

// SendDeliver is not used and can be implemented for special GRPC connectivity in the future
func (o *MockGrpcOrderer) SendDeliver(ctx reqContext.Context, envelope *fab.SignedEnvelope) (chan *common.Block, chan error) {
return nil, nil
}
22 changes: 17 additions & 5 deletions pkg/fab/txn/txn.go
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protoutil"
"github.com/hyperledger/fabric-sdk-go/pkg/common/logging"
ctxprovider "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/context"
"github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -172,10 +173,16 @@ func broadcastEnvelope(reqCtx reqContext.Context, envelope *fab.SignedEnvelope,
randOrderers := []fab.Orderer{}
randOrderers = append(randOrderers, orderers...)

// get a context client instance to create child contexts with timeout read from the config in sendBroadcast()
ctxClient, ok := context.RequestClientContext(reqCtx)
if !ok {
return nil, errors.New("failed get client context from reqContext for SendTransaction")
}

// Iterate them in a random order and try broadcasting 1 by 1
var errResp error
for _, i := range rand.Perm(len(randOrderers)) {
resp, err := sendBroadcast(reqCtx, envelope, randOrderers[i])
resp, err := sendBroadcast(reqCtx, envelope, randOrderers[i], ctxClient)
if err != nil {
errResp = err
} else {
Expand All @@ -185,11 +192,16 @@ func broadcastEnvelope(reqCtx reqContext.Context, envelope *fab.SignedEnvelope,
return nil, errResp
}

func sendBroadcast(reqCtx reqContext.Context, envelope *fab.SignedEnvelope, orderer fab.Orderer) (*fab.TransactionResponse, error) {
logger.Debugf("Broadcasting envelope to orderer :%s\n", orderer.URL())
func sendBroadcast(reqCtx reqContext.Context, envelope *fab.SignedEnvelope, orderer fab.Orderer, client ctxprovider.Client) (*fab.TransactionResponse, error) {
logger.Debugf("Broadcasting envelope to orderer: %s\n", orderer.URL())
// create a childContext for this SendBroadcast orderer using the config's timeout value
// the parent context (reqCtx) should not have a timeout value
childCtx, cancel := context.NewRequest(client, context.WithTimeoutType(fab.OrdererResponse), context.WithParent(reqCtx))
defer cancel()

// Send request
if _, err := orderer.SendBroadcast(reqCtx, envelope); err != nil {
logger.Debugf("Receive Error Response from orderer :%s\n", err)
if _, err := orderer.SendBroadcast(childCtx, envelope); err != nil {
logger.Debugf("Receive Error Response from orderer: %s\n", err)
return nil, errors.Wrapf(err, "calling orderer '%s' failed", orderer.URL())
}

Expand Down

0 comments on commit 4def0f9

Please sign in to comment.