Skip to content

Commit

Permalink
improve re-balance disk during download if disk not exists in system.…
Browse files Browse the repository at this point in the history
…disks. Use least used for `local` disks and `random` for object disks, fix #561
  • Loading branch information
Slach committed Feb 4, 2024
1 parent e23e5f1 commit cf4dcd7
Show file tree
Hide file tree
Showing 17 changed files with 685 additions and 241 deletions.
4 changes: 4 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# v2.5.0
IMPROVEMENTS
- improve re-balance disk during download if disk not exists in system.disks. Use least used for `local` disks and `random` for object disks, fix [561](https://github.com/Altinity/clickhouse-backup/issues/561)

# v2.4.25
BUG FIXES
- fix `--restore-table-mapping` corner cases for when destination database contains special characters , fix [820](https://github.com/Altinity/clickhouse-backup/issues/820)
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/pkg/sftp v1.13.6
github.com/prometheus/client_golang v1.18.0
github.com/puzpuzpuz/xsync v1.5.2
github.com/ricochet2200/go-disk-usage/du v0.0.0-20210707232629-ac9918953285
github.com/stretchr/testify v1.8.4
github.com/tencentyun/cos-go-sdk-v5 v0.7.45
github.com/urfave/cli v1.22.14
Expand Down Expand Up @@ -101,7 +102,6 @@ require (
github.com/kr/fs v0.1.0 // indirect
github.com/mattn/go-ieproxy v0.0.11 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mozillazg/go-httpheader v0.4.0 // indirect
github.com/nwaples/rardecode/v2 v2.0.0-beta.2 // indirect
Expand Down Expand Up @@ -129,7 +129,6 @@ require (
golang.org/x/sys v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
Expand Down
122 changes: 7 additions & 115 deletions go.sum

Large diffs are not rendered by default.

206 changes: 187 additions & 19 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/eapache/go-resiliency/retrier"
"io"
"io/fs"
"math/rand"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -210,13 +211,8 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
return fmt.Errorf("one of Download Metadata go-routine return error: %v", err)
}
if !schemaOnly {
for _, t := range tableMetadataAfterDownload {
for disk := range t.Parts {
if _, diskExists := b.DiskToPathMap[disk]; !diskExists && disk != b.cfg.ClickHouse.EmbeddedBackupDisk {
b.DiskToPathMap[disk] = b.DiskToPathMap["default"]
log.Warnf("table '%s.%s' require disk '%s' that not found in clickhouse table system.disks, you can add nonexistent disks to `disk_mapping` in `clickhouse` config section, data will download to %s", t.Database, t.Table, disk, b.DiskToPathMap["default"])
}
}
if reBalanceErr := b.reBalanceTablesMetadataIfDiskNotExists(tableMetadataAfterDownload, disks, remoteBackup, log); reBalanceErr != nil {
return reBalanceErr
}
log.Debugf("prepare table SHADOW concurrent semaphore with concurrency=%d len(tableMetadataAfterDownload)=%d", b.cfg.General.DownloadConcurrency, len(tableMetadataAfterDownload))
dataGroup, dataCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -304,6 +300,103 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
return nil
}

func (b *Backuper) reBalanceTablesMetadataIfDiskNotExists(tableMetadataAfterDownload []metadata.TableMetadata, disks []clickhouse.Disk, remoteBackup storage.Backup, log *apexLog.Entry) error {
var disksByStoragePolicyAndType map[string]map[string][]clickhouse.Disk
filterDisksByTypeAndStoragePolicies := func(disk string, diskType string, disks []clickhouse.Disk, remoteBackup storage.Backup, t metadata.TableMetadata) (string, []clickhouse.Disk, error) {
_, ok := remoteBackup.DiskTypes[disk]
if !ok {
return "", nil, fmt.Errorf("disk: %s not found in disk_types section %#v in %s/metadata.json", disk, remoteBackup.DiskTypes, remoteBackup.BackupName)
}
storagePolicy := b.ch.ExtractStoragePolicy(t.Query)
if len(disksByStoragePolicyAndType) == 0 {
disksByStoragePolicyAndType = b.splitDisksByTypeAndStoragePolicy(disks)
}
if _, isTypeExists := disksByStoragePolicyAndType[diskType]; !isTypeExists {
return "", nil, fmt.Errorf("disk: %s, diskType: %s not found in system.disks", disk, diskType)
}
filteredDisks, isPolicyExists := disksByStoragePolicyAndType[diskType][storagePolicy]
if !isPolicyExists || len(filteredDisks) == 0 {
return "", nil, fmt.Errorf("storagePolicy: %s with diskType: %s not found in system.disks", storagePolicy, diskType)
}
return storagePolicy, filteredDisks, nil
}

updateDiskFreeSize := func(downloadDisk, diskType, storagePolicy string, newFreeSpace uint64) {
for dIdx := range disksByStoragePolicyAndType[diskType][storagePolicy] {
if disksByStoragePolicyAndType[diskType][storagePolicy][dIdx].Name == downloadDisk {
disksByStoragePolicyAndType[diskType][storagePolicy][dIdx].FreeSpace = newFreeSpace
}
}
}

for i, t := range tableMetadataAfterDownload {
isRebalanced := false
if t.TotalBytes == 0 {
continue
}
totalFiles := 0
for disk := range t.Files {
totalFiles += len(t.Files[disk])
}
totalParts := 0
for disk := range t.Parts {
totalParts += len(t.Parts[disk])
}
if totalFiles == 0 && totalParts == 0 {
continue
}
partSize := t.TotalBytes / uint64(totalParts)
//re-balance parts
for disk := range t.Parts {
if _, diskExists := b.DiskToPathMap[disk]; !diskExists && disk != b.cfg.ClickHouse.EmbeddedBackupDisk {
diskType := remoteBackup.DiskTypes[disk]
storagePolicy, filteredDisks, err := filterDisksByTypeAndStoragePolicies(disk, diskType, disks, remoteBackup, t)
if err != nil {
return err
}
rebalancedDisks := common.EmptyMap{}
for j := range t.Parts[disk] {
isObjectDisk, downloadDisk, newFreeSpace, reBalanceErr := b.getDownloadDiskForNonExistsDisk(diskType, filteredDisks, partSize)
if reBalanceErr != nil {
return reBalanceErr
}
rebalancedDisks[downloadDisk] = struct{}{}
tableMetadataAfterDownload[i].Parts[disk][j].RebalancedDisk = downloadDisk
isRebalanced = true
if !isObjectDisk {
updateDiskFreeSize(downloadDisk, diskType, storagePolicy, newFreeSpace)
}
//re-balance file depend on part
if t.Files != nil && len(t.Files) > 0 {
if len(t.Files[disk]) == 0 {
return fmt.Errorf("table: `%s`.`%s` part.Name: %s, part.RebalancedDisk: %s, non empty `files` can't find disk: %s", t.Table, t.Database, t.Parts[disk][j].Name, t.Parts[disk][j].RebalancedDisk, disk)
}
for _, fileName := range t.Files[disk] {
if strings.HasPrefix(fileName, disk+"_"+t.Parts[disk][j].Name+".") {
if tableMetadataAfterDownload[i].RebalancedFiles == nil {
tableMetadataAfterDownload[i].RebalancedFiles = map[string]string{}
}
tableMetadataAfterDownload[i].RebalancedFiles[fileName] = downloadDisk
}
}
}
}
rebalancedDisksStr := strings.TrimPrefix(
strings.Replace(fmt.Sprintf("%v", rebalancedDisks), ":{}", "", -1), "map",
)
log.Warnf("table '%s.%s' require disk '%s' that not found in system.disks, you can add nonexistent disks to `disk_mapping` in `clickhouse` config section, data will download to %v", t.Database, t.Table, disk, rebalancedDisksStr)
}
}
if isRebalanced {
if _, saveErr := t.Save(t.LocalFile, false); saveErr != nil {
return saveErr
}
}
}

return nil
}

func (b *Backuper) downloadTableMetadataIfNotExists(ctx context.Context, backupName string, log *apexLog.Entry, tableTitle metadata.TableTitle) (*metadata.TableMetadata, error) {
metadataLocalFile := path.Join(b.DefaultDataPath, "backup", backupName, "metadata", common.TablePathEncode(tableTitle.Database), fmt.Sprintf("%s.json", common.TablePathEncode(tableTitle.Table)))
tm := &metadata.TableMetadata{}
Expand Down Expand Up @@ -340,6 +433,7 @@ func (b *Backuper) downloadTableMetadata(ctx context.Context, backupName string,
}
partitionsIdMap, _ = partition.ConvertPartitionsToIdsMapAndNamesList(ctx, b.ch, nil, []metadata.TableMetadata{tableMetadata}, partitions)
filterPartsAndFilesByPartitionsFilter(tableMetadata, partitionsIdMap[metadata.TableTitle{Database: tableMetadata.Database, Table: tableMetadata.Table}])
tableMetadata.LocalFile = localMetadataFile
}
if isProcessed {
size += uint64(processedSize)
Expand Down Expand Up @@ -394,6 +488,7 @@ func (b *Backuper) downloadTableMetadata(ctx context.Context, backupName string,
}
written = int64(jsonSize)
size += jsonSize
tableMetadata.LocalFile = localMetadataFile
}
if b.resume {
b.resumableState.AppendToState(localMetadataFile, written)
Expand Down Expand Up @@ -497,7 +592,12 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
continue
}
archiveFile := table.Files[disk][downloadOffset[disk]]
tableLocalDir := b.getLocalBackupDataPathForTable(remoteBackup.BackupName, disk, dbAndTableDir)
diskName := disk
isRebalanced := false
if diskName, isRebalanced = table.RebalancedFiles[archiveFile]; !isRebalanced {
diskName = disk
}
tableLocalDir := b.getLocalBackupDataPathForTable(remoteBackup.BackupName, diskName, dbAndTableDir)
downloadOffset[disk] += 1
tableRemoteFile := path.Join(remoteBackup.BackupName, "shadow", common.TablePathEncode(table.Database), common.TablePathEncode(table.Table), archiveFile)
dataGroup.Go(func() error {
Expand Down Expand Up @@ -529,7 +629,7 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.

for disk, parts := range table.Parts {
tableRemotePath := path.Join(remoteBackup.BackupName, "shadow", dbAndTableDir, disk)
diskPath := b.DiskToPathMap[disk]
diskPath, diskExists := b.DiskToPathMap[disk]
tableLocalPath := path.Join(diskPath, "backup", remoteBackup.BackupName, "shadow", dbAndTableDir, disk)
if b.isEmbedded {
tableLocalPath = path.Join(diskPath, remoteBackup.BackupName, "data", dbAndTableDir)
Expand All @@ -538,6 +638,12 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
if part.Required {
continue
}
if !diskExists {
diskPath, diskExists = b.DiskToPathMap[part.RebalancedDisk]
if !diskExists {
return fmt.Errorf("downloadTableData: table: `%s`.`%s`, disk: %s, part.Name: %s, part.RebalancedDisk: %s not rebalanced", table.Table, table.Database, disk, part.Name, part.RebalancedDisk)
}
}
partRemotePath := path.Join(tableRemotePath, part.Name)
partLocalPath := path.Join(tableLocalPath, part.Name)
dataGroup.Go(func() error {
Expand Down Expand Up @@ -584,8 +690,19 @@ func (b *Backuper) downloadDiffParts(ctx context.Context, remoteBackup metadata.
diffRemoteFilesLock := &sync.Mutex{}

for disk, parts := range table.Parts {
diskPath, diskExists := b.DiskToPathMap[disk]
for _, part := range parts {
newPath := path.Join(b.DiskToPathMap[disk], "backup", remoteBackup.BackupName, "shadow", dbAndTableDir, disk, part.Name)
if !diskExists && part.RebalancedDisk == "" {
return fmt.Errorf("downloadDiffParts: table: `%s`.`%s`, disk: %s, part.Name: %s, part.RebalancedDisk: `%s` not rebalanced", table.Table, table.Database, disk, part.Name, part.RebalancedDisk)
}
if !diskExists {
diskPath, diskExists = b.DiskToPathMap[part.RebalancedDisk]
if !diskExists {
return fmt.Errorf("downloadDiffParts: table: `%s`.`%s`, disk: %s, part.Name: %s, part.RebalancedDisk: `%s` not rebalanced", table.Table, table.Database, disk, part.Name, part.RebalancedDisk)
}
disk = part.RebalancedDisk
}
newPath := path.Join(diskPath, "backup", remoteBackup.BackupName, "shadow", dbAndTableDir, disk, part.Name)
if err := b.checkNewPath(newPath, part); err != nil {
return err
}
Expand All @@ -600,6 +717,9 @@ func (b *Backuper) downloadDiffParts(ctx context.Context, remoteBackup metadata.
if err != nil && os.IsNotExist(err) {
partForDownload := part
diskForDownload := disk
if !diskExists {
diskForDownload = part.RebalancedDisk
}
downloadDiffGroup.Go(func() error {
tableRemoteFiles, err := b.findDiffBackupFilesRemote(downloadDiffCtx, remoteBackup, table, diskForDownload, partForDownload, log)
if err != nil {
Expand Down Expand Up @@ -830,17 +950,22 @@ func (b *Backuper) findDiffFileExist(ctx context.Context, requiredBackup *metada
log.WithFields(apexLog.Fields{"tableRemoteFile": tableRemoteFile, "tableRemotePath": tableRemotePath, "part": part.Name}).Debugf("findDiffFileExist not found")
return "", "", err
}
if tableLocalDir, diskExists := b.DiskToPathMap[localDisk]; !diskExists {
return "", "", fmt.Errorf("`%s` is not found in system.disks", localDisk)
} else {
if path.Ext(tableRemoteFile) == ".txt" {
tableLocalDir = path.Join(tableLocalDir, "backup", requiredBackup.BackupName, "shadow", dbAndTableDir, localDisk, part.Name)
} else {
tableLocalDir = path.Join(tableLocalDir, "backup", requiredBackup.BackupName, "shadow", dbAndTableDir, localDisk)
tableLocalDir, diskExists := b.DiskToPathMap[localDisk]
if !diskExists {
tableLocalDir, diskExists = b.DiskToPathMap[part.RebalancedDisk]
if !diskExists {
return "", "", fmt.Errorf("localDisk:%s, part.Name: %s, part.RebalancedDisk: %s is not found in system.disks", localDisk, part.Name, part.RebalancedDisk)
}
log.WithFields(apexLog.Fields{"tableRemoteFile": tableRemoteFile, "tableRemotePath": tableRemotePath, "part": part.Name}).Debugf("findDiffFileExist found")
return tableRemotePath, tableLocalDir, nil
localDisk = part.RebalancedDisk
}

if path.Ext(tableRemoteFile) == ".txt" {
tableLocalDir = path.Join(tableLocalDir, "backup", requiredBackup.BackupName, "shadow", dbAndTableDir, localDisk, part.Name)
} else {
tableLocalDir = path.Join(tableLocalDir, "backup", requiredBackup.BackupName, "shadow", dbAndTableDir, localDisk)
}
log.WithFields(apexLog.Fields{"tableRemoteFile": tableRemoteFile, "tableRemotePath": tableRemotePath, "part": part.Name}).Debugf("findDiffFileExist found")
return tableRemotePath, tableLocalDir, nil
}

func (b *Backuper) ReadBackupMetadataRemote(ctx context.Context, backupName string) (*metadata.BackupMetadata, error) {
Expand Down Expand Up @@ -937,3 +1062,46 @@ func (b *Backuper) downloadSingleBackupFile(ctx context.Context, remoteFile stri
}
return nil
}

// filterDisksByStoragePolicyAndType - https://github.com/Altinity/clickhouse-backup/issues/561
func (b *Backuper) splitDisksByTypeAndStoragePolicy(disks []clickhouse.Disk) map[string]map[string][]clickhouse.Disk {
disksByTypeAndPolicy := map[string]map[string][]clickhouse.Disk{}
for _, d := range disks {
if !d.IsBackup {
if _, typeExists := disksByTypeAndPolicy[d.Type]; !typeExists {
disksByTypeAndPolicy[d.Type] = map[string][]clickhouse.Disk{}
}
for _, policy := range d.StoragePolicies {
if _, policyExists := disksByTypeAndPolicy[d.Type][policy]; !policyExists {
disksByTypeAndPolicy[d.Type][policy] = []clickhouse.Disk{}
}
disksByTypeAndPolicy[d.Type][policy] = append(disksByTypeAndPolicy[d.Type][policy], d)
}
}
}
return disksByTypeAndPolicy
}

// getDownloadDiskForNonExistsDisk - https://github.com/Altinity/clickhouse-backup/issues/561
// allow to Restore to new server with different storage policy, different disk count,
// implements `least_used` for normal disk and `random` for Object disks
func (b *Backuper) getDownloadDiskForNonExistsDisk(notExistsDiskType string, filteredDisks []clickhouse.Disk, partSize uint64) (bool, string, uint64, error) {
// random for non-local disks
if notExistsDiskType != "local" {
roundRobinIdx := rand.Intn(len(filteredDisks))
return true, filteredDisks[roundRobinIdx].Name, 0, nil
}
// least_used for local
freeSpace := partSize
leastUsedIdx := -1
for idx, d := range filteredDisks {
if filteredDisks[idx].FreeSpace > freeSpace {
freeSpace = d.FreeSpace
leastUsedIdx = idx
}
}
if leastUsedIdx < 0 {
return false, "", 0, fmt.Errorf("%s free space, not found in system.disks with `local` type", utils.FormatBytes(partSize))
}
return false, filteredDisks[leastUsedIdx].Name, filteredDisks[leastUsedIdx].FreeSpace - partSize, nil
}
Loading

0 comments on commit cf4dcd7

Please sign in to comment.