Skip to content

Commit

Permalink
added object_disk_server_side_copy_concurrency with default value `…
Browse files Browse the repository at this point in the history
…32`, to avoid slow `create` or `restore` backup process which was restricted by `upload_concurrency` or `download_concurrency` options, fix #903
  • Loading branch information
Slach committed Apr 29, 2024
1 parent c5f8a65 commit 90059bf
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 57 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# v2.5.5
IMPROVEMENTS
- added `object_disk_server_side_copy_concurrency` with default value `32`, to avoid slow `create` or `restore` backup process which was restricted by `upload_concurrency` or `download_concurrency` options, fix [903](https://github.com/Altinity/clickhouse-backup/issues/903)

BUG FIXES
- fixed `create --rbac` behavior when /var/lib/clickhouse/access not exists but present only `replicated` system.user_directories, fix [904](https://github.com/Altinity/clickhouse-backup/issues/904)

Expand Down
3 changes: 3 additions & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ general:
download_max_bytes_per_second: 0 # DOWNLOAD_MAX_BYTES_PER_SECOND, 0 means no throttling
upload_max_bytes_per_second: 0 # UPLOAD_MAX_BYTES_PER_SECOND, 0 means no throttling

# when table data contains in system.disks with type=ObjectStorage, then we need execute remote copy object in object storage service provider, this parameter can restrict how many files will copied in parallel for each table
object_disk_server_side_copy_concurrency: 32

# RESTORE_SCHEMA_ON_CLUSTER, execute all schema related SQL queries with `ON CLUSTER` clause as Distributed DDL.
# Check `system.clusters` table for the correct cluster name, also `system.macros` can be used.
# This isn't applicable when `use_embedded_backup_restore: true`
Expand Down
4 changes: 2 additions & 2 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName, diffFromRe
var backupDataSize, backupMetadataSize uint64
var metaMutex sync.Mutex
createBackupWorkingGroup, createCtx := errgroup.WithContext(ctx)
createBackupWorkingGroup.SetLimit(int(b.cfg.General.UploadConcurrency))
createBackupWorkingGroup.SetLimit(b.cfg.ClickHouse.MaxConnections)

var tableMetas []metadata.TableTitle
for _, tableItem := range tables {
Expand Down Expand Up @@ -796,7 +796,7 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName string,
ctx, cancel := context.WithCancel(ctx)
defer cancel()
uploadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx)
uploadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.UploadConcurrency * b.cfg.General.UploadConcurrency))
uploadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.ObjectDiskServerSizeCopyConcurrency))
srcDiskConnection, exists := object_disk.DisksConnections.Load(disk.Name)
if !exists {
return 0, fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name)
Expand Down
4 changes: 2 additions & 2 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, ba
return fmt.Errorf("%s is not created. Restore schema first or create missing tables manually", strings.Join(missingTables, ", "))
}
restoreBackupWorkingGroup, restoreCtx := errgroup.WithContext(ctx)
restoreBackupWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))
restoreBackupWorkingGroup.SetLimit(b.cfg.ClickHouse.MaxConnections)

for i := range tablesForRestore {
tableRestoreStartTime := time.Now()
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
}
start := time.Now()
downloadObjectDiskPartsWorkingGroup, downloadCtx := errgroup.WithContext(ctx)
downloadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.DownloadConcurrency))
downloadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.ObjectDiskServerSizeCopyConcurrency))
for _, part := range parts {
dstDiskName := diskName
if part.RebalancedDisk != "" {
Expand Down
106 changes: 54 additions & 52 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,35 @@ type Config struct {

// GeneralConfig - general setting section
type GeneralConfig struct {
RemoteStorage string `yaml:"remote_storage" envconfig:"REMOTE_STORAGE"`
MaxFileSize int64 `yaml:"max_file_size" envconfig:"MAX_FILE_SIZE"`
BackupsToKeepLocal int `yaml:"backups_to_keep_local" envconfig:"BACKUPS_TO_KEEP_LOCAL"`
BackupsToKeepRemote int `yaml:"backups_to_keep_remote" envconfig:"BACKUPS_TO_KEEP_REMOTE"`
LogLevel string `yaml:"log_level" envconfig:"LOG_LEVEL"`
AllowEmptyBackups bool `yaml:"allow_empty_backups" envconfig:"ALLOW_EMPTY_BACKUPS"`
DownloadConcurrency uint8 `yaml:"download_concurrency" envconfig:"DOWNLOAD_CONCURRENCY"`
UploadConcurrency uint8 `yaml:"upload_concurrency" envconfig:"UPLOAD_CONCURRENCY"`
UploadMaxBytesPerSecond uint64 `yaml:"upload_max_bytes_per_second" envconfig:"UPLOAD_MAX_BYTES_PER_SECOND"`
DownloadMaxBytesPerSecond uint64 `yaml:"download_max_bytes_per_second" envconfig:"DOWNLOAD_MAX_BYTES_PER_SECOND"`
UseResumableState bool `yaml:"use_resumable_state" envconfig:"USE_RESUMABLE_STATE"`
RestoreSchemaOnCluster string `yaml:"restore_schema_on_cluster" envconfig:"RESTORE_SCHEMA_ON_CLUSTER"`
UploadByPart bool `yaml:"upload_by_part" envconfig:"UPLOAD_BY_PART"`
DownloadByPart bool `yaml:"download_by_part" envconfig:"DOWNLOAD_BY_PART"`
RestoreDatabaseMapping map[string]string `yaml:"restore_database_mapping" envconfig:"RESTORE_DATABASE_MAPPING"`
RetriesOnFailure int `yaml:"retries_on_failure" envconfig:"RETRIES_ON_FAILURE"`
RetriesPause string `yaml:"retries_pause" envconfig:"RETRIES_PAUSE"`
WatchInterval string `yaml:"watch_interval" envconfig:"WATCH_INTERVAL"`
FullInterval string `yaml:"full_interval" envconfig:"FULL_INTERVAL"`
WatchBackupNameTemplate string `yaml:"watch_backup_name_template" envconfig:"WATCH_BACKUP_NAME_TEMPLATE"`
ShardedOperationMode string `yaml:"sharded_operation_mode" envconfig:"SHARDED_OPERATION_MODE"`
CPUNicePriority int `yaml:"cpu_nice_priority" envconfig:"CPU_NICE_PRIORITY"`
IONicePriority string `yaml:"io_nice_priority" envconfig:"IO_NICE_PRIORITY"`
RBACBackupAlways bool `yaml:"rbac_backup_always" envconfig:"RBAC_BACKUP_ALWAYS"`
RBACConflictResolution string `yaml:"rbac_conflict_resolution" envconfig:"RBAC_CONFLICT_RESOLUTION"`
RetriesDuration time.Duration
WatchDuration time.Duration
FullDuration time.Duration
RemoteStorage string `yaml:"remote_storage" envconfig:"REMOTE_STORAGE"`
MaxFileSize int64 `yaml:"max_file_size" envconfig:"MAX_FILE_SIZE"`
BackupsToKeepLocal int `yaml:"backups_to_keep_local" envconfig:"BACKUPS_TO_KEEP_LOCAL"`
BackupsToKeepRemote int `yaml:"backups_to_keep_remote" envconfig:"BACKUPS_TO_KEEP_REMOTE"`
LogLevel string `yaml:"log_level" envconfig:"LOG_LEVEL"`
AllowEmptyBackups bool `yaml:"allow_empty_backups" envconfig:"ALLOW_EMPTY_BACKUPS"`
DownloadConcurrency uint8 `yaml:"download_concurrency" envconfig:"DOWNLOAD_CONCURRENCY"`
UploadConcurrency uint8 `yaml:"upload_concurrency" envconfig:"UPLOAD_CONCURRENCY"`
UploadMaxBytesPerSecond uint64 `yaml:"upload_max_bytes_per_second" envconfig:"UPLOAD_MAX_BYTES_PER_SECOND"`
DownloadMaxBytesPerSecond uint64 `yaml:"download_max_bytes_per_second" envconfig:"DOWNLOAD_MAX_BYTES_PER_SECOND"`
ObjectDiskServerSizeCopyConcurrency uint8 `yaml:"object_disk_server_side_copy_concurrency" envconfig:"OBJECT_DISK_SERVER_SIDE_COPY_CONCURRENCY"`
UseResumableState bool `yaml:"use_resumable_state" envconfig:"USE_RESUMABLE_STATE"`
RestoreSchemaOnCluster string `yaml:"restore_schema_on_cluster" envconfig:"RESTORE_SCHEMA_ON_CLUSTER"`
UploadByPart bool `yaml:"upload_by_part" envconfig:"UPLOAD_BY_PART"`
DownloadByPart bool `yaml:"download_by_part" envconfig:"DOWNLOAD_BY_PART"`
RestoreDatabaseMapping map[string]string `yaml:"restore_database_mapping" envconfig:"RESTORE_DATABASE_MAPPING"`
RetriesOnFailure int `yaml:"retries_on_failure" envconfig:"RETRIES_ON_FAILURE"`
RetriesPause string `yaml:"retries_pause" envconfig:"RETRIES_PAUSE"`
WatchInterval string `yaml:"watch_interval" envconfig:"WATCH_INTERVAL"`
FullInterval string `yaml:"full_interval" envconfig:"FULL_INTERVAL"`
WatchBackupNameTemplate string `yaml:"watch_backup_name_template" envconfig:"WATCH_BACKUP_NAME_TEMPLATE"`
ShardedOperationMode string `yaml:"sharded_operation_mode" envconfig:"SHARDED_OPERATION_MODE"`
CPUNicePriority int `yaml:"cpu_nice_priority" envconfig:"CPU_NICE_PRIORITY"`
IONicePriority string `yaml:"io_nice_priority" envconfig:"IO_NICE_PRIORITY"`
RBACBackupAlways bool `yaml:"rbac_backup_always" envconfig:"RBAC_BACKUP_ALWAYS"`
RBACConflictResolution string `yaml:"rbac_conflict_resolution" envconfig:"RBAC_CONFLICT_RESOLUTION"`
RetriesDuration time.Duration
WatchDuration time.Duration
FullDuration time.Duration
}

// GCSConfig - GCS settings section
Expand Down Expand Up @@ -508,30 +509,31 @@ func DefaultConfig() *Config {
}
return &Config{
General: GeneralConfig{
RemoteStorage: "none",
MaxFileSize: 0,
BackupsToKeepLocal: 0,
BackupsToKeepRemote: 0,
LogLevel: "info",
UploadConcurrency: uploadConcurrency,
DownloadConcurrency: downloadConcurrency,
RestoreSchemaOnCluster: "",
UploadByPart: true,
DownloadByPart: true,
UseResumableState: true,
RetriesOnFailure: 3,
RetriesPause: "30s",
RetriesDuration: 100 * time.Millisecond,
WatchInterval: "1h",
WatchDuration: 1 * time.Hour,
FullInterval: "24h",
FullDuration: 24 * time.Hour,
WatchBackupNameTemplate: "shard{shard}-{type}-{time:20060102150405}",
RestoreDatabaseMapping: make(map[string]string, 0),
IONicePriority: "idle",
CPUNicePriority: 15,
RBACBackupAlways: true,
RBACConflictResolution: "recreate",
RemoteStorage: "none",
MaxFileSize: 0,
BackupsToKeepLocal: 0,
BackupsToKeepRemote: 0,
LogLevel: "info",
UploadConcurrency: uploadConcurrency,
DownloadConcurrency: downloadConcurrency,
ObjectDiskServerSizeCopyConcurrency: 32,
RestoreSchemaOnCluster: "",
UploadByPart: true,
DownloadByPart: true,
UseResumableState: true,
RetriesOnFailure: 3,
RetriesPause: "30s",
RetriesDuration: 100 * time.Millisecond,
WatchInterval: "1h",
WatchDuration: 1 * time.Hour,
FullInterval: "24h",
FullDuration: 24 * time.Hour,
WatchBackupNameTemplate: "shard{shard}-{type}-{time:20060102150405}",
RestoreDatabaseMapping: make(map[string]string, 0),
IONicePriority: "idle",
CPUNicePriority: 15,
RBACBackupAlways: true,
RBACConflictResolution: "recreate",
},
ClickHouse: ClickHouseConfig{
Username: "default",
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (s *S3) CopyObject(ctx context.Context, srcSize int64, srcBucket, srcKey, d
numParts := (srcSize + partSize - 1) / partSize

copyPartErrGroup, ctx := errgroup.WithContext(ctx)
copyPartErrGroup.SetLimit(s.Config.Concurrency)
copyPartErrGroup.SetLimit(s.Config.Concurrency * s.Config.Concurrency)

var mu sync.Mutex
var parts []s3types.CompletedPart
Expand Down

0 comments on commit 90059bf

Please sign in to comment.