Skip to content

Commit

Permalink
feat(Dgraph): Online restores allows to restore a specific backup. (#…
Browse files Browse the repository at this point in the history
…6411) (#6742)

Currently online restores will restore the entire backup series. This PR allows to
restore the backup series up to a specific backup.

Fixes DGRAPH-1648

Co-authored-by: Martin Martinez Rivera <martinmr@dgraph.io>
  • Loading branch information
akashjain971 and martinmr committed Oct 23, 2020
1 parent 7f63663 commit 5e753cc
Show file tree
Hide file tree
Showing 11 changed files with 515 additions and 374 deletions.
12 changes: 9 additions & 3 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ const adminTypes = `
"""
backupId: String
"""
Number of the backup within the backup series to be restored. Backups with a greater value
will be ignored. If the value is missing, the entire series will be restored.
"""
backupNum: Int
"""
Path to the key file needed to decrypt the backup. This file should be accessible
by all alphas in the group. The backup will be written using the encryption key
Expand Down Expand Up @@ -114,17 +120,17 @@ const adminTypes = `
"""
Secret key credential for the destination.
"""
"""
secretKey: String
"""
AWS session token, if required.
"""
"""
sessionToken: String
"""
Set to true to allow backing up to S3 or Minio bucket that requires no credentials.
"""
"""
anonymous: Boolean
}
Expand Down
14 changes: 12 additions & 2 deletions graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/pkg/errors"
)

type restoreInput struct {
Location string
BackupId string
BackupNum int
EncryptionKeyFile string
AccessKey string
SecretKey string
Expand All @@ -51,6 +53,7 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
req := pb.RestoreRequest{
Location: input.Location,
BackupId: input.BackupId,
BackupNum: uint64(input.BackupNum),
EncryptionKeyFile: input.EncryptionKeyFile,
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
Expand Down Expand Up @@ -93,6 +96,13 @@ func getRestoreInput(m schema.Mutation) (*restoreInput, error) {
}

var input restoreInput
err = json.Unmarshal(inputByts, &input)
return &input, schema.GQLWrapf(err, "couldn't get input argument")
if err := json.Unmarshal(inputByts, &input); err != nil {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}

if input.BackupNum < 0 {
err := errors.Errorf("backupNum value should be equal or greater than zero")
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}
return &input, nil
}
2 changes: 2 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ message RestoreRequest {
string vault_path = 13;
string vault_field = 14;
string vault_format = 15;

uint64 backup_num = 16;
}

message Proposal {
Expand Down
624 changes: 331 additions & 293 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

95 changes: 87 additions & 8 deletions systest/online-restore/online_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ import (
"github.com/dgraph-io/dgraph/testutil"
)

func sendRestoreRequest(t *testing.T, backupId string) int {
func sendRestoreRequest(t *testing.T, backupId string, backupNum int) int {
restoreRequest := fmt.Sprintf(`mutation restore() {
restore(input: {location: "/data/backup", backupId: "%s",
restore(input: {location: "/data/backup", backupId: "%s", backupNum: %d,
encryptionKeyFile: "/data/keys/enc_key"}) {
code
message
restoreId
}
}`, backupId)
}`, backupId, backupNum)

adminUrl := "http://localhost:8180/admin"
params := testutil.GraphQLParams{
Expand Down Expand Up @@ -171,7 +171,10 @@ func runQueries(t *testing.T, dg *dgo.Dgraph, shouldFail bool) {

resp, err := dg.NewTxn().Query(context.Background(), bodies[0])
if shouldFail {
require.Error(t, err)
if err != nil {
continue
}
require.False(t, testutil.EqualJSON(t, bodies[1], string(resp.GetJson()), "", true))
} else {
require.NoError(t, err)
require.True(t, testutil.EqualJSON(t, bodies[1], string(resp.GetJson()), "", true))
Expand Down Expand Up @@ -226,12 +229,88 @@ func TestBasicRestore(t *testing.T) {
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

restoreId := sendRestoreRequest(t, "youthful_rhodes3")
restoreId := sendRestoreRequest(t, "youthful_rhodes3", 0)
waitForRestore(t, restoreId, dg)
runQueries(t, dg, false)
runMutations(t, dg)
}

func TestRestoreBackupNum(t *testing.T) {
disableDraining(t)

conn, err := grpc.Dial(testutil.SockAddr, grpc.WithInsecure())
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))
runQueries(t, dg, true)

restoreId := sendRestoreRequest(t, "youthful_rhodes3", 1)
waitForRestore(t, restoreId, dg)
runQueries(t, dg, true)
runMutations(t, dg)
}

func TestRestoreBackupNumInvalid(t *testing.T) {
disableDraining(t)

conn, err := grpc.Dial(testutil.SockAddr, grpc.WithInsecure())
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))
runQueries(t, dg, true)

// Send a request with a backupNum greater than the number of manifests.
adminUrl := "http://localhost:8180/admin"
restoreRequest := fmt.Sprintf(`mutation restore() {
restore(input: {location: "/data/backup", backupId: "%s", backupNum: %d,
encryptionKeyFile: "/data/keys/enc_key"}) {
code
message
restoreId
}
}`, "youthful_rhodes3", 1000)

params := testutil.GraphQLParams{
Query: restoreRequest,
}
b, err := json.Marshal(params)
require.NoError(t, err)

resp, err := http.Post(adminUrl, "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err := ioutil.ReadAll(resp.Body)
bufString := string(buf)
require.NoError(t, err)
require.Contains(t, bufString, "not enough backups")

// Send a request with a negative backupNum value.
restoreRequest = fmt.Sprintf(`mutation restore() {
restore(input: {location: "/data/backup", backupId: "%s", backupNum: %d,
encryptionKeyFile: "/data/keys/enc_key"}) {
code
message
restoreId
}
}`, "youthful_rhodes3", -1)

params = testutil.GraphQLParams{
Query: restoreRequest,
}
b, err = json.Marshal(params)
require.NoError(t, err)

resp, err = http.Post(adminUrl, "application/json", bytes.NewBuffer(b))
require.NoError(t, err)
buf, err = ioutil.ReadAll(resp.Body)
bufString = string(buf)
require.NoError(t, err)
require.Contains(t, bufString, "backupNum value should be equal or greater than zero")
}

func TestMoveTablets(t *testing.T) {
disableDraining(t)

Expand All @@ -242,13 +321,13 @@ func TestMoveTablets(t *testing.T) {
ctx := context.Background()
require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true}))

restoreId := sendRestoreRequest(t, "youthful_rhodes3")
restoreId := sendRestoreRequest(t, "youthful_rhodes3", 0)
waitForRestore(t, restoreId, dg)
runQueries(t, dg, false)

// Send another restore request with a different backup. This backup has some of the
// same predicates as the previous one but they are stored in different groups.
restoreId = sendRestoreRequest(t, "blissful_hermann1")
restoreId = sendRestoreRequest(t, "blissful_hermann1", 0)
waitForRestore(t, restoreId, dg)

resp, err := dg.NewTxn().Query(context.Background(), `{
Expand Down Expand Up @@ -304,7 +383,7 @@ func TestListBackups(t *testing.T) {
encrypted
groups {
groupId
predicates
predicates
}
path
since
Expand Down
6 changes: 4 additions & 2 deletions testutil/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ func DiffJSONMaps(t *testing.T, wantMap, gotMap map[string]interface{},
if err != nil {
t.Error("Could not marshal JSON:", err)
}
t.Errorf("Expected JSON and actual JSON differ:\n%s",
sdiffJSON(wantBuf, gotBuf, savepath, quiet))
if !quiet {
t.Errorf("Expected JSON and actual JSON differ:\n%s",
sdiffJSON(wantBuf, gotBuf, savepath, quiet))
}
return false
}

Expand Down
68 changes: 48 additions & 20 deletions worker/backup_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"fmt"
"io"
"net/url"
"sort"

"github.com/dgraph-io/dgraph/protos/pb"

Expand Down Expand Up @@ -59,9 +60,11 @@ type UriHandler interface {
// These function calls are used by both Create and Load.
io.WriteCloser

// GetManfiest returns the list of manfiests for the given backup series ID
// at the specified location.
GetManifests(*url.URL, string) ([]*Manifest, error)
// GetManifests returns the list of manfiests for the given backup series ID
// and backup number at the specified location. If backupNum is set to zero,
// all the manifests for the backup series will be returned. If it's greater
// than zero, manifests from one to backupNum will be returned.
GetManifests(*url.URL, string, uint64) ([]*Manifest, error)

// GetLatestManifest reads the manifests at the given URL and returns the
// latest manifest.
Expand All @@ -78,12 +81,12 @@ type UriHandler interface {
// created after will be ignored.
// Objects implementing this function will be used for retrieving (dowload) backup files
// and loading the data into a DB. The restore CLI command uses this call.
Load(*url.URL, string, loadFn) LoadResult
Load(*url.URL, string, uint64, loadFn) LoadResult

// Verify checks that the specified backup can be restored to a cluster with the
// given groups. The last manifest of that backup should have the same number of
// groups as given list of groups.
Verify(*url.URL, string, []uint32) error
Verify(*url.URL, *pb.RestoreRequest, []uint32) error

// ListManifests will scan the provided URI and return the paths to the manifests stored
// in that location.
Expand Down Expand Up @@ -148,7 +151,8 @@ type loadFn func(reader io.Reader, groupId uint32, preds predicateSet) (uint64,

// LoadBackup will scan location l for backup files in the given backup series and load them
// sequentially. Returns the maximum Since value on success, otherwise an error.
func LoadBackup(location, backupId string, creds *Credentials, fn loadFn) LoadResult {
func LoadBackup(location, backupId string, backupNum uint64, creds *Credentials,
fn loadFn) LoadResult {
uri, err := url.Parse(location)
if err != nil {
return LoadResult{0, 0, err}
Expand All @@ -159,13 +163,13 @@ func LoadBackup(location, backupId string, creds *Credentials, fn loadFn) LoadRe
return LoadResult{0, 0, errors.Errorf("Unsupported URI: %v", uri)}
}

return h.Load(uri, backupId, fn)
return h.Load(uri, backupId, backupNum, fn)
}

// VerifyBackup will access the backup location and verify that the specified backup can
// be restored to the cluster.
func VerifyBackup(location, backupId string, creds *Credentials, currentGroups []uint32) error {
uri, err := url.Parse(location)
func VerifyBackup(req *pb.RestoreRequest, creds *Credentials, currentGroups []uint32) error {
uri, err := url.Parse(req.GetLocation())
if err != nil {
return err
}
Expand All @@ -175,7 +179,7 @@ func VerifyBackup(location, backupId string, creds *Credentials, currentGroups [
return errors.Errorf("Unsupported URI: %v", uri)
}

return h.Verify(uri, backupId, currentGroups)
return h.Verify(uri, req, currentGroups)
}

// ListBackupManifests scans location l for backup files and returns the list of manifests.
Expand Down Expand Up @@ -272,18 +276,18 @@ func backupName(since uint64, groupId uint32) string {
return fmt.Sprintf(backupNameFmt, since, groupId)
}

// verifyGroupsInBackup checks that the groups in the last manifest match the groups in
// the current cluster. If they don't match, the backup cannot be restored to the cluster.
func verifyGroupsInBackup(manifests []*Manifest, currentGroups []uint32) error {
var maxBackupNum uint64
var lastManifest *Manifest
for _, manifest := range manifests {
if manifest.BackupNum > maxBackupNum {
lastManifest = manifest
maxBackupNum = manifest.BackupNum
}
// verifyRequest verifies the manifests satisfy the requirements to process the given
// restore request.
func verifyRequest(req *pb.RestoreRequest, manifests []*Manifest, currentGroups []uint32) error {
if len(manifests) == 0 {
return errors.Errorf("No backups with the specified backup ID %s", req.GetBackupId())
}

if err := verifyManifests(manifests); err != nil {
return err
}

lastManifest := manifests[len(manifests)-1]
if len(currentGroups) != len(lastManifest.Groups) {
return errors.Errorf("groups in cluster and latest backup manifest differ")
}
Expand All @@ -295,3 +299,27 @@ func verifyGroupsInBackup(manifests []*Manifest, currentGroups []uint32) error {
}
return nil
}

func getManifests(manifests []*Manifest, backupId string,
backupNum uint64) ([]*Manifest, error) {

manifests, err := filterManifests(manifests, backupId)
if err != nil {
return nil, err
}

// Sort manifests in the ascending order of their BackupNum so that the first
// manifest corresponds to the first full backup and so on.
sort.Slice(manifests, func(i, j int) bool {
return manifests[i].BackupNum < manifests[j].BackupNum
})

if backupNum > 0 {
if len(manifests) < int(backupNum) {
return nil, errors.Errorf("not enough backups to restore manifest with backupNum %d",
backupNum)
}
manifests = manifests[:backupNum]
}
return manifests, nil
}
Loading

0 comments on commit 5e753cc

Please sign in to comment.