Skip to content

Commit

Permalink
improve deleting local backup and deleting remote object disk backup…
Browse files Browse the repository at this point in the history
… by prefix instead of reading metadata files, applying macros for `object_disks_path`, related to #561
  • Loading branch information
Slach committed Feb 5, 2024
1 parent cf4dcd7 commit c74420b
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 110 deletions.
151 changes: 42 additions & 109 deletions pkg/backup/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"path"
"path/filepath"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -127,8 +126,8 @@ func (b *Backuper) RemoveBackupLocal(ctx context.Context, backupName string, dis
if err != nil {
return err
}

if b.hasObjectDisks(backupList, backupName, disks) {
hasObjectDisks := b.hasObjectDisks(backupList, backupName, disks)
if hasObjectDisks {
bd, err := storage.NewBackupDestination(ctx, b.cfg, b.ch, false, backupName)
if err != nil {
return err
Expand All @@ -154,21 +153,20 @@ func (b *Backuper) RemoveBackupLocal(ctx context.Context, backupName string, dis
}
if !skip && strings.Contains(backup.Tags, "embedded") {
if err = b.cleanLocalEmbedded(ctx, backup, disks); err != nil {
log.Warnf("b.cleanRemoteEmbedded return error: %v", err)
log.Warnf("b.cleanLocalEmbedded return error: %v", err)
return err
}
}
if !skip && hasObjectDisks {
if err = b.cleanBackupObjectDisks(ctx, backupName); err != nil {
return err
}
}

for _, disk := range disks {
backupPath := path.Join(disk.Path, "backup", backupName)
if disk.IsBackup {
backupPath = path.Join(disk.Path, backupName)
}
if !skip && !disk.IsBackup && (disk.Type == "s3" || disk.Type == "azure_blob_storage") && !strings.Contains(backup.Tags, "embedded") {
if err = b.cleanLocalBackupObjectDisk(ctx, backupName, backupPath, disk.Name); err != nil {
return err
}
}
log.Debugf("remove '%s'", backupPath)
if err = os.RemoveAll(backupPath); err != nil {
return err
Expand Down Expand Up @@ -201,34 +199,6 @@ func (b *Backuper) hasObjectDisks(backupList []LocalBackup, backupName string, d
return false
}

func (b *Backuper) cleanLocalBackupObjectDisk(ctx context.Context, backupName string, backupPath, diskName string) error {
_, err := os.Stat(backupPath)
if os.IsNotExist(err) {
return nil
} else if err != nil {
return err
}
err = filepath.Walk(backupPath, func(fPath string, fInfo os.FileInfo, err error) error {
if err != nil {
return err
}
if fInfo.IsDir() {
return nil
}
objMeta, err := object_disk.ReadMetadataFromFile(fPath)
if err != nil {
return err
}
for _, storageObject := range objMeta.StorageObjects {
if err = b.dst.DeleteFileFromObjectDiskBackup(ctx, path.Join(backupName, diskName, storageObject.ObjectRelativePath)); err != nil {
return err
}
}
return nil
})
return err
}

func (b *Backuper) cleanLocalEmbedded(ctx context.Context, backup LocalBackup, disks []clickhouse.Disk) error {
for _, disk := range disks {
if disk.Name == b.cfg.ClickHouse.EmbeddedBackupDisk {
Expand Down Expand Up @@ -326,8 +296,8 @@ func (b *Backuper) RemoveBackupRemote(ctx context.Context, backupName string) er
log.Warnf("b.cleanRemoteEmbedded return error: %v", err)
return err
}
} else if err = b.cleanRemoteBackupObjectDisks(ctx, backup); err != nil {
log.Warnf("b.cleanRemoteBackupObjectDisks return error: %v", err)
} else if err = b.cleanBackupObjectDisks(ctx, backup.BackupName); err != nil {
log.Warnf("b.cleanBackupObjectDisks return error: %v", err)
return err
}
}
Expand All @@ -348,75 +318,6 @@ func (b *Backuper) RemoveBackupRemote(ctx context.Context, backupName string) er
return fmt.Errorf("'%s' is not found on remote storage", backupName)
}

func (b *Backuper) cleanRemoteBackupObjectDisks(ctx context.Context, backup storage.Backup) error {
if b.dst.Kind() != "azblob" && b.dst.Kind() != "s3" && b.dst.Kind() != "gcs" {
return nil
}
if !backup.Legacy && len(backup.Disks) > 0 && backup.DiskTypes != nil && len(backup.DiskTypes) < len(backup.Disks) {
return fmt.Errorf("RemoveRemoteBackupObjectDisks: invalid backup.DiskTypes=%#v, not correlated with backup.Disks=%#v", backup.DiskTypes, backup.Disks)
}
return b.dst.Walk(ctx, backup.BackupName+"/", true, func(ctx context.Context, f storage.RemoteFile) error {
fName := path.Join(backup.BackupName, f.Name())
if !strings.HasPrefix(fName, path.Join(backup.BackupName, "/shadow/")) {
return nil
}
for diskName, diskType := range backup.DiskTypes {
if diskType == "s3" || diskType == "azure_blob_storage" {
compressedRE := regexp.MustCompile(`/shadow/([^/]+/[^/]+)/` + diskName + `_[^/]+$`)
if matches := compressedRE.FindStringSubmatch(fName); len(matches) > 0 {
// compressed remote object disk part
localPath := path.Join(backup.Disks[diskName], "backup", backup.BackupName, "shadow", matches[1], diskName)
if err := b.dst.DownloadCompressedStream(ctx, fName, localPath); err != nil {
return err
}
walkErr := filepath.Walk(localPath, func(fPath string, fInfo fs.FileInfo, err error) error {
if err != nil {
return err
}
if fInfo.IsDir() {
return nil
}
objMeta, err := object_disk.ReadMetadataFromFile(fPath)
if err != nil {
return err
}
for _, storageObject := range objMeta.StorageObjects {
err = b.dst.DeleteFileFromObjectDiskBackup(ctx, path.Join(backup.BackupName, diskName, storageObject.ObjectRelativePath))
if err != nil {
return err
}
}
return nil
})
if walkErr != nil {
b.log.Warnf("filepath.Walk(%s) return error: %v", localPath, walkErr)
}
if err := os.RemoveAll(localPath); err != nil {
return err
}
} else if regexp.MustCompile(`/shadow/[^/]+/[^/]+/` + diskName + `/.+$`).MatchString(fName) {
// non compressed remote object disk part
objMetaReader, err := b.dst.GetFileReader(ctx, fName)
if err != nil {
return err
}
objMeta, err := object_disk.ReadMetadataFromReader(objMetaReader, fName)
if err != nil {
return err
}
for _, storageObject := range objMeta.StorageObjects {
err = b.dst.DeleteFileFromObjectDiskBackup(ctx, path.Join(backup.BackupName, diskName, storageObject.ObjectRelativePath))
if err != nil {
return err
}
}
}
}
}
return nil
})
}

func (b *Backuper) cleanRemoteEmbedded(ctx context.Context, backup storage.Backup, bd *storage.BackupDestination) error {
if err := object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, b.cfg.ClickHouse.EmbeddedBackupDisk); err != nil {
return err
Expand All @@ -442,6 +343,38 @@ func (b *Backuper) cleanRemoteEmbedded(ctx context.Context, backup storage.Backu
})
}

// cleanBackupObjectDisks - recursive delete <object_disks_path>/<backupName>
func (b *Backuper) cleanBackupObjectDisks(ctx context.Context, backupName string) error {
backupPath, err := b.getObjectDiskBackupPath(backupName)
if err != nil {
return fmt.Errorf("cleanBackupObjectDisks: %s, contains oject disks but b.getObjectDiskBackupPath return error: %v", backupName, err)
}
return b.dst.Walk(ctx, backupPath, true, func(ctx context.Context, f storage.RemoteFile) error {
if b.dst.Kind() == "azblob" {
if f.Size() > 0 || !f.LastModified().IsZero() {
return b.dst.DeleteFile(ctx, path.Join(backupPath, f.Name()))
} else {
return nil
}
}
return b.dst.DeleteFile(ctx, path.Join(backupPath, f.Name()))
})
}

func (b *Backuper) getObjectDiskBackupPath(backupName string) (string, error) {
var objectDiskPath string
if b.cfg.General.RemoteStorage == "s3" {
objectDiskPath = b.cfg.S3.ObjectDiskPath
} else if b.cfg.General.RemoteStorage == "azblob" {
objectDiskPath = b.cfg.AzureBlob.ObjectDiskPath
} else if b.cfg.General.RemoteStorage == "gcs" {
objectDiskPath = b.cfg.GCS.ObjectDiskPath
} else {
return "", fmt.Errorf("unsupported remote_storage: %s", b.cfg.General.RemoteStorage)
}
return path.Join(objectDiskPath, backupName), nil
}

func (b *Backuper) skipIfSameLocalBackupPresent(ctx context.Context, backupName, tags string) (bool, error) {
if localList, _, err := b.GetLocalBackups(ctx, nil); err != nil {
return true, err
Expand Down
10 changes: 10 additions & 0 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,16 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
return err
}
needToDownloadObjectDisk = true
if b.cfg.General.RemoteStorage == "s3" {
b.cfg.S3.ObjectDiskPath, err = b.ch.ApplyMacros(ctx, b.cfg.S3.ObjectDiskPath)
} else if b.cfg.General.RemoteStorage == "gcs" {
b.cfg.GCS.ObjectDiskPath, err = b.ch.ApplyMacros(ctx, b.cfg.GCS.ObjectDiskPath)
} else if b.cfg.General.RemoteStorage == "azblob" {
b.cfg.AzureBlob.ObjectDiskPath, err = b.ch.ApplyMacros(ctx, b.cfg.AzureBlob.ObjectDiskPath)
}
if err != nil {
return err
}
break
}
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ func NewBackupDestination(ctx context.Context, cfg *config.Config, ch *clickhous
if err != nil {
return nil, err
}
azblobStorage.Config.ObjectDiskPath, err = ch.ApplyMacros(ctx, azblobStorage.Config.ObjectDiskPath)
if err != nil {
return nil, err
}

bufferSize := azblobStorage.Config.BufferSize
// https://github.com/Altinity/clickhouse-backup/issues/317
Expand Down Expand Up @@ -677,6 +681,10 @@ func NewBackupDestination(ctx context.Context, cfg *config.Config, ch *clickhous
if err != nil {
return nil, err
}
s3Storage.Config.ObjectDiskPath, err = ch.ApplyMacros(ctx, s3Storage.Config.ObjectDiskPath)
if err != nil {
return nil, err
}
// https://github.com/Altinity/clickhouse-backup/issues/588
if len(s3Storage.Config.ObjectLabels) > 0 && backupName != "" {
objectLabels := s3Storage.Config.ObjectLabels
Expand All @@ -699,6 +707,10 @@ func NewBackupDestination(ctx context.Context, cfg *config.Config, ch *clickhous
if err != nil {
return nil, err
}
googleCloudStorage.Config.ObjectDiskPath, err = ch.ApplyMacros(ctx, googleCloudStorage.Config.ObjectDiskPath)
if err != nil {
return nil, err
}
// https://github.com/Altinity/clickhouse-backup/issues/588
if len(googleCloudStorage.Config.ObjectLabels) > 0 && backupName != "" {
objectLabels := googleCloudStorage.Config.ObjectLabels
Expand Down
2 changes: 1 addition & 1 deletion test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2054,7 +2054,7 @@ func replaceStorageDiskNameForReBalance(r *require.Assertions, ch *TestClickHous
}
ch.chbackend.Close()
r.NoError(utils.ExecCmd(context.Background(), 180*time.Second, "docker-compose", "-f", os.Getenv("COMPOSE_FILE"), "restart", "clickhouse"))
ch.connectWithWait(r, 3*time.Second, 1*time.Second)
ch.connectWithWait(r, 3*time.Second, 1*time.Minute)
}

func testBackupSpecifiedPartitions(t *testing.T, r *require.Assertions, ch *TestClickHouse, remoteStorageType string, backupConfig string) {
Expand Down

0 comments on commit c74420b

Please sign in to comment.