Skip to content

Commit

Permalink
feat(Dgraph): add utility to export backup data. (#6550)
Browse files Browse the repository at this point in the history
This utility allows to take a single backup (full or incremental) and export the data
and the schema inside it to RDF. Encrypted backups are supported.

Fixes DGRAPH-2465
  • Loading branch information
martinmr committed Sep 28, 2020
1 parent adfb1d4 commit 369a5c1
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 7 deletions.
1 change: 1 addition & 0 deletions dgraph/cmd/root_ee.go
Expand Up @@ -22,6 +22,7 @@ func init() {
subcommands = append(subcommands,
&backup.Restore,
&backup.LsBackup,
&backup.ExportBackup,
&acl.CmdAcl,
)
}
49 changes: 46 additions & 3 deletions ee/backup/run.go
Expand Up @@ -33,15 +33,23 @@ var Restore x.SubCommand
// LsBackup is the sub-command used to list the backups in a folder.
var LsBackup x.SubCommand

var ExportBackup x.SubCommand

var opt struct {
backupId, location, pdir, zero string
key x.SensitiveByteSlice
forceZero bool
backupId string
location string
pdir string
zero string
key x.SensitiveByteSlice
forceZero bool
destination string
format string
}

func init() {
initRestore()
initBackupLs()
initExportBackup()
}

func initRestore() {
Expand Down Expand Up @@ -247,3 +255,38 @@ func runLsbackupCmd() error {

return nil
}

func initExportBackup() {
ExportBackup.Cmd = &cobra.Command{
Use: "export_backup",
Short: "Export data inside single full or incremental backup.",
Long: ``,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
defer x.StartProfile(ExportBackup.Conf).Stop()
if err := runExportBackup(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
},
}

flag := ExportBackup.Cmd.Flags()
flag.StringVarP(&opt.location, "location", "l", "",
"Sets the location of the backup. Only file URIs are supported for now.")
flag.StringVarP(&opt.destination, "destination", "d", "",
"The folder to which export the backups.")
flag.StringVarP(&opt.format, "format", "f", "rdf",
"The format of the export output. Accepts a value of either rdf or json")
enc.RegisterFlags(flag)
}

func runExportBackup() error {
var err error
if opt.key, err = enc.ReadKey(ExportBackup.Conf); err != nil {
return err
}

exporter := worker.BackupExporter{}
return exporter.ExportBackup(opt.location, opt.destination, opt.format, opt.key)
}
18 changes: 14 additions & 4 deletions worker/export.go
Expand Up @@ -544,8 +544,17 @@ func export(ctx context.Context, in *pb.ExportRequest) (ExportedFiles, error) {
}
glog.Infof("Running export for group %d at timestamp %d.", in.GroupId, in.ReadTs)

return exportInternal(ctx, in, pstore, false)
}

// exportInternal contains the core logic to export a Dgraph database. If skipZero is set to
// false, the parts of this method that require to talk to zero will be skipped. This is useful
// when exporting a p directory directly from disk without a running cluster.
func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB,
skipZero bool) (ExportedFiles, error) {
uts := time.Unix(in.UnixTs, 0)
exportStorage, err := newExportStorage(in, fmt.Sprintf("dgraph.r%d.u%s", in.ReadTs, uts.UTC().Format("0102.1504")))
exportStorage, err := newExportStorage(in,
fmt.Sprintf("dgraph.r%d.u%s", in.ReadTs, uts.UTC().Format("0102.1504")))
if err != nil {
return nil, err
}
Expand All @@ -562,12 +571,13 @@ func export(ctx context.Context, in *pb.ExportRequest) (ExportedFiles, error) {
return nil, err
}

gqlSchemaWriter, err := exportStorage.openFile(fmt.Sprintf("g%02d%s", in.GroupId, ".gql_schema.gz"))
gqlSchemaWriter, err := exportStorage.openFile(
fmt.Sprintf("g%02d%s", in.GroupId, ".gql_schema.gz"))
if err != nil {
return nil, err
}

stream := pstore.NewStreamAt(in.ReadTs)
stream := db.NewStreamAt(in.ReadTs)
stream.LogPrefix = "Export"
stream.ChooseKey = func(item *badger.Item) bool {
// Skip exporting delete data including Schema and Types.
Expand All @@ -594,7 +604,7 @@ func export(ctx context.Context, in *pb.ExportRequest) (ExportedFiles, error) {
return false
}

if !pk.IsType() {
if !pk.IsType() && !skipZero {
if servesTablet, err := groups().ServesTablet(pk.Attr); err != nil || !servesTablet {
return false
}
Expand Down
147 changes: 147 additions & 0 deletions worker/file_handler.go
Expand Up @@ -13,15 +13,23 @@
package worker

import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"

Expand All @@ -34,6 +42,12 @@ type fileHandler struct {
fp *os.File
}

// BackupExporter is an alias of fileHandler so that this struct can be used
// by the export_backup command.
type BackupExporter struct {
fileHandler
}

// readManifest reads a manifest file at path using the handler.
// Returns nil on success, otherwise an error.
func (h *fileHandler) readManifest(path string, m *Manifest) error {
Expand Down Expand Up @@ -240,3 +254,136 @@ func pathExist(path string) bool {
}
return !os.IsNotExist(err) && !os.IsPermission(err)
}

func (h *fileHandler) ExportBackup(backupDir, exportDir, format string,
key x.SensitiveByteSlice) error {
if format != "json" && format != "rdf" {
return errors.Errorf("invalid format %s", format)
}

// Create exportDir and temporary folder to store the restored backup.
var err error
exportDir, err = filepath.Abs(exportDir)
if err != nil {
return errors.Wrapf(err, "cannot convert path %s to absolute path", exportDir)
}
if err := os.MkdirAll(exportDir, 0755); err != nil {
return errors.Wrapf(err, "cannot create dir %s", exportDir)
}
tmpDir, err := ioutil.TempDir("", "export_backup")
if err != nil {
return errors.Wrapf(err, "cannot create temp dir")
}

// Function to load the a single backup file.
loadFn := func(r io.Reader, groupId uint32, preds predicateSet) (uint64, error) {
dir := filepath.Join(tmpDir, fmt.Sprintf("p%d", groupId))

r, err := enc.GetReader(key, r)
if err != nil {
return 0, err
}

gzReader, err := gzip.NewReader(r)
if err != nil {
if len(key) != 0 {
err = errors.Wrap(err,
"Unable to read the backup. Ensure the encryption key is correct.")
}
return 0, errors.Wrapf(err, "cannot create gzip reader")
}
// The badger DB should be opened only after creating the backup
// file reader and verifying the encryption in the backup file.
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(false).
WithTableLoadingMode(options.MemoryMap).
WithValueThreshold(1 << 10).
WithNumVersionsToKeep(math.MaxInt32).
WithEncryptionKey(key))

if err != nil {
return 0, errors.Wrapf(err, "cannot open DB at %s", dir)
}
defer db.Close()
_, err = loadFromBackup(db, gzReader, 0, preds)
if err != nil {
return 0, errors.Wrapf(err, "cannot load backup")
}
return 0, x.WriteGroupIdFile(dir, uint32(groupId))
}

// Read manifest from folder.
manifest := &Manifest{}
manifestPath := filepath.Join(backupDir, backupManifest)
if err := h.ReadManifest(manifestPath, manifest); err != nil {
return errors.Wrapf(err, "cannot read manifest at %s", manifestPath)
}
manifest.Path = manifestPath
if manifest.Since == 0 || len(manifest.Groups) == 0 {
return errors.Errorf("no data found in backup")
}

// Restore backup to disk.
for gid := range manifest.Groups {
file := filepath.Join(backupDir, backupName(manifest.Since, gid))
fp, err := os.Open(file)
if err != nil {
return errors.Wrapf(err, "cannot open backup file at %s", file)
}
defer fp.Close()

// Only restore the predicates that were assigned to this group at the time
// of the last backup.
predSet := manifest.getPredsInGroup(gid)

_, err = loadFn(fp, gid, predSet)
if err != nil {
return err
}
}

// Export the data from the p directories produced by the last step.
ch := make(chan error, len(manifest.Groups))
for gid := range manifest.Groups {
go func(group uint32) {
dir := filepath.Join(tmpDir, fmt.Sprintf("p%d", group))
db, err := badger.OpenManaged(badger.DefaultOptions(dir).
WithSyncWrites(false).
WithTableLoadingMode(options.MemoryMap).
WithValueThreshold(1 << 10).
WithNumVersionsToKeep(math.MaxInt32).
WithEncryptionKey(key))

if err != nil {
ch <- errors.Wrapf(err, "cannot open DB at %s", dir)
return
}
defer db.Close()

req := &pb.ExportRequest{
GroupId: group,
ReadTs: manifest.Since,
UnixTs: time.Now().Unix(),
Format: format,
Destination: exportDir,
}

_, err = exportInternal(context.Background(), req, db, true)
ch <- errors.Wrapf(err, "cannot export data inside DB at %s", dir)
}(gid)
}

for i := 0; i < len(manifest.Groups); i++ {
err := <-ch
if err != nil {
return err
}
}

// Clean up temporary directory.
if err := os.RemoveAll(tmpDir); err != nil {
return errors.Wrapf(err, "cannot remove temp directory at %s", tmpDir)
}

return nil
}

0 comments on commit 369a5c1

Please sign in to comment.