@@ -3156,15 +3156,13 @@ func TestCompactionCorruption(t *testing.T) {
3156
3156
now .Store (1 )
3157
3157
d .problemSpans .InitForTesting (manifest .NumLevels , d .cmp , func () crtime.Mono { return now .Load () })
3158
3158
3159
- var workloadWG sync.WaitGroup
3160
- var stopWorkload atomic.Bool
3161
- defer stopWorkload .Store (true )
3162
- startWorkload := func () {
3163
- stopWorkload .Store (false )
3164
- workloadWG .Add (1 )
3159
+ startWorkload := func (minKey , maxKey byte ) (stop func ()) {
3160
+ var shouldStop atomic.Bool
3161
+ var wg sync.WaitGroup
3162
+ wg .Add (1 )
3165
3163
go func () {
3166
- defer workloadWG .Done ()
3167
- for ! stopWorkload .Load () {
3164
+ defer wg .Done ()
3165
+ for ! shouldStop .Load () {
3168
3166
b := d .NewBatch ()
3169
3167
// Write a random key of the form a012345 and flush it. This will result
3170
3168
// in (mostly) non-overlapping tables in L0.
@@ -3174,7 +3172,7 @@ func TestCompactionCorruption(t *testing.T) {
3174
3172
}
3175
3173
v := make ([]byte , 1024 + rand .IntN (10240 ))
3176
3174
_ , _ = rand .NewChaCha8 (valSeed ).Read (v )
3177
- key := fmt .Sprintf ("%c%06d" , 'a' + byte (rand .IntN (int ('z' - 'a' + 1 ))), rand .IntN (1000000 ))
3175
+ key := fmt .Sprintf ("%c%06d" , minKey + byte (rand .IntN (int (maxKey - minKey + 1 ))), rand .IntN (1000000 ))
3178
3176
if err := b .Set ([]byte (key ), v , nil ); err != nil {
3179
3177
panic (err )
3180
3178
}
@@ -3187,9 +3185,19 @@ func TestCompactionCorruption(t *testing.T) {
3187
3185
time .Sleep (10 * time .Millisecond )
3188
3186
}
3189
3187
}()
3188
+ return func () {
3189
+ shouldStop .Store (true )
3190
+ wg .Wait ()
3191
+ }
3190
3192
}
3191
3193
3192
3194
datadriven .RunTest (t , "testdata/compaction_corruption" , func (t * testing.T , td * datadriven.TestData ) string {
3195
+ if arg , ok := td .Arg ("workload" ); ok {
3196
+ if len (arg .Vals ) != 2 || len (arg .Vals [0 ]) != 1 || len (arg .Vals [1 ]) != 1 {
3197
+ td .Fatalf (t , "workload argument must be of the form (a,z)" )
3198
+ }
3199
+ defer startWorkload (arg .Vals [0 ][0 ], arg .Vals [1 ][0 ])()
3200
+ }
3193
3201
// wait until fn() returns true.
3194
3202
wait := func (what string , fn func () bool ) {
3195
3203
const timeout = 2 * time .Minute
@@ -3230,13 +3238,6 @@ func TestCompactionCorruption(t *testing.T) {
3230
3238
require .NoError (t , writer .Close ())
3231
3239
return fmt .Sprintf ("%s -> %s" , before , after )
3232
3240
3233
- case "start-workload" :
3234
- startWorkload ()
3235
-
3236
- case "stop-workload" :
3237
- stopWorkload .Store (true )
3238
- workloadWG .Wait ()
3239
-
3240
3241
case "wait-for-problem-span" :
3241
3242
wait ("problem span" , func () bool {
3242
3243
return ! d .problemSpans .IsEmpty ()
0 commit comments