@@ -21,6 +21,7 @@ import (
21
21
"github.com/cockroachdb/errors"
22
22
"github.com/cockroachdb/pebble/internal/base"
23
23
"github.com/cockroachdb/pebble/internal/cache"
24
+ "github.com/cockroachdb/pebble/internal/invariants"
24
25
"github.com/cockroachdb/pebble/internal/manifest"
25
26
"github.com/cockroachdb/pebble/internal/testkeys"
26
27
"github.com/cockroachdb/pebble/objstorage"
@@ -1381,3 +1382,102 @@ func (tl *catchFatalLogger) Errorf(format string, args ...interface{}) {}
1381
1382
func (tl * catchFatalLogger ) Fatalf (format string , args ... interface {}) {
1382
1383
tl .fatalMsgs = append (tl .fatalMsgs , fmt .Sprintf (format , args ... ))
1383
1384
}
1385
+
1386
+ func (d * DB ) checkVirtualBounds (m * manifest.TableMetadata ) {
1387
+ if ! invariants .Enabled {
1388
+ return
1389
+ }
1390
+
1391
+ objMeta , err := d .objProvider .Lookup (base .FileTypeTable , m .TableBacking .DiskFileNum )
1392
+ if err != nil {
1393
+ panic (err )
1394
+ }
1395
+ if objMeta .IsExternal () {
1396
+ // Nothing to do; bounds are expected to be loose.
1397
+ return
1398
+ }
1399
+
1400
+ iters , err := d .newIters (context .TODO (), m , nil , internalIterOpts {}, iterPointKeys | iterRangeDeletions | iterRangeKeys )
1401
+ if err != nil {
1402
+ panic (errors .Wrap (err , "pebble: error creating iterators" ))
1403
+ }
1404
+ defer func () { _ = iters .CloseAll () }()
1405
+
1406
+ if m .HasPointKeys {
1407
+ pointIter := iters .Point ()
1408
+ rangeDelIter := iters .RangeDeletion ()
1409
+
1410
+ // Check that the lower bound is tight.
1411
+ pointKV := pointIter .First ()
1412
+ rangeDel , err := rangeDelIter .First ()
1413
+ if err != nil {
1414
+ panic (err )
1415
+ }
1416
+ if (rangeDel == nil || d .cmp (rangeDel .SmallestKey ().UserKey , m .PointKeyBounds .Smallest ().UserKey ) != 0 ) &&
1417
+ (pointKV == nil || d .cmp (pointKV .K .UserKey , m .PointKeyBounds .Smallest ().UserKey ) != 0 ) {
1418
+ panic (errors .Newf ("pebble: virtual sstable %s lower point key bound is not tight" , m .TableNum ))
1419
+ }
1420
+
1421
+ // Check that the upper bound is tight.
1422
+ pointKV = pointIter .Last ()
1423
+ rangeDel , err = rangeDelIter .Last ()
1424
+ if err != nil {
1425
+ panic (err )
1426
+ }
1427
+ if (rangeDel == nil || d .cmp (rangeDel .LargestKey ().UserKey , m .PointKeyBounds .LargestUserKey ()) != 0 ) &&
1428
+ (pointKV == nil || d .cmp (pointKV .K .UserKey , m .PointKeyBounds .Largest ().UserKey ) != 0 ) {
1429
+ panic (errors .Newf ("pebble: virtual sstable %s upper point key bound is not tight" , m .TableNum ))
1430
+ }
1431
+
1432
+ // Check that iterator keys are within bounds.
1433
+ for kv := pointIter .First (); kv != nil ; kv = pointIter .Next () {
1434
+ if d .cmp (kv .K .UserKey , m .PointKeyBounds .Smallest ().UserKey ) < 0 || d .cmp (kv .K .UserKey , m .PointKeyBounds .LargestUserKey ()) > 0 {
1435
+ panic (errors .Newf ("pebble: virtual sstable %s point key %s is not within bounds" , m .TableNum , kv .K .UserKey ))
1436
+ }
1437
+ }
1438
+ s , err := rangeDelIter .First ()
1439
+ for ; s != nil ; s , err = rangeDelIter .Next () {
1440
+ if d .cmp (s .SmallestKey ().UserKey , m .PointKeyBounds .Smallest ().UserKey ) < 0 {
1441
+ panic (errors .Newf ("pebble: virtual sstable %s point key %s is not within bounds" , m .TableNum , s .SmallestKey ().UserKey ))
1442
+ }
1443
+ if d .cmp (s .LargestKey ().UserKey , m .PointKeyBounds .Largest ().UserKey ) > 0 {
1444
+ panic (errors .Newf ("pebble: virtual sstable %s point key %s is not within bounds" , m .TableNum , s .LargestKey ().UserKey ))
1445
+ }
1446
+ }
1447
+ if err != nil {
1448
+ panic (err )
1449
+ }
1450
+ }
1451
+
1452
+ if ! m .HasRangeKeys {
1453
+ return
1454
+ }
1455
+ rangeKeyIter := iters .RangeKey ()
1456
+
1457
+ // Check that the lower bound is tight.
1458
+ if s , err := rangeKeyIter .First (); err != nil {
1459
+ panic (err )
1460
+ } else if m .HasRangeKeys && d .cmp (s .SmallestKey ().UserKey , m .RangeKeyBounds .SmallestUserKey ()) != 0 {
1461
+ panic (errors .Newf ("pebble: virtual sstable %s lower range key bound is not tight" , m .TableNum ))
1462
+ }
1463
+
1464
+ // Check that upper bound is tight.
1465
+ if s , err := rangeKeyIter .Last (); err != nil {
1466
+ panic (err )
1467
+ } else if d .cmp (s .LargestKey ().UserKey , m .RangeKeyBounds .LargestUserKey ()) != 0 {
1468
+ panic (errors .Newf ("pebble: virtual sstable %s upper range key bound is not tight" , m .TableNum ))
1469
+ }
1470
+
1471
+ s , err := rangeKeyIter .First ()
1472
+ for ; s != nil ; s , err = rangeKeyIter .Next () {
1473
+ if d .cmp (s .SmallestKey ().UserKey , m .RangeKeyBounds .SmallestUserKey ()) < 0 {
1474
+ panic (errors .Newf ("pebble: virtual sstable %s point key %s is not within bounds" , m .TableNum , s .SmallestKey ().UserKey ))
1475
+ }
1476
+ if d .cmp (s .LargestKey ().UserKey , m .RangeKeyBounds .LargestUserKey ()) > 0 {
1477
+ panic (errors .Newf ("pebble: virtual sstable %s point key %s is not within bounds" , m .TableNum , s .LargestKey ().UserKey ))
1478
+ }
1479
+ }
1480
+ if err != nil {
1481
+ panic (err )
1482
+ }
1483
+ }
0 commit comments