Skip to content

Commit

Permalink
Refactor shadow WAL to use segments
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Jul 23, 2021
1 parent fc897b4 commit 77274ab
Show file tree
Hide file tree
Showing 14 changed files with 1,021 additions and 832 deletions.
6 changes: 5 additions & 1 deletion cmd/litestream/main.go
Expand Up @@ -9,6 +9,7 @@ import (
"log"
"net/url"
"os"
"os/signal"
"os/user"
"path"
"path/filepath"
Expand Down Expand Up @@ -86,14 +87,17 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {

// Setup signal handler.
ctx, cancel := context.WithCancel(ctx)
signalCh := signalChan()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, notifySignals...)

if err := c.Run(ctx); err != nil {
return err
}

// Wait for signal to stop program.
select {
case <-ctx.Done():
fmt.Println("context done, litestream shutting down")
case err = <-c.execCh:
cancel()
fmt.Println("subprocess exited, litestream shutting down")
Expand Down
7 changes: 1 addition & 6 deletions cmd/litestream/main_notwindows.go
Expand Up @@ -5,7 +5,6 @@ package main
import (
"context"
"os"
"os/signal"
"syscall"
)

Expand All @@ -19,8 +18,4 @@ func runWindowsService(ctx context.Context) error {
panic("cannot run windows service as unix process")
}

func signalChan() <-chan os.Signal {
ch := make(chan os.Signal, 2)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
return ch
}
var notifySignals = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
6 changes: 6 additions & 0 deletions cmd/litestream/main_test.go
Expand Up @@ -2,16 +2,22 @@ package main_test

import (
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"

"github.com/benbjohnson/litestream"
main "github.com/benbjohnson/litestream/cmd/litestream"
"github.com/benbjohnson/litestream/file"
"github.com/benbjohnson/litestream/gcs"
"github.com/benbjohnson/litestream/s3"
)

func init() {
litestream.LogFlags = log.Lmsgprefix | log.Ldate | log.Ltime | log.Lmicroseconds | log.LUTC | log.Lshortfile
}

func TestReadConfigFile(t *testing.T) {
// Ensure global AWS settings are propagated down to replica configurations.
t.Run("PropagateGlobalSettings", func(t *testing.T) {
Expand Down
7 changes: 1 addition & 6 deletions cmd/litestream/main_windows.go
Expand Up @@ -7,7 +7,6 @@ import (
"io"
"log"
"os"
"os/signal"

"golang.org/x/sys/windows"
"golang.org/x/sys/windows/svc"
Expand Down Expand Up @@ -105,8 +104,4 @@ func (w *eventlogWriter) Write(p []byte) (n int, err error) {
return 0, elog.Info(1, string(p))
}

func signalChan() <-chan os.Signal {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
return ch
}
var notifySignals = []os.Signal{os.Interrupt}
14 changes: 0 additions & 14 deletions cmd/litestream/replicate.go
Expand Up @@ -42,7 +42,6 @@ func NewReplicateCommand() *ReplicateCommand {
func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) {
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
execFlag := fs.String("exec", "", "execute subcommand")
tracePath := fs.String("trace", "", "trace path")
configPath, noExpandEnv := registerConfigFlag(fs)
fs.Usage = c.Usage
if err := fs.Parse(args); err != nil {
Expand Down Expand Up @@ -80,16 +79,6 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e
c.Config.Exec = *execFlag
}

// Enable trace logging.
if *tracePath != "" {
f, err := os.Create(*tracePath)
if err != nil {
return err
}
defer f.Close()
litestream.Tracef = log.New(f, "", log.LstdFlags|log.Lmicroseconds|log.LUTC|log.Lshortfile).Printf
}

return nil
}

Expand Down Expand Up @@ -215,8 +204,5 @@ Arguments:
-no-expand-env
Disables environment variable expansion in configuration file.
-trace PATH
Write verbose trace logging to PATH.
`[1:], DefaultConfigPath())
}
135 changes: 135 additions & 0 deletions cmd/litestream/replicate_test.go
@@ -0,0 +1,135 @@
package main_test

import (
"context"
"database/sql"
"errors"
"fmt"
"hash/crc64"
"io"
"os"
"path/filepath"
"runtime"
"testing"
"time"

main "github.com/benbjohnson/litestream/cmd/litestream"
"golang.org/x/sync/errgroup"
)

func TestReplicateCommand(t *testing.T) {
if testing.Short() {
t.Skip("long running test, skipping")
} else if runtime.GOOS != "linux" {
t.Skip("must run system tests on Linux, skipping")
}

const writeTime = 10 * time.Second

dir := t.TempDir()
configPath := filepath.Join(dir, "litestream.yml")
dbPath := filepath.Join(dir, "db")
restorePath := filepath.Join(dir, "restored")
replicaPath := filepath.Join(dir, "replica")

if err := os.WriteFile(configPath, []byte(`
dbs:
- path: `+dbPath+`
replicas:
- path: `+replicaPath+`
`), 0666); err != nil {
t.Fatal(err)
}

// Generate data into SQLite database from separate goroutine.
g, ctx := errgroup.WithContext(context.Background())
mainctx, cancel := context.WithCancel(ctx)
g.Go(func() error {
defer cancel()

db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
}
defer db.Close()

if _, err := db.ExecContext(ctx, `PRAGMA journal_mode = WAL`); err != nil {
return fmt.Errorf("cannot enable wal: %w", err)
} else if _, err := db.ExecContext(ctx, `PRAGMA synchronous = NORMAL`); err != nil {
return fmt.Errorf("cannot enable wal: %w", err)
} else if _, err := db.ExecContext(ctx, `CREATE TABLE t (id INTEGER PRIMARY KEY)`); err != nil {
return fmt.Errorf("cannot create table: %w", err)
}

ticker := time.NewTicker(1 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTimer(writeTime)
defer timer.Stop()

for i := 0; ; i++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
case <-ticker.C:
if _, err := db.ExecContext(ctx, `INSERT INTO t (id) VALUES (?);`, i); err != nil {
return fmt.Errorf("cannot insert: i=%d err=%w", i, err)
}
}
}
})

// Replicate database unless the context is canceled.
g.Go(func() error {
return main.NewMain().Run(mainctx, []string{"replicate", "-config", configPath})
})

if err := g.Wait(); err != nil {
t.Fatal(err)
}

// Checkpoint database.
mustCheckpoint(t, dbPath)
chksum0 := mustChecksum(t, dbPath)

// Restore to another path.
if err := main.NewMain().Run(context.Background(), []string{"restore", "-config", configPath, "-o", restorePath, dbPath}); err != nil && !errors.Is(err, context.Canceled) {
t.Fatal(err)
}

// Verify contents match.
if chksum1 := mustChecksum(t, restorePath); chksum0 != chksum1 {
t.Fatal("restore mismatch")
}
}

func mustCheckpoint(tb testing.TB, path string) {
tb.Helper()

db, err := sql.Open("sqlite3", path)
if err != nil {
tb.Fatal(err)
}
defer db.Close()

if _, err := db.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
tb.Fatal(err)
}
}

func mustChecksum(tb testing.TB, path string) uint64 {
tb.Helper()

f, err := os.Open(path)
if err != nil {
tb.Fatal(err)
}
defer f.Close()

h := crc64.New(crc64.MakeTable(crc64.ISO))
if _, err := io.Copy(h, f); err != nil {
tb.Fatal(err)
}
return h.Sum64()
}

0 comments on commit 77274ab

Please sign in to comment.