Skip to content

Commit

Permalink
minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 committed Aug 30, 2023
1 parent a43760a commit 83f03f3
Show file tree
Hide file tree
Showing 12 changed files with 578 additions and 588 deletions.
58 changes: 45 additions & 13 deletions dgraphtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os/exec"
"strings"
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/pkg/errors"

"github.com/dgraph-io/dgo/v230"
"github.com/dgraph-io/dgo/v230/protos/api"
"github.com/dgraph-io/dgraph/graphql/schema"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)

Expand All @@ -43,7 +46,7 @@ type Cluster interface {
AlphasLogs() ([]string, error)
AssignUids(gc *dgo.Dgraph, num uint64) error
GetVersion() string
IsAclEnable() bool
GetEncKeyPath() (string, error)
}

type GrpcClient struct {
Expand Down Expand Up @@ -401,8 +404,9 @@ func (hc *HTTPClient) WaitForTask(taskId string) error {
}

// Restore performs restore on Dgraph cluster from the given path to backup
func (hc *HTTPClient) Restore(c Cluster, backupPath string,
backupId string, incrFrom, backupNum int, fromNamespace int64) error {
func (hc *HTTPClient) Restore(c Cluster, backupPath string, backupId string,
incrFrom, backupNum int, oneNamespaceOnly bool, fromNamespace uint64) error {

// incremental restore was introduced in commit 8b3712e93ed2435bea52d957f7b69976c6cfc55b
incrRestoreSupported, err := IsHigherVersion(c.GetVersion(), "8b3712e93ed2435bea52d957f7b69976c6cfc55b")
if err != nil {
Expand All @@ -412,9 +416,9 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
return errors.New("incremental restore is not supported by the cluster")
}

var encKey string
if c.IsAclEnable() {
encKey = encKeyMountPath
encKey, err := c.GetEncKeyPath()
if err != nil {
return errors.Wrapf(err, "error getting encryption key path")
}

var varPart, queryPart string
Expand All @@ -423,15 +427,15 @@ func (hc *HTTPClient) Restore(c Cluster, backupPath string,
queryPart = " incrementalFrom: $incrFrom,"
}
query := fmt.Sprintf(`mutation restore($location: String!, $backupId: String,
%v$backupNum: Int, $encKey: String , $namespace: Int) {
restore(input: {location: $location, backupId: $backupId,%v
backupNum: $backupNum, encryptionKeyFile: $encKey,namespace: $namespace}) {
%v$backupNum: Int, $encKey: String, $oneNamespaceOnly: Boolean, $fromNamespace: UInt64) {
restore(input: {location: $location, backupId: $backupId,%v backupNum: $backupNum,
encryptionKeyFile: $encKey, oneNamespaceOnly: $oneNamespaceOnly, fromNamespace: $fromNamespace}) {
code
message
}
}`, varPart, queryPart)
vars := map[string]interface{}{"location": backupPath, "backupId": backupId,
"backupNum": backupNum, "encKey": encKey, "namespace": fromNamespace}
vars := map[string]interface{}{"location": backupPath, "backupId": backupId, "backupNum": backupNum,
"encKey": encKey, "oneNamespaceOnly": oneNamespaceOnly, "fromNamespace": fromNamespace}
if incrRestoreSupported {
vars["incrFrom"] = incrFrom
}
Expand Down Expand Up @@ -609,18 +613,46 @@ func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
if err != nil {
return nil, errors.Wrap(err, "error getting zero state http response")
}
defer func() {
if err := response.Body.Close(); err != nil {
log.Printf("[WARNING] error closing body: %v", err)
}
}()

body, err := io.ReadAll(response.Body)
if err != nil {
return nil, errors.New("error reading zero state response body")
return nil, errors.Wrapf(err, "error reading zero state response body")
}
var stateResponse LicenseResponse
if err := json.Unmarshal(body, &stateResponse); err != nil {
return nil, errors.New("error unmarshaling zero state response")
return nil, errors.Wrapf(err, "error unmarshaling zero state response")
}

return &stateResponse, nil
}

func (hc *HTTPClient) GetState() (*pb.MembershipState, error) {
resp, err := http.Get(hc.stateURL)
if err != nil {
return nil, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
log.Printf("[WARNING] error closing body: %v", err)
}
}()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrapf(err, "error reading zero state response body")
}
var state pb.MembershipState
if err := jsonpb.Unmarshal(bytes.NewReader(body), &state); err != nil {
return nil, errors.Wrapf(err, "error unmarshalling zero state response")
}
return &state, nil
}

func (hc *HTTPClient) PostDqlQuery(query string) ([]byte, error) {
req, err := http.NewRequest(http.MethodPost, hc.dqlURL, bytes.NewBufferString(query))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dgraphtest/compose_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ func (c *ComposeCluster) GetVersion() string {
return localVersion
}

func (c *ComposeCluster) IsAclEnable() bool {
panic(errNotImplemented)
func (c *ComposeCluster) GetEncKeyPath() (string, error) {
return "", errNotImplemented
}
41 changes: 9 additions & 32 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package dgraphtest

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -37,16 +36,13 @@ import (
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/volume"
docker "github.com/docker/docker/client"
"github.com/gogo/protobuf/jsonpb"
"github.com/golang-jwt/jwt/v5"
"github.com/golang/glog"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/dgraph-io/dgo/v230"
"github.com/dgraph-io/dgo/v230/protos/api"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)

Expand Down Expand Up @@ -580,7 +576,7 @@ func (c *LocalCluster) Upgrade(version string, strategy UpgradeStrategy) error {
return errors.Wrapf(err, "error during login after upgrade")
}
}
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1, -1); err != nil {
if err := hc.Restore(c, DefaultBackupDir, "", 0, 1, false, 0); err != nil {
return errors.Wrap(err, "error doing restore during upgrade")
}
if err := WaitForRestore(c); err != nil {
Expand Down Expand Up @@ -811,34 +807,15 @@ func (c *LocalCluster) GetVersion() string {
return c.conf.version
}

func (c *LocalCluster) IsAclEnable() bool {
return c.conf.acl
}

func (c *LocalCluster) GetState() (pb.MembershipState, error) {
var state pb.MembershipState
hc, err := c.HTTPClient()
if err != nil {
return state, err
}
resp, err := http.Get(hc.stateURL)
defer func() {
if err := resp.Body.Close(); err != nil {
glog.Warningf("error closing body: %v", err)
}
}()
if err != nil {
return state, err
}
stateRes, err := io.ReadAll(resp.Body)
if err != nil {
return state, err
}
err = jsonpb.Unmarshal(bytes.NewReader(stateRes), &state)
if err != nil {
return state, err
// GetEncKeyPath returns the path to the encryption key file when encryption is enabled.
// It returns an empty string otherwise. The path to the encryption file is valid only
// inside the alpha container.
func (c *LocalCluster) GetEncKeyPath() (string, error) {
if c.conf.encryption {
return encKeyMountPath, nil
}
return state, nil

return "", nil
}

func (c *LocalCluster) printAllLogs() error {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ require (
golang.org/x/text v0.12.0
golang.org/x/tools v0.9.3
google.golang.org/grpc v1.56.2
google.golang.org/protobuf v1.31.0
gopkg.in/square/go-jose.v2 v2.3.1
gopkg.in/yaml.v2 v2.4.0
)
Expand Down Expand Up @@ -142,7 +143,6 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc/examples v0.0.0-20230821201920-d51b3f41716d // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/DataDog/dd-trace-go.v1 v1.22.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
14 changes: 11 additions & 3 deletions graphql/admin/endpoints_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,17 @@ const adminTypes = `
anonymous: Boolean
"""
Namespace for the restore
"""
namespace: Int
oneNamespaceOnly should be set if only one namespace is to be restored from
the backup. The namespace value should be set into the fromNamespace parameter.
The given namespace is always restored into namespace 0 in the cluster.
"""
oneNamespaceOnly: Boolean
"""
fromNamespace is the namespace that will be restored into the namespace 0 of the cluster.
If oneNamespaceOnly is set, only the fromNamespace will be restored into the new cluster.
"""
fromNamespace: UInt64
}
type RestorePayload {
Expand Down
14 changes: 4 additions & 10 deletions graphql/admin/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type restoreInput struct {
VaultPath string
VaultField string
VaultFormat string
Namespace int64
OneNamespaceOnly bool
FromNamespace uint64
}

func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved, bool) {
Expand Down Expand Up @@ -76,7 +77,8 @@ func resolveRestore(ctx context.Context, m schema.Mutation) (*resolve.Resolved,
VaultPath: input.VaultPath,
VaultField: input.VaultField,
VaultFormat: input.VaultFormat,
Namespace: input.Namespace,
OneNamespaceOnly: input.OneNamespaceOnly,
FromNamespace: input.FromNamespace,
}

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -123,13 +125,5 @@ func getRestoreInput(m schema.Mutation) (*restoreInput, error) {
return nil, schema.GQLWrapf(err, "couldn't get input argument")
}

var namespaceFromReq interface{}
if input, ok := inputArg.(map[string]interface{}); ok {
namespaceFromReq = input["namespace"]
}
if namespaceFromReq == nil {
input.Namespace = -1
}

return &input, nil
}
3 changes: 2 additions & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ message RestoreRequest {
uint64 backup_num = 16;
uint64 incremental_from = 17;
bool is_partial = 18;
int64 namespace = 19;
bool one_namespace_only = 19;
uint64 from_namespace = 20;
}

message Proposal {
Expand Down
Loading

0 comments on commit 83f03f3

Please sign in to comment.