Skip to content

Commit

Permalink
[FAB-7508] Add retries to channel client
Browse files Browse the repository at this point in the history
Change-Id: I3bceb4913bfc4e2d539031ba46055a8d69edc304
Signed-off-by: Divyank Katira <Divyank.Katira@securekey.com>
  • Loading branch information
d1vyank committed Jan 30, 2018
1 parent e985ca4 commit 47e68c4
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 15 deletions.
10 changes: 10 additions & 0 deletions api/apitxn/opts.go
Expand Up @@ -8,6 +8,8 @@ package apitxn

import (
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/retry"
)

//WithTimeout encapsulates time.Duration to Option
Expand All @@ -25,3 +27,11 @@ func WithProposalProcessor(proposalProcessors ...ProposalProcessor) Option {
return nil
}
}

// WithRetry option to configure retries
func WithRetry(opt retry.Opts) Option {
return func(opts *Opts) error {
opts.Retry = opt
return nil
}
}
2 changes: 2 additions & 0 deletions api/apitxn/txn.go
Expand Up @@ -9,6 +9,7 @@ package apitxn
import (
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/retry"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
)

Expand All @@ -33,6 +34,7 @@ type Response struct {
type Opts struct {
ProposalProcessors []ProposalProcessor // targets
Timeout time.Duration
Retry retry.Opts
}

//Option func for each Opts argument
Expand Down
8 changes: 5 additions & 3 deletions api/apitxn/txnhandler/txnhandler.go
Expand Up @@ -9,6 +9,7 @@ package txnhandler
import (
"github.com/hyperledger/fabric-sdk-go/api/apifabclient"
"github.com/hyperledger/fabric-sdk-go/api/apitxn"
"github.com/hyperledger/fabric-sdk-go/pkg/retry"
)

//Handler for chaining transaction executions
Expand All @@ -26,7 +27,8 @@ type ClientContext struct {

//RequestContext contains request, opts, response parameters for handler execution
type RequestContext struct {
Request apitxn.Request
Opts apitxn.Opts
Response apitxn.Response
Request apitxn.Request
Opts apitxn.Opts
Response apitxn.Response
RetryHandler retry.Handler
}
20 changes: 11 additions & 9 deletions pkg/fabric-client/mocks/mockpeer.go
Expand Up @@ -16,15 +16,16 @@ import (

// MockPeer is a mock fabricsdk.Peer.
type MockPeer struct {
Error error
MockName string
MockURL string
MockRoles []string
MockCert *pem.Block
Payload []byte
ResponseMessage string
MockMSP string
Status int32
Error error
MockName string
MockURL string
MockRoles []string
MockCert *pem.Block
Payload []byte
ResponseMessage string
MockMSP string
Status int32
ProcessProposalCalls int
}

// NewMockPeer creates basic mock peer
Expand Down Expand Up @@ -80,6 +81,7 @@ func (p *MockPeer) URL() string {

// ProcessTransactionProposal does not send anything anywhere but returns an empty mock ProposalResponse
func (p *MockPeer) ProcessTransactionProposal(tp apitxn.TransactionProposal) (apitxn.TransactionProposalResult, error) {
p.ProcessProposalCalls++
return apitxn.TransactionProposalResult{
Endorser: p.MockURL,
Proposal: tp,
Expand Down
12 changes: 9 additions & 3 deletions pkg/fabric-txn/chclient/chclient.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hyperledger/fabric-sdk-go/api/apitxn/txnhandler"
"github.com/hyperledger/fabric-sdk-go/pkg/errors"
txnHandlerImpl "github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn/txnhandler"
"github.com/hyperledger/fabric-sdk-go/pkg/retry"
"github.com/hyperledger/fabric-sdk-go/pkg/status"
)

Expand Down Expand Up @@ -85,8 +86,12 @@ func (cc *ChannelClient) InvokeHandler(handler txnhandler.Handler, request apitx
complete := make(chan bool)

go func() {
handleInvoke:
//Perform action through handler
handler.Handle(requestContext, clientContext)
if requestContext.RetryHandler.Required(requestContext.Response.Error) {
goto handleInvoke
}
complete <- true
}()
select {
Expand All @@ -113,9 +118,10 @@ func (cc *ChannelClient) prepareHandlerContexts(request apitxn.Request, options
}

requestContext := &txnhandler.RequestContext{
Request: request,
Opts: options,
Response: apitxn.Response{},
Request: request,
Opts: options,
Response: apitxn.Response{},
RetryHandler: retry.New(options.Retry),
}

if requestContext.Opts.Timeout == 0 {
Expand Down
18 changes: 18 additions & 0 deletions pkg/fabric-txn/chclient/chclient_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/channel"
fcmocks "github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fabric-client/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/retry"

txnmocks "github.com/hyperledger/fabric-sdk-go/pkg/fabric-txn/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/status"
Expand Down Expand Up @@ -281,6 +282,23 @@ func TestTransactionValidationError(t *testing.T) {
assert.EqualValues(t, validationCode, status.ToTransactionValidationCode(statusError.Code))
}

func TestExecuteTxWithRetries(t *testing.T) {
testStatus := status.New(status.EndorserClientStatus, status.ConnectionFailed.ToInt32(), "test", nil)

testPeer1 := fcmocks.NewMockPeer("Peer1", "http://peer1.com")
testPeer1.Error = testStatus
chClient := setupChannelClient([]apifabclient.Peer{testPeer1}, t)
retryOpts := retry.DefaultOpts
retryOpts.RetryableCodes = retry.ChannelClientRetryableCodes

response := chClient.Query(apitxn.Request{ChaincodeID: "testCC", Fcn: "invoke", Args: [][]byte{[]byte("query"), []byte("b")}},
apitxn.WithRetry(retryOpts))
if response.Error == nil {
t.Fatalf("Should have failed for not success status")
}
assert.Equal(t, retry.DefaultOpts.Attempts, testPeer1.ProcessProposalCalls-1, "Expected peer to be called (retry attempts + 1) times")
}

func setupTestChannel() (*channel.Channel, error) {
ctx := setupTestContext()
return channel.New(ctx, "testChannel")
Expand Down
93 changes: 93 additions & 0 deletions pkg/retry/defaults.go
@@ -0,0 +1,93 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package retry

import (
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/status"
"github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
grpcCodes "google.golang.org/grpc/codes"
)

const (
// DefaultAttempts number of retry attempts made by default
DefaultAttempts = 3
// DefaultInitialBackoff default initial backoff
DefaultInitialBackoff = 500 * time.Millisecond
// DefaultMaxBackoff default maximum backoff
DefaultMaxBackoff = 60 * time.Second
// DefaultBackoffFactor default backoff factor
DefaultBackoffFactor = 2.0
)

// DefaultOpts default retry options
var DefaultOpts = Opts{
Attempts: DefaultAttempts,
InitialBackoff: DefaultInitialBackoff,
MaxBackoff: DefaultMaxBackoff,
BackoffFactor: DefaultBackoffFactor,
RetryableCodes: DefaultRetryableCodes,
}

// DefaultRetryableCodes these are the error codes, grouped by source of error,
// that are considered to be transient error conditions by default
var DefaultRetryableCodes = map[status.Group][]status.Code{
status.EndorserClientStatus: []status.Code{
status.EndorsementMismatch,
},
status.EndorserServerStatus: []status.Code{
status.Code(common.Status_SERVICE_UNAVAILABLE),
status.Code(common.Status_INTERNAL_SERVER_ERROR),
},
status.OrdererServerStatus: []status.Code{
status.Code(common.Status_SERVICE_UNAVAILABLE),
status.Code(common.Status_INTERNAL_SERVER_ERROR),
},
status.EventServerStatus: []status.Code{
status.Code(pb.TxValidationCode_DUPLICATE_TXID),
status.Code(pb.TxValidationCode_ENDORSEMENT_POLICY_FAILURE),
status.Code(pb.TxValidationCode_MVCC_READ_CONFLICT),
status.Code(pb.TxValidationCode_PHANTOM_READ_CONFLICT),
},
// TODO: gRPC introduced retries in v1.8.0. This can be replaced with the
// gRPC fail fast option, once available
status.GRPCTransportStatus: []status.Code{
status.Code(grpcCodes.Unavailable),
},
}

// ChannelClientRetryableCodes are the suggested codes that should be treated as
// transient by fabric-sdk-go/api/apitxn.ChannelClient
var ChannelClientRetryableCodes = map[status.Group][]status.Code{
status.EndorserClientStatus: []status.Code{
status.ConnectionFailed, status.EndorsementMismatch,
},
status.EndorserServerStatus: []status.Code{
status.Code(common.Status_SERVICE_UNAVAILABLE),
status.Code(common.Status_INTERNAL_SERVER_ERROR),
},
status.OrdererClientStatus: []status.Code{
status.ConnectionFailed,
},
status.OrdererServerStatus: []status.Code{
status.Code(common.Status_SERVICE_UNAVAILABLE),
status.Code(common.Status_INTERNAL_SERVER_ERROR),
},
status.EventServerStatus: []status.Code{
status.Code(pb.TxValidationCode_DUPLICATE_TXID),
status.Code(pb.TxValidationCode_ENDORSEMENT_POLICY_FAILURE),
status.Code(pb.TxValidationCode_MVCC_READ_CONFLICT),
status.Code(pb.TxValidationCode_PHANTOM_READ_CONFLICT),
},
// TODO: gRPC introduced retries in v1.8.0. This can be replaced with the
// gRPC fail fast option, once available
status.GRPCTransportStatus: []status.Code{
status.Code(grpcCodes.Unavailable),
},
}
109 changes: 109 additions & 0 deletions pkg/retry/retry.go
@@ -0,0 +1,109 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

// Package retry provides retransmission capabilities to fabric-sdk-go
package retry

import (
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/status"
)

// Opts defines the retry parameters
type Opts struct {
// Attempts the number retry attempts
Attempts int
// InitialBackoff the backoff interval for the first retry attempt
InitialBackoff time.Duration
// MaxBackoff the maximum backoff interval for any retry attempt
MaxBackoff time.Duration
// BackoffFactor the factor by which the InitialBackoff is exponentially
// incremented for consecutive retry attempts.
// For example, a backoff factor of 2.5 will result in a backoff of
// InitialBackoff * 2.5 * 2.5 on the second attempt.
BackoffFactor float64
// RetryableCodes defines the status codes, mapped by group, returned by fabric-sdk-go
// that warrant a retry. This will default to retry.DefaultRetryableCodes.
RetryableCodes map[status.Group][]status.Code
}

// Handler retry handler interface decides whether a retry is required for the given
// error
type Handler interface {
Required(err error) bool
}

// impl retry Handler implementation
type impl struct {
opts Opts
retries int
}

// New retry Handler with the given opts
func New(opts Opts) Handler {
if len(opts.RetryableCodes) == 0 {
opts.RetryableCodes = DefaultRetryableCodes
}
return &impl{opts: opts}
}

// WithDefaults new retry Handler with default opts
func WithDefaults() Handler {
return &impl{opts: DefaultOpts}
}

// WithAttempts new retry Handler with given attempts. Other opts are set to default.
func WithAttempts(attempts int) Handler {
opts := DefaultOpts
opts.Attempts = attempts
return &impl{opts: opts}
}

// Required determines if retry is required for the given error
// Note: backoffs are implemented behind this interface
func (i *impl) Required(err error) bool {
if i.retries == i.opts.Attempts {
return false
}

s, ok := status.FromError(err)
if ok && i.isRetryable(s.Group, s.Code) {
time.Sleep(i.backoffPeriod())
i.retries++
return true
}

return false
}

// backoffPeriod calculates the backoff duration based on the provided opts
func (i *impl) backoffPeriod() time.Duration {
backoff, max := float64(i.opts.InitialBackoff), float64(i.opts.MaxBackoff)
for j := 0; j < i.retries && backoff < max; j++ {
backoff *= i.opts.BackoffFactor
}
if backoff > max {
backoff = max
}

return time.Duration(backoff)
}

// isRetryable determines if the given status is configured to be retryable
func (i *impl) isRetryable(g status.Group, c int32) bool {
for group, codes := range i.opts.RetryableCodes {
if g != group {
continue
}
for _, code := range codes {
if status.Code(c) == code {
return true
}
}
}
return false
}

0 comments on commit 47e68c4

Please sign in to comment.