Skip to content

Commit 6344314

Browse files
committed
db: pull timeNow into Options.private
Pull the timeNow func that permits mocking time into the private section of the Options struct so that callers may mock time within Open itself.
1 parent 83c41dc commit 6344314

File tree

10 files changed

+49
-45
lines changed

10 files changed

+49
-45
lines changed

blob_rewrite.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (c *pickedBlobFileCompaction) ConstructCompaction(
6161
// when it completes.
6262
c.vers.Ref()
6363
return &blobFileRewriteCompaction{
64-
beganAt: d.timeNow(),
64+
beganAt: d.opts.private.timeNow(),
6565
grantHandle: grantHandle,
6666
version: c.vers,
6767
input: c.file,
@@ -188,12 +188,12 @@ func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
188188
},
189189
}
190190
d.opts.EventListener.BlobFileRewriteBegin(info)
191-
startTime := d.timeNow()
191+
startTime := d.opts.private.timeNow()
192192

193193
// Run the blob file rewrite.
194194
objMeta, ve, err := d.runBlobFileRewriteLocked(ctx, jobID, c)
195195

196-
info.Duration = d.timeNow().Sub(startTime)
196+
info.Duration = d.opts.private.timeNow().Sub(startTime)
197197

198198
// Update the version with the remapped blob file.
199199
if err == nil {
@@ -266,7 +266,7 @@ func (c *blobFileRewriteCompaction) Execute(jobID JobID, d *DB) error {
266266
}
267267

268268
// Notify the event listener that the compaction has ended.
269-
now := d.timeNow()
269+
now := d.opts.private.timeNow()
270270
info.TotalDuration = now.Sub(c.beganAt)
271271
info.Done = true
272272
info.Err = err
@@ -348,7 +348,7 @@ func (d *DB) runBlobFileRewriteLocked(
348348
FileNum: objMeta.DiskFileNum,
349349
Size: stats.FileLen,
350350
ValueSize: stats.UncompressedValueBytes,
351-
CreationTime: uint64(d.timeNow().Unix()),
351+
CreationTime: uint64(d.opts.private.timeNow().Unix()),
352352
}
353353
physical.PopulateProperties(&stats.Properties)
354354

compaction.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1431,7 +1431,7 @@ func (d *DB) maybeScheduleDelayedFlush(tbl *memTable, dur time.Duration) {
14311431
if mem == nil || mem.flushForced {
14321432
return
14331433
}
1434-
deadline := d.timeNow().Add(dur)
1434+
deadline := d.opts.private.timeNow().Add(dur)
14351435
if !mem.delayedFlushForcedAt.IsZero() && deadline.After(mem.delayedFlushForcedAt) {
14361436
// Already scheduled to flush sooner than within `dur`.
14371437
return
@@ -1720,7 +1720,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
17201720
d.mu.versions.latest.l0Organizer,
17211721
d.mu.versions.picker.getBaseLevel(),
17221722
d.mu.mem.queue[:n],
1723-
d.timeNow(),
1723+
d.opts.private.timeNow(),
17241724
d.shouldCreateShared(0),
17251725
d.determineCompactionValueSeparation,
17261726
)
@@ -1738,7 +1738,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
17381738
}
17391739
d.opts.EventListener.FlushBegin(info)
17401740

1741-
startTime := d.timeNow()
1741+
startTime := d.opts.private.timeNow()
17421742

17431743
var ve *manifest.VersionEdit
17441744
var stats compact.Stats
@@ -1758,7 +1758,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
17581758
if c.kind == compactionKindIngestedFlushable {
17591759
ve, err = d.runIngestFlush(c)
17601760
}
1761-
info.Duration = d.timeNow().Sub(startTime)
1761+
info.Duration = d.opts.private.timeNow().Sub(startTime)
17621762
if err != nil {
17631763
return versionUpdate{}, err
17641764
}
@@ -1885,7 +1885,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
18851885
info.Err = errEmptyTable
18861886
}
18871887
info.Done = true
1888-
info.TotalDuration = d.timeNow().Sub(startTime)
1888+
info.TotalDuration = d.opts.private.timeNow().Sub(startTime)
18891889
d.opts.EventListener.FlushEnd(info)
18901890

18911891
// The order of these operations matters here for ease of testing.
@@ -2280,7 +2280,7 @@ func (d *DB) tryScheduleDeleteOnlyCompaction() bool {
22802280
d.mu.compact.deletionHints = unresolvedHints
22812281

22822282
if len(inputs) > 0 {
2283-
c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow(), resolvedHints, exciseEnabled)
2283+
c := newDeleteOnlyCompaction(d.opts, v, inputs, d.opts.private.timeNow(), resolvedHints, exciseEnabled)
22842284
d.mu.compact.compactingCount++
22852285
d.mu.compact.compactProcesses++
22862286
c.AddInProgressLocked(d)
@@ -2635,7 +2635,7 @@ func (d *DB) compact(c compaction, errChannel chan error) {
26352635
// must be atomic with the above removal of c from
26362636
// d.mu.compact.InProgress to ensure Metrics.Compact.Duration does not
26372637
// miss or double count a completing compaction's duration.
2638-
d.mu.compact.duration += d.timeNow().Sub(c.BeganAt())
2638+
d.mu.compact.duration += d.opts.private.timeNow().Sub(c.BeganAt())
26392639
}()
26402640
// Done must not be called while holding any lock that needs to be
26412641
// acquired by Schedule. Also, it must be called after new Version has
@@ -2737,12 +2737,12 @@ func (d *DB) cleanupVersionEdit(ve *manifest.VersionEdit) {
27372737
func (d *DB) compact1(jobID JobID, c *tableCompaction) (err error) {
27382738
info := c.makeInfo(jobID)
27392739
d.opts.EventListener.CompactionBegin(info)
2740-
startTime := d.timeNow()
2740+
startTime := d.opts.private.timeNow()
27412741

27422742
ve, stats, outputBlobs, err := d.runCompaction(jobID, c)
27432743

27442744
info.Annotations = append(info.Annotations, c.annotations...)
2745-
info.Duration = d.timeNow().Sub(startTime)
2745+
info.Duration = d.opts.private.timeNow().Sub(startTime)
27462746
if err == nil {
27472747
validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
27482748
_, err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
@@ -2799,7 +2799,7 @@ func (d *DB) compact1(jobID JobID, c *tableCompaction) (err error) {
27992799
d.mu.versions.incrementCompactions(c.kind, c.extraLevels, c.metrics.bytesWritten.Load(), err)
28002800
d.mu.versions.incrementCompactionBytes(-c.metrics.bytesWritten.Load())
28012801

2802-
info.TotalDuration = d.timeNow().Sub(c.metrics.beganAt)
2802+
info.TotalDuration = d.opts.private.timeNow().Sub(c.metrics.beganAt)
28032803
d.opts.EventListener.CompactionEnd(info)
28042804

28052805
// Update the read state before deleting obsolete files because the

compaction_picker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func (pc *pickedTableCompaction) ConstructCompaction(
245245
return newCompaction(
246246
pc,
247247
d.opts,
248-
d.timeNow(),
248+
d.opts.private.timeNow(),
249249
d.ObjProvider(),
250250
grantHandle,
251251
d.shouldCreateShared(pc.outputLevel.level),

compaction_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2890,6 +2890,11 @@ func TestMarkedForCompaction(t *testing.T) {
28902890
EventListener: &eventListener,
28912891
Logger: testutils.Logger{T: t},
28922892
}
2893+
t := time.Now()
2894+
opts.private.timeNow = func() time.Time {
2895+
t = t.Add(time.Second)
2896+
return t
2897+
}
28932898
opts.Experimental.CompactionScheduler = func() CompactionScheduler {
28942899
return NewConcurrencyLimitSchedulerWithNoPeriodicGrantingForTest()
28952900
}
@@ -2899,11 +2904,7 @@ func TestMarkedForCompaction(t *testing.T) {
28992904
}
29002905
d.mu.Lock()
29012906
defer d.mu.Unlock()
2902-
t := time.Now()
2903-
d.timeNow = func() time.Time {
2904-
t = t.Add(time.Second)
2905-
return t
2906-
}
2907+
29072908
s := d.mu.versions.currentVersion().DebugString()
29082909
return s
29092910

db.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,6 @@ type DB struct {
547547
// quickly.
548548
problemSpans problemspans.ByLevel
549549

550-
// Normally equal to time.Now() but may be overridden in tests.
551-
timeNow func() time.Time
552550
// the time at database Open; may be used to compute metrics like effective
553551
// compaction concurrency
554552
openedAt time.Time
@@ -1901,7 +1899,7 @@ func (d *DB) Metrics() *Metrics {
19011899
metrics.Compact.Duration = d.mu.compact.duration
19021900
for c := range d.mu.compact.inProgress {
19031901
if !c.IsFlush() {
1904-
metrics.Compact.Duration += d.timeNow().Sub(c.BeganAt())
1902+
metrics.Compact.Duration += d.opts.private.timeNow().Sub(c.BeganAt())
19051903
}
19061904
}
19071905
metrics.Compact.NumProblemSpans = d.problemSpans.Len()
@@ -2026,7 +2024,7 @@ func (d *DB) Metrics() *Metrics {
20262024

20272025
metrics.SecondaryCacheMetrics = d.objProvider.Metrics()
20282026

2029-
metrics.Uptime = d.timeNow().Sub(d.openedAt)
2027+
metrics.Uptime = d.opts.private.timeNow().Sub(d.openedAt)
20302028

20312029
metrics.manualMemory = manual.GetMetrics()
20322030

download.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@ func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
6767
JobID: int(d.newJobID()),
6868
Spans: spans,
6969
}
70-
startTime := d.timeNow()
70+
startTime := d.opts.private.timeNow()
7171
d.opts.EventListener.DownloadBegin(info)
7272

7373
for info.RestartCount = 0; ; info.RestartCount++ {
7474
tasks := d.createDownloadTasks(spans)
75-
info.Duration = d.timeNow().Sub(startTime)
75+
info.Duration = d.opts.private.timeNow().Sub(startTime)
7676
if len(tasks) == 0 {
7777
// We are done.
7878
info.Done = true
@@ -96,7 +96,7 @@ func (d *DB) Download(ctx context.Context, spans []DownloadSpan) error {
9696

9797
if err != nil {
9898
info.Err = err
99-
info.Duration = d.timeNow().Sub(startTime)
99+
info.Duration = d.opts.private.timeNow().Sub(startTime)
100100
d.opts.EventListener.DownloadEnd(info)
101101
return err
102102
}
@@ -446,7 +446,8 @@ func (d *DB) tryLaunchDownloadForFile(
446446

447447
download.numLaunchedDownloads++
448448
doneCh = make(chan error, 1)
449-
c := newCompaction(pc, d.opts, d.timeNow(), d.objProvider, noopGrantHandle{}, d.shouldCreateShared(pc.outputLevel.level), d.determineCompactionValueSeparation)
449+
c := newCompaction(pc, d.opts, d.opts.private.timeNow(), d.objProvider, noopGrantHandle{},
450+
d.shouldCreateShared(pc.outputLevel.level), d.determineCompactionValueSeparation)
450451
c.isDownload = true
451452
d.mu.compact.downloadingCount++
452453
c.AddInProgressLocked(d)

event_listener_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,16 @@ func TestEventListener(t *testing.T) {
7777
L0CompactionThreshold: 10,
7878
WALDir: "wal",
7979
}
80+
t := time.Now()
81+
opts.private.timeNow = func() time.Time {
82+
t = t.Add(time.Second)
83+
return t
84+
}
8085
var err error
8186
d, err = Open("db", opts)
8287
if err != nil {
8388
return err.Error()
8489
}
85-
t := time.Now()
86-
d.timeNow = func() time.Time {
87-
t = t.Add(time.Second)
88-
return t
89-
}
9090
d.opts.private.testingAlwaysWaitForCleanup = true
9191
return memLog.String()
9292

open.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"slices"
1616
"sync"
1717
"sync/atomic"
18-
"time"
1918

2019
"github.com/cockroachdb/crlib/crtime"
2120
"github.com/cockroachdb/errors"
@@ -253,17 +252,15 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
253252
d.mu.versions.logSeqNum.Store(base.SeqNumStart)
254253
d.mu.formatVers.vers.Store(uint64(formatVersion))
255254
d.mu.formatVers.marker = rs.fmvMarker
256-
257-
d.timeNow = time.Now
258-
d.openedAt = d.timeNow()
255+
d.openedAt = d.opts.private.timeNow()
259256

260257
d.mu.Lock()
261258
defer d.mu.Unlock()
262259

263260
jobID := d.newJobIDLocked()
264261

265262
blobRewriteHeuristic := manifest.BlobRewriteHeuristic{
266-
CurrentTime: d.timeNow,
263+
CurrentTime: d.opts.private.timeNow,
267264
MinimumAge: opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
268265
}
269266

options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,10 @@ type Options struct {
12141214
// before calling DB.ingestApply.
12151215
testingBeforeIngestApplyFunc func()
12161216

1217+
// timeNow returns the current time. It defaults to time.Now. It's
1218+
// configurable here so that tests can mock the current time.
1219+
timeNow func() time.Time
1220+
12171221
// fsCloser holds a closer that should be invoked after a DB using these
12181222
// Options is closed. This is used to automatically stop the
12191223
// long-running goroutine associated with the disk-health-checking FS.
@@ -1683,6 +1687,9 @@ func (o *Options) EnsureDefaults() {
16831687
if o.Experimental.VirtualTableRewriteUnreferencedFraction == nil {
16841688
o.Experimental.VirtualTableRewriteUnreferencedFraction = func() float64 { return defaultVirtualTableUnreferencedFraction }
16851689
}
1690+
if o.private.timeNow == nil {
1691+
o.private.timeNow = time.Now
1692+
}
16861693
// TODO(jackson): Enable value separation by default once we have confidence
16871694
// in a default policy.
16881695

testdata/event_listener

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ close: db/marker.manifest.000002.MANIFEST-000006
147147
remove: db/marker.manifest.000001.MANIFEST-000001
148148
sync: db
149149
[JOB 3] MANIFEST created 000006
150-
[JOB 3] flushed 1 memtable (100B) to L0 [000005] (657B), in 1.0s (2.0s total), output rate 657B/s
150+
[JOB 3] flushed 1 memtable (100B) to L0 [000005] (657B), in 1.0s (3.0s total), output rate 657B/s
151151

152152
compact
153153
----
@@ -172,7 +172,7 @@ close: db/marker.manifest.000003.MANIFEST-000009
172172
remove: db/marker.manifest.000002.MANIFEST-000006
173173
sync: db
174174
[JOB 5] MANIFEST created 000009
175-
[JOB 5] flushed 1 memtable (100B) to L0 [000008] (657B), in 1.0s (2.0s total), output rate 657B/s
175+
[JOB 5] flushed 1 memtable (100B) to L0 [000008] (657B), in 1.0s (3.0s total), output rate 657B/s
176176
remove: db/MANIFEST-000001
177177
[JOB 5] MANIFEST deleted 000001
178178
[JOB 6] compacting(default) L0 [000005 000008] (1.3KB) Score=0.00 + L6 [] (0B) Score=0.00; OverlappingRatio: Single 0.00, Multi 0.00
@@ -206,7 +206,7 @@ close: db/marker.manifest.000004.MANIFEST-000011
206206
remove: db/marker.manifest.000003.MANIFEST-000009
207207
sync: db
208208
[JOB 6] MANIFEST created 000011
209-
[JOB 6] compacted(default) L0 [000005 000008] (1.3KB) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [000010] (652B), in 1.0s (3.0s total), output rate 652B/s
209+
[JOB 6] compacted(default) L0 [000005 000008] (1.3KB) Score=0.00 + L6 [] (0B) Score=0.00 -> L6 [000010] (652B), in 1.0s (4.0s total), output rate 652B/s
210210
close: db/000005.sst
211211
close: db/000008.sst
212212
remove: db/MANIFEST-000006
@@ -242,7 +242,7 @@ close: db/marker.manifest.000005.MANIFEST-000014
242242
remove: db/marker.manifest.000004.MANIFEST-000011
243243
sync: db
244244
[JOB 8] MANIFEST created 000014
245-
[JOB 8] flushed 1 memtable (100B) to L0 [000013] (657B), in 1.0s (2.0s total), output rate 657B/s
245+
[JOB 8] flushed 1 memtable (100B) to L0 [000013] (657B), in 1.0s (3.0s total), output rate 657B/s
246246

247247
enable-file-deletions
248248
----
@@ -401,7 +401,7 @@ sync-data: db/000022.sst
401401
close: db/000022.sst
402402
sync: db
403403
sync: db/MANIFEST-000016
404-
[JOB 15] flushed 1 memtable (100B) to L0 [000022] (657B), in 1.0s (2.0s total), output rate 657B/s
404+
[JOB 15] flushed 1 memtable (100B) to L0 [000022] (657B), in 1.0s (3.0s total), output rate 657B/s
405405
[JOB 16] flushing 2 ingested tables
406406
create: db/MANIFEST-000023
407407
close: db/MANIFEST-000016
@@ -412,7 +412,7 @@ close: db/marker.manifest.000007.MANIFEST-000023
412412
remove: db/marker.manifest.000006.MANIFEST-000016
413413
sync: db
414414
[JOB 16] MANIFEST created 000023
415-
[JOB 16] flushed 2 ingested flushables L0:000017 (652B) + L6:000018 (652B) in 1.0s (2.0s total), output rate 1.3KB/s
415+
[JOB 16] flushed 2 ingested flushables L0:000017 (652B) + L6:000018 (652B) in 1.0s (3.0s total), output rate 1.3KB/s
416416
remove: db/MANIFEST-000014
417417
[JOB 16] MANIFEST deleted 000014
418418
[JOB 17] flushing 1 memtable (100B) to L0

0 commit comments

Comments
 (0)