Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pre-allocated protobufs during backups. #5508

Merged
merged 2 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 39 additions & 4 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,40 @@ import (
"github.com/dgraph-io/dgraph/x"
)

const (
// backupNumGo is the number of go routines used by the backup stream writer.
backupNumGo = 16
)

// Processor handles the different stages of the backup process.
type Processor struct {
// DB is the Badger pstore managed by this node.
DB *badger.DB
// Request stores the backup request containing the parameters for this backup.
Request *pb.BackupRequest

// plList is an array of pre-allocated pb.PostingList objects.
plList []*pb.PostingList
// bplList is an array of pre-allocated pb.BackupPostingList objects.
bplList []*pb.BackupPostingList
}

func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *Processor {
bp := &Processor{
DB: db,
Request: req,
plList: make([]*pb.PostingList, backupNumGo),
bplList: make([]*pb.BackupPostingList, backupNumGo),
}

for i := range bp.plList {
bp.plList[i] = &pb.PostingList{}
}
for i := range bp.bplList {
bp.bplList[i] = &pb.BackupPostingList{}
}

return bp
}

// LoadResult holds the output of a Load operation.
Expand Down Expand Up @@ -141,6 +169,7 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {

stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
stream.NumGo = backupNumGo
stream.KeyToList = pr.toBackupList
stream.ChooseKey = func(item *badger.Item) bool {
parsedKey, err := x.Parse(item.Key())
Expand Down Expand Up @@ -259,7 +288,7 @@ func (pr *Processor) toBackupList(key []byte, itr *badger.Iterator) (*bpb.KVList
}
kv.Key = backupKey

backupPl, err := toBackupPostingList(kv.Value)
backupPl, err := pr.toBackupPostingList(kv.Value, itr.ThreadId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -304,12 +333,18 @@ func toBackupKey(key []byte) ([]byte, error) {
return backupKey, nil
}

func toBackupPostingList(val []byte) ([]byte, error) {
pl := &pb.PostingList{}
func (pr *Processor) toBackupPostingList(val []byte, threadNum int) ([]byte, error) {
pl := pr.plList[threadNum]
bpl := pr.bplList[threadNum]
pl.Reset()
bpl.Reset()

if err := pl.Unmarshal(val); err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}
backupVal, err := posting.ToBackupPostingList(pl).Marshal()
posting.ToBackupPostingList(pl, bpl)
backupVal, err := bpl.Marshal()

if err != nil {
return nil, errors.Wrapf(err, "while converting posting list for backup")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3
github.com/dgrijalva/jwt-go v3.2.0+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkE
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592 h1:j6j3yunDbktI4H3tbj3grt2enO4EPbhstU6Tb8HwqdQ=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27 h1:2oSyH418QYuj2IhW5XWvSJqu5S2wT6czY2ZmPcLhgB4=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 h1:5leDFqGys055YO3TbghBhk/QdRPEwyLPdgsSJfiR20I=
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
Expand Down
8 changes: 3 additions & 5 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,17 +1486,15 @@ func (l *List) PartSplits() []uint64 {
}

// ToBackupPostingList converts a posting list into its representation used for storing backups.
func ToBackupPostingList(l *pb.PostingList) *pb.BackupPostingList {
bl := pb.BackupPostingList{}
if l == nil {
return &bl
func ToBackupPostingList(l *pb.PostingList, bl *pb.BackupPostingList) {
if l == nil || bl == nil {
return
}

bl.Uids = codec.Decode(l.Pack, 0)
bl.Postings = l.Postings
bl.CommitTs = l.CommitTs
bl.Splits = l.Splits
return &bl
}

// FromBackupPostingList converts a posting list in the format used for backups to a
Expand Down
2 changes: 1 addition & 1 deletion worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.Status,
return nil, err
}

bp := &backup.Processor{DB: pstore, Request: req}
bp := backup.NewBackupProcessor(pstore, req)
return bp.WriteBackup(ctx)
}

Expand Down