From 3cf6c32392ebe783e97888537ab34d5334723a3a Mon Sep 17 00:00:00 2001 From: Avi Deitcher Date: Mon, 18 Aug 2025 13:23:48 +0300 Subject: [PATCH] add support for parallelism Signed-off-by: Avi Deitcher --- Makefile | 2 +- cmd/dump.go | 11 +++++++ cmd/dump_test.go | 16 +++++++++++ docs/configuration.md | 2 ++ go.mod | 2 +- go.sum | 2 ++ pkg/core/dump.go | 2 ++ pkg/core/dumpoptions.go | 2 ++ pkg/database/dump.go | 63 ++++++++++++++++++++++++++++++----------- test/backup_test.go | 13 ++++++++- 10 files changed, 96 insertions(+), 19 deletions(-) diff --git a/Makefile b/Makefile index 1aa230d..3a8ba21 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ push: build docker push $(TARGET) integration_test: - TEST_INTEGRATION=true go test -v ./test + TEST_INTEGRATION=true go test -v ./test -test.timeout=20m integration_test_debug: TEST_INTEGRATION=true dlv --wd=./test test ./test diff --git a/cmd/dump.go b/cmd/dump.go index d9b0152..329dcf8 100644 --- a/cmd/dump.go +++ b/cmd/dump.go @@ -95,6 +95,13 @@ func dumpCmd(passedExecs execs, cmdConfig *cmdConfiguration) (*cobra.Command, er if len(exclude) == 0 { exclude = nil } + + // how many databases to back up in parallel + parallel := v.GetInt("parallelism") + if !v.IsSet("parallelism") && dumpConfig != nil && dumpConfig.Parallelism != nil { + parallel = *dumpConfig.Parallelism + } + preBackupScripts := v.GetString("pre-backup-scripts") if preBackupScripts == "" && scriptsConfig != nil && scriptsConfig.PreBackup != nil { preBackupScripts = *scriptsConfig.PreBackup @@ -256,6 +263,7 @@ func dumpCmd(passedExecs execs, cmdConfig *cmdConfiguration) (*cobra.Command, er MaxAllowedPacket: maxAllowedPacket, Run: uid, FilenamePattern: filenamePattern, + Parallelism: parallel, } _, err := executor.Dump(tracerCtx, dumpOpts) if err != nil { @@ -311,6 +319,9 @@ S3: If it is a URL of the format s3://bucketname/path then it will connect via S // once flags.Bool("once", false, "Override all other settings and run the dump once immediately and exit. Useful if you use an external scheduler (e.g. as part of an orchestration solution like Cattle or Docker Swarm or [kubernetes cron jobs](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/)) and don't want the container to do the scheduling internally.") + // parallelism - how many databases (and therefore connections) to back up at once + flags.Int("parallelism", 1, "How many databases to back up in parallel.") + // safechars flags.Bool("safechars", false, "The dump filename usually includes the character `:` in the date, to comply with RFC3339. Some systems and shells don't like that character. If true, will replace all `:` with `-`.") diff --git a/cmd/dump_test.go b/cmd/dump_test.go index fb79431..5ae3eba 100644 --- a/cmd/dump_test.go +++ b/cmd/dump_test.go @@ -39,6 +39,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: defaultPort}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil}, {"file URL with pass-file", []string{"--server", "abc", "--target", "file:///foo/bar", "--pass-file", "testdata/password.txt"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -46,6 +47,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: defaultPort, Pass: "testpassword"}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil}, {"file URL with pass and pass-file (pass takes precedence)", []string{"--server", "abc", "--target", "file:///foo/bar", "--pass", "explicitpass", "--pass-file", "testdata/password.txt"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -53,6 +55,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: defaultPort, Pass: "explicitpass"}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil}, {"file URL with prune", []string{"--server", "abc", "--target", "file:///foo/bar", "--retention", "1h"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -60,6 +63,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: defaultPort}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}}, // database name and port @@ -69,6 +73,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: defaultPort}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil}, {"database explicit name with explicit port", []string{"--server", "abc", "--port", "3307", "--target", "file:///foo/bar"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -76,6 +81,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: 3307}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil}, // config file @@ -85,6 +91,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abcd", Port: 3306, User: "user2", Pass: "xxxx2"}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}}, {"config file with port override", []string{"--config-file", "testdata/config.yml", "--port", "3307"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -92,6 +99,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abcd", Port: 3307, User: "user2", Pass: "xxxx2"}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}}, {"config file with filename pattern override", []string{"--config-file", "testdata/pattern.yml", "--port", "3307"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -99,6 +107,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abcd", Port: 3307, User: "user2", Pass: "xxxx2"}, FilenamePattern: "foo_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}}, // timer options @@ -108,6 +117,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: defaultPort}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Once: true, Frequency: defaultFrequency, Begin: defaultBegin}, nil}, {"cron flag", []string{"--server", "abc", "--target", "file:///foo/bar", "--cron", "0 0 * * *"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -115,6 +125,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: defaultPort}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin, Cron: "0 0 * * *"}, nil}, {"begin flag", []string{"--server", "abc", "--target", "file:///foo/bar", "--begin", "1234"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -122,6 +133,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: defaultPort}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: "1234"}, nil}, {"frequency flag", []string{"--server", "abc", "--target", "file:///foo/bar", "--frequency", "10"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -129,6 +141,7 @@ func TestDumpCmd(t *testing.T) { Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{Host: "abc", Port: defaultPort}, FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: 10, Begin: defaultBegin}, nil}, {"incompatible flags: once/cron", []string{"--server", "abc", "--target", "file:///foo/bar", "--once", "--cron", "0 0 * * *"}, "", true, core.DumpOptions{}, core.TimerOptions{}, nil}, {"incompatible flags: once/begin", []string{"--server", "abc", "--target", "file:///foo/bar", "--once", "--begin", "1234"}, "", true, core.DumpOptions{}, core.TimerOptions{}, nil}, @@ -146,6 +159,7 @@ func TestDumpCmd(t *testing.T) { DBConn: &database.Connection{Host: "abc", Port: defaultPort}, PreBackupScripts: "/prebackup", FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil}, {"postbackup scripts", []string{"--server", "abc", "--target", "file:///foo/bar", "--post-backup-scripts", "/postbackup"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -154,6 +168,7 @@ func TestDumpCmd(t *testing.T) { DBConn: &database.Connection{Host: "abc", Port: defaultPort}, PostBackupScripts: "/postbackup", FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil}, {"prebackup and postbackup scripts", []string{"--server", "abc", "--target", "file:///foo/bar", "--post-backup-scripts", "/postbackup", "--pre-backup-scripts", "/prebackup"}, "", false, core.DumpOptions{ Targets: []storage.Storage{file.New(*fileTargetURL)}, @@ -163,6 +178,7 @@ func TestDumpCmd(t *testing.T) { PreBackupScripts: "/prebackup", PostBackupScripts: "/postbackup", FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}", + Parallelism: 1, }, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil}, } diff --git a/docs/configuration.md b/docs/configuration.md index e6d3c95..69d3584 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -81,6 +81,7 @@ The following are the environment variables, CLI flags and configuration file op | where to put the dump file; see [backup](./backup.md) | BP | `dump --target` | `DB_DUMP_TARGET` | `dump.targets` | | | where the restore file exists; see [restore](./restore.md) | R | `restore --target` | `DB_RESTORE_TARGET` | `restore.target` | | | replace any `:` in the dump filename with `-` | BP | `dump --safechars` | `DB_DUMP_SAFECHARS` | `database.safechars` | `false` | +| How many databases to back up in parallel, uses that number of threads and connections | B | `dump --parallelism` | `DB_DUMP_PARALLELISM` | `dump.parallelism` | `1` | | AWS access key ID, used only if a target does not have one | BRP | `aws-access-key-id` | `AWS_ACCESS_KEY_ID` | `dump.targets[s3-target].accessKeyID` | | | AWS secret access key, used only if a target does not have one | BRP | `aws-secret-access-key` | `AWS_SECRET_ACCESS_KEY` | `dump.targets[s3-target].secretAccessKey` | | | AWS default region, used only if a target does not have one | BRP | `aws-region` | `AWS_REGION` | `dump.targets[s3-target].region` | | @@ -144,6 +145,7 @@ for details of each. * `preBackup`: string, path to directory with pre-backup scripts * `postBackup`: string, path to directory with post-backup scripts * `targets`: strings, list of names of known targets, defined in the `targets` section, where to save the backup + * `parallelism`: int, how many databases to back up in parallel * `restore`: the restore configuration * `scripts`: * `preRestore`: string, path to directory with pre-restore scripts diff --git a/go.mod b/go.mod index b778051..c8c431c 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( require ( filippo.io/age v1.2.1 github.com/InfiniteLoopSpace/go_S-MIME v0.0.0-20181221134359-3f58f9a4b2b6 - github.com/databacker/api/go/api v0.0.0-20250423183243-7775066c265e + github.com/databacker/api/go/api v0.0.0-20250818102239-219c793f2151 github.com/google/go-cmp v0.7.0 go.opentelemetry.io/otel v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 diff --git a/go.sum b/go.sum index 8430ad6..053328f 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,8 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/databacker/api/go/api v0.0.0-20250423183243-7775066c265e h1:5K7IbijS9p+dezx9m45CjFCR2Sf6BfT/tb540aEw66k= github.com/databacker/api/go/api v0.0.0-20250423183243-7775066c265e/go.mod h1:bQhbl71Lk1ATni0H+u249hjoQ8ShAdVNcNjnw6z+SbE= +github.com/databacker/api/go/api v0.0.0-20250818102239-219c793f2151 h1:WuQNmzJiLSR0d2IpeifwK0E6eOLZQDxzbuHWIEN2/9U= +github.com/databacker/api/go/api v0.0.0-20250818102239-219c793f2151/go.mod h1:bQhbl71Lk1ATni0H+u249hjoQ8ShAdVNcNjnw6z+SbE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/pkg/core/dump.go b/pkg/core/dump.go index 90b7197..9c3f408 100644 --- a/pkg/core/dump.go +++ b/pkg/core/dump.go @@ -41,6 +41,7 @@ func (e *Executor) Dump(ctx context.Context, opts DumpOptions) (DumpResults, err suppressUseDatabase := opts.SuppressUseDatabase maxAllowedPacket := opts.MaxAllowedPacket filenamePattern := opts.FilenamePattern + parallelism := opts.Parallelism logger := e.Logger.WithField("run", opts.Run.String()) logger.Level = e.Logger.Level @@ -112,6 +113,7 @@ func (e *Executor) Dump(ctx context.Context, opts DumpOptions) (DumpResults, err SuppressUseDatabase: suppressUseDatabase, MaxAllowedPacket: maxAllowedPacket, PostDumpDelay: opts.PostDumpDelay, + Parallelism: parallelism, }, dw); err != nil { dbDumpSpan.SetStatus(codes.Error, err.Error()) dbDumpSpan.End() diff --git a/pkg/core/dumpoptions.go b/pkg/core/dumpoptions.go index 1025c09..d59414a 100644 --- a/pkg/core/dumpoptions.go +++ b/pkg/core/dumpoptions.go @@ -29,4 +29,6 @@ type DumpOptions struct { FilenamePattern string // PostDumpDelay inafter each dump is complete, while holding connection open. Do not use outside of tests. PostDumpDelay time.Duration + // Parallelism how many databases to back up at once, consuming that number of threads + Parallelism int } diff --git a/pkg/database/dump.go b/pkg/database/dump.go index e0d034b..8ecd09b 100644 --- a/pkg/database/dump.go +++ b/pkg/database/dump.go @@ -3,6 +3,7 @@ package database import ( "context" "fmt" + "sync" "time" "github.com/databacker/mysql-backup/pkg/database/mysql" @@ -16,6 +17,7 @@ type DumpOpts struct { MaxAllowedPacket int // PostDumpDelay after each dump is complete, while holding connection open. Do not use outside of tests. PostDumpDelay time.Duration + Parallelism int } func Dump(ctx context.Context, dbconn *Connection, opts DumpOpts, writers []DumpWriter) error { @@ -31,25 +33,54 @@ func Dump(ctx context.Context, dbconn *Connection, opts DumpOpts, writers []Dump if err != nil { return fmt.Errorf("failed to open connection to database: %v", err) } + + // limit to opts.Parallelism connections + // if none is provided, default to 1, i.e. serial + parallelism := opts.Parallelism + if parallelism == 0 { + parallelism = 1 + } + sem := make(chan struct{}, parallelism) + errCh := make(chan error, len(writers)) + var wg sync.WaitGroup for _, writer := range writers { - for _, schema := range writer.Schemas { - dumper := &mysql.Data{ - Out: writer.Writer, - Connection: db, - Schema: schema, - Host: dbconn.Host, - Compact: opts.Compact, - Triggers: opts.Triggers, - Routines: opts.Routines, - SuppressUseDatabase: opts.SuppressUseDatabase, - MaxAllowedPacket: opts.MaxAllowedPacket, - PostDumpDelay: opts.PostDumpDelay, + sem <- struct{}{} // acquire a slot + wg.Add(1) + go func(writer DumpWriter) { + defer wg.Done() + defer func() { <-sem }() + for _, schema := range writer.Schemas { + dumper := &mysql.Data{ + Out: writer.Writer, + Connection: db, + Schema: schema, + Host: dbconn.Host, + Compact: opts.Compact, + Triggers: opts.Triggers, + Routines: opts.Routines, + SuppressUseDatabase: opts.SuppressUseDatabase, + MaxAllowedPacket: opts.MaxAllowedPacket, + PostDumpDelay: opts.PostDumpDelay, + } + // return on any error + if err := dumper.Dump(); err != nil { + errCh <- fmt.Errorf("failed to dump database %s: %v", schema, err) + return + } } - if err := dumper.Dump(); err != nil { - return fmt.Errorf("failed to dump database %s: %v", schema, err) - } - } + }(writer) } + wg.Wait() + close(errCh) + var errs []error + for err := range errCh { + if err != nil { + errs = append(errs, err) + } + } + if len(errs) > 0 { + return fmt.Errorf("one or more errors occurred: %v", errs) + } return nil } diff --git a/test/backup_test.go b/test/backup_test.go index 3e3cd01..5f4c4fd 100644 --- a/test/backup_test.go +++ b/test/backup_test.go @@ -864,6 +864,7 @@ func TestIntegration(t *testing.T) { t.Fatalf("failed to get docker client: %v", err) } t.Run("parallel databases", func(t *testing.T) { + t.Parallel() base := t.TempDir() mysql, err := startDatabase(dc, base, mysqlImage, "mysql-parallel") defer func() { @@ -914,6 +915,7 @@ func TestIntegration(t *testing.T) { t.Fatalf("invalid target url: %v", err) } + parallelism := 4 dumpOptions := core.DumpOptions{ Compressor: &compression.GzipCompressor{}, DBConn: &database.Connection{ @@ -923,7 +925,8 @@ func TestIntegration(t *testing.T) { Port: mysql.port, }, Targets: []storage.Storage{store}, - PostDumpDelay: 5 * time.Second, // for testing only, make them delay 10 seconds + PostDumpDelay: 5 * time.Second, // for testing only, make them delay a few seconds + Parallelism: parallelism, // set } ctx := context.Background() start := time.Now() @@ -964,12 +967,20 @@ func TestIntegration(t *testing.T) { t.Logf("[%s]\tthreads_running=%d\tthreads_connected=%d\topen_user=%d\tactive_user=%d\n", time.Now().Format("15:04:05"), tr, tc, uTotal, uActive) + // threads connected should not be more than our parallel+1 + if tc > int64(parallelism)+1 { + t.Errorf("too many threads connected: %d (max %d)", tc, parallelism+1) + } + if tc < int64(parallelism) { + t.Errorf("too few threads connected: %d (min %d)", tc, parallelism) + } } } t.Logf("Dump completed at %s in %s", time.Now(), time.Since(start)) }) t.Run("dump", func(t *testing.T) { + t.Parallel() var ( err error smb, mysql containerPort