Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions worker/online_restore_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"context"
"io"
"net/url"
"strings"
"time"

"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/ee/enc"
Expand All @@ -30,6 +32,10 @@ import (
"github.com/spf13/viper"
)

const (
errRestoreProposal = "cannot propose restore request"
)

// ProcessRestoreRequest verifies the backup data and sends a restore proposal to each group.
func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest) error {
if req == nil {
Expand Down Expand Up @@ -69,7 +75,7 @@ func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest) error {
reqCopy.GroupId = gid

go func() {
errCh <- proposeRestoreOrSend(ctx, reqCopy)
errCh <- tryRestoreProposal(ctx, reqCopy)
}()
}

Expand Down Expand Up @@ -99,6 +105,40 @@ func proposeRestoreOrSend(ctx context.Context, req *pb.RestoreRequest) error {
return err
}

func retriableRestoreError(err error) bool {
switch {
case err == conn.ErrNoConnection:
// Try to recover from temporary connection issues.
return true
case strings.Contains(err.Error(), "Raft isn't initialized yet"):
// Try to recover if raft has not been initialized.
return true
case strings.Contains(err.Error(), errRestoreProposal):
// Do not try to recover from other errors when sending the proposal.
return false
default:
// Try to recover from other errors (e.g wrong group, waiting for timestamp, etc).
return true
}
}

func tryRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {
var err error
for i := 0; i < 10; i++ {
err = proposeRestoreOrSend(ctx, req)
if err == nil {
return nil
}

if retriableRestoreError(err) {
time.Sleep(time.Second)
continue
}
return err
}
return err
}

// Restore implements the Worker interface.
func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.Status, error) {
var emptyRes pb.Status
Expand All @@ -114,7 +154,7 @@ func (w *grpcWorker) Restore(ctx context.Context, req *pb.RestoreRequest) (*pb.S

err := groups().Node.proposeAndWait(ctx, &pb.Proposal{Restore: req})
if err != nil {
return &emptyRes, errors.Wrapf(err, "cannot propose restore request")
return &emptyRes, errors.Wrapf(err, errRestoreProposal)
}

return &emptyRes, nil
Expand Down