Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 86 additions & 10 deletions internal/raftengine/etcd/wal_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package etcd

import (
"bytes"
"io"
"os"
"path/filepath"

Expand Down Expand Up @@ -106,24 +107,28 @@ func bootstrapWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, f
}

func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm StateMachine) (*diskState, error) {
// Scope the repair retry tightly to WAL-only reads: both
// loadPersistedSnapshot (scans WAL via wal.ValidSnapshotEntries)
// and openAndReadWAL's ReadAll can surface io.ErrUnexpectedEOF
// when the kernel OOM-killer SIGKILLed the process mid-WAL-write.
// wal.Repair truncates the partial trailing record once and is
// idempotent. FSM snapshot restore is kept out of this retry —
// a truncated .fsm payload surfacing ErrUnexpectedEOF is a
// different failure mode (the FSM snapshotter has its own
// on-disk CRC footer) and wal.Repair does not address it;
// running repair in that case would dirty a perfectly-good WAL.
snapshotter := snap.New(logger, snapDir)
snapshot, err := loadPersistedSnapshot(logger, walDir, snapshotter)
snapshot, err := loadPersistedSnapshotWithRepair(logger, walDir, snapshotter)
if err != nil {
return nil, err
}
if err := restoreSnapshotState(fsm, snapshot, fsmSnapDir); err != nil {
return nil, err
}

w, err := wal.Open(logger, walDir, walSnapshotFor(snapshot))
if err != nil {
return nil, errors.WithStack(err)
}

_, hardState, entries, err := w.ReadAll()
w, hardState, entries, err := openAndReadWALWithRepair(logger, walDir, walSnapshotFor(snapshot))
if err != nil {
_ = w.Close()
return nil, errors.WithStack(err)
return nil, err
}

storage, err := newMemoryStorage(persistedState{
Expand All @@ -132,7 +137,12 @@ func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm St
Entries: entries,
})
if err != nil {
_ = w.Close()
if closeErr := w.Close(); closeErr != nil {
logger.Warn("WAL close failed after storage init error",
zap.String("dir", walDir),
zap.Error(closeErr),
)
}
return nil, err
}

Expand All @@ -143,6 +153,72 @@ func loadWalState(logger *zap.Logger, walDir, snapDir, fsmSnapDir string, fsm St
}, nil
}

// loadPersistedSnapshotWithRepair wraps loadPersistedSnapshot with one
// wal.Repair attempt on io.ErrUnexpectedEOF. The caller passes in a
// shared snapshotter so loadWalState does not instantiate snap.New
// twice per open.
func loadPersistedSnapshotWithRepair(logger *zap.Logger, walDir string, snapshotter *snap.Snapshotter) (raftpb.Snapshot, error) {
snapshot, err := loadPersistedSnapshot(logger, walDir, snapshotter)
if err == nil || !errors.Is(err, io.ErrUnexpectedEOF) {
return snapshot, err
}
logger.Warn("WAL tail truncated during snapshot scan, repairing",
zap.String("dir", walDir),
zap.Error(err),
)
if !wal.Repair(logger, walDir) {
return raftpb.Snapshot{}, errors.Wrap(err, "WAL unrepairable")
}
snapshot, err = loadPersistedSnapshot(logger, walDir, snapshotter)
if err != nil {
return raftpb.Snapshot{}, errors.Wrap(err, "WAL unrepairable after repair")
}
return snapshot, nil
}

// openAndReadWALWithRepair wraps openAndReadWAL with one wal.Repair
// attempt on io.ErrUnexpectedEOF.
func openAndReadWALWithRepair(logger *zap.Logger, walDir string, walSnap walpb.Snapshot) (*wal.WAL, raftpb.HardState, []raftpb.Entry, error) {
w, hs, ents, err := openAndReadWAL(logger, walDir, walSnap)
if err == nil || !errors.Is(err, io.ErrUnexpectedEOF) {
return w, hs, ents, err
}
logger.Warn("WAL tail truncated during ReadAll, repairing",
zap.String("dir", walDir),
zap.Error(err),
)
if !wal.Repair(logger, walDir) {
return nil, raftpb.HardState{}, nil, errors.Wrap(err, "WAL unrepairable")
}
w, hs, ents, err = openAndReadWAL(logger, walDir, walSnap)
if err != nil {
return nil, raftpb.HardState{}, nil, errors.Wrap(err, "WAL unrepairable after repair")
}
return w, hs, ents, nil
}

// openAndReadWAL opens the WAL at walDir and runs ReadAll. io.ErrUnexpectedEOF
// and other errors propagate upward; the retry/repair is handled once at
// loadWalState so ValidSnapshotEntries and ReadAll share a single repair
// pass and the "WAL tail truncated" log is emitted at most once.
func openAndReadWAL(logger *zap.Logger, walDir string, walSnap walpb.Snapshot) (*wal.WAL, raftpb.HardState, []raftpb.Entry, error) {
w, err := wal.Open(logger, walDir, walSnap)
if err != nil {
return nil, raftpb.HardState{}, nil, errors.WithStack(err)
}
_, hardState, entries, err := w.ReadAll()
if err != nil {
if closeErr := w.Close(); closeErr != nil {
logger.Warn("WAL close failed after ReadAll error",
zap.String("dir", walDir),
zap.Error(closeErr),
)
}
return nil, raftpb.HardState{}, nil, errors.WithStack(err)
}
return w, hardState, entries, nil
}

func loadPersistedSnapshot(logger *zap.Logger, walDir string, snapshotter *snap.Snapshotter) (raftpb.Snapshot, error) {
walSnaps, err := wal.ValidSnapshotEntries(logger, walDir)
if err != nil {
Expand Down
165 changes: 165 additions & 0 deletions internal/raftengine/etcd/wal_store_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package etcd

import (
"bytes"
"context"
"os"
"path/filepath"
"sort"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -128,3 +133,163 @@ func TestRestoreSnapshotStateTokenFileNotFound(t *testing.T) {
require.ErrorIs(t, err, ErrFSMSnapshotNotFound)
require.Nil(t, fsm.restored)
}

// --- openAndReadWAL / WAL auto-repair tests ---
//
// These tests cover the OOM-SIGKILL → partial-trailing-record scenario:
// the kernel kills the process mid-WAL-write, leaving the last
// preallocated WAL segment with a torn trailing record. On restart,
// wal.ReadAll returns io.ErrUnexpectedEOF. openAndReadWAL should invoke
// wal.Repair to truncate the partial record and retry once. CRC
// mismatches (real corruption, not torn writes) must propagate.

// seedWAL bootstraps a fresh raft WAL with a few proposals, closes it
// cleanly, and returns the data dir.
func seedWAL(t *testing.T, proposals [][]byte) string {
t.Helper()
dir := t.TempDir()
fsm := &testStateMachine{}
engine, err := Open(context.Background(), OpenConfig{
NodeID: 1,
LocalID: "n1",
LocalAddress: "127.0.0.1:0",
DataDir: dir,
Bootstrap: true,
StateMachine: fsm,
})
require.NoError(t, err)
for _, p := range proposals {
_, err := engine.Propose(context.Background(), p)
require.NoError(t, err)
}
require.NoError(t, engine.Close())
return dir
}

// truncateInsideLastRecord scans walPath for the end of written record
// data (the first 8-byte aligned block of zeros in the preallocated tail)
// and truncates a few bytes before that boundary so the truncation lands
// inside framing rather than in the zero padding.
func truncateInsideLastRecord(t *testing.T, walPath string) {
t.Helper()
data, err := os.ReadFile(walPath)
require.NoError(t, err)
end := len(data)
// Walk backwards 8 bytes at a time, skipping zeros, until we hit a
// block that isn't all zeros — that's where real record framing ends.
zeros := make([]byte, 8)
for end >= 8 && bytes.Equal(data[end-8:end], zeros) {
end -= 8
}
require.Greater(t, end, 16, "WAL has no non-zero content; seedWAL likely didn't propose anything")
// Lop off the final 5 bytes of real framing — enough to corrupt the
// trailing record's length prefix or payload so wal.ReadAll surfaces
// io.ErrUnexpectedEOF instead of a clean EOF.
require.NoError(t, os.Truncate(walPath, int64(end-5)))
}

// lastWALFile returns the path of the lexicographically-last .wal in dir.
// etcd WAL filenames are seq-index padded, so lexicographic order matches
// sequence order.
func lastWALFile(t *testing.T, walDir string) string {
t.Helper()
entries, err := os.ReadDir(walDir)
require.NoError(t, err)
names := make([]string, 0, len(entries))
for _, e := range entries {
if filepath.Ext(e.Name()) == ".wal" {
names = append(names, e.Name())
}
}
require.NotEmpty(t, names, "no .wal files in %s", walDir)
sort.Strings(names)
return filepath.Join(walDir, names[len(names)-1])
}

func TestLoadWalStateRepairsTruncatedTail(t *testing.T) {
// Simulates the 2026-04-24 incident: OOM-SIGKILL mid-WAL-write left
// the last segment with a torn trailing record. Before this fix the
// process could not restart; now openAndReadWAL invokes wal.Repair
// to truncate the partial record and continues.
dir := seedWAL(t, [][]byte{[]byte("one"), []byte("two"), []byte("three")})

// Chop bytes off the tail to simulate a mid-record SIGKILL. etcd
// preallocates 64MiB per WAL with zero padding, so we must find the
// actual end of written records and truncate *inside* framing;
// truncating in the zero-padded region leaves valid records intact
// and the decoder stops cleanly at the zero length header (no
// ErrUnexpectedEOF → repair wouldn't trigger, test would pass for
// the wrong reason).
walPath := lastWALFile(t, filepath.Join(dir, walDirName))
truncateInsideLastRecord(t, walPath)

// Re-open: loadWalState → openAndReadWAL → repair → succeed.
fsm := &testStateMachine{}
engine, err := Open(context.Background(), OpenConfig{
NodeID: 1,
LocalID: "n1",
LocalAddress: "127.0.0.1:0",
DataDir: dir,
StateMachine: fsm,
})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, engine.Close()) })

// Entries committed before the torn write must survive.
require.GreaterOrEqual(t, len(fsm.Applied()), 1,
"repair should preserve entries committed before the torn write")
}

func TestLoadWalStateUnrepairableCRCMismatchReturnsError(t *testing.T) {
// wal.Repair only fixes io.ErrUnexpectedEOF (torn trailing record).
// A flipped byte inside a persisted record surfaces as a CRC
// mismatch, which is genuine corruption — repair cannot help and
// the error must propagate rather than silently masking it.
dir := seedWAL(t, [][]byte{[]byte("one"), []byte("two"), []byte("three")})

walPath := lastWALFile(t, filepath.Join(dir, walDirName))
f, err := os.OpenFile(walPath, os.O_RDWR, 0)
require.NoError(t, err)
// Flip a byte ~200 bytes in — past the file header but inside a
// real record. etcd WAL preallocates zeroes, so we need to land
// in content not padding.
var one [1]byte
_, err = f.ReadAt(one[:], 200)
require.NoError(t, err)
one[0] ^= 0xff
_, err = f.WriteAt(one[:], 200)
require.NoError(t, err)
require.NoError(t, f.Close())

fsm := &testStateMachine{}
_, err = Open(context.Background(), OpenConfig{
NodeID: 1,
LocalID: "n1",
LocalAddress: "127.0.0.1:0",
DataDir: dir,
StateMachine: fsm,
})
require.Error(t, err, "CRC mismatch is not repairable; error must surface")
}

func TestOpenAndReadWALSucceedsWithoutRepair(t *testing.T) {
// Happy-path sanity check: a pristine WAL opens and ReadAll returns
// the expected entries, no repair invoked.
dir := seedWAL(t, [][]byte{[]byte("one"), []byte("two")})

fsm := &testStateMachine{}
engine, err := Open(context.Background(), OpenConfig{
NodeID: 1,
LocalID: "n1",
LocalAddress: "127.0.0.1:0",
DataDir: dir,
StateMachine: fsm,
})
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, engine.Close()) })

require.Equal(t,
[][]byte{[]byte("one"), []byte("two")},
fsm.Applied(),
)
}
Loading