Skip to content

Commit

Permalink
changefeedccl: add logs for slow ExportRequests and frontier spans
Browse files Browse the repository at this point in the history
The metrics added in cockroachdb#32241 are great for monitoring the health of a
changefeed and roughly debugging performance issues. Many of the ones
I've been seeing in testing have been one or many spans being behind, so
add some debug logging with details on the keys and lag, which then
allows for more targeted investigation.

Release note: None
  • Loading branch information
danhhz committed Nov 29, 2018
1 parent 142476f commit 7ae4ea1
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 45 deletions.
26 changes: 25 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -300,6 +301,8 @@ type changeFrontier struct {
freqEmitResolved time.Duration
// lastEmitResolved is the last time a resolved timestamp was emitted.
lastEmitResolved time.Time
// lastSlowSpanLog is the last time a slow span from `sf` was logged.
lastSlowSpanLog time.Time

// jobProgressedFn, if non-nil, is called to checkpoint the changefeed's
// progress in the corresponding system job entry.
Expand Down Expand Up @@ -497,7 +500,9 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error {
if err := protoutil.Unmarshal([]byte(*raw), &resolved); err != nil {
return errors.Wrapf(err, `unmarshalling resolved span: %x`, raw)
}
if cf.sf.Forward(resolved.Span, resolved.Timestamp) {

frontierChanged := cf.sf.Forward(resolved.Span, resolved.Timestamp)
if frontierChanged {
newResolved := cf.sf.Frontier()
cf.metrics.mu.Lock()
if cf.metricsID != -1 {
Expand All @@ -517,6 +522,25 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error {
cf.lastEmitResolved = newResolved.GoTime()
}
}

// Potentially log the most behind span in the frontier for debugging.
slownessThreshold := 10 * changefeedPollInterval.Get(&cf.flowCtx.Settings.SV)
frontier := cf.sf.Frontier()
now := timeutil.Now()
if resolvedBehind := now.Sub(frontier.GoTime()); resolvedBehind > slownessThreshold {
if frontierChanged {
log.Infof(cf.Ctx, "job %d new resolved timestamp %s is behind by %s",
cf.spec.JobID, frontier, resolvedBehind)
}
const slowSpanMaxFrequency = 10 * time.Second
if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency {
cf.lastSlowSpanLog = now
s := cf.sf.peekFrontierSpan()
log.Infof(cf.Ctx, "job %d span [%s,%s) is behind by %s",
cf.spec.JobID, s.Key, s.EndKey, resolvedBehind)
}
}

return nil
}

Expand Down
99 changes: 55 additions & 44 deletions pkg/ccl/changefeedccl/poller.go
Expand Up @@ -385,8 +385,6 @@ func getSpansToProcess(
func (p *poller) exportSpansParallel(
ctx context.Context, spans []roachpb.Span, start, end hlc.Timestamp, isFullScan bool,
) error {
sender := p.db.NonTransactionalSender()

// Export requests for the various watched spans are executed in parallel,
// with a semaphore-enforced limit based on a cluster setting.
maxConcurrentExports := clusterNodeCount(p.gossip) *
Expand All @@ -409,58 +407,30 @@ func (p *poller) exportSpansParallel(

g.GoCtx(func(ctx context.Context) error {
defer func() { <-exportsSem }()
if log.V(2) {
log.Infof(ctx, `sending ExportRequest [%s,%s)`, span.Key, span.EndKey)
}

stopwatchStart := timeutil.Now()
exported, pErr := exportSpan(ctx, span, sender, start, end, isFullScan)
exportDuration := timeutil.Since(stopwatchStart)
err := p.exportSpan(ctx, span, start, end, isFullScan)
finished := atomic.AddInt64(&atomicFinished, 1)
if log.V(2) {
log.Infof(ctx, `finished ExportRequest [%s,%s) %d of %d took %s`,
span.Key, span.EndKey, finished, len(spans), timeutil.Since(stopwatchStart))
log.Infof(ctx, `exported %d of %d`, finished, len(spans))
}
if pErr != nil {
return errors.Wrapf(
pErr.GoError(), `fetching changes for [%s,%s)`, span.Key, span.EndKey,
)
}
p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds())

// When outputting a full scan, we want to use the schema at the scan
// timestamp, not the schema at the value timestamp.
var schemaTimestamp hlc.Timestamp
if isFullScan {
schemaTimestamp = end
}
stopwatchStart = timeutil.Now()
for _, file := range exported.(*roachpb.ExportResponse).Files {
if err := p.slurpSST(ctx, file.SST, schemaTimestamp); err != nil {
return err
}
}
if err := p.buf.AddResolved(ctx, span, end); err != nil {
if err != nil {
return err
}

if log.V(2) {
log.Infof(ctx, `finished buffering [%s,%s) took %s`,
span.Key, span.EndKey, timeutil.Since(stopwatchStart))
}
return nil
})
}
return g.Wait()
}

func exportSpan(
ctx context.Context,
span roachpb.Span,
sender client.Sender,
start, end hlc.Timestamp,
fullScan bool,
) (roachpb.Response, *roachpb.Error) {
func (p *poller) exportSpan(
ctx context.Context, span roachpb.Span, start, end hlc.Timestamp, isFullScan bool,
) error {
sender := p.db.NonTransactionalSender()
if log.V(2) {
log.Infof(ctx, `sending ExportRequest [%s,%s) over (%s,%s]`,
span.Key, span.EndKey, start, end)
}

header := roachpb.Header{Timestamp: end}
req := &roachpb.ExportRequest{
RequestHeader: roachpb.RequestHeaderFromSpan(span),
Expand All @@ -469,11 +439,52 @@ func exportSpan(
ReturnSST: true,
OmitChecksum: true,
}
if fullScan {
if isFullScan {
req.MVCCFilter = roachpb.MVCCFilter_Latest
req.StartTime = hlc.Timestamp{}
}
return client.SendWrappedWith(ctx, sender, header, req)

stopwatchStart := timeutil.Now()
exported, pErr := client.SendWrappedWith(ctx, sender, header, req)
exportDuration := timeutil.Since(stopwatchStart)
if log.V(2) {
log.Infof(ctx, `finished ExportRequest [%s,%s) over (%s,%s] took %s`,
span.Key, span.EndKey, start, end, exportDuration)
}
slowExportThreshold := 10 * changefeedPollInterval.Get(&p.settings.SV)
if exportDuration > slowExportThreshold {
log.Infof(ctx, "finished ExportRequest [%s,%s) over (%s,%s] took %s behind by %s",
span.Key, span.EndKey, start, end, exportDuration, timeutil.Since(end.GoTime()))
}

if pErr != nil {
return errors.Wrapf(
pErr.GoError(), `fetching changes for [%s,%s)`, span.Key, span.EndKey,
)
}
p.metrics.PollRequestNanosHist.RecordValue(exportDuration.Nanoseconds())

// When outputting a full scan, we want to use the schema at the scan
// timestamp, not the schema at the value timestamp.
var schemaTimestamp hlc.Timestamp
if isFullScan {
schemaTimestamp = end
}
stopwatchStart = timeutil.Now()
for _, file := range exported.(*roachpb.ExportResponse).Files {
if err := p.slurpSST(ctx, file.SST, schemaTimestamp); err != nil {
return err
}
}
if err := p.buf.AddResolved(ctx, span, end); err != nil {
return err
}

if log.V(2) {
log.Infof(ctx, `finished buffering [%s,%s) took %s`,
span.Key, span.EndKey, timeutil.Since(stopwatchStart))
}
return nil
}

func (p *poller) updateTableHistory(ctx context.Context, endTS hlc.Timestamp) error {
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/span_frontier.go
Expand Up @@ -130,6 +130,13 @@ func (s *spanFrontier) Frontier() hlc.Timestamp {
return s.minHeap[0].ts
}

func (s *spanFrontier) peekFrontierSpan() roachpb.Span {
if s.minHeap.Len() == 0 {
return roachpb.Span{}
}
return s.minHeap[0].span
}

// Forward advances the timestamp for a span. Any part of the span that doesn't
// overlap the tracked span set will be ignored. True is returned if the
// frontier advanced as a result.
Expand Down

0 comments on commit 7ae4ea1

Please sign in to comment.