Skip to content

Commit d4618d5

Browse files
committed
db: periodically recheck the memtable write stall condition
The period is set to 1s, by writing at that interval to the condition variable. A future PR will make Options.MemTableStopWritesThreshold dynamic.
1 parent d187376 commit d4618d5

File tree

2 files changed

+87
-4
lines changed

2 files changed

+87
-4
lines changed

db.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2341,6 +2341,7 @@ func (d *DB) maybeInduceWriteStall(b *Batch) {
23412341
// This function will call EventListener.WriteStallBegin at most once. If
23422342
// it does call it, it will call EventListener.WriteStallEnd once before
23432343
// returning.
2344+
var timer *time.Timer
23442345
for {
23452346
var size uint64
23462347
for i := range d.mu.mem.queue {
@@ -2357,7 +2358,18 @@ func (d *DB) maybeInduceWriteStall(b *Batch) {
23572358
})
23582359
}
23592360
beforeWait := crtime.NowMono()
2361+
// NB: In a rare case, we can start a write stall, and then the system
2362+
// may detect WAL failover, resulting in
2363+
// ElevateWriteStallThresholdForFailover returning true. So we want to
2364+
// recheck the predicate periodically, which we do by signaling the
2365+
// condition variable.
2366+
if timer == nil {
2367+
timer = time.AfterFunc(time.Second, d.mu.compact.cond.Broadcast)
2368+
} else {
2369+
timer.Reset(time.Second)
2370+
}
23602371
d.mu.compact.cond.Wait()
2372+
timer.Stop()
23612373
if b != nil {
23622374
b.commitStats.MemTableWriteStallDuration += beforeWait.Elapsed()
23632375
}

db_test.go

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/datadriven"
2626
"github.com/cockroachdb/errors"
2727
"github.com/cockroachdb/pebble/internal/base"
28+
"github.com/cockroachdb/pebble/internal/buildtags"
2829
"github.com/cockroachdb/pebble/internal/cache"
2930
"github.com/cockroachdb/pebble/internal/testkeys"
3031
"github.com/cockroachdb/pebble/internal/testutils"
@@ -2160,6 +2161,8 @@ func TestRecycleLogs(t *testing.T) {
21602161

21612162
type sstAndLogFileBlockingFS struct {
21622163
vfs.FS
2164+
blockWAL bool
2165+
blockSST bool
21632166
unblocker sync.WaitGroup
21642167
}
21652168

@@ -2168,7 +2171,8 @@ var _ vfs.FS = &sstAndLogFileBlockingFS{}
21682171
func (fs *sstAndLogFileBlockingFS) Create(
21692172
name string, category vfs.DiskWriteCategory,
21702173
) (vfs.File, error) {
2171-
if strings.HasSuffix(name, ".log") || strings.HasSuffix(name, ".sst") {
2174+
if (strings.HasSuffix(name, ".log") && fs.blockWAL) ||
2175+
(strings.HasSuffix(name, ".sst") && fs.blockSST) {
21722176
fs.unblocker.Wait()
21732177
}
21742178
return fs.FS.Create(name, category)
@@ -2178,16 +2182,16 @@ func (fs *sstAndLogFileBlockingFS) unblock() {
21782182
fs.unblocker.Done()
21792183
}
21802184

2181-
func newBlockingFS(fs vfs.FS) *sstAndLogFileBlockingFS {
2182-
lfbfs := &sstAndLogFileBlockingFS{FS: fs}
2185+
func newBlockingFS(fs vfs.FS, blockWAL, blockSST bool) *sstAndLogFileBlockingFS {
2186+
lfbfs := &sstAndLogFileBlockingFS{FS: fs, blockWAL: blockWAL, blockSST: blockSST}
21832187
lfbfs.unblocker.Add(1)
21842188
return lfbfs
21852189
}
21862190

21872191
func TestWALFailoverAvoidsWriteStall(t *testing.T) {
21882192
mem := vfs.NewMem()
21892193
// All sst and log creation is blocked.
2190-
primaryFS := newBlockingFS(mem)
2194+
primaryFS := newBlockingFS(mem, true /*blockWAL*/, true /*blockSST*/)
21912195
// Secondary for WAL failover can do log creation.
21922196
secondary := wal.Dir{FS: mem, Dirname: "secondary"}
21932197
walFailover := &WALFailoverOptions{Secondary: secondary, FailoverOptions: wal.FailoverOptions{
@@ -2221,6 +2225,73 @@ func TestWALFailoverAvoidsWriteStall(t *testing.T) {
22212225
primaryFS.unblock()
22222226
}
22232227

2228+
type testLogManager struct {
2229+
wal.Manager
2230+
elevateWriteStallThreshold atomic.Bool
2231+
}
2232+
2233+
func (tlm *testLogManager) ElevateWriteStallThresholdForFailover() bool {
2234+
return tlm.elevateWriteStallThreshold.Load()
2235+
}
2236+
2237+
func TestElevateThresholdAfterWriteStallUnblocksStall(t *testing.T) {
2238+
mem := vfs.NewMem()
2239+
// All sst writes are blocked.
2240+
blockingFS := newBlockingFS(mem, false /*blockWAL*/, true /*blockSST*/)
2241+
writeStallBeginCh := make(chan struct{}, 1)
2242+
el := EventListener{
2243+
WriteStallBegin: func(_ WriteStallBeginInfo) {
2244+
writeStallBeginCh <- struct{}{}
2245+
},
2246+
}
2247+
o := &Options{
2248+
FS: blockingFS,
2249+
MemTableSize: 4 << 20,
2250+
MemTableStopWritesThreshold: 2,
2251+
Logger: testutils.Logger{T: t},
2252+
EventListener: &el,
2253+
}
2254+
d, err := Open("", o)
2255+
// Replace the log manager with one that can elevate the write stall threshold.
2256+
d.mu.Lock()
2257+
testWALManager := &testLogManager{Manager: d.mu.log.manager}
2258+
d.mu.log.manager = testWALManager
2259+
d.mu.Unlock()
2260+
require.NoError(t, err)
2261+
value := make([]byte, 1<<20)
2262+
for i := range value {
2263+
value[i] = byte(rand.Uint32())
2264+
}
2265+
go func() {
2266+
// Wait for write stall to begin.
2267+
<-writeStallBeginCh
2268+
t.Logf("write stall has begun")
2269+
testWALManager.elevateWriteStallThreshold.Store(true)
2270+
}()
2271+
done := make(chan struct{})
2272+
go func() {
2273+
// After ~8 writes, the default write stall threshold is exceeded.
2274+
// It is observed by the above goroutine, which removes the stall.
2275+
for i := 0; i < 200; i++ {
2276+
require.NoError(t, d.Set([]byte(fmt.Sprintf("%d", i)), value, nil))
2277+
}
2278+
done <- struct{}{}
2279+
}()
2280+
timeout := 15 * time.Second
2281+
if buildtags.SlowBuild {
2282+
timeout = time.Minute
2283+
}
2284+
select {
2285+
case <-time.After(timeout):
2286+
t.Fatalf("write stall did not terminate")
2287+
case <-done:
2288+
}
2289+
require.True(t, testWALManager.elevateWriteStallThreshold.Load())
2290+
// Unblock the writes to allow the DB to close.
2291+
blockingFS.unblock()
2292+
require.NoError(t, d.Close())
2293+
}
2294+
22242295
// TestDeterminism is a datadriven test intended to validate determinism of
22252296
// operations in the face of concurrency or randomizing of operations. The test
22262297
// data defines a sequence of commands run sequentially. Then the test may

0 commit comments

Comments
 (0)