@@ -2843,38 +2843,48 @@ func TestCompactionErrorStats(t *testing.T) {
28432843func TestCompactionCorruption (t * testing.T ) {
28442844 mem := vfs .NewMem ()
28452845 var numFinishedCompactions atomic.Int32
2846+ var once sync.Once
28462847 opts := & Options {
28472848 FS : mem ,
28482849 FormatMajorVersion : FormatNewest ,
28492850 EventListener : & EventListener {
2851+ BackgroundError : func (error ) {},
28502852 DataCorruption : func (info DataCorruptionInfo ) {
28512853 if testing .Verbose () {
2852- fmt .Printf ("got expected data corruption: %s\n " , info .Path )
2854+ once .Do (func () { fmt .Printf ("got expected data corruption: %s\n " , info .Path ) })
2855+ }
2856+ },
2857+ CompactionBegin : func (info CompactionInfo ) {
2858+ if testing .Verbose () {
2859+ fmt .Printf ("%d: compaction begin (L%d)\n " , info .JobID , info .Output .Level )
28532860 }
28542861 },
28552862 CompactionEnd : func (info CompactionInfo ) {
2863+ if testing .Verbose () {
2864+ fmt .Printf ("%d: compaction end (L%d)\n " , info .JobID , info .Output .Level )
2865+ }
28562866 if info .Err == nil {
28572867 numFinishedCompactions .Add (1 )
28582868 }
28592869 },
28602870 },
2871+ L0CompactionThreshold : 1 ,
2872+ L0CompactionFileThreshold : 10 ,
28612873 }
28622874 opts .WithFSDefaults ()
28632875 remoteStorage := remote .NewInMem ()
28642876 opts .Experimental .RemoteStorage = remote .MakeSimpleFactory (map [remote.Locator ]remote.Storage {
28652877 "external-locator" : remoteStorage ,
28662878 })
2879+ opts .EnsureDefaults ()
2880+ opts .Levels [0 ].TargetFileSize = 8192
28672881 d , err := Open ("" , opts )
28682882 require .NoError (t , err )
28692883
28702884 var now crtime.AtomicMono
28712885 now .Store (1 )
28722886 d .problemSpans .InitForTesting (manifest .NumLevels , d .cmp , func () crtime.Mono { return now .Load () })
28732887
2874- randKey := func () []byte {
2875- return []byte {'a' + byte (rand .IntN (26 ))}
2876- }
2877-
28782888 var workloadWG sync.WaitGroup
28792889 var stopWorkload atomic.Bool
28802890 defer stopWorkload .Store (true )
@@ -2885,12 +2895,14 @@ func TestCompactionCorruption(t *testing.T) {
28852895 defer workloadWG .Done ()
28862896 for ! stopWorkload .Load () {
28872897 b := d .NewBatch ()
2888- v := make ([]byte , 100 + rand .IntN (1000 ))
2889- for i := range v {
2890- v [i ] = byte (rand .Uint32 ())
2891- }
2898+ // Write some random keys of the form a012345.
28922899 for i := 0 ; i < 100 ; i ++ {
2893- if err := b .Set (randKey (), v , nil ); err != nil {
2900+ v := make ([]byte , 100 + rand .IntN (100 ))
2901+ for i := range v {
2902+ v [i ] = byte (rand .Uint32 ())
2903+ }
2904+ key := fmt .Sprintf ("%c%06d" , 'a' + byte (rand .IntN (int ('z' - 'a' + 1 ))), rand .IntN (1000000 ))
2905+ if err := b .Set ([]byte (key ), v , nil ); err != nil {
28942906 panic (err )
28952907 }
28962908 }
@@ -2900,12 +2912,24 @@ func TestCompactionCorruption(t *testing.T) {
29002912 if err := d .Flush (); err != nil {
29012913 panic (err )
29022914 }
2903- time .Sleep (10 * time .Microsecond )
2915+ time .Sleep (10 * time .Millisecond )
29042916 }
29052917 }()
29062918 }
29072919
29082920 datadriven .RunTest (t , "testdata/compaction_corruption" , func (t * testing.T , td * datadriven.TestData ) string {
2921+ // wait until fn() returns true.
2922+ wait := func (what string , fn func () bool ) {
2923+ const timeout = 2 * time .Minute
2924+ start := time .Now ()
2925+ for ! fn () {
2926+ if time .Since (start ) > timeout {
2927+ td .Fatalf (t , "timeout waiting for %s\n %s\n " , what , d .DebugString ())
2928+ }
2929+ time .Sleep (10 * time .Millisecond )
2930+ }
2931+ }
2932+
29092933 switch td .Cmd {
29102934 case "build-remote" :
29112935 require .NoError (t , runBuildRemoteCmd (td , d , remoteStorage ))
@@ -2942,37 +2966,26 @@ func TestCompactionCorruption(t *testing.T) {
29422966 workloadWG .Wait ()
29432967
29442968 case "wait-for-problem-span" :
2945- timeout := time .Now ().Add (100 * time .Second )
2946- for d .problemSpans .IsEmpty () {
2947- if timeout .Before (time .Now ()) {
2948- td .Fatalf (t , "timeout waiting for problem span" )
2949- }
2950- time .Sleep (10 * time .Millisecond )
2951- }
2969+ wait ("problem span" , func () bool {
2970+ return ! d .problemSpans .IsEmpty ()
2971+ })
29522972 if testing .Verbose () {
29532973 fmt .Printf ("%s: wait-for-problem-span:\n %s" , td .Pos , d .problemSpans .String ())
29542974 }
29552975
29562976 case "wait-for-compactions" :
29572977 target := numFinishedCompactions .Load () + 5
2958- timeout := time .Now ().Add (10 * time .Second )
2959- for numFinishedCompactions .Load () < target {
2960- if timeout .Before (time .Now ()) {
2961- td .Fatalf (t , "timeout waiting for compactions" )
2962- }
2963- time .Sleep (10 * time .Millisecond )
2964- }
2978+ wait ("compactions" , func () bool {
2979+ return numFinishedCompactions .Load () >= target
2980+ })
29652981
29662982 case "expire-spans" :
29672983 now .Store (now .Load () + crtime .Mono (30 * time .Minute ))
29682984
29692985 case "wait-for-no-external-files" :
2970- timeout := time .Now ().Add (10 * time .Second )
2971- for hasExternalFiles (d ) {
2972- if timeout .Before (time .Now ()) {
2973- td .Fatalf (t , "timeout waiting for compactions" )
2974- }
2975- }
2986+ wait ("no external files" , func () bool {
2987+ return ! hasExternalFiles (d )
2988+ })
29762989
29772990 default :
29782991 return fmt .Sprintf ("unknown command: %s" , td .Cmd )
0 commit comments