Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Backup): use validReadTs from manifest for backward compatibility (#7601) #7863

Merged
merged 2 commits into from May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions ee/backup/run.go
Expand Up @@ -90,6 +90,7 @@ func runLsbackupCmd() error {
type backupEntry struct {
Path string `json:"path"`
Since uint64 `json:"since"`
ReadTs uint64 `json:"read_ts"`
BackupId string `json:"backup_id"`
BackupNum uint64 `json:"backup_num"`
Encrypted bool `json:"encrypted"`
Expand All @@ -105,7 +106,8 @@ func runLsbackupCmd() error {

be := backupEntry{
Path: manifest.Path,
Since: manifest.Since,
Since: manifest.SinceTsDeprecated,
ReadTs: manifest.ReadTs,
BackupId: manifest.BackupId,
BackupNum: manifest.BackupNum,
Encrypted: manifest.Encrypted,
Expand Down Expand Up @@ -271,7 +273,7 @@ func runExportBackup() error {

in := &pb.ExportRequest{
GroupId: uint32(gid),
ReadTs: latestManifest.Since,
ReadTs: latestManifest.ValidReadTs(),
UnixTs: time.Now().Unix(),
Format: opt.format,
Destination: exportDir,
Expand Down
4 changes: 3 additions & 1 deletion graphql/admin/list_backups.go
Expand Up @@ -44,6 +44,7 @@ type group struct {
type manifest struct {
Type string `json:"type,omitempty"`
Since uint64 `json:"since,omitempty"`
ReadTs uint64 `json:"read_ts,omitempty"`
Groups []*group `json:"groups,omitempty"`
BackupId string `json:"backupId,omitempty"`
BackupNum uint64 `json:"backupNum,omitempty"`
Expand Down Expand Up @@ -107,7 +108,8 @@ func convertManifests(manifests []*worker.Manifest) []*manifest {
for i, m := range manifests {
res[i] = &manifest{
Type: m.Type,
Since: m.Since,
Since: m.SinceTsDeprecated,
ReadTs: m.ReadTs,
BackupId: m.BackupId,
BackupNum: m.BackupNum,
Path: m.Path,
Expand Down
22 changes: 16 additions & 6 deletions worker/backup.go
Expand Up @@ -32,17 +32,17 @@ import (
type predicateSet map[string]struct{}

// Manifest records backup details, these are values used during restore.
// Since is the timestamp from which the next incremental backup should start (it's set
// to the readTs of the current backup).
// ReadTs will be used to create the next incremental backup.
// Groups are the IDs of the groups involved.
type Manifest struct {
sync.Mutex
//Type is the type of backup, either full or incremental.
Type string `json:"type"`
// Since is the timestamp at which this backup was taken. It's called Since
// because it will become the timestamp from which to backup in the next
// incremental backup.
Since uint64 `json:"since"`
// SinceTsDeprecated is kept for backward compatibility. Use readTs instead of sinceTs.
SinceTsDeprecated uint64 `json:"since"`
// ReadTs is the timestamp at which this backup was taken. This would be
// the since timestamp for the next incremental backup.
ReadTs uint64 `json:"read_ts"`
// Groups is the map of valid groups to predicates at the time the backup was created.
Groups map[uint32][]string `json:"groups"`
// BackupId is a unique ID assigned to all the backups in the same series
Expand All @@ -68,6 +68,16 @@ type Manifest struct {
Compression string `json:"compression"`
}

// ValidReadTs function returns the valid read timestamp. The backup can have
// the readTs=0 if the backup was done on an older version of dgraph. The
// SinceTsDecprecated is kept for backward compatibility.
func (m *Manifest) ValidReadTs() uint64 {
if m.ReadTs == 0 {
return m.SinceTsDeprecated
}
return m.ReadTs
}

type MasterManifest struct {
Manifests []*Manifest
}
Expand Down
46 changes: 26 additions & 20 deletions worker/backup_ee.go
Expand Up @@ -152,8 +152,12 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
return err
}

req.SinceTs = latestManifest.Since
// Use the readTs as the sinceTs for the next backup. If not found, use the
// SinceTsDeprecated value from the latest manifest.
req.SinceTs = latestManifest.ValidReadTs()

if req.ForceFull {
// To force a full backup we'll set the sinceTs to zero.
req.SinceTs = 0
} else {
if x.WorkerConfig.EncryptionKey != nil {
Expand Down Expand Up @@ -196,30 +200,32 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

resCh := make(chan BackupRes, len(state.Groups))
for _, gid := range groups {
br := proto.Clone(req).(*pb.BackupRequest)
br.GroupId = gid
br.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
res, err := BackupGroup(ctx, req)
resCh <- BackupRes{res: res, err: err}
}(br)
}

var dropOperations []*pb.DropOperation
for range groups {
if backupRes := <-resCh; backupRes.err != nil {
glog.Errorf("Error received during backup: %v", backupRes.err)
return backupRes.err
} else {
{ // This is the code which sends out Backup requests and waits for them to finish.
resCh := make(chan BackupRes, len(state.Groups))
for _, gid := range groups {
br := proto.Clone(req).(*pb.BackupRequest)
br.GroupId = gid
br.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
res, err := BackupGroup(ctx, req)
resCh <- BackupRes{res: res, err: err}
}(br)
}

for range groups {
backupRes := <-resCh
if backupRes.err != nil {
glog.Errorf("Error received during backup: %v", backupRes.err)
return backupRes.err
}
dropOperations = append(dropOperations, backupRes.res.GetDropOperations()...)
}
}

dir := fmt.Sprintf(backupPathFmt, req.UnixTs)
m := Manifest{
Since: req.ReadTs,
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
DropOperations: dropOperations,
Expand Down Expand Up @@ -581,8 +587,8 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro

// GoString implements the GoStringer interface for Manifest.
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Since: %d, Groups: %v, Encrypted: %v}`,
m.Since, m.Groups, m.Encrypted)
return fmt.Sprintf(`Manifest{Since: %d, ReadTs: %d, Groups: %v, Encrypted: %v}`,
m.SinceTsDeprecated, m.ReadTs, m.Groups, m.Encrypted)
}

func (tl *threadLocal) toBackupList(key []byte, itr *badger.Iterator) (
Expand Down
2 changes: 1 addition & 1 deletion worker/backup_manifest.go
Expand Up @@ -134,7 +134,7 @@ func getFilteredManifests(h x.UriHandler, manifests []*Manifest,
for _, m := range manifests {
missingFiles := false
for g := range m.Groups {
path := filepath.Join(m.Path, backupName(m.Since, g))
path := filepath.Join(m.Path, backupName(m.ValidReadTs(), g))
if !h.FileExists(path) {
missingFiles = true
break
Expand Down
2 changes: 1 addition & 1 deletion worker/online_restore.go
Expand Up @@ -538,5 +538,5 @@ func RunOfflineRestore(dir, location, backupId string, keyFile string,
}
}
// TODO: Fix this return value.
return LoadResult{Version: manifest.Since}
return LoadResult{Version: manifest.ValidReadTs()}
}
4 changes: 2 additions & 2 deletions worker/restore_map.go
Expand Up @@ -618,7 +618,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) {
if dropAll {
break
}
if manifest.Since == 0 || len(manifest.Groups) == 0 {
if manifest.ValidReadTs() == 0 || len(manifest.Groups) == 0 {
continue
}
for gid := range manifest.Groups {
Expand All @@ -630,7 +630,7 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) {

// Only restore the predicates that were assigned to this group at the time
// of the last backup.
file := filepath.Join(manifest.Path, backupName(manifest.Since, gid))
file := filepath.Join(manifest.Path, backupName(manifest.ValidReadTs(), gid))
br := readerFrom(h, file).WithEncryption(keys.EncKey).WithCompression(manifest.Compression)
if br.err != nil {
return nil, errors.Wrap(br.err, "newBackupReader")
Expand Down