Skip to content

Commit

Permalink
add namespace aware restore functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaji-dgraph committed Sep 5, 2023
1 parent 4c9448a commit c10fcf0
Show file tree
Hide file tree
Showing 15 changed files with 957 additions and 392 deletions.
93 changes: 87 additions & 6 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os/exec"
"strings"
Expand All @@ -43,6 +44,7 @@ type Cluster interface {
AlphasLogs() ([]string, error)
AssignUids(gc *dgo.Dgraph, num uint64) error
GetVersion() string
GetEncKeyPath() (string, error)
}

type GrpcClient struct {
Expand Down Expand Up @@ -374,8 +376,8 @@ func (hc *HTTPClient) WaitForTask(taskId string) error {
}

// Restore performs restore on Dgraph cluster from the given path to backup
func (hc *HTTPClient) Restore(c Cluster, backupPath string,
backupId string, incrFrom, backupNum int, encKey string) error {
func (hc *HTTPClient) Restore(c Cluster, backupPath string, backupId string,
incrFrom, backupNum int) error {

// incremental restore was introduced in commit 8b3712e93ed2435bea52d957f7b69976c6cfc55b
incrRestoreSupported, err := IsHigherVersion(c.GetVersion(), "8b3712e93ed2435bea52d957f7b69976c6cfc55b")
Expand All @@ -386,6 +388,14 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
return errors.New("incremental restore is not supported by the cluster")
}

if err != nil {
return errors.Wrapf(err, "error checking namespace aware restore support")
}
encKey, err := c.GetEncKeyPath()
if err != nil {
return errors.Wrapf(err, "error getting encryption key path")
}

var varPart, queryPart string
if incrRestoreSupported {
varPart = "$incrFrom: Int, "
Expand All @@ -399,8 +409,8 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
message
}
}`, varPart, queryPart)
vars := map[string]interface{}{"location": backupPath, "backupId": backupId,
"backupNum": backupNum, "encKey": encKey}
vars := map[string]interface{}{"location": backupPath, "backupId": backupId, "backupNum": backupNum,
"encKey": encKey}
if incrRestoreSupported {
vars["incrFrom"] = incrFrom
}
Expand Down Expand Up @@ -429,6 +439,71 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
return nil
}

// RestoreTenant restore specific namespace
func (hc *HTTPClient) RestoreTenant(c Cluster, backupPath string, backupId string,
incrFrom, backupNum int, isNamespaceAwareRestore bool, fromNamespace uint64) error {

// incremental restore was introduced in commit 8b3712e93ed2435bea52d957f7b69976c6cfc55b
incrRestoreSupported, err := IsHigherVersion(c.GetVersion(), "8b3712e93ed2435bea52d957f7b69976c6cfc55b")
if err != nil {
return errors.Wrapf(err, "error checking incremental restore support")
}
if !incrRestoreSupported && incrFrom != 0 {
return errors.New("incremental restore is not supported by the cluster")
}

if err != nil {
return errors.Wrapf(err, "error checking namespace aware restore support")
}
encKey, err := c.GetEncKeyPath()
if err != nil {
return errors.Wrapf(err, "error getting encryption key path")
}

var varPart, queryPart string
if incrRestoreSupported {
varPart = "$incrFrom: Int, "
queryPart = " incrementalFrom: $incrFrom,"
}

query := fmt.Sprintf(`mutation restoreTenant( $location: String!, $backupId: String,
%v$backupNum: Int, $encKey: String,$fromNamespace: Int! ) {
restoreTenant(input: {restoreInput: { location: $location, backupId: $backupId,%v backupNum: $backupNum,
encryptionKeyFile: $encKey },fromNamespace:$fromNamespace}) {
code
message
}
}`, varPart, queryPart)
vars := map[string]interface{}{"location": backupPath, "backupId": backupId, "backupNum": backupNum,
"encKey": encKey, "fromNamespace": fromNamespace}
if incrRestoreSupported {
vars["incrFrom"] = incrFrom
}

params := GraphQLParams{
Query: query,
Variables: vars,
}
resp, err := hc.RunGraphqlQuery(params, true)
if err != nil {
return err
}

var restoreResp struct {
RestoreTenant struct {
Code string
Message string
}
}
if err := json.Unmarshal(resp, &restoreResp); err != nil {
return errors.Wrap(err, "error unmarshalling restore response")
}
if restoreResp.RestoreTenant.Code != "Success" {
return fmt.Errorf("restore failed, response: %+v", restoreResp.RestoreTenant)
}
return nil
}

// WaitForRestore waits for restore to complete on all alphas
func WaitForRestore(c Cluster) error {
loop:
Expand Down Expand Up @@ -578,13 +653,19 @@ func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
if err != nil {
return nil, errors.Wrap(err, "error getting zero state http response")
}
defer func() {
if err := response.Body.Close(); err != nil {
log.Printf("[WARNING] error closing body: %v", err)
}
}()

body, err := io.ReadAll(response.Body)
if err != nil {
return nil, errors.New("error reading zero state response body")
return nil, errors.Wrapf(err, "error reading zero state response body")
}
var stateResponse LicenseResponse
if err := json.Unmarshal(body, &stateResponse); err != nil {
return nil, errors.New("error unmarshaling zero state response")
return nil, errors.Wrapf(err, "error unmarshaling zero state response")
}

return &stateResponse, nil
Expand Down
4 changes: 4 additions & 0 deletions dgraphtest/compose_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ func (c *ComposeCluster) AssignUids(client *dgo.Dgraph, num uint64) error {
func (c *ComposeCluster) GetVersion() string {
return localVersion
}

func (c *ComposeCluster) GetEncKeyPath() (string, error) {
return "", errNotImplemented
}
6 changes: 3 additions & 3 deletions dgraphtest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type ClusterConfig struct {
aclAlg jwt.SigningMethod
encryption bool
version string
volumes map[string]string
Volumes map[string]string
refillInterval time.Duration
uidLease int
portOffset int // exposed port offset for grpc/http port for both alpha/zero
Expand All @@ -131,7 +131,7 @@ func NewClusterConfig() ClusterConfig {
replicas: 1,
verbosity: 2,
version: localVersion,
volumes: map[string]string{DefaultBackupDir: defaultBackupVol, DefaultExportDir: defaultExportVol},
Volumes: map[string]string{DefaultBackupDir: defaultBackupVol, DefaultExportDir: defaultExportVol},
refillInterval: 20 * time.Second,
uidLease: 50,
portOffset: -1,
Expand Down Expand Up @@ -192,7 +192,7 @@ func (cc ClusterConfig) WithVersion(version string) ClusterConfig {
// WithAlphaVolume allows creating a shared volumes across alphas with
// name volname and mount directory specified as dir inside the container
func (cc ClusterConfig) WithAlphaVolume(volname, dir string) ClusterConfig {
cc.volumes[dir] = volname
cc.Volumes[dir] = volname
return cc
}

Expand Down
2 changes: 1 addition & 1 deletion dgraphtest/dgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (a *alpha) mounts(c *LocalCluster) ([]mount.Mount, error) {
})
}

for dir, vol := range c.conf.volumes {
for dir, vol := range c.conf.Volumes {
mounts = append(mounts, mount.Mount{
Type: mount.TypeVolume,
Source: vol,
Expand Down
21 changes: 14 additions & 7 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (c *LocalCluster) init() error {
return errors.Wrap(err, "error while making binariesPath")
}

for _, vol := range c.conf.volumes {
for _, vol := range c.conf.Volumes {
if err := c.createVolume(vol); err != nil {
return err
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func (c *LocalCluster) Cleanup(verbose bool) {

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()
for _, vol := range c.conf.volumes {
for _, vol := range c.conf.Volumes {
if err := c.dcli.VolumeRemove(ctx, vol, true); err != nil {
log.Printf("[WARNING] error removing volume [%v]: %v", vol, err)
}
Expand Down Expand Up @@ -606,10 +606,6 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
return err
}

var encPath string
if c.conf.encryption {
encPath = encKeyMountPath
}
hc, err = c.HTTPClient()
if err != nil {
return errors.Wrapf(err, "error creating HTTP client after upgrade")
Expand All @@ -619,7 +615,7 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
return errors.Wrapf(err, "error during login after upgrade")
}
}
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1, encPath); err != nil {
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1); err != nil {
return errors.Wrap(err, "error doing restore during upgrade")
}
if err := WaitForRestore(c); err != nil {
Expand Down Expand Up @@ -850,6 +846,17 @@ func (c *LocalCluster) GetVersion() string {
return c.conf.version
}

// GetEncKeyPath returns the path to the encryption key file when encryption is enabled.
// It returns an empty string otherwise. The path to the encryption file is valid only
// inside the alpha container.
func (c *LocalCluster) GetEncKeyPath() (string, error) {
if c.conf.encryption {
return encKeyMountPath, nil
}

return "", nil
}

func (c *LocalCluster) printAllLogs() error {
log.Printf("[INFO] all logs for cluster with prefix [%v] are below!", c.conf.prefix)
var finalErr error
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ require (
golang.org/x/text v0.12.0
golang.org/x/tools v0.9.3
google.golang.org/grpc v1.56.2
google.golang.org/protobuf v1.31.0
gopkg.in/square/go-jose.v2 v2.3.1
gopkg.in/yaml.v2 v2.4.0
)
Expand Down Expand Up @@ -143,6 +142,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc/examples v0.0.0-20230821201920-d51b3f41716d // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.22.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ func newAdminResolverFactory() resolve.ResolverFactory {
"moveTablet": resolveMoveTablet,
"assign": resolveAssign,
"enterpriseLicense": resolveEnterpriseLicense,
"restoreTenant": resolveTenantRestore,
}

rf := resolverFactoryWithErrorMsg(errResolverNotFound).
Expand Down
14 changes: 14 additions & 0 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ const adminTypes = `
taskId: String
}
input RestoreTenantInput {
restoreInput: RestoreInput
"""
fromNamespace is the namespace that will be restored into the namespace 0 of the cluster.
"""
fromNamespace: Int!
}
input RestoreInput {
"""
Expand Down Expand Up @@ -478,6 +487,11 @@ const adminMutations = `
"""
restore(input: RestoreInput!) : RestorePayload
"""
Restore given namespace into namespace 0 of the cluster
"""
restoreTenant(input: RestoreTenantInput!) : RestorePayload
"""
Login to Dgraph. Successful login results in a JWT that can be used in future requests.
If login is not successful an error is returned.
Expand Down
Loading

0 comments on commit c10fcf0

Please sign in to comment.