Permalink
Browse files

Port Stream framework from Dgraph to Badger (#653)

This PR ports over a Stream framework from Dgraph to Badger. This framework allows users to concurrently iterate over Badger, converting data to key-value lists, which then get sent out serially. In Dgraph we use this framework for shipping snapshots from leader to followers, doing periodic delta merging of updates, moving predicates from one Alpha group to another, and so on.

However, the framework is general enough that it could lie within Badger, and that's what this PR achieves. In addition, the Backup API of Badger now uses this framework to make taking backups faster.

Changes:
* Port Stream from Dgraph to Badger.
* Switch Backup to use the new Stream framework.
* Update godocs.
* Remove a t.Logf data race.
* Self-review
  • Loading branch information...
manishrjain committed Dec 28, 2018
1 parent 7b9d6e3 commit 14cbd89eb9238d4f32828ccb0822da2ac2397306
Showing with 879 additions and 358 deletions.
  1. +59 −44 backup.go
  2. +0 −291 backup0_test.go
  3. +265 −2 backup_test.go
  4. +2 −1 managed_db_test.go
  5. +197 −14 protos/backup.pb.go
  6. +10 −6 protos/backup.proto
  7. +333 −0 stream.go
  8. +13 −0 y/y.go
103 backup.go
@@ -19,6 +19,7 @@ package badger
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"io"
"sync"
@@ -27,18 +28,6 @@ import (
"github.com/dgraph-io/badger/y"
)

func writeTo(entry *protos.KVPair, w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, uint64(entry.Size())); err != nil {
return err
}
buf, err := entry.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}

// Backup dumps a protobuf-encoded list of all entries in the database into the
// given writer, that are newer than the specified version. It returns a
// timestamp indicating when the entries were dumped which can be passed into a
@@ -47,22 +36,19 @@ func writeTo(entry *protos.KVPair, w io.Writer) error {
//
// This can be used to backup the data in a database at a given point in time.
func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
var tsNew uint64
var skipKey []byte
err := db.View(func(txn *Txn) error {
opts := DefaultIteratorOptions
opts.AllVersions = true
it := txn.NewIterator(opts)
defer it.Close()

for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if item.Version() < since || bytes.Equal(skipKey, item.Key()) {
stream := db.NewStream()
stream.KeyToKVList = func(key []byte, itr *Iterator) (*protos.KVList, error) {
list := &protos.KVList{}
for ; itr.Valid(); itr.Next() {
item := itr.Item()
if !bytes.Equal(item.Key(), key) {
return list, nil
}
if item.Version() < since {
// Ignore versions less than given timestamp, or skip older
// versions of the given skipKey.
continue
// versions of the given key.
return list, nil
}
skipKey = skipKey[:0]

var valCopy []byte
if !item.IsDeletedOrExpired() {
@@ -72,44 +58,69 @@ func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
if err != nil {
Errorf("Key [%x, %d]. Error while fetching value [%v]\n",
item.Key(), item.Version(), err)
continue
return nil, err
}
}

// clear txn bits
meta := item.meta &^ (bitTxn | bitFinTxn)

entry := &protos.KVPair{
Key: y.Copy(item.Key()),
kv := &protos.KVPair{
Key: item.KeyCopy(nil),
Value: valCopy,
UserMeta: []byte{item.UserMeta()},
Version: item.Version(),
ExpiresAt: item.ExpiresAt(),
Meta: []byte{meta},
}
if err := writeTo(entry, w); err != nil {
return err
}
list.Kv = append(list.Kv, kv)

switch {
case item.DiscardEarlierVersions():
// If we need to discard earlier versions of this item, add a delete
// marker just below the current version.
entry.Version -= 1
entry.Meta = []byte{bitDelete}
if err := writeTo(entry, w); err != nil {
return err
}
skipKey = item.KeyCopy(skipKey)
list.Kv = append(list.Kv, &protos.KVPair{
Key: item.KeyCopy(nil),
Version: item.Version() - 1,
Meta: []byte{bitDelete},
})
return list, nil

case item.IsDeletedOrExpired():
skipKey = item.KeyCopy(skipKey)
return list, nil
}
}
return list, nil
}

var maxVersion uint64
stream.Send = func(list *protos.KVList) error {
for _, kv := range list.Kv {
if maxVersion < kv.Version {
maxVersion = kv.Version
}
if err := writeTo(kv, w); err != nil {
return err
}
}
tsNew = txn.readTs
return nil
})
return tsNew, err
}

if err := stream.Orchestrate(context.Background(), 8, "DB.Backup"); err != nil {
return 0, err
}
return maxVersion, nil
}

func writeTo(entry *protos.KVPair, w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, uint64(entry.Size())); err != nil {
return err
}
buf, err := entry.Marshal()
if err != nil {
return err
}
_, err = w.Write(buf)
return err
}

// Load reads a protobuf-encoded list of all entries from a reader and writes
@@ -164,10 +175,14 @@ func (db *DB) Load(r io.Reader) error {
if err = e.Unmarshal(unmarshalBuf[:sz]); err != nil {
return err
}
var userMeta byte
if len(e.UserMeta) > 0 {
userMeta = e.UserMeta[0]
}
entries = append(entries, &Entry{
Key: y.KeyWithTs(e.Key, e.Version),
Value: e.Value,
UserMeta: e.UserMeta[0],
UserMeta: userMeta,
ExpiresAt: e.ExpiresAt,
meta: e.Meta[0],
})
Oops, something went wrong.

0 comments on commit 14cbd89

Please sign in to comment.