Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: kvcoord: Correctly handle stuck rangefeeds #92704

Merged
merged 1 commit into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,6 @@ func (ds *DistSender) singleRangeFeed(

for {
stuckWatcher.stop() // if timer is running from previous iteration, stop it now
if catchupRes == nil {
// Already finished catch-up scan (in an earlier iteration of this loop),
// so start timer early, not on first event received.
stuckWatcher.ping()
}
if transport.IsExhausted() {
return args.Timestamp, newSendError(
fmt.Sprintf("sending to all %d replicas failed", len(replicas)))
Expand Down Expand Up @@ -639,19 +634,21 @@ func (ds *DistSender) singleRangeFeed(
}
}

var event *roachpb.RangeFeedEvent
for {
event, err := stream.Recv()
if err == io.EOF {
return args.Timestamp, nil
}
if err != nil {
if err := stuckWatcher.do(func() (err error) {
event, err = stream.Recv()
return err
}); err != nil {
if err == io.EOF {
return args.Timestamp, nil
}
if stuckWatcher.stuck() {
afterCatchUpScan := catchupRes == nil
return args.Timestamp, ds.handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold())
}
return args.Timestamp, err
}
stuckWatcher.ping() // starts timer on first event only

msg := RangeFeedMessage{RangeFeedEvent: event, RegisteredSpan: span}
switch t := event.GetValue().(type) {
Expand Down
54 changes: 43 additions & 11 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,35 @@ type stuckRangeFeedCanceler struct {
resetTimerAfter time.Time
activeThreshold time.Duration

_stuck int32 // atomic
// _state manages canceler state transitions.
// do():
// inactive <-----
// | |
// ----active---- |
// | | |
// timeout ok |
// | |-----
// stuck
// If timeout occurs outside do(), it is ignored.
_state int32 // atomic

// A testing knob to notify when timer triggers.
afterTimerTrigger func()
}

type state int32

const (
inactive state = iota
active
stuck
)

// stuck returns true if the stuck detection got triggered.
// If this returns true, the cancel function will be invoked
// shortly, if it hasn't already.
func (w *stuckRangeFeedCanceler) stuck() bool {
return atomic.LoadInt32(&w._stuck) != 0
return atomic.LoadInt32(&w._state) == int32(stuck)
}

// stop releases the active timer, if any. It should be invoked
Expand All @@ -69,13 +90,14 @@ func (w *stuckRangeFeedCanceler) stop() {
}
}

// ping notifies the canceler that the rangefeed has received an
// event, i.e. is making progress.
func (w *stuckRangeFeedCanceler) ping() {
// do invokes callback cb, arranging for cancellation to happen if the callback
// takes too long to complete. Returns errRestartStuckRange if cb took excessive
// amount of time.
func (w *stuckRangeFeedCanceler) do(cb func() error) error {
threshold := w.threshold()
if threshold == 0 {
w.stop()
return
return cb()
}

mkTimer := func() {
Expand All @@ -86,21 +108,31 @@ func (w *stuckRangeFeedCanceler) ping() {
// ping() event arrives at 29.999s, the timer should only fire
// at 90s, not 60s.
w.t = time.AfterFunc(3*threshold/2, func() {
// NB: important to store _stuck before canceling, since we
// want the caller to be able to detect stuck() after ctx
// cancels.
atomic.StoreInt32(&w._stuck, 1)
w.cancel()
if w.afterTimerTrigger != nil {
defer w.afterTimerTrigger()
}

// NB: trigger cancellation only if currently active.
if atomic.CompareAndSwapInt32(&w._state, int32(active), int32(stuck)) {
w.cancel()
}
})
w.resetTimerAfter = timeutil.Now().Add(threshold / 2)
}

if !atomic.CompareAndSwapInt32(&w._state, int32(inactive), int32(active)) {
return errRestartStuckRange
}
defer atomic.CompareAndSwapInt32(&w._state, int32(active), int32(inactive))

if w.t == nil {
mkTimer()
} else if w.resetTimerAfter.Before(timeutil.Now()) || w.activeThreshold != threshold {
w.stop()
mkTimer()
}

return cb()
}

// newStuckRangeFeedCanceler sets up a canceler with the provided
Expand Down
73 changes: 55 additions & 18 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_canceler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,46 @@
package kvcoord

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

type cancelRec int32

func (c *cancelRec) cancel() {
atomic.StoreInt32((*int32)(c), 1)
}

func (c *cancelRec) canceled() bool {
return atomic.LoadInt32((*int32)(c)) != 0
}

func TestStuckRangeFeedCanceler(t *testing.T) {
defer leaktest.AfterTest(t)()

_dur := int64(24 * time.Hour) // atomic
var cr cancelRec
c := newStuckRangeFeedCanceler(cr.cancel, func() time.Duration {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doNothing := func() error { return nil }
blockUntilCanceled := func() error {
<-ctx.Done()
return ctx.Err()
}

c := newStuckRangeFeedCanceler(cancel, func() time.Duration {
return time.Duration(atomic.LoadInt64(&_dur))
})
require.Nil(t, c.t) // not running upon creation
require.Nil(t, c.t) // not running upon creation.

for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
require.False(t, c.stuck())
c.ping()
require.NoError(t, c.do(doNothing))
require.NotNil(t, c.t) // first call to ping sets off timer
}
atomic.StoreInt64(&_dur, int64(time.Nanosecond))
atomic.StoreInt64(&_dur, int64(10*time.Millisecond))
// Nothing has reset the timer yet, so we won't be stuck here.
// This isn't great but it is true, so documenting it.
require.False(t, c.stuck())
// Ping will update the timer, so it will fire very soon.
c.ping()
require.Eventually(t, cr.canceled, time.Second /* max */, 5*time.Nanosecond /* tick */)
require.True(t, errors.Is(c.do(blockUntilCanceled), context.Canceled))
require.True(t, c.stuck())

atomic.StoreInt64(&_dur, int64(24*time.Hour))
Expand All @@ -60,6 +60,43 @@ func TestStuckRangeFeedCanceler(t *testing.T) {
for i := 0; i < 10; i++ {
time.Sleep(time.Nanosecond)
require.True(t, c.stuck())
c.ping()
require.True(t, errors.Is(c.do(blockUntilCanceled), errRestartStuckRange))
}
}

// Ensure that canceller monitors only the duration of the do()
// function, and not anything happening outside.
func TestStuckRangeFeedCancelerScope(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doNothing := func() error { return nil }

triggered := struct {
sync.Once
ch chan struct{}
}{ch: make(chan struct{})}

const duration = time.Second
c := newStuckRangeFeedCanceler(cancel, func() time.Duration {
return duration
})
c.afterTimerTrigger = func() {
triggered.Do(func() {
close(triggered.ch)
})
}

require.Nil(t, c.t) // not running upon creation.
require.False(t, c.stuck())
require.Nil(t, c.do(doNothing))

// Now, start waiting until timer triggers.
// Even though timer triggered, the watcher is not cancelled since
// time expired outside do().
<-triggered.ch
require.Nil(t, ctx.Err())
require.False(t, c.stuck())
require.Nil(t, c.do(doNothing))
}