@@ -7,8 +7,11 @@ package pebble
77import (
88 "bytes"
99 "context"
10+ "encoding/binary"
1011 "fmt"
1112 "io"
13+ "math"
14+ "math/rand"
1215 "os"
1316 "path/filepath"
1417 "reflect"
@@ -18,12 +21,15 @@ import (
1821 "sort"
1922 "strconv"
2023 "strings"
24+ "sync"
2125 "sync/atomic"
2226 "syscall"
2327 "testing"
28+ "time"
2429
2530 "github.com/cockroachdb/datadriven"
2631 "github.com/cockroachdb/errors"
32+ "github.com/cockroachdb/metamorphic"
2733 "github.com/cockroachdb/pebble/internal/base"
2834 "github.com/cockroachdb/pebble/internal/cache"
2935 "github.com/cockroachdb/pebble/internal/manifest"
@@ -1525,3 +1531,167 @@ func TestMkdirAllAndSyncParents(t *testing.T) {
15251531 }
15261532 })
15271533}
1534+
1535+ // TestWALFailoverRandomized is a randomzied test exercising recovery in the
1536+ // presence of WAL failover. It repeatedly opens a database, writes a number of
1537+ // batches concurrently and simulates a hard crash using vfs.NewCrashableMem. It
1538+ // ensures that the resulting DB state opens successfully, and the contents of
1539+ // the DB match the expectations based on the keys written.
1540+ //
1541+ // This test is partially a regression test for #3865.
1542+ func TestWALFailoverRandomized (t * testing.T ) {
1543+ seed := time .Now ().UnixNano ()
1544+ t .Logf ("seed %d" , seed )
1545+ mem := vfs .NewCrashableMem ()
1546+ makeOptions := func (mem * vfs.MemFS ) * Options {
1547+ failoverOpts := WALFailoverOptions {
1548+ Secondary : wal.Dir {FS : mem , Dirname : "secondary" },
1549+ FailoverOptions : wal.FailoverOptions {
1550+ PrimaryDirProbeInterval : time .Microsecond ,
1551+ HealthyProbeLatencyThreshold : 20 * time .Microsecond ,
1552+ HealthyInterval : 10 * time .Microsecond ,
1553+ UnhealthySamplingInterval : time .Microsecond ,
1554+ UnhealthyOperationLatencyThreshold : func () (time.Duration , bool ) {
1555+ return 10 * time .Microsecond , true
1556+ },
1557+ ElevatedWriteStallThresholdLag : 50 * time .Microsecond ,
1558+ },
1559+ }
1560+
1561+ mean := time .Duration (rand .ExpFloat64 () * float64 (time .Microsecond ))
1562+ p := rand .Float64 ()
1563+ t .Logf ("Injecting mean %s of latency with p=%.3f" , mean , p )
1564+ fs := errorfs .Wrap (mem , errorfs .RandomLatency (errorfs .Randomly (p , seed ), mean , seed , time .Second ))
1565+ return & Options {
1566+ FS : fs ,
1567+ FormatMajorVersion : internalFormatNewest ,
1568+ Logger : testLogger {t },
1569+ MemTableSize : 128 << 10 , // 128 KiB
1570+ MemTableStopWritesThreshold : 4 ,
1571+ WALFailover : & failoverOpts ,
1572+ }
1573+ }
1574+
1575+ // KV state tracking.
1576+ //
1577+ // This test uses all uint16 big-endian integers as a keyspace. Values are
1578+ // randomly sized but always contain the key in the first two bytes. We
1579+ // track the state of all KVs throughout the test (whether they're
1580+ // definitely set, maybe set or definitely unset).
1581+ //
1582+ // Note that the test may wrap around to the beginning of the keyspace. This
1583+ // may cause KVs left at kvMaybeSet to be written and be definitively set
1584+ // the second time around.
1585+ type kvState int8
1586+ const (
1587+ kvUnset kvState = 0
1588+ kvMaybeSet kvState = 1
1589+ kvSet kvState = 2
1590+ )
1591+ const keyspaceSize = math .MaxUint16 + 1
1592+ var kvs struct {
1593+ sync.Mutex
1594+ states [keyspaceSize ]kvState
1595+ count uint64 // [0, math.MaxUint16]; INVARIANT: states[count:] all zeroes
1596+ crashing bool
1597+ }
1598+ setIsCrashing := func (crashing bool ) {
1599+ kvs .Lock ()
1600+ defer kvs .Unlock ()
1601+ kvs .crashing = crashing
1602+ }
1603+ // transitionState is called by goroutines responsible for committing
1604+ // batches to the engine. Note that 'i' is the index of the KV before
1605+ // wrapping around and needs to be modded by math.MaxUint16.
1606+ transitionState := func (i , count uint64 , state kvState ) {
1607+ kvs .Lock ()
1608+ defer kvs .Unlock ()
1609+ if kvs .crashing && state == kvSet {
1610+ // We're racing with a CrashClone call and it's indeterminate
1611+ // whether what we think we synced actually made the cut. Leave the
1612+ // kvs at the kvMaybeSet.
1613+ state = kvMaybeSet
1614+ }
1615+ for j := uint64 (0 ); j < count ; j ++ {
1616+ idx := (i + j ) % keyspaceSize
1617+ kvs .states [idx ] = max (kvs .states [idx ], state )
1618+ }
1619+ kvs .count = max (kvs .count , i + count , math .MaxUint16 )
1620+ }
1621+ // validateState is called on recovery to ensure that engine state agrees
1622+ // with the tracked KV state.
1623+ validateState := func (d * DB ) {
1624+ it , err := d .NewIter (nil )
1625+ require .NoError (t , err )
1626+ valid := it .First ()
1627+ for i := 0 ; i < int (kvs .count ); i ++ {
1628+ var kvIsSet bool
1629+ if valid {
1630+ require .Len (t , it .Key (), 2 )
1631+ require .Equal (t , it .Key (), it .Value ()[:2 ])
1632+ kvIsSet = binary .BigEndian .Uint16 (it .Key ()) == uint16 (i )
1633+ }
1634+ if kvIsSet && kvs .states [i ] == kvUnset {
1635+ t .Fatalf ("key %04x is set; state says it should be unset" , i )
1636+ } else if ! kvIsSet && kvs .states [i ] == kvSet {
1637+ t .Fatalf ("key %04x is unset; state says it should be set" , i )
1638+ }
1639+ if kvIsSet {
1640+ valid = it .Next ()
1641+ }
1642+ }
1643+ require .NoError (t , it .Close ())
1644+ }
1645+
1646+ d , err := Open ("primary" , makeOptions (mem ))
1647+ require .NoError (t , err )
1648+ rng := rand .New (rand .NewSource (seed ))
1649+ var wg sync.WaitGroup
1650+ var n uint64
1651+ randomOps := metamorphic.Weighted [func ()]{
1652+ {Weight : 1 , Item : func () {
1653+ time .Sleep (time .Microsecond * time .Duration (rand .Intn (30 )))
1654+ t .Log ("initiating hard crash" )
1655+ setIsCrashing (true )
1656+ // Take a crash-consistent clone of the filesystem and use that going forward.
1657+ mem = mem .CrashClone (vfs.CrashCloneCfg {UnsyncedDataPercent : 50 , RNG : rng })
1658+ wg .Wait () // Wait for outstanding batch commits to finish.
1659+ _ = d .Close ()
1660+ d , err = Open ("primary" , makeOptions (mem ))
1661+ require .NoError (t , err )
1662+ validateState (d )
1663+ setIsCrashing (false )
1664+ }},
1665+ {Weight : 20 , Item : func () {
1666+ count := rng .Intn (14 ) + 1
1667+ var k [2 ]byte
1668+ var v [4096 ]byte
1669+ b := d .NewBatch ()
1670+ for i := 0 ; i < count ; i ++ {
1671+ j := uint16 ((n + uint64 (i )) % keyspaceSize )
1672+ binary .BigEndian .PutUint16 (k [:], j )
1673+ vn := max (rng .Intn (cap (v )), 2 )
1674+ binary .BigEndian .PutUint16 (v [:], j )
1675+ require .NoError (t , b .Set (k [:], v [:vn ], nil ))
1676+ }
1677+ maybeSync := NoSync
1678+ if rng .Intn (2 ) == 1 {
1679+ maybeSync = Sync
1680+ }
1681+ wg .Add (1 )
1682+ go func (n , count uint64 ) {
1683+ defer wg .Done ()
1684+ transitionState (n , count , kvMaybeSet )
1685+ require .NoError (t , b .Commit (maybeSync ))
1686+ if maybeSync == Sync {
1687+ transitionState (n , count , kvSet )
1688+ }
1689+ }(n , uint64 (count ))
1690+ n += uint64 (count )
1691+ }},
1692+ }
1693+ nextRandomOp := randomOps .RandomDeck (rng )
1694+ for o := 0 ; o < 1000 ; o ++ {
1695+ nextRandomOp ()()
1696+ }
1697+ }
0 commit comments