Skip to content

Commit 503a214

Browse files
committed
db: track memtable flush, manifest update durations in TableIngestInfo
This patch tracks the duration of memtable flushes (for flushable ingests) and MANIFEST updates in our TableIngestInfo. This gives us more observability into slow ingests. Informs: #4970
1 parent c1d3686 commit 503a214

File tree

7 files changed

+41
-13
lines changed

7 files changed

+41
-13
lines changed

event.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,12 @@ type TableIngestInfo struct {
535535
// flushable.
536536
flushable bool
537537
Err error
538+
// WaitFlushDuration is the time spent waiting for memtable flushes to
539+
// complete, given that an overlap between ingesting sstables and memtables
540+
// exists.
541+
WaitFlushDuration time.Duration
542+
// ManifestUpdateDuration is the time spent updating the manifest.
543+
ManifestUpdateDuration time.Duration
538544
}
539545

540546
func (i TableIngestInfo) String() string {
@@ -549,7 +555,8 @@ func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
549555
}
550556

551557
if i.flushable {
552-
w.Printf("[JOB %d] ingested as flushable", redact.Safe(i.JobID))
558+
w.Printf("[JOB %d] ingested as flushable, memtable flushes took %.1fs:", redact.Safe(i.JobID),
559+
redact.Safe(i.WaitFlushDuration.Seconds()))
553560
} else {
554561
w.Printf("[JOB %d] ingested", redact.Safe(i.JobID))
555562
}
@@ -566,6 +573,7 @@ func (i TableIngestInfo) SafeFormat(w redact.SafePrinter, _ rune) {
566573
w.Printf(" %s%s (%s)", redact.Safe(levelStr), t.FileNum,
567574
redact.Safe(humanize.Bytes.Uint64(t.Size)))
568575
}
576+
w.Printf("; manifest update took %.1fs", redact.Safe(i.ManifestUpdateDuration.Seconds()))
569577
}
570578

571579
// TableStatsInfo contains the info for a table stats loaded event.

event_listener_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ func TestEventListener(t *testing.T) {
5555
info.InputBytes = 100
5656
flushEnd(info)
5757
}
58+
tableIngested := lel.TableIngested
59+
lel.TableIngested = func(info TableIngestInfo) {
60+
// Make deterministic.
61+
info.WaitFlushDuration = 200 * time.Millisecond
62+
info.ManifestUpdateDuration = 100 * time.Millisecond
63+
tableIngested(info)
64+
}
5865
opts := &Options{
5966
// The table stats collector runs asynchronously and its
6067
// timing is less predictable. It increments nextJobID, which

ingest.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"sort"
1212
"time"
1313

14+
"github.com/cockroachdb/crlib/crtime"
1415
"github.com/cockroachdb/errors"
1516
"github.com/cockroachdb/pebble/internal/base"
1617
"github.com/cockroachdb/pebble/internal/cache"
@@ -1499,6 +1500,7 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
14991500
var mut *memTable
15001501
// asFlushable indicates whether the sstable was ingested as a flushable.
15011502
var asFlushable bool
1503+
var waitFlushStart crtime.Mono
15021504
prepare := func(seqNum base.SeqNum) {
15031505
// Note that d.commit.mu is held by commitPipeline when calling prepare.
15041506

@@ -1607,7 +1609,6 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
16071609
d.mu.log.manager.ElevateWriteStallThresholdForFailover()) &&
16081610
!d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles &&
16091611
(!args.ExciseSpan.Valid() || d.FormatMajorVersion() >= FormatFlushableIngestExcises)
1610-
16111612
if !canIngestFlushable {
16121613
// We're not able to ingest as a flushable,
16131614
// so we must synchronously flush.
@@ -1628,6 +1629,7 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
16281629
mut = d.mu.mem.mutable
16291630
mut.writerRef()
16301631
mem.flushForced = true
1632+
waitFlushStart = crtime.NowMono()
16311633
d.maybeScheduleFlush()
16321634
return
16331635
}
@@ -1642,6 +1644,8 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
16421644
}
16431645

16441646
var ve *manifest.VersionEdit
1647+
var waitFlushDuration time.Duration
1648+
var manifestUpdateDuration time.Duration
16451649
apply := func(seqNum base.SeqNum) {
16461650
if err != nil || asFlushable {
16471651
// An error occurred during prepare.
@@ -1683,11 +1687,12 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
16831687
// finish.
16841688
if mem != nil {
16851689
<-mem.flushed
1690+
waitFlushDuration = waitFlushStart.Elapsed()
16861691
}
16871692

16881693
// Assign the sstables to the correct level in the LSM and apply the
16891694
// version edit.
1690-
ve, err = d.ingestApply(ctx, jobID, loadResult, mut, args.ExciseSpan, args.ExciseBoundsPolicy, seqNum)
1695+
ve, manifestUpdateDuration, err = d.ingestApply(ctx, jobID, loadResult, mut, args.ExciseSpan, args.ExciseBoundsPolicy, seqNum)
16911696
}
16921697

16931698
// Only one ingest can occur at a time because if not, one would block waiting
@@ -1724,9 +1729,11 @@ func (d *DB) ingest(ctx context.Context, args ingestArgs) (IngestOperationStats,
17241729
var stats IngestOperationStats
17251730
if loadResult.fileCount() > 0 {
17261731
info := TableIngestInfo{
1727-
JobID: int(jobID),
1728-
Err: err,
1729-
flushable: asFlushable,
1732+
JobID: int(jobID),
1733+
Err: err,
1734+
flushable: asFlushable,
1735+
WaitFlushDuration: waitFlushDuration,
1736+
ManifestUpdateDuration: manifestUpdateDuration,
17301737
}
17311738
if len(loadResult.local) > 0 {
17321739
info.GlobalSeqNum = loadResult.local[0].SmallestSeqNum
@@ -1905,7 +1912,7 @@ func (d *DB) ingestApply(
19051912
exciseSpan KeyRange,
19061913
exciseBoundsPolicy exciseBoundsPolicy,
19071914
exciseSeqNum base.SeqNum,
1908-
) (*manifest.VersionEdit, error) {
1915+
) (*manifest.VersionEdit, time.Duration, error) {
19091916
d.mu.Lock()
19101917
defer d.mu.Unlock()
19111918

@@ -1917,6 +1924,7 @@ func (d *DB) ingestApply(
19171924
}
19181925
var metrics levelMetricsDelta
19191926

1927+
manifestUpdateStart := crtime.NowMono()
19201928
// Determine the target level inside UpdateVersionLocked. This prevents two
19211929
// concurrent ingestion jobs from using the same version to determine the
19221930
// target level, and also provides serialization with concurrent compaction
@@ -2164,8 +2172,9 @@ func (d *DB) ingestApply(
21642172
}, nil
21652173
})
21662174
if err != nil {
2167-
return nil, err
2175+
return nil, 0, err
21682176
}
2177+
manifestUpdateDuration := manifestUpdateStart.Elapsed()
21692178

21702179
// Check for any EventuallyFileOnlySnapshots that could be watching for
21712180
// an excise on this span. There should be none as the
@@ -2211,7 +2220,7 @@ func (d *DB) ingestApply(
22112220
}
22122221
}
22132222
d.maybeValidateSSTablesLocked(toValidate)
2214-
return ve, nil
2223+
return ve, manifestUpdateDuration, nil
22152224
}
22162225

22172226
// maybeValidateSSTablesLocked adds the slice of newTableEntrys to the pending

internal/strparse/strparse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (p *Parser) Uint32() uint32 {
175175
return uint32(x)
176176
}
177177

178-
// Uint64 parses the next token as a sequence number.
178+
// SeqNum parses the next token as a sequence number.
179179
func (p *Parser) SeqNum() base.SeqNum {
180180
return base.ParseSeqNum(p.Next())
181181
}

replay/testdata/collect/start_stop

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ ingest
1414
created src/000002.sst
1515
created src/000003.sst
1616
created src/000004.sst
17-
[JOB 0] ingested L0:000002 (10KB), L0:000003 (10KB), L0:000004 (10KB)
17+
[JOB 0] ingested L0:000002 (10KB), L0:000003 (10KB), L0:000004 (10KB); manifest update took 0.1s
1818

1919

2020
wait

replay/workload_capture_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ func TestWorkloadCollector(t *testing.T) {
128128
_, err = currentManifest.Write(randData(25))
129129
require.NoError(t, err)
130130
}
131+
// Override the default duration values for TableIngestInfo to
132+
// ensure deterministic output.
133+
ingestInfo.WaitFlushDuration = 200 * time.Millisecond
134+
ingestInfo.ManifestUpdateDuration = 100 * time.Millisecond
131135
fmt.Fprint(&buf, ingestInfo.String())
132136
c.onTableIngest(ingestInfo)
133137
return buf.String()

testdata/event_listener

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ sync: db
238238
remove: db/MANIFEST-000011
239239
[JOB 10] MANIFEST deleted 000011
240240
remove: ext/0
241-
[JOB 10] ingested L0:000015 (757B)
241+
[JOB 10] ingested L0:000015 (755B); manifest update took 0.1s
242242

243243
metrics
244244
----
@@ -350,7 +350,7 @@ sync: wal
350350
[JOB 13] WAL created 000020
351351
remove: ext/a
352352
remove: ext/b
353-
[JOB 11] ingested as flushable 000017 (757B), 000018 (757B)
353+
[JOB 11] ingested as flushable, memtable flushes took 0.2s: 000017 (755B), 000018 (755B); manifest update took 0.1s
354354
sync-data: wal/000020.log
355355
close: wal/000020.log
356356
create: wal/000021.log

0 commit comments

Comments
 (0)