Skip to content

Commit

Permalink
Reconnect Watch with Backoff (#2834)
Browse files Browse the repository at this point in the history
Implement simple exponential backoff to reconnect failed watch.
  • Loading branch information
martinmaly committed Feb 24, 2022
1 parent 5d7d156 commit 84e55ff
Showing 1 changed file with 51 additions and 4 deletions.
55 changes: 51 additions & 4 deletions porch/apiserver/pkg/registry/porch/background.go
Expand Up @@ -42,6 +42,11 @@ type background struct {
cache *cache.Cache
}

const (
minReconnectDelay = 100 * time.Millisecond
maxReconnectDelay = 10 * time.Second
)

// run will run until ctx is done
func (b *background) run(ctx context.Context) {
klog.Infof("Background routine starting ...")
Expand All @@ -56,13 +61,17 @@ func (b *background) run(ctx context.Context) {
}
}()

reconnect := newBackoffTimer(minReconnectDelay, maxReconnectDelay)
defer reconnect.Stop()

// Start ticker
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

loop:
for {
if watcher == nil {
select {
case <-reconnect.channel():
var err error
klog.Infof("Starting watch ... ")
var obj configapi.RepositoryList
Expand All @@ -73,19 +82,22 @@ loop:
},
})
if err != nil {
klog.Errorf("Cannot start watch: %v", err)
klog.Errorf("Cannot start watch: %v; will retry", err)
reconnect.backoff()
} else {
klog.Infof("Watch successfully started.")
events = watcher.ResultChan()
}
}

select {
case event, eventOk := <-events:
if !eventOk {
klog.Errorf("Watch event stream closed. Will restart watch from bookmark %q", bookmark)
watcher.Stop()
events = nil
watcher = nil

// Initiate reconnect
reconnect.reset()
} else if repository, ok := event.Object.(*configapi.Repository); ok {
if event.Type == watch.Bookmark {
bookmark = repository.ResourceVersion
Expand Down Expand Up @@ -151,3 +163,38 @@ func (b *background) cacheRepository(ctx context.Context, repo *configapi.Reposi
}
return nil
}

type backoffTimer struct {
min, max, curr time.Duration
timer *time.Timer
}

func newBackoffTimer(min, max time.Duration) *backoffTimer {
return &backoffTimer{
min: min,
max: max,
timer: time.NewTimer(min),
}
}

func (t *backoffTimer) Stop() bool {
return t.timer.Stop()
}

func (t *backoffTimer) channel() <-chan time.Time {
return t.timer.C
}

func (t *backoffTimer) reset() bool {
t.curr = t.min
return t.timer.Reset(t.curr)
}

func (t *backoffTimer) backoff() bool {
curr := t.curr * 2
if curr > t.max {
curr = t.max
}
t.curr = curr
return t.timer.Reset(curr)
}

0 comments on commit 84e55ff

Please sign in to comment.