Skip to content

Commit

Permalink
Merge pull request #4097 from bluekeyes/bkeyes/retry-delay
Browse files Browse the repository at this point in the history
tq: add exponential backoff for retries
  • Loading branch information
bk2204 committed Apr 16, 2020
2 parents 9ba8be2 + cf02216 commit 8282419
Show file tree
Hide file tree
Showing 38 changed files with 12,059 additions and 355 deletions.
12 changes: 12 additions & 0 deletions docs/man/git-lfs-config.5.ronn
Expand Up @@ -160,6 +160,18 @@ be scoped inside the configuration for a remote.
not an integer, is less than one, or is not given, a value of eight will be
used instead.

* `lfs.transfer.maxretrydelay`

Specifies the maximum time in seconds LFS will wait between each retry
attempt. LFS uses exponential backoff for retries, doubling the time between
each retry until reaching this limit. If a server requests a delay using the
`Retry-After` header, the header value overrides the exponential delay for
that attempt and is not limited by this option.

Must be an integer which is not negative. Use zero to disable delays between
retries unless requested by a server. If the value is not an integer, is
negative, or is not given, a value of ten will be used instead.

* `lfs.transfer.maxverifies`

Specifies how many verification requests LFS will attempt per OID before
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Expand Up @@ -17,14 +17,16 @@ require (
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.3 // indirect
github.com/ssgelm/cookiejarparser v1.0.1
github.com/stretchr/testify v1.2.2
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.5.1
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v0.0.0-20170210233622-6b67b3fab74d
golang.org/x/net v0.0.0-20191027093000-83d349e8ac1a
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
golang.org/x/sys v0.0.0-20190412213103-97732733099d
gopkg.in/jcmturner/goidentity.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)

go 1.11
13 changes: 13 additions & 0 deletions go.sum
Expand Up @@ -2,6 +2,7 @@ github.com/alexbrainman/sspi v0.0.0-20180125232955-4729b3d4d858 h1:OZQyEhf4Bviyd
github.com/alexbrainman/sspi v0.0.0-20180125232955-4729b3d4d858/go.mod h1:976q2ETgjT2snVCf2ZaBnyBbVoPERGjUz+0sofzEfro=
github.com/avast/retry-go v2.4.2+incompatible h1:+ZjCypQT/CyP0kyJO2EcU4d/ZEJWSbP8NENI578cPmA=
github.com/avast/retry-go v2.4.2+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dpotapov/go-spnego v0.0.0-20190506202455-c2c609116ad0 h1:Hhh7nu7CfFVlnBJqmDDUh+j1H5fqjLMzM4czZzNNJGM=
Expand Down Expand Up @@ -38,8 +39,15 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/ssgelm/cookiejarparser v1.0.1 h1:cRdXauUbOTFzTPJFaeiWbHnQ+tRGlpKKzvIK9PUekE4=
github.com/ssgelm/cookiejarparser v1.0.1/go.mod h1:DUfC0mpjIzlDN7DzKjXpHj0qMI5m9VrZuz3wSlI+OEI=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
Expand All @@ -62,6 +70,7 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
Expand All @@ -72,3 +81,7 @@ gopkg.in/jcmturner/gokrb5.v5 v5.3.0 h1:RS1MYApX27Hx1Xw7NECs7XxGxxrm69/4OmaRuX9kw
gopkg.in/jcmturner/gokrb5.v5 v5.3.0/go.mod h1:oQz8Wc5GsctOTgCVyKad1Vw4TCWz5G6gfIQr88RPv4k=
gopkg.in/jcmturner/rpc.v0 v0.0.2 h1:wBTgrbL1qmLBUPsYVCqdJiI5aJgQhexmK+JkTHPUNJI=
gopkg.in/jcmturner/rpc.v0 v0.0.2/go.mod h1:NzMq6cRzR9lipgw7WxRBHNx5N8SifBuaCQsOT1kWY/E=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
2 changes: 1 addition & 1 deletion t/t-push.sh
Expand Up @@ -575,7 +575,7 @@ begin_test "push (retry with expired actions)"

GIT_TRACE=1 git push origin master 2>&1 | tee push.log

expected="enqueue retry #1 for \"$contents_oid\" (size: $contents_size): LFS: tq: action \"upload\" expires at"
expected="enqueue retry #1 after 0.25s for \"$contents_oid\" (size: $contents_size): LFS: tq: action \"upload\" expires at"

grep "$expected" push.log
grep "Uploading LFS objects: 100% (1/1), 21 B" push.log
Expand Down
15 changes: 14 additions & 1 deletion tq/manifest.go
Expand Up @@ -12,13 +12,16 @@ import (

const (
defaultMaxRetries = 8
defaultMaxRetryDelay = 10
defaultConcurrentTransfers = 8
)

type Manifest struct {
// maxRetries is the maximum number of retries a single object can
// attempt to make before it will be dropped.
// attempt to make before it will be dropped. maxRetryDelay is the maximum
// time in seconds to wait between retry attempts when using backoff.
maxRetries int
maxRetryDelay int
concurrentTransfers int
basicTransfersOnly bool
standaloneTransferAgent string
Expand All @@ -39,6 +42,10 @@ func (m *Manifest) MaxRetries() int {
return m.maxRetries
}

func (m *Manifest) MaxRetryDelay() int {
return m.maxRetryDelay
}

func (m *Manifest) ConcurrentTransfers() int {
return m.concurrentTransfers
}
Expand Down Expand Up @@ -77,6 +84,9 @@ func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote s
if v := git.Int("lfs.transfer.maxretries", 0); v > 0 {
m.maxRetries = v
}
if v := git.Int("lfs.transfer.maxretrydelay", -1); v > -1 {
m.maxRetryDelay = v
}
if v := git.Int("lfs.concurrenttransfers", 0); v > 0 {
m.concurrentTransfers = v
}
Expand All @@ -91,6 +101,9 @@ func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote s
if m.maxRetries < 1 {
m.maxRetries = defaultMaxRetries
}
if m.maxRetryDelay < 1 {
m.maxRetryDelay = defaultMaxRetryDelay
}

if m.concurrentTransfers < 1 {
m.concurrentTransfers = defaultConcurrentTransfers
Expand Down
80 changes: 50 additions & 30 deletions tq/transfer_queue.go
Expand Up @@ -17,37 +17,36 @@ import (

const (
defaultBatchSize = 100
baseRetryDelayMs = 250
)

type retryCounter struct {
MaxRetries int `git:"lfs.transfer.maxretries"`
MaxRetries int
MaxRetryDelay int

// cmu guards count
cmu sync.Mutex
// count maps OIDs to number of retry attempts
count map[string]int
}

// newRetryCounter instantiates a new *retryCounter. It parses the gitconfig
// value: `lfs.transfer.maxretries`, and falls back to defaultMaxRetries if none
// was provided.
//
// If it encountered an error in Unmarshaling the *config.Configuration, it will
// be returned, otherwise nil.
// newRetryCounter instantiates a new *retryCounter.
func newRetryCounter() *retryCounter {
return &retryCounter{
MaxRetries: defaultMaxRetries,
count: make(map[string]int),
MaxRetries: defaultMaxRetries,
MaxRetryDelay: defaultMaxRetryDelay,
count: make(map[string]int),
}
}

// Increment increments the number of retries for a given OID. It is safe to
// call across multiple goroutines.
func (r *retryCounter) Increment(oid string) {
// Increment increments the number of retries for a given OID and returns the
// new value. It is safe to call across multiple goroutines.
func (r *retryCounter) Increment(oid string) int {
r.cmu.Lock()
defer r.cmu.Unlock()

r.count[oid]++
return r.count[oid]
}

// CountFor returns the current number of retries for a given OID. It is safe to
Expand All @@ -66,6 +65,22 @@ func (r *retryCounter) CanRetry(oid string) (int, bool) {
return count, count < r.MaxRetries
}

// ReadyTime returns the time from now when the current retry can occur or the
// zero time if the retry can occur immediately.
func (r *retryCounter) ReadyTime(oid string) time.Time {
count := r.CountFor(oid)
if count < 1 {
return time.Time{}
}

maxDelayMs := 1000 * uint64(r.MaxRetryDelay)
delay := uint64(baseRetryDelayMs) * (1 << uint(count-1))
if delay == 0 || delay > maxDelayMs {
delay = maxDelayMs
}
return time.Now().Add(time.Duration(delay) * time.Millisecond)
}

// batch implements the sort.Interface interface and enables sorting on a slice
// of `*Transfer`s by object size.
//
Expand Down Expand Up @@ -295,6 +310,7 @@ func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options
}

q.rc.MaxRetries = q.manifest.maxRetries
q.rc.MaxRetryDelay = q.manifest.maxRetryDelay
q.client.MaxRetries = q.manifest.maxRetries

if q.batchSize <= 0 {
Expand Down Expand Up @@ -506,6 +522,24 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
next := q.makeBatch()
tracerx.Printf("tq: sending batch of size %d", len(batch))

enqueueRetry := func(t *objectTuple, err error, readyTime *time.Time) {
count := q.rc.Increment(t.Oid)

if readyTime == nil {
t.ReadyTime = q.rc.ReadyTime(t.Oid)
} else {
t.ReadyTime = *readyTime
}
delay := time.Until(t.ReadyTime).Seconds()

var errMsg string
if err != nil {
errMsg = fmt.Sprintf(": %s", err)
}
tracerx.Printf("tq: enqueue retry #%d after %.2fs for %q (size: %d)%s", count, delay, t.Oid, t.Size, errMsg)
next = append(next, t)
}

q.meter.Pause()
var bRes *BatchResponse
if q.manifest.standaloneTransferAgent != "" {
Expand All @@ -530,14 +564,10 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
// retried, they will be marked as failed.
for _, t := range batch {
if q.canRetryObject(t.Oid, err) {
q.rc.Increment(t.Oid)

next = append(next, t)
enqueueRetry(t, err, nil)
} else if readyTime, canRetry := q.canRetryObjectLater(t.Oid, err); canRetry {
tracerx.Printf("tq: retrying object %s after %s seconds.", t.Oid, time.Until(readyTime).Seconds())
err = nil
t.ReadyTime = readyTime
next = append(next, t)
enqueueRetry(t, err, &readyTime)
} else {
q.wait.Done()
}
Expand Down Expand Up @@ -599,13 +629,8 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)
tr := newTransfer(o, objects.First().Name, objects.First().Path)

if a, err := tr.Rel(q.direction.String()); err != nil {
// XXX(taylor): duplication
if q.canRetryObject(tr.Oid, err) {
q.rc.Increment(tr.Oid)
count := q.rc.CountFor(tr.Oid)

tracerx.Printf("tq: enqueue retry #%d for %q (size: %d): %s", count, tr.Oid, tr.Size, err)
next = append(next, objects.First())
enqueueRetry(objects.First(), err, nil)
} else {
q.errorc <- errors.Errorf("[%v] %v", tr.Name, err)

Expand All @@ -624,12 +649,7 @@ func (q *TransferQueue) enqueueAndCollectRetriesFor(batch batch) (batch, error)

retries := q.addToAdapter(bRes.endpoint, toTransfer)
for t := range retries {
q.rc.Increment(t.Oid)
count := q.rc.CountFor(t.Oid)

tracerx.Printf("tq: enqueue retry #%d for %q (size: %d)", count, t.Oid, t.Size)

next = append(next, t)
enqueueRetry(t, nil, nil)
}

return next, nil
Expand Down
45 changes: 44 additions & 1 deletion tq/transfer_queue_test.go
Expand Up @@ -2,6 +2,7 @@ package tq

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -10,15 +11,27 @@ func TestManifestDefaultsToFixedRetries(t *testing.T) {
assert.Equal(t, 8, NewManifest(nil, nil, "", "").MaxRetries())
}

func TestManifestDefaultsToFixedRetryDelay(t *testing.T) {
assert.Equal(t, 10, NewManifest(nil, nil, "", "").MaxRetryDelay())
}

func TestRetryCounterDefaultsToFixedRetries(t *testing.T) {
rc := newRetryCounter()
assert.Equal(t, 8, rc.MaxRetries)
}

func TestRetryCounterDefaultsToFixedRetryDelay(t *testing.T) {
rc := newRetryCounter()
assert.Equal(t, 10, rc.MaxRetryDelay)
}

func TestRetryCounterIncrementsObjects(t *testing.T) {
rc := newRetryCounter()
rc.Increment("oid")
assert.Equal(t, 1, rc.Increment("oid"))
assert.Equal(t, 1, rc.CountFor("oid"))

assert.Equal(t, 2, rc.Increment("oid"))
assert.Equal(t, 2, rc.CountFor("oid"))
}

func TestRetryCounterCanNotRetryAfterExceedingRetryCount(t *testing.T) {
Expand All @@ -31,6 +44,36 @@ func TestRetryCounterCanNotRetryAfterExceedingRetryCount(t *testing.T) {
assert.False(t, canRetry)
}

func TestRetryCounterDoesNotDelayFirstAttempt(t *testing.T) {
rc := newRetryCounter()
assert.Equal(t, time.Time{}, rc.ReadyTime("oid"))
}

func TestRetryCounterDelaysExponentially(t *testing.T) {
rc := newRetryCounter()
start := time.Now()

rc.Increment("oid")
ready1 := rc.ReadyTime("oid")
assert.GreaterOrEqual(t, int64(ready1.Sub(start)/time.Millisecond), int64(baseRetryDelayMs))

rc.Increment("oid")
ready2 := rc.ReadyTime("oid")
assert.GreaterOrEqual(t, int64(ready2.Sub(start)/time.Millisecond), int64(2*baseRetryDelayMs))
}

func TestRetryCounterLimitsDelay(t *testing.T) {
rc := newRetryCounter()
rc.MaxRetryDelay = 1

for i := 0; i < 4; i++ {
rc.Increment("oid")
}

rt := rc.ReadyTime("oid")
assert.WithinDuration(t, time.Now(), rt, 1*time.Second)
}

func TestBatchSizeReturnsBatchSize(t *testing.T) {
q := NewTransferQueue(
Upload, NewManifest(nil, nil, "", ""), "origin", WithBatchSize(3))
Expand Down
35 changes: 17 additions & 18 deletions vendor/github.com/stretchr/testify/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8282419

Please sign in to comment.