Skip to content

Commit

Permalink
Use pre-allocated protobufs during backups.
Browse files Browse the repository at this point in the history
Backups convert the values stored in disk into an intermediate format
(for backwards compatibility reasons). This creates a lot of
pb.PostingList and pb.BackupPostingList objects that are discarded
almost immediatly. Using preallocated protobufs (one per thread used by
the stream object) should avoid these allocations altogether.

Fixes DGRAPH-1312
  • Loading branch information
martinmr committed May 22, 2020
1 parent cfbcf2b commit 2396d9c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
43 changes: 39 additions & 4 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,40 @@ import (
"github.com/dgraph-io/dgraph/x"
)

const (
// backupNumGo is the number of go routines used by the backup stream writer.
backupNumGo = 16
)

// Processor handles the different stages of the backup process.
type Processor struct {
// DB is the Badger pstore managed by this node.
DB *badger.DB
// Request stores the backup request containing the parameters for this backup.
Request *pb.BackupRequest

// plList is an array of pre-allocated pb.PostingList objects.
plList []*pb.PostingList
// bplList is an array of pre-allocated pb.BackupPostingList objects.
bplList []*pb.BackupPostingList
}

func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *Processor {
bp := &Processor{
DB: db,
Request: req,
plList: make([]*pb.PostingList, backupNumGo),
bplList: make([]*pb.BackupPostingList, backupNumGo),
}

for i := range bp.plList {
bp.plList[i] = &pb.PostingList{}
}
for i := range bp.bplList {
bp.bplList[i] = &pb.BackupPostingList{}
}

return bp
}

// LoadResult holds the output of a Load operation.
Expand Down Expand Up @@ -141,6 +169,7 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {

stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
stream.NumGo = backupNumGo
stream.KeyToList = pr.toBackupList
stream.ChooseKey = func(item *badger.Item) bool {
parsedKey, err := x.Parse(item.Key())
Expand Down Expand Up @@ -259,7 +288,7 @@ func (pr *Processor) toBackupList(key []byte, itr *badger.Iterator) (*bpb.KVList
}
kv.Key = backupKey

backupPl, err := toBackupPostingList(kv.Value)
backupPl, err := pr.toBackupPostingList(kv.Value, itr.ThreadId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -304,12 +333,18 @@ func toBackupKey(key []byte) ([]byte, error) {
return backupKey, nil
}

func toBackupPostingList(val []byte) ([]byte, error) {
pl := &pb.PostingList{}
func (pr *Processor) toBackupPostingList(val []byte, threadNum int) ([]byte, error) {
pl := pr.plList[threadNum]
bpl := pr.bplList[threadNum]
pl.Reset()
bpl.Reset()

if err := pl.Unmarshal(val); err != nil {
return nil, errors.Wrapf(err, "while reading posting list")
}
backupVal, err := posting.ToBackupPostingList(pl).Marshal()
posting.ToBackupPostingList(pl, bpl)
backupVal, err := bpl.Marshal()

if err != nil {
return nil, errors.Wrapf(err, "while converting posting list for backup")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3
github.com/dgrijalva/jwt-go v3.2.0+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkE
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592 h1:j6j3yunDbktI4H3tbj3grt2enO4EPbhstU6Tb8HwqdQ=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200515210839-ef28ef36b592/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27 h1:2oSyH418QYuj2IhW5XWvSJqu5S2wT6czY2ZmPcLhgB4=
github.com/dgraph-io/badger/v2 v2.0.1-rc1.0.20200522174526-6eaa5009af27/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM=
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 h1:5leDFqGys055YO3TbghBhk/QdRPEwyLPdgsSJfiR20I=
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
Expand Down
8 changes: 3 additions & 5 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -1486,17 +1486,15 @@ func (l *List) PartSplits() []uint64 {
}

// ToBackupPostingList converts a posting list into its representation used for storing backups.
func ToBackupPostingList(l *pb.PostingList) *pb.BackupPostingList {
bl := pb.BackupPostingList{}
if l == nil {
return &bl
func ToBackupPostingList(l *pb.PostingList, bl *pb.BackupPostingList) {
if l == nil || bl == nil {
return
}

bl.Uids = codec.Decode(l.Pack, 0)
bl.Postings = l.Postings
bl.CommitTs = l.CommitTs
bl.Splits = l.Splits
return &bl
}

// FromBackupPostingList converts a posting list in the format used for backups to a
Expand Down
2 changes: 1 addition & 1 deletion worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.Status,
return nil, err
}

bp := &backup.Processor{DB: pstore, Request: req}
bp := backup.NewBackupProcessor(pstore, req)
return bp.WriteBackup(ctx)
}

Expand Down

0 comments on commit 2396d9c

Please sign in to comment.