Skip to content

Commit e013bfd

Browse files
damzIbrahim Jarif
andauthored
Rework DB.DropPrefix (#1381)
Fixes three issues with the current implementation: - It can generate compaction requests that break the invariant that bottom tables need to be consecutive (issue #1380). See #1380 (comment) - It performs the same level compactions in increasing order of levels (starting from L0) which leads to old versions of keys for the prefix re-surfacing to active transactions. - When you have to drop multiple prefixes, the API forces you to drop one prefix at a time and go through the whole expensive table rewriting multiple times. Fixes #1381 Co-authored-by: Ibrahim Jarif <ibrahim@dgraph.io>
1 parent 3042e37 commit e013bfd

File tree

4 files changed

+142
-59
lines changed

4 files changed

+142
-59
lines changed

db.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -973,7 +973,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
973973
defer b.Close()
974974
var vp valuePointer
975975
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
976-
if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) {
976+
if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) {
977977
continue
978978
}
979979
vs := iter.Value()
@@ -986,9 +986,9 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte {
986986
}
987987

988988
type flushTask struct {
989-
mt *skl.Skiplist
990-
vptr valuePointer
991-
dropPrefix []byte
989+
mt *skl.Skiplist
990+
vptr valuePointer
991+
dropPrefixes [][]byte
992992
}
993993

994994
// handleFlushTask must be run serially.
@@ -1617,7 +1617,7 @@ func (db *DB) dropAll() (func(), error) {
16171617
// - Compact L0->L1, skipping over Kp.
16181618
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
16191619
// - Resume memtable flushes, compactions and writes.
1620-
func (db *DB) DropPrefix(prefix []byte) error {
1620+
func (db *DB) DropPrefix(prefixes ...[]byte) error {
16211621
db.opt.Infof("DropPrefix Called")
16221622
f, err := db.prepareToDrop()
16231623
if err != nil {
@@ -1637,8 +1637,8 @@ func (db *DB) DropPrefix(prefix []byte) error {
16371637
task := flushTask{
16381638
mt: memtable,
16391639
// Ensure that the head of value log gets persisted to disk.
1640-
vptr: db.vhead,
1641-
dropPrefix: prefix,
1640+
vptr: db.vhead,
1641+
dropPrefixes: prefixes,
16421642
}
16431643
db.opt.Debugf("Flushing memtable")
16441644
if err := db.handleFlushTask(task); err != nil {
@@ -1653,7 +1653,7 @@ func (db *DB) DropPrefix(prefix []byte) error {
16531653
db.mt = skl.NewSkiplist(arenaSize(db.opt))
16541654

16551655
// Drop prefixes from the levels.
1656-
if err := db.lc.dropPrefix(prefix); err != nil {
1656+
if err := db.lc.dropPrefixes(prefixes); err != nil {
16571657
return err
16581658
}
16591659
db.opt.Infof("DropPrefix done")

levels.go

Lines changed: 92 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,25 @@ func (s *levelsController) dropTree() (int, error) {
274274
// tables who only have keys with this prefix are quickly dropped. The ones which have other keys
275275
// are run through MergeIterator and compacted to create new tables. All the mechanisms of
276276
// compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
277-
func (s *levelsController) dropPrefix(prefix []byte) error {
277+
func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
278+
// Internal move keys related to the given prefix should also be skipped.
279+
for _, prefix := range prefixes {
280+
key := make([]byte, 0, len(badgerMove)+len(prefix))
281+
key = append(key, badgerMove...)
282+
key = append(key, prefix...)
283+
prefixes = append(prefixes, key)
284+
}
285+
278286
opt := s.kv.opt
279-
for _, l := range s.levels {
287+
// Iterate levels in the reverse order because if we were to iterate from
288+
// lower level (say level 0) to a higher level (say level 3) we could have
289+
// a state in which level 0 is compacted and an older version of a key exists in lower level.
290+
// At this point, if someone creates an iterator, they would see an old
291+
// value for a key from lower levels. Iterating in reverse order ensures we
292+
// drop the oldest data first so that lookups never return stale data.
293+
for i := len(s.levels) - 1; i >= 0; i-- {
294+
l := s.levels[i]
295+
280296
l.RLock()
281297
if l.level == 0 {
282298
size := len(l.tables)
@@ -288,7 +304,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
288304
score: 1.74,
289305
// A unique number greater than 1.0 does two things. Helps identify this
290306
// function in logs, and forces a compaction.
291-
dropPrefix: prefix,
307+
dropPrefixes: prefixes,
292308
}
293309
if err := s.doCompact(cp); err != nil {
294310
opt.Warningf("While compacting level 0: %v", err)
@@ -298,39 +314,49 @@ func (s *levelsController) dropPrefix(prefix []byte) error {
298314
continue
299315
}
300316

301-
var tables []*table.Table
302-
// Internal move keys related to the given prefix should also be skipped.
303-
moveKeyForPrefix := append(badgerMove, prefix...)
304-
prefixesToSkip := [][]byte{prefix, moveKeyForPrefix}
305-
for _, table := range l.tables {
306-
var absent bool
307-
switch {
308-
case hasAnyPrefixes(table.Smallest(), prefixesToSkip):
309-
case hasAnyPrefixes(table.Biggest(), prefixesToSkip):
310-
case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip):
311-
default:
312-
absent = true
317+
// Build a list of compaction tableGroups affecting all the prefixes we
318+
// need to drop. We need to build tableGroups that satisfy the invariant that
319+
// bottom tables are consecutive.
320+
// tableGroup contains groups of consecutive tables.
321+
var tableGroups [][]*table.Table
322+
var tableGroup []*table.Table
323+
324+
finishGroup := func() {
325+
if len(tableGroup) > 0 {
326+
tableGroups = append(tableGroups, tableGroup)
327+
tableGroup = nil
313328
}
314-
if !absent {
315-
tables = append(tables, table)
329+
}
330+
331+
for _, table := range l.tables {
332+
if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) {
333+
tableGroup = append(tableGroup, table)
334+
} else {
335+
finishGroup()
316336
}
317337
}
338+
finishGroup()
339+
318340
l.RUnlock()
319-
if len(tables) == 0 {
341+
342+
if len(tableGroups) == 0 {
320343
continue
321344
}
322345

323-
cd := compactDef{
324-
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
325-
thisLevel: l,
326-
nextLevel: l,
327-
top: []*table.Table{},
328-
bot: tables,
329-
dropPrefix: prefix,
330-
}
331-
if err := s.runCompactDef(l.level, cd); err != nil {
332-
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
333-
return err
346+
opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
347+
for _, operation := range tableGroups {
348+
cd := compactDef{
349+
elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"),
350+
thisLevel: l,
351+
nextLevel: l,
352+
top: nil,
353+
bot: operation,
354+
dropPrefixes: prefixes,
355+
}
356+
if err := s.runCompactDef(l.level, cd); err != nil {
357+
opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
358+
return err
359+
}
334360
}
335361
}
336362
return nil
@@ -395,9 +421,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool {
395421
}
396422

397423
type compactionPriority struct {
398-
level int
399-
score float64
400-
dropPrefix []byte
424+
level int
425+
score float64
426+
dropPrefixes [][]byte
401427
}
402428

403429
// pickCompactLevel determines which level to compact.
@@ -491,13 +517,19 @@ func (s *levelsController) compactBuildTables(
491517

492518
// Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
493519
var valid []*table.Table
520+
521+
nextTable:
494522
for _, table := range botTables {
495-
if len(cd.dropPrefix) > 0 &&
496-
bytes.HasPrefix(table.Smallest(), cd.dropPrefix) &&
497-
bytes.HasPrefix(table.Biggest(), cd.dropPrefix) {
498-
// All the keys in this table have the dropPrefix. So, this table does not need to be
499-
// in the iterator and can be dropped immediately.
500-
continue
523+
if len(cd.dropPrefixes) > 0 {
524+
for _, prefix := range cd.dropPrefixes {
525+
if bytes.HasPrefix(table.Smallest(), prefix) &&
526+
bytes.HasPrefix(table.Biggest(), prefix) {
527+
// All the keys in this table have the dropPrefix. So, this
528+
// table does not need to be in the iterator and can be
529+
// dropped immediately.
530+
continue nextTable
531+
}
532+
}
501533
}
502534
valid = append(valid, table)
503535
}
@@ -535,12 +567,9 @@ func (s *levelsController) compactBuildTables(
535567
bopts.BfCache = s.kv.bfCache
536568
builder := table.NewTableBuilder(bopts)
537569
var numKeys, numSkips uint64
538-
// Internal move keys related to the given prefix should also be skipped.
539-
moveKeyForPrefix := append(badgerMove, cd.dropPrefix...)
540-
prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix}
541570
for ; it.Valid(); it.Next() {
542571
// See if we need to skip the prefix.
543-
if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) {
572+
if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
544573
numSkips++
545574
updateStats(it.Value())
546575
continue
@@ -719,10 +748,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
719748
return false
720749
}
721750

751+
func containsPrefix(smallValue, largeValue, prefix []byte) bool {
752+
if bytes.HasPrefix(smallValue, prefix) {
753+
return true
754+
}
755+
if bytes.HasPrefix(largeValue, prefix) {
756+
return true
757+
}
758+
if bytes.Compare(prefix, smallValue) > 0 &&
759+
bytes.Compare(prefix, largeValue) < 0 {
760+
return true
761+
}
762+
763+
return false
764+
}
765+
722766
func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool {
723767
for _, prefix := range listOfPrefixes {
724-
if bytes.Compare(prefix, smallValue) > 0 &&
725-
bytes.Compare(prefix, largeValue) < 0 {
768+
if containsPrefix(smallValue, largeValue, prefix) {
726769
return true
727770
}
728771
}
@@ -744,7 +787,7 @@ type compactDef struct {
744787

745788
thisSize int64
746789

747-
dropPrefix []byte
790+
dropPrefixes [][]byte
748791
}
749792

750793
func (cd *compactDef) lockLevels() {
@@ -918,10 +961,10 @@ func (s *levelsController) doCompact(p compactionPriority) error {
918961
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.
919962

920963
cd := compactDef{
921-
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
922-
thisLevel: s.levels[l],
923-
nextLevel: s.levels[l+1],
924-
dropPrefix: p.dropPrefix,
964+
elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"),
965+
thisLevel: s.levels[l],
966+
nextLevel: s.levels[l+1],
967+
dropPrefixes: p.dropPrefixes,
925968
}
926969
cd.elog.SetMaxEvents(100)
927970
defer cd.elog.Finish()

levels_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,3 +778,43 @@ func TestL0Stall(t *testing.T) {
778778
test(t, &opt)
779779
})
780780
}
781+
782+
// Regression test for https://github.com/dgraph-io/dgraph/issues/5573
783+
func TestDropPrefixMoveBug(t *testing.T) {
784+
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
785+
// l1 is used to verify that drop prefix actually drops move keys from all the levels.
786+
l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}}
787+
createAndOpen(db, l1, 1)
788+
789+
// Mutiple levels can have the exact same move key with version.
790+
l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}, {"A", "", 0, 0}}
791+
l21 := []keyValVersion{{"B", "", 0, 0}, {"C", "", 0, 0}}
792+
l22 := []keyValVersion{{"F", "", 0, 0}, {"G", "", 0, 0}}
793+
794+
// Level 2 has all the tables.
795+
createAndOpen(db, l2, 2)
796+
createAndOpen(db, l21, 2)
797+
createAndOpen(db, l22, 2)
798+
799+
require.NoError(t, db.lc.validate())
800+
require.NoError(t, db.DropPrefix([]byte("F")))
801+
802+
db.View(func(txn *Txn) error {
803+
iopt := DefaultIteratorOptions
804+
iopt.AllVersions = true
805+
806+
it := txn.NewIterator(iopt)
807+
defer it.Close()
808+
809+
specialKey := []byte("F")
810+
droppedPrefixes := [][]byte{specialKey, append(badgerMove, specialKey...)}
811+
for it.Rewind(); it.Valid(); it.Next() {
812+
key := it.Item().Key()
813+
// Ensure we don't have any "F" or "!badger!move!F" left
814+
require.False(t, hasAnyPrefixes(key, droppedPrefixes))
815+
}
816+
return nil
817+
})
818+
require.NoError(t, db.lc.validate())
819+
})
820+
}

util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ func (s *levelHandler) validate() error {
6060

6161
if y.CompareKeys(s.tables[j].Smallest(), s.tables[j].Biggest()) > 0 {
6262
return errors.Errorf(
63-
"Intra: %q vs %q: level=%d j=%d numTables=%d",
64-
s.tables[j].Smallest(), s.tables[j].Biggest(), s.level, j, numTables)
63+
"Intra: \n%s\n vs \n%s\n: level=%d j=%d numTables=%d",
64+
hex.Dump(s.tables[j].Smallest()), hex.Dump(s.tables[j].Biggest()), s.level, j, numTables)
6565
}
6666
}
6767
return nil

0 commit comments

Comments
 (0)