Skip to content
Permalink
Browse files

Implement SingleDelete

In Pebble SingleDelete combined with Delete/Merge, it is converted to Delete.
  • Loading branch information
hueypark committed Sep 1, 2019
1 parent 53f7531 commit ba4587b05b0a8aabba2b39332b59809923e26b02
Showing with 669 additions and 19 deletions.
  1. +53 −0 batch.go
  2. +10 −0 batch_test.go
  3. +50 −0 compaction_iter.go
  4. +28 −0 db.go
  5. +182 −0 db_test.go
  6. +1 −0 internal.go
  7. +16 −14 internal/base/internal.go
  8. +3 −3 iterator.go
  9. +4 −2 mem_table.go
  10. +199 −0 testdata/compaction_iter
  11. +121 −0 testdata/iterator
  12. +2 −0 tool/wal.go
@@ -526,6 +526,59 @@ func (b *Batch) DeleteDeferred(keyLen int, _ *WriteOptions) (*DeferredBatchOp, e
return &b.deferredOp, nil
}

// SingleDelete adds an action to the batch that single deletes the entry for key.
// See Writer.SingleDelete for more details on the semantics of SingleDelete.
//
// It is safe to modify the contents of the arguments after SingleDelete returns.
func (b *Batch) SingleDelete(key []byte, _ *WriteOptions) error {
deferredOp, err := b.SingleDeleteDeferred(len(key), nil)
if err != nil {
return err
}
copy(deferredOp.Key, key)
// TODO(peter): Manually inline DeferredBatchOp.Finish(). Mid-stack inlining
// in go1.13 will remove the need for this.
if b.index != nil {
if err := b.index.Add(deferredOp.offset); err != nil {
// We never add duplicate entries, so an error should never occur.
panic(err)
}
}
return nil
}

// SingleDeleteDeferred is similar to SingleDelete in that it adds a single delete
// operation to the batch, except it only takes in key/value lengths instead of
// complete slices, letting the caller encode into those objects and then call
// Finish() on the returned object.
func (b *Batch) SingleDeleteDeferred(keyLen int, _ *WriteOptions) (*DeferredBatchOp, error) {
// Code duplication with Delete is so that the Delete case (where byte
// slices are provided) can preserve the fast path.
if len(b.storage.data) == 0 {
b.init(keyLen + binary.MaxVarintLen64 + batchHeaderLen)
}
if !b.increment() {
return nil, ErrInvalidBatch
}

b.memTableSize += memTableEntrySize(keyLen, 0)

pos := len(b.storage.data)
b.deferredOp.offset = uint32(pos)
b.grow(1 + maxVarintLen32 + keyLen)
b.storage.data[pos] = byte(InternalKeyKindSingleDelete)
pos++
varlen1 := putUvarint32(b.storage.data[pos:], uint32(keyLen))
pos += varlen1
b.deferredOp.Key = b.storage.data[pos : pos+keyLen]
b.deferredOp.Value = nil

b.storage.data = b.storage.data[:len(b.storage.data)-(maxVarintLen32-varlen1)]

b.deferredOp.index = b.index
return &b.deferredOp, nil
}

// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
//
@@ -47,13 +47,16 @@ func TestBatch(t *testing.T) {
{InternalKeyKindSet, "roses", "red"},
{InternalKeyKindSet, "violets", "blue"},
{InternalKeyKindDelete, "roses", ""},
{InternalKeyKindSingleDelete, "roses", ""},
{InternalKeyKindSet, "", ""},
{InternalKeyKindSet, "", "non-empty"},
{InternalKeyKindDelete, "", ""},
{InternalKeyKindSingleDelete, "", ""},
{InternalKeyKindSet, "grass", "green"},
{InternalKeyKindSet, "grass", "greener"},
{InternalKeyKindSet, "eleventy", strings.Repeat("!!11!", 100)},
{InternalKeyKindDelete, "nosuchkey", ""},
{InternalKeyKindSingleDelete, "nosuchkey", ""},
{InternalKeyKindSet, "binarydata", "\x00"},
{InternalKeyKindSet, "binarydata", "\xff"},
{InternalKeyKindMerge, "merge", "mergedata"},
@@ -73,6 +76,8 @@ func TestBatch(t *testing.T) {
_ = b.Merge([]byte(tc.key), []byte(tc.value), nil)
case InternalKeyKindDelete:
_ = b.Delete([]byte(tc.key), nil)
case InternalKeyKindSingleDelete:
_ = b.SingleDelete([]byte(tc.key), nil)
case InternalKeyKindRangeDelete:
_ = b.DeleteRange([]byte(tc.key), []byte(tc.value), nil)
case InternalKeyKindLogData:
@@ -103,6 +108,11 @@ func TestBatch(t *testing.T) {
copy(d.Key, key)
copy(d.Value, value)
d.Finish()
case InternalKeyKindSingleDelete:
d, _ := b.SingleDeleteDeferred(len(key), nil)
copy(d.Key, key)
copy(d.Value, value)
d.Finish()
case InternalKeyKindRangeDelete:
d, _ := b.DeleteRangeDeferred(len(key), len(value), nil)
copy(d.Key, key)
@@ -222,6 +222,15 @@ func (i *compactionIter) Next() (*InternalKey, []byte) {
i.skip = true
return &i.key, i.value

case InternalKeyKindSingleDelete:
if i.rangeDelFrag.Deleted(i.key, i.curSnapshotSeqNum) {
i.saveKey()
i.skipStripe()
continue
}

return i.singleDeleteNext()

case InternalKeyKindRangeDelete:
i.key = i.cloneKey(i.key)
i.rangeDelFrag.Add(i.key, i.iterValue)
@@ -384,6 +393,47 @@ func (i *compactionIter) mergeNext() (*InternalKey, []byte) {
}
}

func (i *compactionIter) singleDeleteNext() (*InternalKey, []byte) {
// Save the current key.
i.saveKey()
i.valid = true

if !i.nextInStripe() {
i.skip = false
return &i.key, i.value
}

key := i.iterKey
switch key.Kind() {
case InternalKeyKindDelete:
// We've hit a Delete, transform the SingleDelete into a full Delete.
i.key.SetKind(InternalKeyKindDelete)
i.nextInStripe()
return &i.key, i.value

case InternalKeyKindSet:
i.nextInStripe()
return i.Next()

case InternalKeyKindMerge:
// We've hit a Merge, transform the SingleDelete into a full Delete.
i.key.SetKind(InternalKeyKindDelete)
i.nextInStripe()
return &i.key, i.value

case InternalKeyKindSingleDelete:
i.nextInStripe()
return &i.key, i.value

case InternalKeyKindRangeDelete:
return i.Next()

default:
i.err = fmt.Errorf("invalid internal key kind: %d", i.iterKey.Kind())
return nil, nil
}
}

func (i *compactionIter) saveKey() {
i.keyBuf = append(i.keyBuf[:0], i.iterKey.UserKey...)
i.key.UserKey = i.keyBuf
28 db.go
@@ -89,6 +89,26 @@ type Writer interface {
// It is safe to modify the contents of the arguments after Delete returns.
Delete(key []byte, o *WriteOptions) error

// SingleDelete is similar to Delete in that it deletes the value for the given key. Like Delete,
// it is a blind operation that will succeed even if the given key does not exist.
//
// WARNING: Undefined (non-deterministic) behavior will result if a key is overwritten and
// then deleted using SingleDelete. The record may appear deleted immediately, but be
// resurrected at a later time after compactions have been performed. Or the record may
// be deleted permanently. A Delete operation lays down a "tombstone" which shadows all
// previous versions of a key. The SingleDelete operation is akin to "anti-matter" and will
// only delete the most recently written version for a key. These different semantics allow
// the DB to avoid propagating a SingleDelete operation during a compaction as soon as the
// corresponding Set operation is encountered. These semantics require extreme care to handle
// properly. Only use if you have a workload where the performance gain is critical and you
// can guarantee that a record is written once and then deleted once.
//
// SingleDelete is internally transformed into a Delete if the most recent record for a key is either
// a Merge or Delete record.
//
// It is safe to modify the contents of the arguments after SingleDelete returns.
SingleDelete(key []byte, o *WriteOptions) error

// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
//
@@ -328,6 +348,14 @@ func (d *DB) Delete(key []byte, opts *WriteOptions) error {
return d.Apply(b, opts)
}

// SingleDelete is part of Writer.
func (d *DB) SingleDelete(key []byte, opts *WriteOptions) error {
b := newBatch(d)
defer b.release()
_ = b.SingleDelete(key, opts)
return d.Apply(b, opts)
}

// DeleteRange deletes all of the keys (and values) in the range [start,end)
// (inclusive on start, exclusive on end).
//
@@ -506,6 +506,109 @@ func TestGetMerge(t *testing.T) {
}
}

func TestSingleDeleteGet(t *testing.T) {
d, err := Open("", &Options{
FS: vfs.NewMem(),
})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Fatal(err)
}
}()

key := []byte("key")
val := []byte("val")

d.Set(key, val, nil)
verifyGet(t, d, key, val)

key2 := []byte("key2")
val2 := []byte("val2")

d.Set(key2, val2, nil)
verifyGet(t, d, key2, val2)

d.SingleDelete(key2, nil)
verifyGetNotFound(t, d, key2)
}

func TestSingleDeleteFlush(t *testing.T) {
d, err := Open("", &Options{
FS: vfs.NewMem(),
})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Fatal(err)
}
}()

key := []byte("key")
valFirst := []byte("first")
valSecond := []byte("second")
key2 := []byte("key2")
val2 := []byte("val2")

d.Set(key, valFirst, nil)
d.Set(key2, val2, nil)
d.Flush()

d.SingleDelete(key, nil)
d.Set(key, valSecond, nil)
d.Delete(key2, nil)
d.Set(key2, val2, nil)
d.Flush()

d.SingleDelete(key, nil)
d.Delete(key2, nil)
d.Flush()

verifyGetNotFound(t, d, key)
verifyGetNotFound(t, d, key2)
}

func TestUnremovableSingleDelete(t *testing.T) {
d, err := Open("", &Options{
FS: vfs.NewMem(),
})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
t.Fatal(err)
}
}()

key := []byte("key")
valFirst := []byte("valFirst")
valSecond := []byte("valSecond")

d.Set(key, valFirst, nil)
ss := d.NewSnapshot()
d.SingleDelete(key, nil)
d.Set(key, valSecond, nil)
d.Flush()

verifyGetSnapshot(t, ss, key, valFirst)
verifyGet(t, d, key, valSecond)

d.SingleDelete(key, nil)

verifyGetSnapshot(t, ss, key, valFirst)
verifyGetNotFound(t, d, key)

d.Flush()

verifyGetSnapshot(t, ss, key, valFirst)
verifyGetNotFound(t, d, key)
}

func TestIterLeak(t *testing.T) {
for _, leak := range []bool{true, false} {
t.Run(fmt.Sprintf("leak=%t", leak), func(t *testing.T) {
@@ -771,3 +874,82 @@ func TestDBConcurrentCommitCompactFlush(t *testing.T) {
t.Fatal(err)
}
}

func BenchmarkDelete(b *testing.B) {
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
const keyCount = 10000
var keys [keyCount][]byte
for i := 0; i < keyCount; i++ {
keys[i] = []byte(strconv.Itoa(rng.Int()))
}
val := bytes.Repeat([]byte("x"), 1000)

benchmark := func(useSingleDelete bool) {
d, err := Open(
"",
&Options{
FS: vfs.NewMem(),
})
if err != nil {
b.Fatal(err)
}
defer func() {
if err := d.Close(); err != nil {
b.Fatal(err)
}
}()

for i := 0; i < keyCount; i++ {
d.Set(keys[i], val, nil)
}

for _, key := range keys {
if useSingleDelete {
d.SingleDelete(key, nil)
} else {
d.Delete(key, nil)
}
}
}

b.Run("delete", func(b *testing.B) {
for i := 0; i < b.N; i++ {
benchmark(false)
}
})

b.Run("single-delete", func(b *testing.B) {
for i := 0; i < b.N; i++ {
benchmark(true)
}
})
}

func verifyGet(t *testing.T, d *DB, key, expected []byte) {
val, err := d.Get(key)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(expected, val) {
t.Fatalf("expected %s, but got %s", expected, val)
}
}

func verifyGetNotFound(t *testing.T, d *DB, key []byte) {
val, err := d.Get(key)
if err != base.ErrNotFound {
t.Fatalf("expected nil, but got %s", val)
}
}

func verifyGetSnapshot(t *testing.T, ss *Snapshot, key, expected []byte) {
val, err := ss.Get(key)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(expected, val) {
t.Fatalf("expected %s, but got %s", expected, val)
}
}

0 comments on commit ba4587b

Please sign in to comment.
You can’t perform that action at this time.