Skip to content

Commit

Permalink
kvcoord: Add observability for actively running range feeds.
Browse files Browse the repository at this point in the history
Add a `crdb_internal.active_range_feeds` virtual table which lists
all currently running range feeds on a node.

The table lists the span for which the range feed
was started, along with all of the ranges making up that span.
Per range information, including the node where the range feed is running
against, as well as the timestamp when the last non-error event has
been seen from the range is recorded in the table.

Release Notes: Improve range feed observability by adding a
`crdb_internal.active_range_feeds` virtual table which lists
all currently executing range feeds on the node.
  • Loading branch information
Yevgeniy Miretskiy committed Aug 18, 2021
1 parent b16c86d commit 57c7481
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 17 deletions.
34 changes: 34 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4394,3 +4394,37 @@ func TestChangefeedOnErrorOption(t *testing.T) {
t.Run(`kafka`, kafkaTest(testFn))
t.Run(`webhook`, webhookTest(testFn))
}

func TestDistSenderRangeFeedPopulatesVirtualTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
})
defer s.Stopper().Stop(context.Background())

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `
SET CLUSTER SETTING kv.rangefeed.enabled='true';
CREATE TABLE tbl (a INT, b STRING);
INSERT INTO tbl VALUES (1, 'one'), (2, 'two'), (3, 'three');
CREATE CHANGEFEED FOR tbl INTO 'null://';
`)

var tableID int
sqlDB.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name='tbl'").Scan(&tableID)
numRangesQuery := fmt.Sprintf(
"SELECT count(*) FROM crdb_internal.active_range_feeds WHERE range_start LIKE $/Table/%d/%%",
tableID)
testutils.SucceedsSoon(t, func() error {
var numRanges int
sqlDB.QueryRow(t, numRangesQuery).Scan(&numRanges)
if numRanges == 0 {
return errors.New("still waiting for ranges to be registered")
}
return nil
})
}
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -289,6 +290,9 @@ type DistSender struct {
// the descriptor, instead of trying to reorder them by latency. The knob
// only applies to requests sent with the LEASEHOLDER routing policy.
dontReorderReplicas bool

// Currently executing range feeds.
activeRangeFeeds sync.Map
}

var _ kv.Sender = &DistSender{}
Expand Down
154 changes: 137 additions & 17 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
Expand All @@ -24,9 +26,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

type singleRangeInfo struct {
Expand Down Expand Up @@ -57,6 +61,10 @@ func (ds *DistSender) RangeFeed(
return err
}

rr := newRangeFeedRegistry(ctx, span, ts, withDiff)
ds.activeRangeFeeds.Store(rr, nil)
defer ds.activeRangeFeeds.Delete(rr)

g := ctxgroup.WithContext(ctx)
// Goroutine that processes subdivided ranges and creates a rangefeed for
// each.
Expand All @@ -67,7 +75,7 @@ func (ds *DistSender) RangeFeed(
case sri := <-rangeCh:
// Spawn a child goroutine to process this feed.
g.GoCtx(func(ctx context.Context) error {
return ds.partialRangeFeed(ctx, &sri, withDiff, rangeCh, eventCh)
return rr.startPartialRangeFeed(ctx, ds, sri.rs, sri.ts, sri.token, withDiff, rangeCh, eventCh)
})
case <-ctx.Done():
return ctx.Err()
Expand All @@ -83,6 +91,110 @@ func (ds *DistSender) RangeFeed(
return g.Wait()
}

// RangeFeedContext is the structure containing arguments passed to
// RangeFeed call. It functions as a kind of key for an active range feed.
type RangeFeedContext struct {
CtxTags string // context tags

// Span, timestamp and withDiff options passed to RangeFeed call.
Span roachpb.Span
TS hlc.Timestamp
WithDiff bool
}

// ActiveRangeFeed structure describes the state of currently executing range feed.
type ActiveRangeFeed struct {
Span roachpb.Span
StartTS hlc.Timestamp
NodeID roachpb.NodeID
LastEvent time.Time
}

// ActiveRangeFeedIterFn is an iterator function which is passed ActiveRangeFeed structure.
type ActiveRangeFeedIterFn func(rfCtx RangeFeedContext, feed ActiveRangeFeed) error

// ForEachActiveRangeFeed invokes provided function for each active range feed.
func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr error) {
const continueIter = true
const stopIter = false

ds.activeRangeFeeds.Range(func(k, v interface{}) bool {
r := k.(*rangeFeedRegistry)
r.ranges.Range(func(k, v interface{}) bool {
active := k.(*activeRangeFeed)
active.Lock()
defer active.Unlock()
if err := fn(r.RangeFeedContext, active.ActiveRangeFeed); err != nil {
iterErr = err
return stopIter
}
return continueIter
})
return iterErr == nil
})

return
}

// activeRangeFeed is a thread safe ActiveRangeFeed.
type activeRangeFeed struct {
syncutil.Mutex
ActiveRangeFeed
}

func (a *activeRangeFeed) onRangeEvent(nodeID roachpb.NodeID) {
a.Lock()
defer a.Unlock()
a.LastEvent = timeutil.Now()
a.NodeID = nodeID
}

// rangeFeedRegistry is responsible for keeping track of currently executing
// range feeds.
type rangeFeedRegistry struct {
RangeFeedContext
// Map of ranges (activeRangeFeed* -> nil) started by this registry.
ranges sync.Map
}

func newRangeFeedRegistry(
ctx context.Context, span roachpb.Span, ts hlc.Timestamp, withDiff bool,
) *rangeFeedRegistry {
rr := &rangeFeedRegistry{
RangeFeedContext: RangeFeedContext{
Span: span,
TS: ts,
WithDiff: withDiff,
},
}
if b := logtags.FromContext(ctx); b != nil {
rr.CtxTags = b.String()
}
return rr
}

// startPartialRangeFeed starts execution of partial rangefeed.
func (r *rangeFeedRegistry) startPartialRangeFeed(
ctx context.Context,
ds *DistSender,
rs roachpb.RSpan,
ts hlc.Timestamp,
token rangecache.EvictionToken,
withDiff bool,
rangeCh chan<- singleRangeInfo,
eventCh chan<- *roachpb.RangeFeedEvent,
) error {
active := &activeRangeFeed{}
active.ActiveRangeFeed = ActiveRangeFeed{
Span: rs.AsRawSpanWithNoLocals(),
StartTS: ts,
}
r.ranges.Store(active, nil)
defer r.ranges.Delete(active)
return ds.partialRangeFeed(
ctx, rs, ts, token, withDiff, rangeCh, eventCh, active.onRangeEvent)
}

func (ds *DistSender) divideAndSendRangeFeedToRanges(
ctx context.Context, rs roachpb.RSpan, ts hlc.Timestamp, rangeCh chan<- singleRangeInfo,
) error {
Expand Down Expand Up @@ -122,40 +234,42 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges(
// this rangefeed, or subdividing the range further in the event of a split.
func (ds *DistSender) partialRangeFeed(
ctx context.Context,
rangeInfo *singleRangeInfo,
rs roachpb.RSpan,
ts hlc.Timestamp,
token rangecache.EvictionToken,
withDiff bool,
rangeCh chan<- singleRangeInfo,
eventCh chan<- *roachpb.RangeFeedEvent,
onRangeEvent onRangeEventCb,
) error {
// Bound the partial rangefeed to the partial span.
span := rangeInfo.rs.AsRawSpanWithNoLocals()
ts := rangeInfo.ts
span := rs.AsRawSpanWithNoLocals()

// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
// If we've cleared the descriptor on a send failure, re-lookup.
if !rangeInfo.token.Valid() {
if !token.Valid() {
var err error
ri, err := ds.getRoutingInfo(ctx, rangeInfo.rs.Key, rangecache.EvictionToken{}, false)
ri, err := ds.getRoutingInfo(ctx, rs.Key, rangecache.EvictionToken{}, false)
if err != nil {
log.VErrEventf(ctx, 1, "range descriptor re-lookup failed: %s", err)
if !rangecache.IsRangeLookupErrorRetryable(err) {
return err
}
continue
}
rangeInfo.token = ri
token = ri
}

// Establish a RangeFeed for a single Range.
maxTS, err := ds.singleRangeFeed(ctx, span, ts, withDiff, rangeInfo.token.Desc(), eventCh)
maxTS, err := ds.singleRangeFeed(ctx, span, ts, withDiff, token.Desc(), eventCh, onRangeEvent)

// Forward the timestamp in case we end up sending it again.
ts.Forward(maxTS)

if err != nil {
if log.V(1) {
log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v",
log.Infof(ctx, "RangeFeed %s disconnected with last lastEvent %s ago: %v",
span, timeutil.Since(ts.GoTime()), err)
}
switch {
Expand All @@ -166,13 +280,13 @@ func (ds *DistSender) partialRangeFeed(
// retry.
case IsSendError(err), errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)):
// Evict the descriptor from the cache and reload on next attempt.
rangeInfo.token.Evict(ctx)
rangeInfo.token = rangecache.EvictionToken{}
token.Evict(ctx)
token = rangecache.EvictionToken{}
continue
case errors.HasType(err, (*roachpb.RangeKeyMismatchError)(nil)):
// Evict the descriptor from the cache.
rangeInfo.token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh)
token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rs, ts, rangeCh)
case errors.HasType(err, (*roachpb.RangeFeedRetryError)(nil)):
var t *roachpb.RangeFeedRetryError
if ok := errors.As(err, &t); !ok {
Expand All @@ -190,8 +304,8 @@ func (ds *DistSender) partialRangeFeed(
roachpb.RangeFeedRetryError_REASON_RANGE_MERGED,
roachpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER:
// Evict the descriptor from the cache.
rangeInfo.token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rangeInfo.rs, ts, rangeCh)
token.Evict(ctx)
return ds.divideAndSendRangeFeedToRanges(ctx, rs, ts, rangeCh)
default:
return errors.AssertionFailedf("unrecognized retriable error type: %T", err)
}
Expand All @@ -203,11 +317,13 @@ func (ds *DistSender) partialRangeFeed(
return ctx.Err()
}

type onRangeEventCb func(nodeID roachpb.NodeID)

// singleRangeFeed gathers and rearranges the replicas, and makes a RangeFeed
// RPC call. Results will be sent on the provided channel. Returns the timestamp
// of the maximum rangefeed checkpoint seen, which can be used to re-establish
// of the maximum rangefeed lastEvent seen, which can be used to re-establish
// the rangefeed with a larger starting timestamp, reflecting the fact that all
// values up to the last checkpoint have already been observed. Returns the
// values up to the last lastEvent have already been observed. Returns the
// request's timestamp if not checkpoints are seen.
func (ds *DistSender) singleRangeFeed(
ctx context.Context,
Expand All @@ -216,6 +332,7 @@ func (ds *DistSender) singleRangeFeed(
withDiff bool,
desc *roachpb.RangeDescriptor,
eventCh chan<- *roachpb.RangeFeedEvent,
onRangeEvent onRangeEventCb,
) (hlc.Timestamp, error) {
args := roachpb.RangeFeedRequest{
Span: span,
Expand Down Expand Up @@ -283,6 +400,9 @@ func (ds *DistSender) singleRangeFeed(
log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError())
return args.Timestamp, t.Error.GoError()
}

onRangeEvent(args.Replica.NodeID)

select {
case eventCh <- event:
case <-ctx.Done():
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/catconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (
CrdbInternalClusterInflightTracesTable
CrdbInternalRegionsTable
CrdbInternalDefaultPrivilegesTable
CrdbInternalActiveRangeFeedsTable
InformationSchemaID
InformationSchemaAdministrableRoleAuthorizationsID
InformationSchemaApplicableRolesID
Expand Down
35 changes: 35 additions & 0 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -150,6 +151,7 @@ var crdbInternal = virtualSchema{
catconstants.CrdbInternalClusterInflightTracesTable: crdbInternalClusterInflightTracesTable,
catconstants.CrdbInternalRegionsTable: crdbInternalRegionsTable,
catconstants.CrdbInternalDefaultPrivilegesTable: crdbInternalDefaultPrivilegesTable,
catconstants.CrdbInternalActiveRangeFeedsTable: crdbInternalActiveRangeFeedsTable,
},
validWithNoDatabaseContext: true,
}
Expand Down Expand Up @@ -5042,3 +5044,36 @@ CREATE TABLE crdb_internal.statement_statistics (
return setupGenerator(ctx, worker, stopper)
},
}

var crdbInternalActiveRangeFeedsTable = virtualSchemaTable{
comment: `node-level table listing all currently running range feeds`,
schema: `
CREATE TABLE crdb_internal.active_range_feeds (
tags STRING,
range_start STRING,
range_end STRING,
startTS STRING,
diff BOOL,
node_id INT,
partial_range_start STRING,
partial_range_end STRING,
last_event_utc INT
);`,
populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
return p.extendedEvalCtx.DistSQLPlanner.distSender.ForEachActiveRangeFeed(
func(rfCtx kvcoord.RangeFeedContext, rf kvcoord.ActiveRangeFeed) error {
return addRow(
tree.NewDString(rfCtx.CtxTags),
tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rfCtx.Span.Key)),
tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rfCtx.Span.EndKey)),
tree.NewDString(rfCtx.TS.AsOfSystemTime()),
tree.MakeDBool(tree.DBool(rfCtx.WithDiff)),
tree.NewDInt(tree.DInt(rf.NodeID)),
tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rf.Span.Key)),
tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rf.Span.EndKey)),
tree.NewDInt(tree.DInt(rf.LastEvent.UTC().UnixNano())),
)
},
)
},
}

0 comments on commit 57c7481

Please sign in to comment.