@@ -331,6 +331,7 @@ func NewL0Sublevels(
331331 }
332332
333333 s .calculateFlushSplitKeys (flushSplitMaxBytes )
334+ s .Check ()
334335 return s , nil
335336}
336337
@@ -441,8 +442,36 @@ func mergeIntervals(
441442func (s * L0Sublevels ) AddL0Files (
442443 files []* TableMetadata , flushSplitMaxBytes int64 , levelMetadata * LevelMetadata ,
443444) (* L0Sublevels , error ) {
444- if invariants .Enabled && s .addL0FilesCalled {
445- panic ("AddL0Files called twice on the same receiver" )
445+ if s .addL0FilesCalled {
446+ if invariants .Enabled {
447+ panic ("AddL0Files called twice on the same receiver" )
448+ }
449+ return nil , errInvalidL0SublevelsOpt
450+ }
451+ if s .levelMetadata .Len ()+ len (files ) != levelMetadata .Len () {
452+ if invariants .Enabled {
453+ panic ("levelMetadata mismatch" )
454+ }
455+ return nil , errInvalidL0SublevelsOpt
456+ }
457+ // AddL0Files only works when the files we are adding match exactly the last
458+ // files in the new levelMetadata (and the previous s.levelMetadata matches a
459+ // prefix of the new levelMetadata).
460+ {
461+ iterOld , iterNew := s .levelMetadata .Iter (), levelMetadata .Iter ()
462+ tOld , tNew := iterOld .First (), iterNew .First ()
463+ for i := 0 ; i < s .levelMetadata .Len (); i ++ {
464+ if tOld != tNew {
465+ return nil , errInvalidL0SublevelsOpt
466+ }
467+ tOld , tNew = iterOld .Next (), iterNew .Next ()
468+ }
469+ for i := range files {
470+ if files [i ] != tNew {
471+ return nil , errInvalidL0SublevelsOpt
472+ }
473+ tNew = iterNew .Next ()
474+ }
446475 }
447476 s .addL0FilesCalled = true
448477
@@ -630,6 +659,7 @@ func (s *L0Sublevels) AddL0Files(
630659
631660 newVal .flushSplitUserKeys = nil
632661 newVal .calculateFlushSplitKeys (flushSplitMaxBytes )
662+ newVal .Check ()
633663 return newVal , nil
634664}
635665
@@ -783,6 +813,33 @@ func (s *L0Sublevels) InitCompactingFileInfo(inProgress []L0Compaction) {
783813 }
784814}
785815
816+ // Check performs sanity checks on L0Sublevels in invariants mode.
817+ func (s * L0Sublevels ) Check () {
818+ if ! invariants .Enabled {
819+ return
820+ }
821+ iter := s .levelMetadata .Iter ()
822+ n := 0
823+ for t := iter .First (); t != nil ; n , t = n + 1 , iter .Next () {
824+ if t .L0Index != n {
825+ panic (fmt .Sprintf ("t.L0Index out of sync (%d vs %d)" , t .L0Index , n ))
826+ }
827+ }
828+ if len (s .Levels ) != len (s .levelFiles ) {
829+ panic ("Levels and levelFiles inconsistency" )
830+ }
831+ for i := range s .Levels {
832+ if s .Levels [i ].Len () != len (s .levelFiles [i ]) {
833+ panic ("Levels and levelFiles inconsistency" )
834+ }
835+ for _ , t := range s .levelFiles [i ] {
836+ if t .SubLevel != i {
837+ panic ("t.SubLevel out of sync" )
838+ }
839+ }
840+ }
841+ }
842+
786843// String produces a string containing useful debug information. Useful in test
787844// code and debugging.
788845func (s * L0Sublevels ) String () string {
0 commit comments