Skip to content

Commit 14cbd89

Browse files
authored
Port Stream framework from Dgraph to Badger (#653)
This PR ports over a Stream framework from Dgraph to Badger. This framework allows users to concurrently iterate over Badger, converting data to key-value lists, which then get sent out serially. In Dgraph we use this framework for shipping snapshots from leader to followers, doing periodic delta merging of updates, moving predicates from one Alpha group to another, and so on. However, the framework is general enough that it could lie within Badger, and that's what this PR achieves. In addition, the Backup API of Badger now uses this framework to make taking backups faster. Changes: * Port Stream from Dgraph to Badger. * Switch Backup to use the new Stream framework. * Update godocs. * Remove a t.Logf data race. * Self-review
1 parent 7b9d6e3 commit 14cbd89

File tree

8 files changed

+879
-358
lines changed

8 files changed

+879
-358
lines changed

backup.go

Lines changed: 59 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package badger
1919
import (
2020
"bufio"
2121
"bytes"
22+
"context"
2223
"encoding/binary"
2324
"io"
2425
"sync"
@@ -27,18 +28,6 @@ import (
2728
"github.com/dgraph-io/badger/y"
2829
)
2930

30-
func writeTo(entry *protos.KVPair, w io.Writer) error {
31-
if err := binary.Write(w, binary.LittleEndian, uint64(entry.Size())); err != nil {
32-
return err
33-
}
34-
buf, err := entry.Marshal()
35-
if err != nil {
36-
return err
37-
}
38-
_, err = w.Write(buf)
39-
return err
40-
}
41-
4231
// Backup dumps a protobuf-encoded list of all entries in the database into the
4332
// given writer, that are newer than the specified version. It returns a
4433
// timestamp indicating when the entries were dumped which can be passed into a
@@ -47,22 +36,19 @@ func writeTo(entry *protos.KVPair, w io.Writer) error {
4736
//
4837
// This can be used to backup the data in a database at a given point in time.
4938
func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
50-
var tsNew uint64
51-
var skipKey []byte
52-
err := db.View(func(txn *Txn) error {
53-
opts := DefaultIteratorOptions
54-
opts.AllVersions = true
55-
it := txn.NewIterator(opts)
56-
defer it.Close()
57-
58-
for it.Rewind(); it.Valid(); it.Next() {
59-
item := it.Item()
60-
if item.Version() < since || bytes.Equal(skipKey, item.Key()) {
39+
stream := db.NewStream()
40+
stream.KeyToKVList = func(key []byte, itr *Iterator) (*protos.KVList, error) {
41+
list := &protos.KVList{}
42+
for ; itr.Valid(); itr.Next() {
43+
item := itr.Item()
44+
if !bytes.Equal(item.Key(), key) {
45+
return list, nil
46+
}
47+
if item.Version() < since {
6148
// Ignore versions less than given timestamp, or skip older
62-
// versions of the given skipKey.
63-
continue
49+
// versions of the given key.
50+
return list, nil
6451
}
65-
skipKey = skipKey[:0]
6652

6753
var valCopy []byte
6854
if !item.IsDeletedOrExpired() {
@@ -72,44 +58,69 @@ func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) {
7258
if err != nil {
7359
Errorf("Key [%x, %d]. Error while fetching value [%v]\n",
7460
item.Key(), item.Version(), err)
75-
continue
61+
return nil, err
7662
}
7763
}
7864

7965
// clear txn bits
8066
meta := item.meta &^ (bitTxn | bitFinTxn)
81-
82-
entry := &protos.KVPair{
83-
Key: y.Copy(item.Key()),
67+
kv := &protos.KVPair{
68+
Key: item.KeyCopy(nil),
8469
Value: valCopy,
8570
UserMeta: []byte{item.UserMeta()},
8671
Version: item.Version(),
8772
ExpiresAt: item.ExpiresAt(),
8873
Meta: []byte{meta},
8974
}
90-
if err := writeTo(entry, w); err != nil {
91-
return err
92-
}
75+
list.Kv = append(list.Kv, kv)
9376

9477
switch {
9578
case item.DiscardEarlierVersions():
9679
// If we need to discard earlier versions of this item, add a delete
9780
// marker just below the current version.
98-
entry.Version -= 1
99-
entry.Meta = []byte{bitDelete}
100-
if err := writeTo(entry, w); err != nil {
101-
return err
102-
}
103-
skipKey = item.KeyCopy(skipKey)
81+
list.Kv = append(list.Kv, &protos.KVPair{
82+
Key: item.KeyCopy(nil),
83+
Version: item.Version() - 1,
84+
Meta: []byte{bitDelete},
85+
})
86+
return list, nil
10487

10588
case item.IsDeletedOrExpired():
106-
skipKey = item.KeyCopy(skipKey)
89+
return list, nil
90+
}
91+
}
92+
return list, nil
93+
}
94+
95+
var maxVersion uint64
96+
stream.Send = func(list *protos.KVList) error {
97+
for _, kv := range list.Kv {
98+
if maxVersion < kv.Version {
99+
maxVersion = kv.Version
100+
}
101+
if err := writeTo(kv, w); err != nil {
102+
return err
107103
}
108104
}
109-
tsNew = txn.readTs
110105
return nil
111-
})
112-
return tsNew, err
106+
}
107+
108+
if err := stream.Orchestrate(context.Background(), 8, "DB.Backup"); err != nil {
109+
return 0, err
110+
}
111+
return maxVersion, nil
112+
}
113+
114+
func writeTo(entry *protos.KVPair, w io.Writer) error {
115+
if err := binary.Write(w, binary.LittleEndian, uint64(entry.Size())); err != nil {
116+
return err
117+
}
118+
buf, err := entry.Marshal()
119+
if err != nil {
120+
return err
121+
}
122+
_, err = w.Write(buf)
123+
return err
113124
}
114125

115126
// Load reads a protobuf-encoded list of all entries from a reader and writes
@@ -164,10 +175,14 @@ func (db *DB) Load(r io.Reader) error {
164175
if err = e.Unmarshal(unmarshalBuf[:sz]); err != nil {
165176
return err
166177
}
178+
var userMeta byte
179+
if len(e.UserMeta) > 0 {
180+
userMeta = e.UserMeta[0]
181+
}
167182
entries = append(entries, &Entry{
168183
Key: y.KeyWithTs(e.Key, e.Version),
169184
Value: e.Value,
170-
UserMeta: e.UserMeta[0],
185+
UserMeta: userMeta,
171186
ExpiresAt: e.ExpiresAt,
172187
meta: e.Meta[0],
173188
})

0 commit comments

Comments
 (0)