Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-2.1: changefeed performance debugging #32872

Merged
merged 2 commits into from Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/buffer.go
Expand Up @@ -10,10 +10,12 @@ package changefeedccl

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

type bufferEntry struct {
Expand All @@ -22,6 +24,8 @@ type bufferEntry struct {
// Timestamp of the schema that should be used to read this KV.
// If unset (zero-valued), the value's timestamp will be used instead.
schemaTimestamp hlc.Timestamp
// bufferGetTimestamp is the time this entry came out of the buffer.
bufferGetTimestamp time.Time
}

// buffer mediates between the changed data poller and the rest of the
Expand Down Expand Up @@ -67,6 +71,7 @@ func (b *buffer) Get(ctx context.Context) (bufferEntry, error) {
case <-ctx.Done():
return bufferEntry{}, ctx.Err()
case e := <-b.entriesCh:
e.bufferGetTimestamp = timeutil.Now()
return e, nil
}
}
25 changes: 23 additions & 2 deletions pkg/ccl/changefeedccl/changefeed.go
Expand Up @@ -61,6 +61,9 @@ type emitEntry struct {
// span that no previously unseen entries with a lower or equal updated
// timestamp will be emitted.
resolved *jobspb.ResolvedSpan

// bufferGetTimestamp is the time this entry came out of the buffer.
bufferGetTimestamp time.Time
}

// kvsToRows gets changed kvs from a closure and converts them into sql rows. It
Expand All @@ -76,6 +79,7 @@ func kvsToRows(
var kvs sqlbase.SpanKVFetcher
appendEmitEntryForKV := func(
ctx context.Context, output []emitEntry, kv roachpb.KeyValue, schemaTimestamp hlc.Timestamp,
bufferGetTimestamp time.Time,
) ([]emitEntry, error) {
// Reuse kvs to save allocations.
kvs.KVs = kvs.KVs[:0]
Expand Down Expand Up @@ -104,6 +108,7 @@ func kvsToRows(

for {
var r emitEntry
r.bufferGetTimestamp = bufferGetTimestamp
r.row.datums, r.row.tableDesc, _, err = rf.NextRow(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -140,13 +145,17 @@ func kvsToRows(
if input.schemaTimestamp != (hlc.Timestamp{}) {
schemaTimestamp = input.schemaTimestamp
}
output, err = appendEmitEntryForKV(ctx, output, input.kv, schemaTimestamp)
output, err = appendEmitEntryForKV(
ctx, output, input.kv, schemaTimestamp, input.bufferGetTimestamp)
if err != nil {
return nil, err
}
}
if input.resolved != nil {
output = append(output, emitEntry{resolved: input.resolved})
output = append(output, emitEntry{
resolved: input.resolved,
bufferGetTimestamp: input.bufferGetTimestamp,
})
}
if output != nil {
return output, nil
Expand All @@ -166,6 +175,7 @@ func emitEntries(
sink Sink,
inputFn func(context.Context) ([]emitEntry, error),
knobs TestingKnobs,
metrics *Metrics,
) func(context.Context) ([]jobspb.ResolvedSpan, error) {
var scratch bufalloc.ByteAllocator
emitRowFn := func(ctx context.Context, row emitRow) error {
Expand Down Expand Up @@ -210,6 +220,17 @@ func emitEntries(
return nil, err
}
for _, input := range inputs {
if input.bufferGetTimestamp == (time.Time{}) {
// We could gracefully handle this instead of panic'ing, but
// we'd really like to be able to reason about this data, so
// instead we're defensive. If this is ever seen in prod without
// breaking a unit test, then we have a pretty severe test
// coverage issue.
panic(`unreachable: bufferGetTimestamp is set by all codepaths`)
}
processingNanos := timeutil.Since(input.bufferGetTimestamp).Nanoseconds()
metrics.ProcessingNanos.Inc(processingNanos)

if input.row.datums != nil {
if err := emitRowFn(ctx, input.row); err != nil {
return nil, err
Expand Down
30 changes: 27 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -150,15 +151,15 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
leaseMgr := ca.flowCtx.LeaseManager.(*sql.LeaseManager)
ca.poller = makePoller(
ca.flowCtx.Settings, ca.flowCtx.ClientDB, ca.flowCtx.ClientDB.Clock(), ca.flowCtx.Gossip,
spans, ca.spec.Feed, initialHighWater, buf, leaseMgr,
spans, ca.spec.Feed, initialHighWater, buf, leaseMgr, metrics,
)
rowsFn := kvsToRows(leaseMgr, ca.spec.Feed, buf.Get)

var knobs TestingKnobs
if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
knobs = *cfKnobs
}
ca.tickFn = emitEntries(ca.flowCtx.Settings, ca.spec.Feed, ca.encoder, ca.sink, rowsFn, knobs)
ca.tickFn = emitEntries(ca.flowCtx.Settings, ca.spec.Feed, ca.encoder, ca.sink, rowsFn, knobs, metrics)

// Give errCh enough buffer both possible errors from supporting goroutines,
// but only the first one is ever used.
Expand Down Expand Up @@ -300,6 +301,8 @@ type changeFrontier struct {
freqEmitResolved time.Duration
// lastEmitResolved is the last time a resolved timestamp was emitted.
lastEmitResolved time.Time
// lastSlowSpanLog is the last time a slow span from `sf` was logged.
lastSlowSpanLog time.Time

// jobProgressedFn, if non-nil, is called to checkpoint the changefeed's
// progress in the corresponding system job entry.
Expand Down Expand Up @@ -497,7 +500,9 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error {
if err := protoutil.Unmarshal([]byte(*raw), &resolved); err != nil {
return errors.Wrapf(err, `unmarshalling resolved span: %x`, raw)
}
if cf.sf.Forward(resolved.Span, resolved.Timestamp) {

frontierChanged := cf.sf.Forward(resolved.Span, resolved.Timestamp)
if frontierChanged {
newResolved := cf.sf.Frontier()
cf.metrics.mu.Lock()
if cf.metricsID != -1 {
Expand All @@ -517,6 +522,25 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error {
cf.lastEmitResolved = newResolved.GoTime()
}
}

// Potentially log the most behind span in the frontier for debugging.
slownessThreshold := 10 * changefeedPollInterval.Get(&cf.flowCtx.Settings.SV)
frontier := cf.sf.Frontier()
now := timeutil.Now()
if resolvedBehind := now.Sub(frontier.GoTime()); resolvedBehind > slownessThreshold {
if frontierChanged {
log.Infof(cf.Ctx, "job %d new resolved timestamp %s is behind by %s",
cf.spec.JobID, frontier, resolvedBehind)
}
const slowSpanMaxFrequency = 10 * time.Second
if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency {
cf.lastSlowSpanLog = now
s := cf.sf.peekFrontierSpan()
log.Infof(cf.Ctx, "job %d span [%s,%s) is behind by %s",
cf.spec.JobID, s.Key, s.EndKey, resolvedBehind)
}
}

return nil
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/changefeedccl/helpers_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -124,11 +125,12 @@ func createBenchmarkChangefeed(
encoder := makeJSONEncoder(details.Opts)
sink := makeBenchSink()

metrics := MakeMetrics(server.DefaultHistogramWindowInterval).(*Metrics)
buf := makeBuffer()
leaseMgr := s.LeaseManager().(*sql.LeaseManager)
poller := makePoller(
s.ClusterSettings(), s.DB(), feedClock, s.Gossip(),
spans, details, initialHighWater, buf, leaseMgr,
s.ClusterSettings(), s.DB(), feedClock, s.Gossip(), spans, details, initialHighWater, buf,
leaseMgr, metrics,
)

th := makeTableHistory(func(context.Context, *sqlbase.TableDescriptor) error { return nil }, initialHighWater)
Expand All @@ -139,7 +141,8 @@ func createBenchmarkChangefeed(
m: th,
}
rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), details, buf.Get)
tickFn := emitEntries(s.ClusterSettings(), details, encoder, sink, rowsFn, TestingKnobs{})
tickFn := emitEntries(
s.ClusterSettings(), details, encoder, sink, rowsFn, TestingKnobs{}, metrics)

ctx, cancel := context.WithCancel(ctx)
go func() { _ = poller.Run(ctx) }()
Expand Down
82 changes: 62 additions & 20 deletions pkg/ccl/changefeedccl/metrics.go
Expand Up @@ -11,6 +11,7 @@ package changefeedccl
import (
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -81,21 +82,43 @@ var (
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaChangefeedEmitNanos = metric.Metadata{
Name: "changefeed.emit_nanos",
Help: "Total time spent emitting all feeds",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
// This is more naturally a histogram but that creates a lot of timeseries
// and it's not clear that the additional fidelity is worth it. Revisit if
// evidence suggests otherwise.
metaChangefeedFlushes = metric.Metadata{
Name: "changefeed.flushes",
Help: "Total flushes across all feeds",
Measurement: "Flushes",
Unit: metric.Unit_COUNT,
}
metaChangefeedSinkErrorRetries = metric.Metadata{
Name: "changefeed.sink_error_retries",
Help: "Total retryable errors encountered while emitting to sinks",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}

metaChangefeedPollRequestNanos = metric.Metadata{
Name: "changefeed.poll_request_nanos",
Help: "Time spent fetching changes",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaChangefeedProcessingNanos = metric.Metadata{
Name: "changefeed.processing_nanos",
Help: "Time spent processing KV changes into SQL rows",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaChangefeedTableMetadataNanos = metric.Metadata{
Name: "changefeed.table_metadata_nanos",
Help: "Time blocked while verifying table metadata histories",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaChangefeedEmitNanos = metric.Metadata{
Name: "changefeed.emit_nanos",
Help: "Total time spent emitting all feeds",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaChangefeedFlushNanos = metric.Metadata{
Name: "changefeed.flush_nanos",
Help: "Total time spent flushing all feeds",
Expand All @@ -113,25 +136,25 @@ var (
Measurement: "Nanoseconds",
Unit: metric.Unit_TIMESTAMP_NS,
}
metaChangefeedSinkErrorRetries = metric.Metadata{
Name: "changefeed.sink_error_retries",
Help: "Total retryable errors encountered while emitting to sinks",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
)

const noMinHighWaterSentinel = int64(math.MaxInt64)

const pollRequestNanosHistMaxLatency = time.Hour

// Metrics are for production monitoring of changefeeds.
type Metrics struct {
EmittedMessages *metric.Counter
EmittedBytes *metric.Counter
EmitNanos *metric.Counter
Flushes *metric.Counter
FlushNanos *metric.Counter
SinkErrorRetries *metric.Counter

PollRequestNanosHist *metric.Histogram
ProcessingNanos *metric.Counter
TableMetadataNanos *metric.Counter
EmitNanos *metric.Counter
FlushNanos *metric.Counter

mu struct {
syncutil.Mutex
id int
Expand All @@ -144,14 +167,33 @@ type Metrics struct {
func (*Metrics) MetricStruct() {}

// MakeMetrics makes the metrics for changefeed monitoring.
func MakeMetrics() metric.Struct {
func MakeMetrics(histogramWindow time.Duration) metric.Struct {
m := &Metrics{
EmittedMessages: metric.NewCounter(metaChangefeedEmittedMessages),
EmittedBytes: metric.NewCounter(metaChangefeedEmittedBytes),
EmitNanos: metric.NewCounter(metaChangefeedEmitNanos),
Flushes: metric.NewCounter(metaChangefeedFlushes),
FlushNanos: metric.NewCounter(metaChangefeedFlushNanos),
SinkErrorRetries: metric.NewCounter(metaChangefeedSinkErrorRetries),

// Metrics for changefeed performance debugging: - PollRequestNanos and
// PollRequestNanosHist, things are first
// fetched with some limited concurrency. We're interested in both the
// total amount of time fetching as well as outliers, so we need both
// the counter and the histogram.
// - N/A. Each change is put into a buffer. Right now nothing measures
// this since the buffer doesn't actually buffer and so it just tracks
// the poll sleep time.
// - ProcessingNanos. Everything from the buffer until the SQL row is
// about to be emitted. This includes TableMetadataNanos, which is
// dependent on network calls, so also tracked in case it's ever the
// cause of a ProcessingNanos blowup.
// - EmitNanos and FlushNanos. All of our interactions with the sink.
PollRequestNanosHist: metric.NewHistogram(
metaChangefeedPollRequestNanos, histogramWindow,
pollRequestNanosHistMaxLatency.Nanoseconds(), 1),
ProcessingNanos: metric.NewCounter(metaChangefeedProcessingNanos),
TableMetadataNanos: metric.NewCounter(metaChangefeedTableMetadataNanos),
EmitNanos: metric.NewCounter(metaChangefeedEmitNanos),
FlushNanos: metric.NewCounter(metaChangefeedFlushNanos),
}
m.mu.resolved = make(map[int]hlc.Timestamp)
m.MinHighWater = metric.NewFunctionalGauge(metaChangefeedMinHighWater, func() int64 {
Expand Down