Skip to content

Commit fc0be3c

Browse files
committed
*: use (*sync.WaitGroup).Go
Update uses of WaitGroup to use the new (Go 1.25+) Go method to prevent accidental misuse and simplify call sites.
1 parent 53346e5 commit fc0be3c

29 files changed

+141
-248
lines changed

checkpoint_test.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -291,35 +291,31 @@ func TestCheckpointCompaction(t *testing.T) {
291291
d, err := Open("", &Options{FS: fs, Logger: testutils.Logger{T: t}})
292292
require.NoError(t, err)
293293

294-
ctx, cancel := context.WithCancel(context.Background())
294+
ctx, cancel := context.WithCancel(t.Context())
295295

296296
var wg sync.WaitGroup
297-
wg.Add(4)
298-
go func() {
297+
wg.Go(func() {
299298
defer cancel()
300-
defer wg.Done()
301299
for i := 0; ctx.Err() == nil; i++ {
302300
if err := d.Set([]byte(fmt.Sprintf("key%06d", i)), nil, nil); err != nil {
303301
t.Error(err)
304302
return
305303
}
306304
}
307-
}()
308-
go func() {
305+
})
306+
wg.Go(func() {
309307
defer cancel()
310-
defer wg.Done()
311308
for ctx.Err() == nil {
312-
if err := d.Compact(context.Background(), []byte("key"), []byte("key999999"), false); err != nil {
309+
if err := d.Compact(t.Context(), []byte("key"), []byte("key999999"), false); err != nil {
313310
t.Error(err)
314311
return
315312
}
316313
}
317-
}()
314+
})
318315
check := make(chan string, 100)
319-
go func() {
316+
wg.Go(func() {
320317
defer cancel()
321318
defer close(check)
322-
defer wg.Done()
323319
for i := 0; ctx.Err() == nil && i < 200; i++ {
324320
dir := fmt.Sprintf("checkpoint%06d", i)
325321
if err := d.Checkpoint(dir); err != nil {
@@ -332,11 +328,10 @@ func TestCheckpointCompaction(t *testing.T) {
332328
case check <- dir:
333329
}
334330
}
335-
}()
336-
go func() {
331+
})
332+
wg.Go(func() {
337333
opts := &Options{FS: fs, Logger: testutils.Logger{T: t}}
338334
defer cancel()
339-
defer wg.Done()
340335
for dir := range check {
341336
d2, err := Open(dir, opts)
342337
if err != nil {
@@ -362,7 +357,7 @@ func TestCheckpointCompaction(t *testing.T) {
362357
return
363358
}
364359
}
365-
}()
360+
})
366361
<-ctx.Done()
367362
wg.Wait()
368363
require.NoError(t, d.Close())

cmd/pebble/fsbench.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -612,13 +612,10 @@ func (bench *fsBench) init(wg *sync.WaitGroup) {
612612
fmt.Println("Running benchmark:", bench.name)
613613
fmt.Println("Description:", bench.description)
614614

615-
wg.Add(1)
616-
go bench.execute(wg)
615+
wg.Go(bench.execute)
617616
}
618617

619-
func (bench *fsBench) execute(wg *sync.WaitGroup) {
620-
defer wg.Done()
621-
618+
func (bench *fsBench) execute() {
622619
latencyHist := bench.reg.Register(bench.name)
623620

624621
for {

cmd/pebble/queue.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,7 @@ func queueTest() (test, *atomic.Int64) {
6262
}
6363

6464
limiter := maxOpsPerSec.newRateLimiter()
65-
wg.Add(1)
66-
go func() {
67-
defer wg.Done()
68-
65+
wg.Go(func() {
6966
for i := queueConfig.size; ; i++ {
7067
idx := i % queueConfig.size
7168

@@ -94,7 +91,7 @@ func queueTest() (test, *atomic.Int64) {
9491
wait(limiter)
9592
ops.Add(1)
9693
}
97-
}()
94+
})
9895
},
9996
tick: func(elapsed time.Duration, i int) {
10097
if i%20 == 0 {

cmd/pebble/scan.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,8 @@ func runScan(cmd *cobra.Command, args []string) {
9090

9191
limiter := maxOpsPerSec.newRateLimiter()
9292

93-
wg.Add(concurrency)
94-
for i := 0; i < concurrency; i++ {
95-
go func(i int) {
96-
defer wg.Done()
97-
93+
for i := range concurrency {
94+
wg.Go(func() {
9895
rng := rand.New(rand.NewPCG(0, uint64(i)))
9996
startKeyBuf := append(make([]byte, 0, 64), []byte("key-")...)
10097
endKeyBuf := append(make([]byte, 0, 64), []byte("key-")...)
@@ -123,7 +120,7 @@ func runScan(cmd *cobra.Command, args []string) {
123120
bytes.Add(nbytes)
124121
scanned.Add(int64(count))
125122
}
126-
}(i)
123+
})
127124
}
128125
},
129126

cmd/pebble/sync.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,9 @@ func runSync(cmd *cobra.Command, args []string) {
6161
init: func(d DB, wg *sync.WaitGroup) {
6262
limiter := maxOpsPerSec.newRateLimiter()
6363

64-
wg.Add(concurrency)
65-
for i := 0; i < concurrency; i++ {
64+
for range concurrency {
6665
latency := reg.Register("ops")
67-
go func() {
68-
defer wg.Done()
69-
66+
wg.Go(func() {
7067
rand := rand.New(rand.NewPCG(0, uint64(time.Now().UnixNano())))
7168
var raw []byte
7269
var buf []byte
@@ -78,7 +75,7 @@ func runSync(cmd *cobra.Command, args []string) {
7875
b := d.NewBatch()
7976
var n uint64
8077
count := int(batchDist.Uint64(rand))
81-
for j := 0; j < count; j++ {
78+
for range count {
8279
block = syncConfig.values.Bytes(rand, block)
8380

8481
if syncConfig.walOnly {
@@ -101,7 +98,7 @@ func runSync(cmd *cobra.Command, args []string) {
10198
latency.Record(time.Since(start))
10299
bytes.Add(n)
103100
}
104-
}()
101+
})
105102
}
106103
},
107104

cmd/pebble/write_bench.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,7 @@ func runWriteBenchmark(_ *cobra.Command, args []string) error {
224224
// take a material amount of time. Instead, pause the
225225
// writers in parallel in the background, and wait for all
226226
// to complete before continuing.
227-
wg.Add(1)
228-
go func(writer *pauseWriter) {
229-
writer.pause()
230-
wg.Done()
231-
}(w)
227+
wg.Go(func() { w.pause() })
232228
}
233229
wg.Wait()
234230

cmd/pebble/ycsb.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -365,23 +365,18 @@ func (y *ycsb) init(db DB, wg *sync.WaitGroup) {
365365

366366
y.limiter = maxOpsPerSec.newRateLimiter()
367367

368-
wg.Add(concurrency)
369-
370368
// If this workload doesn't produce reads, sample the worst case read-amp
371369
// from Metrics() periodically.
372370
if y.weights.get(ycsbRead) == 0 && y.weights.get(ycsbScan) == 0 && y.weights.get(ycsbReverseScan) == 0 {
373-
wg.Add(1)
374-
go y.sampleReadAmp(db, wg)
371+
wg.Go(func() { y.sampleReadAmp(db) })
375372
}
376373

377-
for i := 0; i < concurrency; i++ {
378-
go y.run(db, wg)
374+
for range concurrency {
375+
wg.Go(func() { y.run(db) })
379376
}
380377
}
381378

382-
func (y *ycsb) run(db DB, wg *sync.WaitGroup) {
383-
defer wg.Done()
384-
379+
func (y *ycsb) run(db DB) {
385380
var latency [ycsbNumOps]*namedHistogram
386381
for name, op := range y.opsMap {
387382
latency[op] = y.reg.Register(name)
@@ -418,9 +413,7 @@ func (y *ycsb) run(db DB, wg *sync.WaitGroup) {
418413
}
419414
}
420415

421-
func (y *ycsb) sampleReadAmp(db DB, wg *sync.WaitGroup) {
422-
defer wg.Done()
423-
416+
func (y *ycsb) sampleReadAmp(db DB) {
424417
ticker := time.NewTicker(time.Second)
425418
defer ticker.Stop()
426419
for range ticker.C {

commit_test.go

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,12 @@ func TestCommitPipeline(t *testing.T) {
9898
}
9999

100100
var wg sync.WaitGroup
101-
wg.Add(n)
102-
for i := 0; i < n; i++ {
103-
go func(i int) {
104-
defer wg.Done()
101+
for i := range n {
102+
wg.Go(func() {
105103
var b Batch
106104
_ = b.Set([]byte(fmt.Sprint(i)), nil, nil)
107105
_ = p.Commit(&b, false, false)
108-
}(i)
106+
})
109107
}
110108
wg.Wait()
111109

@@ -141,17 +139,15 @@ func TestCommitPipelineSync(t *testing.T) {
141139
e.queueSemChan = p.logSyncQSem
142140

143141
var wg sync.WaitGroup
144-
wg.Add(n)
145-
for i := 0; i < n; i++ {
146-
go func(i int) {
147-
defer wg.Done()
142+
for i := range n {
143+
wg.Go(func() {
148144
var b Batch
149145
require.NoError(t, b.Set([]byte(fmt.Sprint(i)), nil, nil))
150146
require.NoError(t, p.Commit(&b, true, noSyncWait))
151147
if noSyncWait {
152148
require.NoError(t, b.SyncWait())
153149
}
154-
}(i)
150+
})
155151
}
156152
wg.Wait()
157153
if s := e.writeCount.Load(); uint64(n) != s {
@@ -340,19 +336,16 @@ func TestCommitPipelineLogDataSeqNum(t *testing.T) {
340336
p := newCommitPipeline(testEnv)
341337

342338
var wg sync.WaitGroup
343-
wg.Add(2)
344-
go func() {
345-
defer wg.Done()
339+
wg.Go(func() {
346340
b := &Batch{}
347341
require.NoError(t, b.Set([]byte("foo"), []byte("bar"), nil))
348342
require.NoError(t, p.Commit(b, false /* sync */, false))
349-
}()
350-
go func() {
351-
defer wg.Done()
343+
})
344+
wg.Go(func() {
352345
b := &Batch{}
353346
require.NoError(t, b.LogData([]byte("foo"), nil))
354347
require.NoError(t, p.Commit(b, false /* sync */, false))
355-
}()
348+
})
356349
wg.Wait()
357350
}
358351

compaction_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3412,9 +3412,7 @@ func TestCompactionCorruption(t *testing.T) {
34123412
startWorkload := func(minKey, maxKey byte) (stop func()) {
34133413
var shouldStop atomic.Bool
34143414
var wg sync.WaitGroup
3415-
wg.Add(1)
3416-
go func() {
3417-
defer wg.Done()
3415+
wg.Go(func() {
34183416
var valSeed [32]byte
34193417
for i := range valSeed {
34203418
valSeed[i] = byte(rand.Uint32())
@@ -3443,7 +3441,7 @@ func TestCompactionCorruption(t *testing.T) {
34433441
panic(err)
34443442
}
34453443
}
3446-
}()
3444+
})
34473445
return func() {
34483446
shouldStop.Store(true)
34493447
wg.Wait()

db_test.go

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,10 +1161,8 @@ func TestDBConcurrentCommitCompactFlush(t *testing.T) {
11611161
// those operations.
11621162
const n = 1000
11631163
var wg sync.WaitGroup
1164-
wg.Add(n)
1165-
for i := 0; i < n; i++ {
1166-
go func(i int) {
1167-
defer wg.Done()
1164+
for i := range n {
1165+
wg.Go(func() {
11681166
_ = d.Set([]byte(fmt.Sprint(i)), nil, nil)
11691167
var err error
11701168
switch i % 3 {
@@ -1176,7 +1174,7 @@ func TestDBConcurrentCommitCompactFlush(t *testing.T) {
11761174
_, err = d.AsyncFlush()
11771175
}
11781176
require.NoError(t, err)
1179-
}(i)
1177+
})
11801178
}
11811179
wg.Wait()
11821180

@@ -2495,12 +2493,10 @@ type parallel []orderingNode
24952493

24962494
func (p parallel) visit(fn func(int)) {
24972495
var wg sync.WaitGroup
2498-
wg.Add(len(p))
24992496
for i := range p {
2500-
go func(i int) {
2501-
defer wg.Done()
2497+
wg.Go(func() {
25022498
p[i].visit(fn)
2503-
}(i)
2499+
})
25042500
}
25052501
wg.Wait()
25062502
}
@@ -2617,7 +2613,7 @@ func TestLoadBlockSema(t *testing.T) {
26172613
}
26182614

26192615
// Read all regions to warm up the file cache.
2620-
for i := 0; i < numRegions; i++ {
2616+
for i := range numRegions {
26212617
val, closer, err := db.Get(key(i, 1))
26222618
require.NoError(t, err)
26232619
require.Equal(t, []byte("value"), val)
@@ -2633,12 +2629,10 @@ func TestLoadBlockSema(t *testing.T) {
26332629
var wg sync.WaitGroup
26342630
// Spin up workers that perform random reads.
26352631
const numWorkers = 20
2636-
for i := 0; i < numWorkers; i++ {
2637-
wg.Add(1)
2638-
go func() {
2639-
defer wg.Done()
2632+
for range numWorkers {
2633+
wg.Go(func() {
26402634
const numQueries = 100
2641-
for i := 0; i < numQueries; i++ {
2635+
for range numQueries {
26422636
val, closer, err := db.Get(key(rand.IntN(numRegions), rand.IntN(numKeys)))
26432637
require.NoError(t, err)
26442638
require.Equal(t, []byte("value"), val)
@@ -2647,7 +2641,7 @@ func TestLoadBlockSema(t *testing.T) {
26472641
}
26482642
runtime.Gosched()
26492643
}
2650-
}()
2644+
})
26512645
}
26522646
wg.Wait()
26532647
// Verify the maximum read count did not exceed the limit.

0 commit comments

Comments
 (0)