Skip to content

Commit 2c2ad4d

Browse files
committed
wal: deflake TestManagerFailover
The invocation of the new segmentClosed callback introduced in #5388 occurs asynchronously with respect to the manager and the progression through logical log numbers. This test was flaky in two ways: If the segmentClosed callback was invoked /before/ the writerClosed callback for the same WAL, writerClosed would append a second record for the same logical log containing the set of WALs other than the one inserted by the segmentClosed callback. Conversely, if the segmentClosed callback was sufficiently delayed relative to the closing of the writer, it might not be invoked until after the test listed the set of obsolete logs. The first race is fixed by a refactoring of the segmentClosed and writerClosed callbacks, adapting them to share the same logic for merging logs. The second race is fixed through use of (datadriven.TestData).Retry to account for the nondeterminism. Fix #5401.
1 parent db1725d commit 2c2ad4d

File tree

5 files changed

+63
-53
lines changed

5 files changed

+63
-53
lines changed

wal/failover_manager.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,10 @@ type segmentWithSizeEtc struct {
440440
synchronouslyClosed bool
441441
}
442442

443+
func cmpSegmentWithSizeEtc(a, b segmentWithSizeEtc) int {
444+
return cmp.Compare(a.segment.logNameIndex, b.segment.logNameIndex)
445+
}
446+
443447
type failoverManager struct {
444448
opts Options
445449
// initialObsolete holds the set of DeletableLogs that formed the logs
@@ -655,41 +659,41 @@ func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
655659
return wm.monitor.elevateWriteStallThresholdForFailover()
656660
}
657661

662+
// writerClosed is called by the failoverWriter; see
663+
// failoverWriterOpts.writerClosed.
658664
func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
659665
wm.monitor.noWriter()
660666
wm.mu.Lock()
661667
defer wm.mu.Unlock()
662-
wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
668+
wm.recordClosedWALLocked(llse)
663669
wm.mu.ww = nil
664670
}
665671

666672
// segmentClosed is called by the failoverWriter; see
667673
// failoverWriterOpts.segmentClosed.
668-
func (wm *failoverManager) segmentClosed(num NumWAL, s segmentWithSizeEtc) {
674+
func (wm *failoverManager) segmentClosed(llse logicalLogWithSizesEtc) {
669675
wm.mu.Lock()
670676
defer wm.mu.Unlock()
677+
wm.recordClosedWALLocked(llse)
678+
}
679+
680+
func (wm *failoverManager) recordClosedWALLocked(llse logicalLogWithSizesEtc) {
671681
// Find the closed WAL matching the logical WAL num, if one exists. If we
672-
// find one, we append the segment to the list of segments if it's not
673-
// already there.
674-
i, found := slices.BinarySearchFunc(wm.mu.closedWALs, num, func(llse logicalLogWithSizesEtc, num NumWAL) int {
675-
return cmp.Compare(llse.num, num)
676-
})
677-
if found {
678-
segmentIndex, segmentFound := slices.BinarySearchFunc(wm.mu.closedWALs[i].segments, s.segment.logNameIndex,
679-
func(s segmentWithSizeEtc, logNameIndex LogNameIndex) int {
680-
return cmp.Compare(s.segment.logNameIndex, logNameIndex)
681-
})
682-
if !segmentFound {
683-
wm.mu.closedWALs[i].segments = slices.Insert(wm.mu.closedWALs[i].segments, segmentIndex, s)
684-
}
682+
// find one, we merge the segments into the existing list.
683+
i, found := slices.BinarySearchFunc(wm.mu.closedWALs, llse.num,
684+
func(llse logicalLogWithSizesEtc, num NumWAL) int {
685+
return cmp.Compare(llse.num, num)
686+
})
687+
if !found {
688+
// If we didn't find an existing entry in closedWALs for the provided
689+
// NumWAL, append a new entry.
690+
wm.mu.closedWALs = slices.Insert(wm.mu.closedWALs, i, llse)
685691
return
686692
}
687-
// If we didn't find an existing entry in closedWALs for the provided
688-
// NumWAL, append a new entry.
689-
wm.mu.closedWALs = slices.Insert(wm.mu.closedWALs, i, logicalLogWithSizesEtc{
690-
num: num,
691-
segments: []segmentWithSizeEtc{s},
692-
})
693+
wm.mu.closedWALs[i].segments = append(wm.mu.closedWALs[i].segments, llse.segments...)
694+
slices.SortFunc(wm.mu.closedWALs[i].segments, cmpSegmentWithSizeEtc)
695+
wm.mu.closedWALs[i].segments = slices.CompactFunc(wm.mu.closedWALs[i].segments,
696+
func(a, b segmentWithSizeEtc) bool { return a.logNameIndex == b.logNameIndex })
693697
}
694698

695699
// Stats implements Manager.

wal/failover_manager_test.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -427,27 +427,29 @@ func TestManagerFailover(t *testing.T) {
427427
return b.String()
428428

429429
case "list-and-stats":
430-
logs := fm.List()
431-
stats := fm.Stats()
432-
var b strings.Builder
433-
if len(logs) > 0 {
434-
fmt.Fprintf(&b, "logs:\n")
435-
for _, f := range logs {
436-
fmt.Fprintf(&b, " %s\n", f.String())
430+
return td.Retry(t, func() string {
431+
logs := fm.List()
432+
stats := fm.Stats()
433+
var b strings.Builder
434+
if len(logs) > 0 {
435+
fmt.Fprintf(&b, "logs:\n")
436+
for _, f := range logs {
437+
fmt.Fprintf(&b, " %s\n", f.String())
438+
}
437439
}
438-
}
439-
fmt.Fprintf(&b, "stats:\n")
440-
fmt.Fprintf(&b, " obsolete: count %d size %d\n", stats.ObsoleteFileCount, stats.ObsoleteFileSize)
441-
fmt.Fprintf(&b, " live: count %d size %d\n", stats.LiveFileCount, stats.LiveFileSize)
442-
fmt.Fprintf(&b, " failover: switches %d pri-dur %s sec-dur %s\n", stats.Failover.DirSwitchCount,
443-
stats.Failover.PrimaryWriteDuration.String(), stats.Failover.SecondaryWriteDuration.String())
444-
var latencyProto io_prometheus_client.Metric
445-
stats.Failover.FailoverWriteAndSyncLatency.Write(&latencyProto)
446-
latencySampleCount := *latencyProto.Histogram.SampleCount
447-
if latencySampleCount > 0 {
448-
fmt.Fprintf(&b, " latency sample count: %d\n", latencySampleCount)
449-
}
450-
return b.String()
440+
fmt.Fprintf(&b, "stats:\n")
441+
fmt.Fprintf(&b, " obsolete: count %d size %d\n", stats.ObsoleteFileCount, stats.ObsoleteFileSize)
442+
fmt.Fprintf(&b, " live: count %d size %d\n", stats.LiveFileCount, stats.LiveFileSize)
443+
fmt.Fprintf(&b, " failover: switches %d pri-dur %s sec-dur %s\n", stats.Failover.DirSwitchCount,
444+
stats.Failover.PrimaryWriteDuration.String(), stats.Failover.SecondaryWriteDuration.String())
445+
var latencyProto io_prometheus_client.Metric
446+
stats.Failover.FailoverWriteAndSyncLatency.Write(&latencyProto)
447+
latencySampleCount := *latencyProto.Histogram.SampleCount
448+
if latencySampleCount > 0 {
449+
fmt.Fprintf(&b, " latency sample count: %d\n", latencySampleCount)
450+
}
451+
return b.String()
452+
})
451453

452454
case "write-record":
453455
var value string

wal/failover_writer.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ type failoverWriterOpts struct {
476476
// invoked. It's used to ensure that we reclaim all physical segment files,
477477
// including ones that did not complete creation before the Writer was
478478
// closed.
479-
segmentClosed func(NumWAL, segmentWithSizeEtc)
479+
segmentClosed func(logicalLogWithSizesEtc)
480480

481481
writerCreatedForTest chan<- struct{}
482482

@@ -702,13 +702,18 @@ func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
702702
// there's an obsolete segment file we should clean up. Note
703703
// that the file may be occupying non-negligible disk space even
704704
// though we never wrote to it due to preallocation.
705-
ww.opts.segmentClosed(ww.opts.wn, segmentWithSizeEtc{
706-
segment: segment{
707-
logNameIndex: LogNameIndex(writerIndex),
708-
dir: dir.Dir,
705+
ww.opts.segmentClosed(logicalLogWithSizesEtc{
706+
num: ww.opts.wn,
707+
segments: []segmentWithSizeEtc{
708+
{
709+
segment: segment{
710+
logNameIndex: LogNameIndex(writerIndex),
711+
dir: dir.Dir,
712+
},
713+
approxFileSize: initialFileSize,
714+
synchronouslyClosed: false,
715+
},
709716
},
710-
approxFileSize: initialFileSize,
711-
synchronouslyClosed: false,
712717
})
713718
})
714719
}

wal/failover_writer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func TestFailoverWriter(t *testing.T) {
285285
stopper: stopper,
286286
failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
287287
writerClosed: func(_ logicalLogWithSizesEtc) {},
288-
segmentClosed: func(_ NumWAL, _ segmentWithSizeEtc) {},
288+
segmentClosed: func(_ logicalLogWithSizesEtc) {},
289289
writerCreatedForTest: logWriterCreated,
290290
}, testDirs[dirIndex])
291291
require.NoError(t, err)
@@ -650,7 +650,7 @@ func TestConcurrentWritersWithManyRecords(t *testing.T) {
650650
stopper: stopper,
651651
failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
652652
writerClosed: func(_ logicalLogWithSizesEtc) {},
653-
segmentClosed: func(_ NumWAL, _ segmentWithSizeEtc) {},
653+
segmentClosed: func(_ logicalLogWithSizesEtc) {},
654654
writerCreatedForTest: logWriterCreated,
655655
}, dirs[dirIndex])
656656
require.NoError(t, err)
@@ -753,7 +753,7 @@ func TestFailoverWriterManyRecords(t *testing.T) {
753753
stopper: stopper,
754754
failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
755755
writerClosed: func(_ logicalLogWithSizesEtc) {},
756-
segmentClosed: func(_ NumWAL, _ segmentWithSizeEtc) {},
756+
segmentClosed: func(_ logicalLogWithSizesEtc) {},
757757
}, dir)
758758
require.NoError(t, err)
759759
var buf [1]byte

wal/testdata/manager_failover

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,7 @@ ok
290290
list-and-stats
291291
----
292292
logs:
293-
000001: {(sec,001)}
294-
000001: {(pri,002)}
293+
000001: {(sec,001), (pri,002)}
295294
stats:
296295
obsolete: count 0 size 0
297296
live: count 2 size 18

0 commit comments

Comments
 (0)