Skip to content

Commit

Permalink
client/http, retry: support backoffer in HTTP client (tikv#7680)
Browse files Browse the repository at this point in the history
ref tikv#7300

Support backoffer in HTTP client.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jan 25, 2024
1 parent 6dfc9dd commit b94e74e
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 65 deletions.
75 changes: 48 additions & 27 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -140,42 +141,53 @@ func (ci *clientInner) execDuration(name string, duration time.Duration) {

// requestWithRetry will first try to send the request to the PD leader, if it fails, it will try to send
// the request to the other PD followers to gain a better availability.
// TODO: support custom retry logic, e.g. retry with customizable backoffer.
func (ci *clientInner) requestWithRetry(
ctx context.Context,
reqInfo *requestInfo,
headerOpts ...HeaderOption,
) error {
var (
statusCode int
err error
addr string
pdAddrs, leaderAddrIdx = ci.getPDAddrs()
statusCode int
err error
)
// Try to send the request to the PD leader first.
if leaderAddrIdx != -1 {
addr = pdAddrs[leaderAddrIdx]
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
execFunc := func() error {
var (
addr string
pdAddrs, leaderAddrIdx = ci.getPDAddrs()
)
// Try to send the request to the PD leader first.
if leaderAddrIdx != -1 {
addr = pdAddrs[leaderAddrIdx]
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request leader addr failed",
zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err))
}
log.Debug("[pd] request leader addr failed",
zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err))
}
// Try to send the request to the other PD followers.
for idx := 0; idx < len(pdAddrs); idx++ {
if idx == leaderAddrIdx {
continue
// Try to send the request to the other PD followers.
for idx := 0; idx < len(pdAddrs); idx++ {
if idx == leaderAddrIdx {
continue
}
addr = ci.pdAddrs[idx]
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
break
}
log.Debug("[pd] request follower addr failed",
zap.String("source", ci.source), zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err))
}
addr = ci.pdAddrs[idx]
_, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
break
}
log.Debug("[pd] request follower addr failed",
zap.String("source", ci.source), zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err))
return err
}
if reqInfo.bo == nil {
return execFunc()
}
return err
// Backoffer also needs to check the status code to determine whether to retry.
reqInfo.bo.SetRetryableChecker(func(err error) bool {
return err != nil && !noNeedRetry(statusCode)
})
return reqInfo.bo.Exec(ctx, execFunc)
}

func noNeedRetry(statusCode int) bool {
Expand Down Expand Up @@ -336,6 +348,7 @@ type client struct {

callerID string
respHandler respHandleFunc
bo *retry.Backoffer
}

// ClientOption configures the HTTP client.
Expand Down Expand Up @@ -420,6 +433,13 @@ func (c *client) WithRespHandler(
return &newClient
}

// WithBackoffer sets and returns a new client with the given backoffer.
func (c *client) WithBackoffer(bo *retry.Backoffer) Client {
newClient := *c
newClient.bo = bo
return &newClient
}

// Header key definition constants.
const (
pdAllowFollowerHandleKey = "PD-Allow-Follower-Handle"
Expand All @@ -439,7 +459,8 @@ func WithAllowFollowerHandle() HeaderOption {
func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts ...HeaderOption) error {
return c.inner.requestWithRetry(ctx, reqInfo.
WithCallerID(c.callerID).
WithRespHandler(c.respHandler),
WithRespHandler(c.respHandler).
WithBackoffer(c.bo),
headerOpts...)
}

Expand Down
28 changes: 28 additions & 0 deletions client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"net/http"
"strings"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/retry"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -169,3 +171,29 @@ func TestRedirectWithMetrics(t *testing.T) {
re.Equal(float64(4), out.Counter.GetValue())
c.Close()
}

func TestWithBackoffer(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := NewClient("test-with-backoffer", []string{"http://127.0.0.1"})

base := 100 * time.Millisecond
max := 500 * time.Millisecond
total := time.Second
bo := retry.InitialBackoffer(base, max, total)
// Test the time cost of the backoff.
start := time.Now()
_, err := c.WithBackoffer(bo).GetPDVersion(ctx)
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.Error(err)
// Test if the infinite retry works.
bo = retry.InitialBackoffer(base, max, 0)
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
start = time.Now()
_, err = c.WithBackoffer(bo).GetPDVersion(timeoutCtx)
re.InDelta(3*time.Second, time.Since(start), float64(250*time.Millisecond))
re.ErrorIs(err, context.DeadlineExceeded)
c.Close()
}
3 changes: 3 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/client/retry"
)

// Client is a PD (Placement Driver) HTTP client.
Expand Down Expand Up @@ -92,6 +93,8 @@ type Client interface {
// Additionally, it is important for the caller to handle the content of the response body properly
// in order to ensure that it can be read and marshaled correctly into `res`.
WithRespHandler(func(resp *http.Response, res interface{}) error) Client
// WithBackoffer sets and returns a new client with the given backoffer.
WithBackoffer(*retry.Backoffer) Client
// Close gracefully closes the HTTP client.
Close()
}
Expand Down
13 changes: 12 additions & 1 deletion client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package http

import "fmt"
import (
"fmt"

"github.com/tikv/pd/client/retry"
)

// The following constants are the names of the requests.
const (
Expand Down Expand Up @@ -75,6 +79,7 @@ type requestInfo struct {
body []byte
res interface{}
respHandler respHandleFunc
bo *retry.Backoffer
}

// newRequestInfo creates a new request info.
Expand Down Expand Up @@ -124,6 +129,12 @@ func (ri *requestInfo) WithRespHandler(respHandler respHandleFunc) *requestInfo
return ri
}

// WithBackoffer sets the backoffer of the request.
func (ri *requestInfo) WithBackoffer(bo *retry.Backoffer) *requestInfo {
ri.bo = bo
return ri
}

func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}
2 changes: 1 addition & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
bo := retry.InitialBackoffer(updateMemberBackOffBaseTime, updateMemberTimeout, updateMemberBackOffBaseTime)
for {
select {
case <-ctx.Done():
Expand Down
106 changes: 85 additions & 21 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,129 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
)

// BackOffer is a backoff policy for retrying operations.
type BackOffer struct {
max time.Duration
next time.Duration
// Backoffer is a backoff policy for retrying operations.
type Backoffer struct {
// base defines the initial time interval to wait before each retry.
base time.Duration
// max defines the max time interval to wait before each retry.
max time.Duration
// total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
total time.Duration
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
retryableChecker func(err error) bool

next time.Duration
currentTotal time.Duration
}

// Exec is a helper function to exec backoff.
func (bo *BackOffer) Exec(
func (bo *Backoffer) Exec(
ctx context.Context,
fn func() error,
) error {
if err := fn(); err != nil {
after := time.NewTimer(bo.nextInterval())
defer after.Stop()
defer bo.resetBackoff()
var (
err error
after *time.Timer
)
for {
err = fn()
if !bo.isRetryable(err) {
break
}
currentInterval := bo.nextInterval()
if after == nil {
after = time.NewTimer(currentInterval)
} else {
after.Reset(currentInterval)
}
select {
case <-ctx.Done():
after.Stop()
return errors.Trace(ctx.Err())
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
}
return err
after.Stop()
// If the current total time exceeds the maximum total time, return the last error.
if bo.total > 0 {
bo.currentTotal += currentInterval
if bo.currentTotal >= bo.total {
break
}
}
}
return err
}

// InitialBackoffer make the initial state for retrying.
// - `base` defines the initial time interval to wait before each retry.
// - `max` defines the max time interval to wait before each retry.
// - `total` defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
func InitialBackoffer(base, max, total time.Duration) *Backoffer {
// Make sure the base is less than or equal to the max.
if base > max {
base = max
}
// Make sure the total is not less than the base.
if total > 0 && total < base {
total = base
}
return &Backoffer{
base: base,
max: max,
total: total,
retryableChecker: func(err error) bool {
return err != nil
},
next: base,
currentTotal: 0,
}
// reset backoff when fn() succeed.
bo.resetBackoff()
return nil
}

// InitialBackOffer make the initial state for retrying.
func InitialBackOffer(base, max time.Duration) BackOffer {
return BackOffer{
max: max,
base: base,
next: base,
// SetRetryableChecker sets the retryable checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) {
bo.retryableChecker = checker
}

func (bo *Backoffer) isRetryable(err error) bool {
if bo.retryableChecker == nil {
return true
}
return bo.retryableChecker(err)
}

// nextInterval for now use the `exponentialInterval`.
func (bo *BackOffer) nextInterval() time.Duration {
func (bo *Backoffer) nextInterval() time.Duration {
return bo.exponentialInterval()
}

// exponentialInterval returns the exponential backoff duration.
func (bo *BackOffer) exponentialInterval() time.Duration {
func (bo *Backoffer) exponentialInterval() time.Duration {
backoffInterval := bo.next
// Make sure the total backoff time is less than the total.
if bo.total > 0 && bo.currentTotal+backoffInterval > bo.total {
backoffInterval = bo.total - bo.currentTotal
}
bo.next *= 2
// Make sure the next backoff time is less than the max.
if bo.next > bo.max {
bo.next = bo.max
}
return backoffInterval
}

// resetBackoff resets the backoff to initial state.
func (bo *BackOffer) resetBackoff() {
func (bo *Backoffer) resetBackoff() {
bo.next = bo.base
bo.currentTotal = 0
}

// Only used for test.
Expand Down
Loading

0 comments on commit b94e74e

Please sign in to comment.