Skip to content

Commit

Permalink
Merge pull request #15069 from ahrtr/last_wal_rec_corrupt_3.5_20230108
Browse files Browse the repository at this point in the history
[3.5] etcdserver: process the scenaro of the last WAL record being partially synced to disk
  • Loading branch information
ahrtr committed Jan 11, 2023
2 parents 9e3966f + e1fc545 commit 816c2e2
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 12 deletions.
3 changes: 2 additions & 1 deletion server/etcdserver/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package etcdserver

import (
"errors"
"io"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
Expand Down Expand Up @@ -100,7 +101,7 @@ func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync b
if wmetadata, st, ents, err = w.ReadAll(); err != nil {
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
}
if !wal.Repair(lg, waldir) {
Expand Down
4 changes: 2 additions & 2 deletions server/wal/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
// The length of current WAL entry must be less than the remaining file size.
maxEntryLimit := fileBufReader.FileInfo().Size() - d.lastValidOff - padBytes
if recBytes > maxEntryLimit {
return fmt.Errorf("wal: max entry size limit exceeded, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
return fmt.Errorf("%w: [wal] max entry size limit exceeded when decoding %q, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
}

data := make([]byte, recBytes+padBytes)
Expand Down
9 changes: 5 additions & 4 deletions server/wal/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package wal

import (
"errors"
"io"
"os"
"path/filepath"
Expand Down Expand Up @@ -44,8 +45,8 @@ func Repair(lg *zap.Logger, dirpath string) bool {
for {
lastOffset := decoder.lastOffset()
err := decoder.decode(rec)
switch err {
case nil:
switch {
case err == nil:
// update crc of the decoder when necessary
switch rec.Type {
case crcType:
Expand All @@ -59,11 +60,11 @@ func Repair(lg *zap.Logger, dirpath string) bool {
}
continue

case io.EOF:
case errors.Is(err, io.EOF):
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
return true

case io.ErrUnexpectedEOF:
case errors.Is(err, io.ErrUnexpectedEOF):
bf, bferr := os.Create(f.Name() + ".broken")
if bferr != nil {
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))
Expand Down
10 changes: 5 additions & 5 deletions server/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,13 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
// We do not have to read out all entries in read mode.
// The last record maybe a partial written one, so
// ErrunexpectedEOF might be returned.
if err != io.EOF && err != io.ErrUnexpectedEOF {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
state.Reset()
return nil, state, nil, err
}
default:
// We must read all of the entries if WAL is opened in write mode.
if err != io.EOF {
// We must read all the entries if WAL is opened in write mode.
if !errors.Is(err, io.EOF) {
state.Reset()
return nil, state, nil, err
}
Expand Down Expand Up @@ -598,7 +598,7 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
}
// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, err
}

Expand Down Expand Up @@ -688,7 +688,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta

// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
return nil, err
}

Expand Down
77 changes: 77 additions & 0 deletions server/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package wal
import (
"bytes"
"fmt"
"github.com/stretchr/testify/require"
"io"
"io/ioutil"
"math"
Expand Down Expand Up @@ -1155,3 +1156,79 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) {
t.Fatal(err)
}
}

func TestLastRecordLengthExceedFileEnd(t *testing.T) {
/* The data below was generated by code something like below. The length
* of the last record was intentionally changed to 1000 in order to make
* sure it exceeds the end of the file.
*
* for i := 0; i < 3; i++ {
* es := []raftpb.Entry{{Index: uint64(i + 1), Data: []byte(fmt.Sprintf("waldata%d", i+1))}}
* if err = w.Save(raftpb.HardState{}, es); err != nil {
* t.Fatal(err)
* }
* }
* ......
* var sb strings.Builder
* for _, ch := range buf {
* sb.WriteString(fmt.Sprintf("\\x%02x", ch))
* }
*/
// Generate WAL file
t.Log("Generate a WAL file with the last record's length modified.")
data := []byte("\x04\x00\x00\x00\x00\x00\x00\x84\x08\x04\x10\x00\x00" +
"\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x84\x08\x01\x10\x00\x00" +
"\x00\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x82\x08\x05\x10\xa0\xb3" +
"\x9b\x8f\x08\x1a\x04\x08\x00\x10\x00\x00\x00\x1a\x00\x00\x00\x00" +
"\x00\x00\x86\x08\x02\x10\xba\x8b\xdc\x85\x0f\x1a\x10\x08\x00\x10" +
"\x00\x18\x01\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x31\x00\x00\x00" +
"\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x86\x08\x02\x10\xa1\xe8" +
"\xff\x9c\x02\x1a\x10\x08\x00\x10\x00\x18\x02\x22\x08\x77\x61\x6c" +
"\x64\x61\x74\x61\x32\x00\x00\x00\x00\x00\x00\xe8\x03\x00\x00\x00" +
"\x00\x00\x86\x08\x02\x10\xa1\x9c\xa1\xaa\x04\x1a\x10\x08\x00\x10" +
"\x00\x18\x03\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x33\x00\x00\x00" +
"\x00\x00\x00")

buf := bytes.NewBuffer(data)
f, err := createFileWithData(t, buf)
fileName := f.Name()
require.NoError(t, err)
t.Logf("fileName: %v", fileName)

// Verify low-level decoder directly
t.Log("Verify all records can be parsed correctly.")
rec := &walpb.Record{}
decoder := newDecoder(fileutil.NewFileReader(f))
for {
if err = decoder.decode(rec); err != nil {
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
break
}
if rec.Type == entryType {
e := mustUnmarshalEntry(rec.Data)
t.Logf("Validating normal entry: %v", e)
recData := fmt.Sprintf("waldata%d", e.Index)
require.Equal(t, raftpb.EntryNormal, e.Type)
require.Equal(t, recData, string(e.Data))
}
rec = &walpb.Record{}
}
require.NoError(t, f.Close())

// Verify w.ReadAll() returns io.ErrUnexpectedEOF in the error chain.
t.Log("Verify the w.ReadAll returns io.ErrUnexpectedEOF in the error chain")
newFileName := filepath.Join(filepath.Dir(fileName), "0000000000000000-0000000000000000.wal")
require.NoError(t, os.Rename(fileName, newFileName))

w, err := Open(zaptest.NewLogger(t), filepath.Dir(fileName), walpb.Snapshot{
Index: 0,
Term: 0,
})
require.NoError(t, err)
defer w.Close()

_, _, _, err = w.ReadAll()
// Note: The wal file will be repaired automatically in production
// environment, but only once.
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
}

0 comments on commit 816c2e2

Please sign in to comment.