Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ push: build
docker push $(TARGET)

integration_test:
TEST_INTEGRATION=true go test -v ./test
TEST_INTEGRATION=true go test -v ./test -test.timeout=20m

integration_test_debug:
TEST_INTEGRATION=true dlv --wd=./test test ./test
Expand Down
11 changes: 11 additions & 0 deletions cmd/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ func dumpCmd(passedExecs execs, cmdConfig *cmdConfiguration) (*cobra.Command, er
if len(exclude) == 0 {
exclude = nil
}

// how many databases to back up in parallel
parallel := v.GetInt("parallelism")
if !v.IsSet("parallelism") && dumpConfig != nil && dumpConfig.Parallelism != nil {
parallel = *dumpConfig.Parallelism
}

preBackupScripts := v.GetString("pre-backup-scripts")
if preBackupScripts == "" && scriptsConfig != nil && scriptsConfig.PreBackup != nil {
preBackupScripts = *scriptsConfig.PreBackup
Expand Down Expand Up @@ -256,6 +263,7 @@ func dumpCmd(passedExecs execs, cmdConfig *cmdConfiguration) (*cobra.Command, er
MaxAllowedPacket: maxAllowedPacket,
Run: uid,
FilenamePattern: filenamePattern,
Parallelism: parallel,
}
_, err := executor.Dump(tracerCtx, dumpOpts)
if err != nil {
Expand Down Expand Up @@ -311,6 +319,9 @@ S3: If it is a URL of the format s3://bucketname/path then it will connect via S
// once
flags.Bool("once", false, "Override all other settings and run the dump once immediately and exit. Useful if you use an external scheduler (e.g. as part of an orchestration solution like Cattle or Docker Swarm or [kubernetes cron jobs](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/)) and don't want the container to do the scheduling internally.")

// parallelism - how many databases (and therefore connections) to back up at once
flags.Int("parallelism", 1, "How many databases to back up in parallel.")

// safechars
flags.Bool("safechars", false, "The dump filename usually includes the character `:` in the date, to comply with RFC3339. Some systems and shells don't like that character. If true, will replace all `:` with `-`.")

Expand Down
16 changes: 16 additions & 0 deletions cmd/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,31 @@ func TestDumpCmd(t *testing.T) {
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
{"file URL with pass-file", []string{"--server", "abc", "--target", "file:///foo/bar", "--pass-file", "testdata/password.txt"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
MaxAllowedPacket: defaultMaxAllowedPacket,
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: defaultPort, Pass: "testpassword"},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
{"file URL with pass and pass-file (pass takes precedence)", []string{"--server", "abc", "--target", "file:///foo/bar", "--pass", "explicitpass", "--pass-file", "testdata/password.txt"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
MaxAllowedPacket: defaultMaxAllowedPacket,
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: defaultPort, Pass: "explicitpass"},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
{"file URL with prune", []string{"--server", "abc", "--target", "file:///foo/bar", "--retention", "1h"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
MaxAllowedPacket: defaultMaxAllowedPacket,
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}},

// database name and port
Expand All @@ -69,13 +73,15 @@ func TestDumpCmd(t *testing.T) {
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
{"database explicit name with explicit port", []string{"--server", "abc", "--port", "3307", "--target", "file:///foo/bar"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
MaxAllowedPacket: defaultMaxAllowedPacket,
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: 3307},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},

// config file
Expand All @@ -85,20 +91,23 @@ func TestDumpCmd(t *testing.T) {
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abcd", Port: 3306, User: "user2", Pass: "xxxx2"},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}},
{"config file with port override", []string{"--config-file", "testdata/config.yml", "--port", "3307"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
MaxAllowedPacket: defaultMaxAllowedPacket,
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abcd", Port: 3307, User: "user2", Pass: "xxxx2"},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}},
{"config file with filename pattern override", []string{"--config-file", "testdata/pattern.yml", "--port", "3307"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
MaxAllowedPacket: defaultMaxAllowedPacket,
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abcd", Port: 3307, User: "user2", Pass: "xxxx2"},
FilenamePattern: "foo_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, &core.PruneOptions{Targets: []storage.Storage{file.New(*fileTargetURL)}, Retention: "1h"}},

// timer options
Expand All @@ -108,27 +117,31 @@ func TestDumpCmd(t *testing.T) {
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Once: true, Frequency: defaultFrequency, Begin: defaultBegin}, nil},
{"cron flag", []string{"--server", "abc", "--target", "file:///foo/bar", "--cron", "0 0 * * *"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
MaxAllowedPacket: defaultMaxAllowedPacket,
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin, Cron: "0 0 * * *"}, nil},
{"begin flag", []string{"--server", "abc", "--target", "file:///foo/bar", "--begin", "1234"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
MaxAllowedPacket: defaultMaxAllowedPacket,
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: "1234"}, nil},
{"frequency flag", []string{"--server", "abc", "--target", "file:///foo/bar", "--frequency", "10"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
MaxAllowedPacket: defaultMaxAllowedPacket,
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: 10, Begin: defaultBegin}, nil},
{"incompatible flags: once/cron", []string{"--server", "abc", "--target", "file:///foo/bar", "--once", "--cron", "0 0 * * *"}, "", true, core.DumpOptions{}, core.TimerOptions{}, nil},
{"incompatible flags: once/begin", []string{"--server", "abc", "--target", "file:///foo/bar", "--once", "--begin", "1234"}, "", true, core.DumpOptions{}, core.TimerOptions{}, nil},
Expand All @@ -146,6 +159,7 @@ func TestDumpCmd(t *testing.T) {
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
PreBackupScripts: "/prebackup",
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
{"postbackup scripts", []string{"--server", "abc", "--target", "file:///foo/bar", "--post-backup-scripts", "/postbackup"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
Expand All @@ -154,6 +168,7 @@ func TestDumpCmd(t *testing.T) {
DBConn: &database.Connection{Host: "abc", Port: defaultPort},
PostBackupScripts: "/postbackup",
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
{"prebackup and postbackup scripts", []string{"--server", "abc", "--target", "file:///foo/bar", "--post-backup-scripts", "/postbackup", "--pre-backup-scripts", "/prebackup"}, "", false, core.DumpOptions{
Targets: []storage.Storage{file.New(*fileTargetURL)},
Expand All @@ -163,6 +178,7 @@ func TestDumpCmd(t *testing.T) {
PreBackupScripts: "/prebackup",
PostBackupScripts: "/postbackup",
FilenamePattern: "db_backup_{{ .now }}.{{ .compression }}",
Parallelism: 1,
}, core.TimerOptions{Frequency: defaultFrequency, Begin: defaultBegin}, nil},
}

Expand Down
2 changes: 2 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The following are the environment variables, CLI flags and configuration file op
| where to put the dump file; see [backup](./backup.md) | BP | `dump --target` | `DB_DUMP_TARGET` | `dump.targets` | |
| where the restore file exists; see [restore](./restore.md) | R | `restore --target` | `DB_RESTORE_TARGET` | `restore.target` | |
| replace any `:` in the dump filename with `-` | BP | `dump --safechars` | `DB_DUMP_SAFECHARS` | `database.safechars` | `false` |
| How many databases to back up in parallel, uses that number of threads and connections | B | `dump --parallelism` | `DB_DUMP_PARALLELISM` | `dump.parallelism` | `1` |
| AWS access key ID, used only if a target does not have one | BRP | `aws-access-key-id` | `AWS_ACCESS_KEY_ID` | `dump.targets[s3-target].accessKeyID` | |
| AWS secret access key, used only if a target does not have one | BRP | `aws-secret-access-key` | `AWS_SECRET_ACCESS_KEY` | `dump.targets[s3-target].secretAccessKey` | |
| AWS default region, used only if a target does not have one | BRP | `aws-region` | `AWS_REGION` | `dump.targets[s3-target].region` | |
Expand Down Expand Up @@ -144,6 +145,7 @@ for details of each.
* `preBackup`: string, path to directory with pre-backup scripts
* `postBackup`: string, path to directory with post-backup scripts
* `targets`: strings, list of names of known targets, defined in the `targets` section, where to save the backup
* `parallelism`: int, how many databases to back up in parallel
* `restore`: the restore configuration
* `scripts`:
* `preRestore`: string, path to directory with pre-restore scripts
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
require (
filippo.io/age v1.2.1
github.com/InfiniteLoopSpace/go_S-MIME v0.0.0-20181221134359-3f58f9a4b2b6
github.com/databacker/api/go/api v0.0.0-20250423183243-7775066c265e
github.com/databacker/api/go/api v0.0.0-20250818102239-219c793f2151
github.com/google/go-cmp v0.7.0
go.opentelemetry.io/otel v1.31.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/databacker/api/go/api v0.0.0-20250423183243-7775066c265e h1:5K7IbijS9p+dezx9m45CjFCR2Sf6BfT/tb540aEw66k=
github.com/databacker/api/go/api v0.0.0-20250423183243-7775066c265e/go.mod h1:bQhbl71Lk1ATni0H+u249hjoQ8ShAdVNcNjnw6z+SbE=
github.com/databacker/api/go/api v0.0.0-20250818102239-219c793f2151 h1:WuQNmzJiLSR0d2IpeifwK0E6eOLZQDxzbuHWIEN2/9U=
github.com/databacker/api/go/api v0.0.0-20250818102239-219c793f2151/go.mod h1:bQhbl71Lk1ATni0H+u249hjoQ8ShAdVNcNjnw6z+SbE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (e *Executor) Dump(ctx context.Context, opts DumpOptions) (DumpResults, err
suppressUseDatabase := opts.SuppressUseDatabase
maxAllowedPacket := opts.MaxAllowedPacket
filenamePattern := opts.FilenamePattern
parallelism := opts.Parallelism
logger := e.Logger.WithField("run", opts.Run.String())
logger.Level = e.Logger.Level

Expand Down Expand Up @@ -112,6 +113,7 @@ func (e *Executor) Dump(ctx context.Context, opts DumpOptions) (DumpResults, err
SuppressUseDatabase: suppressUseDatabase,
MaxAllowedPacket: maxAllowedPacket,
PostDumpDelay: opts.PostDumpDelay,
Parallelism: parallelism,
}, dw); err != nil {
dbDumpSpan.SetStatus(codes.Error, err.Error())
dbDumpSpan.End()
Expand Down
2 changes: 2 additions & 0 deletions pkg/core/dumpoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ type DumpOptions struct {
FilenamePattern string
// PostDumpDelay inafter each dump is complete, while holding connection open. Do not use outside of tests.
PostDumpDelay time.Duration
// Parallelism how many databases to back up at once, consuming that number of threads
Parallelism int
}
63 changes: 47 additions & 16 deletions pkg/database/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package database
import (
"context"
"fmt"
"sync"
"time"

"github.com/databacker/mysql-backup/pkg/database/mysql"
Expand All @@ -16,6 +17,7 @@ type DumpOpts struct {
MaxAllowedPacket int
// PostDumpDelay after each dump is complete, while holding connection open. Do not use outside of tests.
PostDumpDelay time.Duration
Parallelism int
}

func Dump(ctx context.Context, dbconn *Connection, opts DumpOpts, writers []DumpWriter) error {
Expand All @@ -31,25 +33,54 @@ func Dump(ctx context.Context, dbconn *Connection, opts DumpOpts, writers []Dump
if err != nil {
return fmt.Errorf("failed to open connection to database: %v", err)
}

// limit to opts.Parallelism connections
// if none is provided, default to 1, i.e. serial
parallelism := opts.Parallelism
if parallelism == 0 {
parallelism = 1
}
sem := make(chan struct{}, parallelism)
errCh := make(chan error, len(writers))
var wg sync.WaitGroup
for _, writer := range writers {
for _, schema := range writer.Schemas {
dumper := &mysql.Data{
Out: writer.Writer,
Connection: db,
Schema: schema,
Host: dbconn.Host,
Compact: opts.Compact,
Triggers: opts.Triggers,
Routines: opts.Routines,
SuppressUseDatabase: opts.SuppressUseDatabase,
MaxAllowedPacket: opts.MaxAllowedPacket,
PostDumpDelay: opts.PostDumpDelay,
sem <- struct{}{} // acquire a slot
wg.Add(1)
go func(writer DumpWriter) {
defer wg.Done()
defer func() { <-sem }()
for _, schema := range writer.Schemas {
dumper := &mysql.Data{
Out: writer.Writer,
Connection: db,
Schema: schema,
Host: dbconn.Host,
Compact: opts.Compact,
Triggers: opts.Triggers,
Routines: opts.Routines,
SuppressUseDatabase: opts.SuppressUseDatabase,
MaxAllowedPacket: opts.MaxAllowedPacket,
PostDumpDelay: opts.PostDumpDelay,
}
// return on any error
if err := dumper.Dump(); err != nil {
errCh <- fmt.Errorf("failed to dump database %s: %v", schema, err)
return
}
}
if err := dumper.Dump(); err != nil {
return fmt.Errorf("failed to dump database %s: %v", schema, err)
}
}
}(writer)
}
wg.Wait()
close(errCh)

var errs []error
for err := range errCh {
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("one or more errors occurred: %v", errs)
}
return nil
}
13 changes: 12 additions & 1 deletion test/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,7 @@ func TestIntegration(t *testing.T) {
t.Fatalf("failed to get docker client: %v", err)
}
t.Run("parallel databases", func(t *testing.T) {
t.Parallel()
base := t.TempDir()
mysql, err := startDatabase(dc, base, mysqlImage, "mysql-parallel")
defer func() {
Expand Down Expand Up @@ -914,6 +915,7 @@ func TestIntegration(t *testing.T) {
t.Fatalf("invalid target url: %v", err)
}

parallelism := 4
dumpOptions := core.DumpOptions{
Compressor: &compression.GzipCompressor{},
DBConn: &database.Connection{
Expand All @@ -923,7 +925,8 @@ func TestIntegration(t *testing.T) {
Port: mysql.port,
},
Targets: []storage.Storage{store},
PostDumpDelay: 5 * time.Second, // for testing only, make them delay 10 seconds
PostDumpDelay: 5 * time.Second, // for testing only, make them delay a few seconds
Parallelism: parallelism, // set
}
ctx := context.Background()
start := time.Now()
Expand Down Expand Up @@ -964,12 +967,20 @@ func TestIntegration(t *testing.T) {
t.Logf("[%s]\tthreads_running=%d\tthreads_connected=%d\topen_user=%d\tactive_user=%d\n",
time.Now().Format("15:04:05"),
tr, tc, uTotal, uActive)
// threads connected should not be more than our parallel+1
if tc > int64(parallelism)+1 {
t.Errorf("too many threads connected: %d (max %d)", tc, parallelism+1)
}
if tc < int64(parallelism) {
t.Errorf("too few threads connected: %d (min %d)", tc, parallelism)
}

}
}
t.Logf("Dump completed at %s in %s", time.Now(), time.Since(start))
})
t.Run("dump", func(t *testing.T) {
t.Parallel()
var (
err error
smb, mysql containerPort
Expand Down