Skip to content

Commit 9dfa343

Browse files
committed
db: fix WAL offset in replayWAL errors
If an error occurs while replaying the WALs during Open, the error is annotated with the log number and the offset within the log. Previously, the offset was always zero. This also caused the queued flushables to be created with a zero logSize. This didn't matter in practice because the queued flushables would be flushed before Open returned. This commit adds a PreviousFilesBytes total to the wal.Offset type to describe the number of bytes replayed from previous WAL segement files.
1 parent 69e7ab4 commit 9dfa343

File tree

4 files changed

+43
-30
lines changed

4 files changed

+43
-30
lines changed

open.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,7 @@ func (d *DB) replayWAL(
766766
buf bytes.Buffer
767767
mem *memTable
768768
entry *flushableEntry
769-
offset int64 // byte offset in rr
769+
offset wal.Offset
770770
lastFlushOffset int64
771771
keysReplayed int64 // number of keys replayed
772772
batchesReplayed int64 // number of batches replayed
@@ -792,12 +792,13 @@ func (d *DB) replayWAL(
792792
return
793793
}
794794
var logSize uint64
795-
if offset >= lastFlushOffset {
796-
logSize = uint64(offset - lastFlushOffset)
795+
mergedOffset := offset.Physical + offset.PreviousFilesBytes
796+
if mergedOffset >= lastFlushOffset {
797+
logSize = uint64(mergedOffset - lastFlushOffset)
797798
}
798799
// Else, this was the initial memtable in the read-only case which must have
799800
// been empty, but we need to flush it since we don't want to add to it later.
800-
lastFlushOffset = offset
801+
lastFlushOffset = mergedOffset
801802
entry.logSize = logSize
802803
if !d.opts.ReadOnly {
803804
toFlush = append(toFlush, entry)
@@ -836,12 +837,14 @@ func (d *DB) replayWAL(
836837
}
837838
defer func() {
838839
if err != nil {
839-
err = errors.WithDetailf(err, "replaying wal %d, offset %d", ll.Num, offset)
840+
err = errors.WithDetailf(err, "replaying wal %d, offset %s", ll.Num, offset)
840841
}
841842
}()
842843

843844
for {
844-
r, offset, err := rr.NextRecord()
845+
var r io.Reader
846+
var err error
847+
r, offset, err = rr.NextRecord()
845848
if err == nil {
846849
_, err = io.Copy(&buf, r)
847850
}
@@ -1048,7 +1051,7 @@ func (d *DB) replayWAL(
10481051
buf.Reset()
10491052
}
10501053

1051-
d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %d; replayed %d keys in %d batches",
1054+
d.opts.Logger.Infof("[JOB %d] WAL %s stopped reading at offset: %s; replayed %d keys in %d batches",
10521055
jobID, base.DiskFileNum(ll.Num).String(), offset, keysReplayed, batchesReplayed)
10531056
flushMem()
10541057

wal/reader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ func (r *virtualWALReader) nextFile() error {
360360
}
361361

362362
fs, path := r.LogicalLog.SegmentLocation(r.currIndex)
363+
r.off.PreviousFilesBytes += r.off.Physical
363364
r.off.PhysicalFile = path
364365
r.off.Physical = 0
365366
var err error

wal/testdata/reader

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ r.NextRecord() = (rr, (000001.log: 1035), <nil>)
5252
r.NextRecord() = (rr, (000001.log: 1076), <nil>)
5353
io.ReadAll(rr) = ("1500000000000000320000004f091e9a83fdeae0ec55eb233a9b5394cb3c7856... <512000-byte record>", <nil>)
5454
BatchHeader: [seqNum=21,count=50]
55-
r.NextRecord() = (rr, (000001-001.log: 0), <nil>)
55+
r.NextRecord() = (rr, (000001-001.log: 0), 513252 from previous files, <nil>)
5656
io.ReadAll(rr) = ("16000000000000000200000038d0ccacfb33b57fb3d386cbe2b67a2fbdc82214... <412-byte record>", <nil>)
5757
BatchHeader: [seqNum=22,count=2]
58-
r.NextRecord() = (rr, (000001-001.log: 498), <nil>)
58+
r.NextRecord() = (rr, (000001-001.log: 498), 513252 from previous files, <nil>)
5959
io.ReadAll(rr) = ("180000000000000001000000ede8f156c48faf84dd55235d19a2df01d13021fc... <100-byte record>", <nil>)
6060
BatchHeader: [seqNum=24,count=1]
61-
r.NextRecord() = (rr, (000001-001.log: 609), EOF)
61+
r.NextRecord() = (rr, (000001-001.log: 609), 513252 from previous files, EOF)
6262

6363
# Test a recycled log file. Recycle 000001.log as 000002.log. This time, do not
6464
# exit cleanly. This simulates a hard process exit (eg, during a fatal shutdown,
@@ -134,13 +134,13 @@ r.NextRecord() = (rr, (000003.log: 111), <nil>)
134134
r.NextRecord() = (rr, (000003.log: 272), <nil>)
135135
io.ReadAll(rr) = ("2a000000000000000100000019458dc5400169e5", <nil>)
136136
BatchHeader: [seqNum=42,count=1]
137-
r.NextRecord() = (rr, (000003-001.log: 192), <nil>)
137+
r.NextRecord() = (rr, (000003-001.log: 192), 303 from previous files, <nil>)
138138
io.ReadAll(rr) = ("2b00000000000000030000009cbf29476e797bac2db8bfea65bda29ea50ddbe4... <80-byte record>", <nil>)
139139
BatchHeader: [seqNum=43,count=3]
140-
r.NextRecord() = (rr, (000003-001.log: 283), <nil>)
140+
r.NextRecord() = (rr, (000003-001.log: 283), 303 from previous files, <nil>)
141141
io.ReadAll(rr) = ("2e000000000000000900000027337fa5bd626044dc5d9d08085bf4ce13bc8d00... <2055-byte record>", <nil>)
142142
BatchHeader: [seqNum=46,count=9]
143-
r.NextRecord() = (rr, (000003-001.log: 2349), EOF)
143+
r.NextRecord() = (rr, (000003-001.log: 2349), 303 from previous files, EOF)
144144

145145
# Extend logical log file 000003 with another log file, the result of failing
146146
# back to the original the device. This time do an "unclean" close.
@@ -166,16 +166,16 @@ r.NextRecord() = (rr, (000003.log: 111), <nil>)
166166
r.NextRecord() = (rr, (000003.log: 272), <nil>)
167167
io.ReadAll(rr) = ("2a000000000000000100000019458dc5400169e5", <nil>)
168168
BatchHeader: [seqNum=42,count=1]
169-
r.NextRecord() = (rr, (000003-001.log: 192), <nil>)
169+
r.NextRecord() = (rr, (000003-001.log: 192), 303 from previous files, <nil>)
170170
io.ReadAll(rr) = ("2b00000000000000030000009cbf29476e797bac2db8bfea65bda29ea50ddbe4... <80-byte record>", <nil>)
171171
BatchHeader: [seqNum=43,count=3]
172-
r.NextRecord() = (rr, (000003-001.log: 283), <nil>)
172+
r.NextRecord() = (rr, (000003-001.log: 283), 303 from previous files, <nil>)
173173
io.ReadAll(rr) = ("2e000000000000000900000027337fa5bd626044dc5d9d08085bf4ce13bc8d00... <2055-byte record>", <nil>)
174174
BatchHeader: [seqNum=46,count=9]
175-
r.NextRecord() = (rr, (000003-002.log: 2157), <nil>)
175+
r.NextRecord() = (rr, (000003-002.log: 2157), 2652 from previous files, <nil>)
176176
io.ReadAll(rr) = ("370000000000000002000000ff0710201f4008e679428b4994708a1af8507303... <205-byte record>", <nil>)
177177
BatchHeader: [seqNum=55,count=2]
178-
r.NextRecord() = (rr, (000003-002.log: 2373), EOF)
178+
r.NextRecord() = (rr, (000003-002.log: 2373), 2652 from previous files, EOF)
179179

180180
# Test reading a log file that does not exist.
181181

@@ -243,16 +243,16 @@ r.NextRecord() = (rr, (000005.log: 909), <nil>)
243243
r.NextRecord() = (rr, (000005.log: 3445), <nil>)
244244
io.ReadAll(rr) = ("0b7401000000000000010000907cd29c9a6deaf239e76e3374f6e9eef047f57f... <2566-byte record>", <nil>)
245245
BatchHeader: [seqNum=95243,count=256]
246-
r.NextRecord() = (rr, (000005-001.log: 0), <nil>)
246+
r.NextRecord() = (rr, (000005-001.log: 0), 6022 from previous files, <nil>)
247247
io.ReadAll(rr) = ("0b75010000000000020000006cad8a0a1461d1ec53bb834b47c6853e040ae9ce... <44-byte record>", <nil>)
248248
BatchHeader: [seqNum=95499,count=2]
249-
r.NextRecord() = (rr, (000005-001.log: 55), <nil>)
249+
r.NextRecord() = (rr, (000005-001.log: 55), 6022 from previous files, <nil>)
250250
io.ReadAll(rr) = ("0d7501000000000005000000c78be2f74d28753a03854ed63e6fd0f17113688d... <416-byte record>", <nil>)
251251
BatchHeader: [seqNum=95501,count=5]
252-
r.NextRecord() = (rr, (000005-001.log: 482), <nil>)
252+
r.NextRecord() = (rr, (000005-001.log: 482), 6022 from previous files, <nil>)
253253
io.ReadAll(rr) = ("12750100000000001d00000096cedf6103af61c008d9f850e63a1dfc7518b9a7... <199-byte record>", <nil>)
254254
BatchHeader: [seqNum=95506,count=29]
255-
r.NextRecord() = (rr, (000005-001.log: 692), pebble/record: invalid chunk)
255+
r.NextRecord() = (rr, (000005-001.log: 692), 6022 from previous files, pebble/record: invalid chunk)
256256

257257
# Read again, this time pretending we found a third segment with the
258258
# logNameIndex=002. This helps exercise error conditions switching to a new
@@ -272,16 +272,16 @@ r.NextRecord() = (rr, (000005.log: 909), <nil>)
272272
r.NextRecord() = (rr, (000005.log: 3445), <nil>)
273273
io.ReadAll(rr) = ("0b7401000000000000010000907cd29c9a6deaf239e76e3374f6e9eef047f57f... <2566-byte record>", <nil>)
274274
BatchHeader: [seqNum=95243,count=256]
275-
r.NextRecord() = (rr, (000005-001.log: 0), <nil>)
275+
r.NextRecord() = (rr, (000005-001.log: 0), 6022 from previous files, <nil>)
276276
io.ReadAll(rr) = ("0b75010000000000020000006cad8a0a1461d1ec53bb834b47c6853e040ae9ce... <44-byte record>", <nil>)
277277
BatchHeader: [seqNum=95499,count=2]
278-
r.NextRecord() = (rr, (000005-001.log: 55), <nil>)
278+
r.NextRecord() = (rr, (000005-001.log: 55), 6022 from previous files, <nil>)
279279
io.ReadAll(rr) = ("0d7501000000000005000000c78be2f74d28753a03854ed63e6fd0f17113688d... <416-byte record>", <nil>)
280280
BatchHeader: [seqNum=95501,count=5]
281-
r.NextRecord() = (rr, (000005-001.log: 482), <nil>)
281+
r.NextRecord() = (rr, (000005-001.log: 482), 6022 from previous files, <nil>)
282282
io.ReadAll(rr) = ("12750100000000001d00000096cedf6103af61c008d9f850e63a1dfc7518b9a7... <199-byte record>", <nil>)
283283
BatchHeader: [seqNum=95506,count=29]
284-
r.NextRecord() = (rr, (000005-002.log: 0), opening WAL file segment "000005-002.log": open 000005-002.log: file does not exist)
284+
r.NextRecord() = (rr, (000005-002.log: 0), 6714 from previous files, opening WAL file segment "000005-002.log": open 000005-002.log: file does not exist)
285285

286286
# Test a scenario where 4 unique batches are split across three physical log
287287
# files. The first log contains (b0, b1, b2), the second log (b1) and the third
@@ -325,10 +325,10 @@ r.NextRecord() = (rr, (000006.log: 406), <nil>)
325325
r.NextRecord() = (rr, (000006.log: 94105), <nil>)
326326
io.ReadAll(rr) = ("1c0200000000000001000000404841433f5369713ee90d8f86c50c5903fa38e9... <180-byte record>", <nil>)
327327
BatchHeader: [seqNum=540,count=1]
328-
r.NextRecord() = (rr, (000006-001.log: 93890), <nil>)
328+
r.NextRecord() = (rr, (000006-001.log: 93890), 94296 from previous files, <nil>)
329329
io.ReadAll(rr) = ("1d0200000000000005000000b68c7a260135dce1ce5c5498550793d15edfae62... <2055-byte record>", <nil>)
330330
BatchHeader: [seqNum=541,count=5]
331-
r.NextRecord() = (rr, (000006-001.log: 95956), EOF)
331+
r.NextRecord() = (rr, (000006-001.log: 95956), 94296 from previous files, EOF)
332332

333333
# Test corrupting the tail of a batch that's large enough to be split into
334334
# multiple reads. Regression test for #3865.
@@ -365,10 +365,10 @@ r.NextRecord() = (rr, (000007.log: 55), <nil>)
365365
r.NextRecord() = (rr, (000007.log: 482), <nil>)
366366
io.ReadAll(rr) = ("12750100000000001d000000362bba27f0ed6f5433a12bc502873a27c67f256c... <199-byte record>", <nil>)
367367
BatchHeader: [seqNum=95506,count=29]
368-
r.NextRecord() = (rr, (000007-001.log: 0), <nil>)
368+
r.NextRecord() = (rr, (000007-001.log: 0), 692 from previous files, <nil>)
369369
io.ReadAll(rr) = ("2f75010000000000130000001ddd809cbb45782c44544a15a15dd52fb7b81a74... <45991-byte record>", <nil>)
370370
BatchHeader: [seqNum=95535,count=19]
371-
r.NextRecord() = (rr, (000007-001.log: 46013), <nil>)
371+
r.NextRecord() = (rr, (000007-001.log: 46013), 692 from previous files, <nil>)
372372
io.ReadAll(rr) = ("427501000000000013000000b30c11cf619ea65167511346cc55bb784a9af26f... <292-byte record>", <nil>)
373373
BatchHeader: [seqNum=95554,count=19]
374-
r.NextRecord() = (rr, (000007-001.log: 46316), EOF)
374+
r.NextRecord() = (rr, (000007-001.log: 46316), 692 from previous files, EOF)

wal/wal.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,10 +423,19 @@ type Offset struct {
423423
// Physical indicates the file offset at which a record begins within
424424
// the physical file named by PhysicalFile.
425425
Physical int64
426+
// PreviousFilesBytes is the bytes read from all the previous physical
427+
// segment files that have been read up to the current log segment. If WAL
428+
// failover is not in use, PreviousFileBytes will always be zero. Otherwise,
429+
// it may be non-zero when replaying records from multiple segment files
430+
// that make up a single logical WAL.
431+
PreviousFilesBytes int64
426432
}
427433

428434
// String implements fmt.Stringer, returning a string representation of the
429435
// offset.
430436
func (o Offset) String() string {
437+
if o.PreviousFilesBytes > 0 {
438+
return fmt.Sprintf("(%s: %d), %d from previous files", o.PhysicalFile, o.Physical, o.PreviousFilesBytes)
439+
}
431440
return fmt.Sprintf("(%s: %d)", o.PhysicalFile, o.Physical)
432441
}

0 commit comments

Comments
 (0)