Skip to content

Commit 0188943

Browse files
committed
db: move replayWAL into recovery.go
1 parent d928a2e commit 0188943

File tree

2 files changed

+229
-228
lines changed

2 files changed

+229
-228
lines changed

open.go

Lines changed: 0 additions & 228 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ import (
1919
"github.com/cockroachdb/crlib/crtime"
2020
"github.com/cockroachdb/errors"
2121
"github.com/cockroachdb/errors/oserror"
22-
"github.com/cockroachdb/pebble/batchrepr"
23-
"github.com/cockroachdb/pebble/internal/arenaskl"
2422
"github.com/cockroachdb/pebble/internal/base"
2523
"github.com/cockroachdb/pebble/internal/cache"
2624
"github.com/cockroachdb/pebble/internal/inflight"
@@ -30,7 +28,6 @@ import (
3028
"github.com/cockroachdb/pebble/internal/manual"
3129
"github.com/cockroachdb/pebble/objstorage"
3230
"github.com/cockroachdb/pebble/objstorage/remote"
33-
"github.com/cockroachdb/pebble/record"
3431
"github.com/cockroachdb/pebble/vfs"
3532
"github.com/cockroachdb/pebble/wal"
3633
"github.com/prometheus/client_golang/prometheus"
@@ -753,231 +750,6 @@ func (d *DB) replayIngestedFlushable(
753750
return d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan)
754751
}
755752

756-
// replayWAL replays the edits in the specified WAL. If the DB is in read
757-
// only mode, then the WALs are replayed into memtables and not flushed. If
758-
// the DB is not in read only mode, then the contents of the WAL are
759-
// guaranteed to be flushed when a flush is scheduled after this method is run.
760-
// Note that this flushing is very important for guaranteeing durability:
761-
// the application may have had a number of pending
762-
// fsyncs to the WAL before the process crashed, and those fsyncs may not have
763-
// happened but the corresponding data may now be readable from the WAL (while
764-
// sitting in write-back caches in the kernel or the storage device). By
765-
// reading the WAL (including the non-fsynced data) and then flushing all
766-
// these changes (flush does fsyncs), we are able to guarantee that the
767-
// initial state of the DB is durable.
768-
//
769-
// This method mutates d.mu.mem.queue and possibly d.mu.mem.mutable and replays
770-
// WALs into the flushable queue. Flushing of the queue is expected to be handled
771-
// by callers. A list of flushable ingests (but not memtables) replayed is returned.
772-
//
773-
// d.mu must be held when calling this, but the mutex may be dropped and
774-
// re-acquired during the course of this method.
775-
func (d *DB) replayWAL(
776-
jobID JobID, ll wal.LogicalLog, strictWALTail bool,
777-
) (flushableIngests []*ingestedFlushable, maxSeqNum base.SeqNum, err error) {
778-
rr := ll.OpenForRead()
779-
defer func() { _ = rr.Close() }()
780-
var (
781-
b Batch
782-
buf bytes.Buffer
783-
mem *memTable
784-
entry *flushableEntry
785-
offset wal.Offset
786-
lastFlushOffset int64
787-
keysReplayed int64 // number of keys replayed
788-
batchesReplayed int64 // number of batches replayed
789-
)
790-
791-
// TODO(jackson): This function is interspersed with panics, in addition to
792-
// corruption error propagation. Audit them to ensure we're truly only
793-
// panicking where the error points to Pebble bug and not user or
794-
// hardware-induced corruption.
795-
796-
// "Flushes" (ie. closes off) the current memtable, if not nil.
797-
flushMem := func() {
798-
if mem == nil {
799-
return
800-
}
801-
mem.writerUnref()
802-
if d.mu.mem.mutable == mem {
803-
d.mu.mem.mutable = nil
804-
}
805-
entry.flushForced = !d.opts.ReadOnly
806-
var logSize uint64
807-
mergedOffset := offset.Physical + offset.PreviousFilesBytes
808-
if mergedOffset >= lastFlushOffset {
809-
logSize = uint64(mergedOffset - lastFlushOffset)
810-
}
811-
// Else, this was the initial memtable in the read-only case which must have
812-
// been empty, but we need to flush it since we don't want to add to it later.
813-
lastFlushOffset = mergedOffset
814-
entry.logSize = logSize
815-
mem, entry = nil, nil
816-
}
817-
818-
mem = d.mu.mem.mutable
819-
if mem != nil {
820-
entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
821-
if !d.opts.ReadOnly {
822-
flushMem()
823-
}
824-
}
825-
826-
// Creates a new memtable if there is no current memtable.
827-
ensureMem := func(seqNum base.SeqNum) {
828-
if mem != nil {
829-
return
830-
}
831-
mem, entry = d.newMemTable(base.DiskFileNum(ll.Num), seqNum, 0 /* minSize */)
832-
d.mu.mem.mutable = mem
833-
d.mu.mem.queue = append(d.mu.mem.queue, entry)
834-
}
835-
836-
defer func() {
837-
if err != nil {
838-
err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
839-
}
840-
}()
841-
842-
for {
843-
var r io.Reader
844-
var err error
845-
r, offset, err = rr.NextRecord()
846-
if err == nil {
847-
_, err = io.Copy(&buf, r)
848-
}
849-
if err != nil {
850-
// It is common to encounter a zeroed or invalid chunk due to WAL
851-
// preallocation and WAL recycling. However zeroed or invalid chunks
852-
// can also be a consequence of corruption / disk rot. When the log
853-
// reader encounters one of these cases, it attempts to disambiguate
854-
// by reading ahead looking for a future record. If a future chunk
855-
// indicates the chunk at the original offset should've been valid, it
856-
// surfaces record.ErrInvalidChunk or record.ErrZeroedChunk. These
857-
// errors are always indicative of corruption and data loss.
858-
//
859-
// Otherwise, the reader surfaces record.ErrUnexpectedEOF indicating
860-
// that the WAL terminated uncleanly and ambiguously. If the WAL is
861-
// the most recent logical WAL, the caller passes in
862-
// (strictWALTail=false), indicating we should tolerate the unclean
863-
// ending. If the WAL is an older WAL, the caller passes in
864-
// (strictWALTail=true), indicating that the WAL should have been
865-
// closed cleanly, and we should interpret the
866-
// `record.ErrUnexpectedEOF` as corruption and stop recovery.
867-
if errors.Is(err, io.EOF) {
868-
break
869-
} else if errors.Is(err, record.ErrUnexpectedEOF) && !strictWALTail {
870-
break
871-
} else if (errors.Is(err, record.ErrUnexpectedEOF) && strictWALTail) ||
872-
errors.Is(err, record.ErrInvalidChunk) || errors.Is(err, record.ErrZeroedChunk) {
873-
// If a read-ahead returns record.ErrInvalidChunk or
874-
// record.ErrZeroedChunk, then there's definitively corruption.
875-
//
876-
// If strictWALTail=true, then record.ErrUnexpectedEOF should
877-
// also be considered corruption because the strictWALTail
878-
// indicates we expect a clean end to the WAL.
879-
//
880-
// Other I/O related errors should not be marked with corruption
881-
// and simply returned.
882-
err = errors.Mark(err, ErrCorruption)
883-
}
884-
885-
return nil, 0, errors.Wrap(err, "pebble: error when replaying WAL")
886-
}
887-
888-
if buf.Len() < batchrepr.HeaderLen {
889-
return nil, 0, base.CorruptionErrorf("pebble: corrupt wal %s (offset %s)",
890-
errors.Safe(base.DiskFileNum(ll.Num)), offset)
891-
}
892-
893-
if d.opts.ErrorIfNotPristine {
894-
return nil, 0, errors.WithDetailf(ErrDBNotPristine, "location: %q", d.dirname)
895-
}
896-
897-
// Specify Batch.db so that Batch.SetRepr will compute Batch.memTableSize
898-
// which is used below.
899-
b = Batch{}
900-
b.db = d
901-
if err := b.SetRepr(buf.Bytes()); err != nil {
902-
return nil, 0, err
903-
}
904-
seqNum := b.SeqNum()
905-
maxSeqNum = seqNum + base.SeqNum(b.Count())
906-
keysReplayed += int64(b.Count())
907-
batchesReplayed++
908-
{
909-
br := b.Reader()
910-
if kind, _, _, ok, err := br.Next(); err != nil {
911-
return nil, 0, err
912-
} else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) {
913-
// We're in the flushable ingests (+ possibly excises) case.
914-
//
915-
// Ingests require an up-to-date view of the LSM to determine the target
916-
// level of ingested sstables, and to accurately compute excises. Instead of
917-
// doing an ingest in this function, we just enqueue a flushable ingest
918-
// in the flushables queue and run a regular flush.
919-
flushMem()
920-
// mem is nil here.
921-
entry, err = d.replayIngestedFlushable(&b, base.DiskFileNum(ll.Num))
922-
if err != nil {
923-
return nil, 0, err
924-
}
925-
fi := entry.flushable.(*ingestedFlushable)
926-
flushableIngests = append(flushableIngests, fi)
927-
d.mu.mem.queue = append(d.mu.mem.queue, entry)
928-
// A flushable ingest is always followed by a WAL rotation.
929-
break
930-
}
931-
}
932-
933-
if b.memTableSize >= uint64(d.largeBatchThreshold) {
934-
flushMem()
935-
// Make a copy of the data slice since it is currently owned by buf and will
936-
// be reused in the next iteration.
937-
b.data = slices.Clone(b.data)
938-
b.flushable, err = newFlushableBatch(&b, d.opts.Comparer)
939-
if err != nil {
940-
return nil, 0, err
941-
}
942-
entry := d.newFlushableEntry(b.flushable, base.DiskFileNum(ll.Num), b.SeqNum())
943-
// Disable memory accounting by adding a reader ref that will never be
944-
// removed.
945-
entry.readerRefs.Add(1)
946-
d.mu.mem.queue = append(d.mu.mem.queue, entry)
947-
} else {
948-
ensureMem(seqNum)
949-
if err = mem.prepare(&b); err != nil && err != arenaskl.ErrArenaFull {
950-
return nil, 0, err
951-
}
952-
// We loop since DB.newMemTable() slowly grows the size of allocated memtables, so the
953-
// batch may not initially fit, but will eventually fit (since it is smaller than
954-
// largeBatchThreshold).
955-
for err == arenaskl.ErrArenaFull {
956-
flushMem()
957-
ensureMem(seqNum)
958-
err = mem.prepare(&b)
959-
if err != nil && err != arenaskl.ErrArenaFull {
960-
return nil, 0, err
961-
}
962-
}
963-
if err = mem.apply(&b, seqNum); err != nil {
964-
return nil, 0, err
965-
}
966-
mem.writerUnref()
967-
}
968-
buf.Reset()
969-
}
970-
971-
d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
972-
jobID, ll.String(), offset, keysReplayed, batchesReplayed)
973-
if !d.opts.ReadOnly {
974-
flushMem()
975-
}
976-
977-
// mem is nil here, if !ReadOnly.
978-
return flushableIngests, maxSeqNum, err
979-
}
980-
981753
func readOptionsFile(opts *Options, path string) (string, error) {
982754
f, err := opts.FS.Open(path)
983755
if err != nil {

0 commit comments

Comments
 (0)