From 9474f95967a4fa2dfc6b7e0f7b54017870871fd2 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Tue, 4 Oct 2022 07:42:58 +0300 Subject: [PATCH] Add replicate flags to pump Litestream manually For some special setups it is sometimes useful to run Litestream manually instead of letting it replicate in the background. This commit implements the following flags for replicate: * -once for doing synchronous replication and then exit * -force-snapshot to force a snapshot during -once * -enforce-retention to enforce retention rules during -once Because running once does not respect the snapshot interval the caller is expected to use -force-snapshot and -enforce-retention regularly to ensure the replication targets stay clean. For this to work correctly with a live database it needs to be opened with auto checkpointing disabled and SQLITE_FCNTL_PERSIST_WAL. Other uses include only using -force-snapshot to create regular backups of the database instead of live replication. Fixes #486 --- cmd/litestream/main.go | 8 +-- cmd/litestream/replicate.go | 109 +++++++++++++++++++++++++++++++++--- db.go | 2 +- 3 files changed, 105 insertions(+), 14 deletions(-) diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 38eb41a5..ba168a0a 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -92,19 +92,19 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) { // Wait for signal to stop program. select { - case err = <-c.execCh: + case err = <-c.runCh: slog.Info("subprocess exited, litestream shutting down") case sig := <-signalCh: slog.Info("signal received, litestream shutting down") - if c.cmd != nil { + if c.runSignal != nil { slog.Info("sending signal to exec process") - if err := c.cmd.Process.Signal(sig); err != nil { + if err := c.runSignal(sig); err != nil { return fmt.Errorf("cannot signal exec process: %w", err) } slog.Info("waiting for exec process to close") - if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") { + if err := <-c.runCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") { return fmt.Errorf("cannot wait for exec process: %w", err) } } diff --git a/cmd/litestream/replicate.go b/cmd/litestream/replicate.go index 898e79de..76ccfc00 100644 --- a/cmd/litestream/replicate.go +++ b/cmd/litestream/replicate.go @@ -23,8 +23,12 @@ import ( // ReplicateCommand represents a command that continuously replicates SQLite databases. type ReplicateCommand struct { - cmd *exec.Cmd // subcommand - execCh chan error // subcommand error channel + runSignal func(os.Signal) error // run cancel signaler + runCh chan error // run error channel + + once bool // replicate once and exit + forceSnapshot bool // force snapshot to all replicas + enforceRetention bool // enforce retention of old snapshots Config Config @@ -34,7 +38,7 @@ type ReplicateCommand struct { func NewReplicateCommand() *ReplicateCommand { return &ReplicateCommand{ - execCh: make(chan error), + runCh: make(chan error), } } @@ -42,6 +46,9 @@ func NewReplicateCommand() *ReplicateCommand { func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) { fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError) execFlag := fs.String("exec", "", "execute subcommand") + onceFlag := fs.Bool("once", false, "replicate once and exit") + forceSnapshotFlag := fs.Bool("force-snapshot", false, "force snapshot when replicating once") + enforceRetentionFlag := fs.Bool("enforce-retention", false, "enforce retention of old snapshots") configPath, noExpandEnv := registerConfigFlag(fs) fs.Usage = c.Usage if err := fs.Parse(args); err != nil { @@ -79,6 +86,22 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e c.Config.Exec = *execFlag } + // Once is mutually exclusive with exec + c.once = *onceFlag + if c.once && c.Config.Exec != "" { + return fmt.Errorf("cannot specify -once flag with exec") + } + + c.forceSnapshot = *forceSnapshotFlag + if !c.once && c.forceSnapshot { + return fmt.Errorf("cannot specify -force-snapshot flag without -once") + } + + c.enforceRetention = *enforceRetentionFlag + if !c.once && c.enforceRetention { + return fmt.Errorf("cannot specify -enforce-retention flag without -once") + } + return nil } @@ -98,6 +121,14 @@ func (c *ReplicateCommand) Run() (err error) { return err } + // Disable monitors if we're running once. + if c.once { + db.MonitorInterval = 0 + for _, r := range db.Replicas { + r.MonitorEnabled = false + } + } + // Open database & attach to program. if err := db.Open(); err != nil { return err @@ -152,14 +183,65 @@ func (c *ReplicateCommand) Run() (err error) { return fmt.Errorf("cannot parse exec command: %w", err) } - c.cmd = exec.Command(execArgs[0], execArgs[1:]...) - c.cmd.Env = os.Environ() - c.cmd.Stdout = os.Stdout - c.cmd.Stderr = os.Stderr - if err := c.cmd.Start(); err != nil { + cmd := exec.Command(execArgs[0], execArgs[1:]...) + cmd.Env = os.Environ() + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { return fmt.Errorf("cannot start exec command: %w", err) } - go func() { c.execCh <- c.cmd.Wait() }() + c.runSignal = cmd.Process.Signal + go func() { c.runCh <- cmd.Wait() }() + } else if c.once { + // Run replication once for each replica with cancel. + ctx, cancel := context.WithCancel(context.Background()) + c.runSignal = func(s os.Signal) error { + cancel() + return nil + } + + go func() { + var err error + + defer func() { + cancel() + c.runCh <- err + }() + + for _, db := range c.DBs { + if c.forceSnapshot { + // Force next index with RESTART checkpoint. + db.MaxCheckpointPageN = 1 + } + + if err = db.Sync(ctx); err != nil { + return + } + + // Prevent checkpointing on Close() + db.MinCheckpointPageN = 0 + db.MaxCheckpointPageN = 0 + db.TruncatePageN = 0 + db.CheckpointInterval = 0 + + for _, r := range db.Replicas { + if c.forceSnapshot { + _, err = r.Snapshot(ctx) + } else { + err = r.Sync(ctx) + } + if err != nil { + return + } + + if c.enforceRetention { + if err = r.EnforceRetention(ctx); err != nil { + return + } + } + } + } + }() } return nil @@ -202,6 +284,15 @@ Arguments: Executes a subcommand. Litestream will exit when the child process exits. Useful for simple process management. + -once + Execute replication once and exit. + + -force-snapshot + When replicating once, force taking a snapshot to all replicas. + + -enforce-retention + When replicating once, enforce rentention of old snapshots. + -no-expand-env Disables environment variable expansion in configuration file. diff --git a/db.go b/db.go index ba914e17..ae9f4780 100644 --- a/db.go +++ b/db.go @@ -775,7 +775,7 @@ func (db *DB) Sync(ctx context.Context) (err error) { checkpoint, checkpointMode = true, CheckpointModeTruncate } else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) { checkpoint, checkpointMode = true, CheckpointModeRestart - } else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) { + } else if db.MinCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) { checkpoint = true } else if db.CheckpointInterval > 0 && !info.dbModTime.IsZero() && time.Since(info.dbModTime) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) { checkpoint = true