Skip to content

Commit

Permalink
[FAB-8571] request context background and timeouts
Browse files Browse the repository at this point in the history
- timeout opts are changed to key-value pair of
timeout type and value. Any existing config timeout
can be overidden for that particular function call by
using corresponding timeout type key in opts.
- a grpc context can be passed as a background for
an operation using 'Background' opts

Change-Id: Ia12854be457b6a52a57c5518cb34845eaf587cfa
Signed-off-by: Sudesh Shetty <sudesh.shetty@securekey.com>
  • Loading branch information
sudeshrshetty committed Mar 14, 2018
1 parent 96fd401 commit 9f6e5f6
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 93 deletions.
36 changes: 25 additions & 11 deletions pkg/client/channel/api.go
Expand Up @@ -7,9 +7,11 @@ SPDX-License-Identifier: Apache-2.0
package channel

import (
reqContext "context"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config"
"github.com/hyperledger/fabric-sdk-go/pkg/errors/retry"
Expand All @@ -19,9 +21,10 @@ import (

// opts allows the user to specify more advanced options
type requestOptions struct {
Targets []fab.Peer // targets
Timeout time.Duration
Retry retry.Opts
Targets []fab.Peer // targets
Retry retry.Opts
Timeouts map[core.TimeoutType]time.Duration //timeout options for channel client operations
ParentContext reqContext.Context //parent grpc context for channel client operations (query, execute, invokehandler)
}

// RequestOption func for each Opts argument
Expand All @@ -44,14 +47,6 @@ type Response struct {
Responses []*fab.TransactionProposalResponse
}

//WithTimeout encapsulates time.Duration to Option
func WithTimeout(timeout time.Duration) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
o.Timeout = timeout
return nil
}
}

//WithTargets encapsulates ProposalProcessors to Option
func WithTargets(targets ...fab.Peer) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
Expand Down Expand Up @@ -94,3 +89,22 @@ func WithRetry(retryOpt retry.Opts) RequestOption {
return nil
}
}

//WithTimeout encapsulates key value pairs of timeout type, timeout duration to Options
func WithTimeout(timeoutType core.TimeoutType, timeout time.Duration) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
if o.Timeouts == nil {
o.Timeouts = make(map[core.TimeoutType]time.Duration)
}
o.Timeouts[timeoutType] = timeout
return nil
}
}

//WithParentContext encapsulates grpc context parent to Options
func WithParentContext(parentContext reqContext.Context) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
o.ParentContext = parentContext
return nil
}
}
24 changes: 24 additions & 0 deletions pkg/client/channel/api_test.go
Expand Up @@ -9,6 +9,8 @@ package channel
import (
"testing"

"time"

"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
fcmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -74,3 +76,25 @@ func setupMockTestContext(username string, mspID string) *fcmocks.MockContext {
ctx := fcmocks.NewMockContext(user)
return ctx
}

func TestTimeoutOptions(t *testing.T) {

opts := requestOptions{}

options := []RequestOption{WithTimeout(core.PeerResponse, 20*time.Second),
WithTimeout(core.ResMgmt, 25*time.Second), WithTimeout(core.OrdererResponse, 30*time.Second),
WithTimeout(core.EventHubConnection, 35*time.Second), WithTimeout(core.Execute, 40*time.Second),
WithTimeout(core.Query, 45*time.Second)}

for _, option := range options {
option(nil, &opts)
}

assert.True(t, opts.Timeouts[core.PeerResponse] == 20*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.ResMgmt] == 25*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.OrdererResponse] == 30*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.EventHubConnection] == 35*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.Execute] == 40*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.Query] == 45*time.Second, "timeout value by type didn't match with one supplied")

}
32 changes: 17 additions & 15 deletions pkg/client/channel/chclient.go
Expand Up @@ -27,10 +27,6 @@ import (

var logger = logging.NewLogger("fabsdk/fab")

const (
defaultHandlerTimeout = time.Second * 180
)

// Client enables access to a channel on a Fabric network.
//
// A channel client instance provides a handler to interact with peers on specified channel.
Expand Down Expand Up @@ -151,7 +147,7 @@ func (cc *Client) InvokeHandler(handler invoke.Handler, request Request, options
select {
case <-complete:
return Response(requestContext.Response), requestContext.Error
case <-time.After(requestContext.Opts.Timeout):
case <-time.After(requestContext.Opts.Timeouts[core.Execute]):
return Response{}, status.New(status.ClientStatus, status.Timeout.ToInt32(),
"request timed out", nil)
}
Expand Down Expand Up @@ -181,16 +177,21 @@ func (cc *Client) resolveRetry(ctx *invoke.RequestContext, o requestOptions) boo
//createReqContext creates req context for invoke handler
func (cc *Client) createReqContext(txnOpts *requestOptions) (reqContext.Context, reqContext.CancelFunc) {

//Setting default timeouts when not provided
if txnOpts.Timeout == 0 {
txnOpts.Timeout = cc.context.Config().Timeout(core.Execute)
if txnOpts.Timeout == 0 {
//If still zero, then set default handler timeout
txnOpts.Timeout = defaultHandlerTimeout
}
if txnOpts.Timeouts == nil {
txnOpts.Timeouts = make(map[core.TimeoutType]time.Duration)
}

return contextImpl.NewRequest(cc.context, contextImpl.WithTimeout(txnOpts.Timeout))
//setting default timeouts when not provided
if txnOpts.Timeouts[core.Execute] == 0 {
txnOpts.Timeouts[core.Execute] = cc.context.Config().TimeoutOrDefault(core.Execute)
}

reqCtx, cancel := contextImpl.NewRequest(cc.context, contextImpl.WithTimeout(txnOpts.Timeouts[core.Execute]),
contextImpl.WithParent(txnOpts.ParentContext))
//Add timeout overrides here as a value so that it can be used by immediate child contexts (in handlers/transactors)
reqCtx = reqContext.WithValue(reqCtx, contextImpl.ReqContextTimeoutOverrides, txnOpts.Timeouts)

return reqCtx, cancel
}

//prepareHandlerContexts prepares context objects for handlers
Expand Down Expand Up @@ -243,8 +244,9 @@ func (cc *Client) addDefaultTimeout(ctx context.Client, timeOutType core.Timeout
option(ctx, &txnOpts)
}

if txnOpts.Timeout == 0 {
return append(options, WithTimeout(cc.context.Config().TimeoutOrDefault(timeOutType)))
if txnOpts.Timeouts[timeOutType] == 0 {
//InvokeHandler relies on Execute timeout
return append(options, WithTimeout(core.Execute, cc.context.Config().TimeoutOrDefault(timeOutType)))
}
return options
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/client/channel/invoke/api.go
Expand Up @@ -8,6 +8,7 @@ SPDX-License-Identifier: Apache-2.0
package invoke

import (
reqContext "context"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
Expand All @@ -18,9 +19,10 @@ import (

// Opts allows the user to specify more advanced options
type Opts struct {
Targets []fab.Peer // targets
Timeout time.Duration
Retry retry.Opts
Targets []fab.Peer // targets
Retry retry.Opts
Timeouts map[core.TimeoutType]time.Duration
ParentContext reqContext.Context //parent grpc context
}

// Request contains the parameters to execute transaction
Expand Down
3 changes: 2 additions & 1 deletion pkg/client/channel/invoke/txnhandler.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/errors/status"
"github.com/pkg/errors"

"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/txn"
Expand Down Expand Up @@ -163,7 +164,7 @@ func (c *CommitTxHandler) Handle(requestContext *RequestContext, clientContext *
requestContext.Error = status.New(status.EventServerStatus, int32(txStatus.TxValidationCode), "received invalid transaction", nil)
return
}
case <-time.After(requestContext.Opts.Timeout):
case <-time.After(requestContext.Opts.Timeouts[core.Execute]):
requestContext.Error = errors.New("Execute didn't receive block event")
return
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/client/channel/invoke/txnhandler_test.go
Expand Up @@ -16,6 +16,7 @@ import (

txnmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
fcmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestExecuteTxHandlerSuccess(t *testing.T) {
select {
case txStatusReg := <-mockEventService.TxStatusRegCh:
txStatusReg.Eventch <- &fab.TxStatusEvent{TxID: txStatusReg.TxID, TxValidationCode: pb.TxValidationCode_VALID}
case <-time.After(requestContext.Opts.Timeout):
case <-time.After(requestContext.Opts.Timeouts[core.Execute]):
t.Fatal("Execute handler : time out not expected")
}
}()
Expand Down Expand Up @@ -212,7 +213,8 @@ func prepareRequestContext(request Request, opts Opts, t *testing.T) *RequestCon
Response: Response{},
}

requestContext.Opts.Timeout = testTimeOut
requestContext.Opts.Timeouts = make(map[core.TimeoutType]time.Duration)
requestContext.Opts.Timeouts[core.Execute] = testTimeOut

return requestContext
}
Expand Down
23 changes: 13 additions & 10 deletions pkg/client/ledger/ledger.go
Expand Up @@ -106,7 +106,7 @@ func (c *Client) QueryInfo(options ...RequestOption) (*fab.BlockchainInfoRespons
return nil, errors.WithMessage(err, "failed to determine target peers for QueryBlockByHash")
}

reqCtx, cancel := c.createRequestContext(opts)
reqCtx, cancel := c.createRequestContext(&opts)
defer cancel()

responses, err := c.ledger.QueryInfo(reqCtx, peersToTxnProcessors(targets))
Expand Down Expand Up @@ -152,7 +152,7 @@ func (c *Client) QueryBlockByHash(blockHash []byte, options ...RequestOption) (*
return nil, errors.WithMessage(err, "failed to determine target peers for QueryBlockByHash")
}

reqCtx, cancel := c.createRequestContext(opts)
reqCtx, cancel := c.createRequestContext(&opts)
defer cancel()

responses, err := c.ledger.QueryBlockByHash(reqCtx, blockHash, peersToTxnProcessors(targets))
Expand Down Expand Up @@ -196,7 +196,7 @@ func (c *Client) QueryBlock(blockNumber uint64, options ...RequestOption) (*comm
return nil, errors.WithMessage(err, "failed to determine target peers for QueryBlock")
}

reqCtx, cancel := c.createRequestContext(opts)
reqCtx, cancel := c.createRequestContext(&opts)
defer cancel()

responses, err := c.ledger.QueryBlock(reqCtx, blockNumber, peersToTxnProcessors(targets))
Expand Down Expand Up @@ -241,7 +241,7 @@ func (c *Client) QueryTransaction(transactionID fab.TransactionID, options ...Re
return nil, errors.WithMessage(err, "failed to determine target peers for QueryTransaction")
}

reqCtx, cancel := c.createRequestContext(opts)
reqCtx, cancel := c.createRequestContext(&opts)
defer cancel()

responses, err := c.ledger.QueryTransaction(reqCtx, transactionID, peersToTxnProcessors(targets))
Expand Down Expand Up @@ -289,7 +289,7 @@ func (c *Client) QueryConfig(options ...RequestOption) (fab.ChannelCfg, error) {
return nil, errors.WithMessage(err, "QueryConfig failed")
}

reqCtx, cancel := c.createRequestContext(opts)
reqCtx, cancel := c.createRequestContext(&opts)
defer cancel()

return channelConfig.Query(reqCtx)
Expand Down Expand Up @@ -371,14 +371,17 @@ func (c *Client) calculateTargets(opts requestOptions) ([]fab.Peer, error) {
}

//createRequestContext creates request context for grpc
func (c *Client) createRequestContext(opts requestOptions) (reqContext.Context, reqContext.CancelFunc) {
func (c *Client) createRequestContext(opts *requestOptions) (reqContext.Context, reqContext.CancelFunc) {

timeout := opts.Timeout
if timeout == 0 {
timeout = c.ctx.Config().TimeoutOrDefault(core.PeerResponse)
if opts.Timeouts == nil {
opts.Timeouts = make(map[core.TimeoutType]time.Duration)
}

return contextImpl.NewRequest(c.ctx, contextImpl.WithTimeout(timeout))
if opts.Timeouts[core.PeerResponse] == 0 {
opts.Timeouts[core.PeerResponse] = c.ctx.Config().TimeoutOrDefault(core.PeerResponse)
}

return contextImpl.NewRequest(c.ctx, contextImpl.WithTimeout(opts.Timeouts[core.PeerResponse]), contextImpl.WithParent(opts.ParentContext))
}

// filterTargets is helper method to filter peers
Expand Down
32 changes: 23 additions & 9 deletions pkg/client/ledger/opts.go
Expand Up @@ -7,9 +7,11 @@ SPDX-License-Identifier: Apache-2.0
package ledger

import (
reqContext "context"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config"
"github.com/pkg/errors"
Expand Down Expand Up @@ -42,11 +44,12 @@ type TargetFilter interface {

//requestOptions contains options for operations performed by LedgerClient
type requestOptions struct {
Targets []fab.Peer // target peers
TargetFilter TargetFilter // target filter
MaxTargets int // maximum number of targets to select
MinTargets int // min number of targets that have to respond with no error (or agree on result)
Timeout time.Duration //timeout options for QueryInfo,QueryBlockByHash,QueryBlock,QueryTransaction,QueryConfig
Targets []fab.Peer // target peers
TargetFilter TargetFilter // target filter
MaxTargets int // maximum number of targets to select
MinTargets int // min number of targets that have to respond with no error (or agree on result)
Timeouts map[core.TimeoutType]time.Duration //timeout options for ledger query operations
ParentContext reqContext.Context //parent grpc context for ledger operations
}

//WithTargets encapsulates fab.Peer targets to ledger RequestOption
Expand Down Expand Up @@ -108,11 +111,22 @@ func WithMinTargets(minTargets int) RequestOption {
}
}

//WithTimeout encapsulates timeout to ledger RequestOption
//WithTimeout encapsulates key value pairs of timeout type, timeout duration to Options
//for QueryInfo,QueryBlockByHash,QueryBlock,QueryTransaction,QueryConfig functions
func WithTimeout(timeout time.Duration) RequestOption {
return func(ctx context.Client, opts *requestOptions) error {
opts.Timeout = timeout
func WithTimeout(timeoutType core.TimeoutType, timeout time.Duration) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
if o.Timeouts == nil {
o.Timeouts = make(map[core.TimeoutType]time.Duration)
}
o.Timeouts[timeoutType] = timeout
return nil
}
}

//WithParentContext encapsulates grpc context parent to Options
func WithParentContext(parentContext reqContext.Context) RequestOption {
return func(ctx context.Client, o *requestOptions) error {
o.ParentContext = parentContext
return nil
}
}
23 changes: 23 additions & 0 deletions pkg/client/ledger/opts_test.go
Expand Up @@ -8,6 +8,7 @@ package ledger

import (
"testing"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
fcmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
Expand Down Expand Up @@ -74,3 +75,25 @@ func setupTestContext(username string, mspID string) *fcmocks.MockContext {
ctx := fcmocks.NewMockContext(user)
return ctx
}

func TestTimeoutOptions(t *testing.T) {

opts := requestOptions{}

options := []RequestOption{WithTimeout(core.PeerResponse, 20*time.Second),
WithTimeout(core.ResMgmt, 25*time.Second), WithTimeout(core.OrdererResponse, 30*time.Second),
WithTimeout(core.EventHubConnection, 35*time.Second), WithTimeout(core.Execute, 40*time.Second),
WithTimeout(core.Query, 45*time.Second)}

for _, option := range options {
option(nil, &opts)
}

assert.True(t, opts.Timeouts[core.PeerResponse] == 20*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.ResMgmt] == 25*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.OrdererResponse] == 30*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.EventHubConnection] == 35*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.Execute] == 40*time.Second, "timeout value by type didn't match with one supplied")
assert.True(t, opts.Timeouts[core.Query] == 45*time.Second, "timeout value by type didn't match with one supplied")

}

0 comments on commit 9f6e5f6

Please sign in to comment.