Skip to content

Commit de903d9

Browse files
committed
record: add logger to reader, update walSync test to use logger
add test logger to walSync test.
1 parent ec8d690 commit de903d9

File tree

3 files changed

+963
-27
lines changed

3 files changed

+963
-27
lines changed

record/record.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,13 @@ type Reader struct {
250250
// that had garbage values. It is used to clarify whether or not a garbage chunk
251251
// encountered during WAL replay was the logical EOF or confirmed corruption.
252252
invalidOffset uint64
253+
254+
// loggerForTesting is a logging helper used by the Reader to accumulate log messages.
255+
loggerForTesting loggerForTesting
256+
}
257+
258+
type loggerForTesting interface {
259+
logf(format string, args ...interface{})
253260
}
254261

255262
// NewReader returns a new reader. If the file contains records encoded using
@@ -422,11 +429,18 @@ func (r *Reader) Next() (io.Reader, error) {
422429
// if there is confirmation of a corruption, otherwise ErrUnexpectedEOF is
423430
// returned after reading all the blocks without corruption confirmation.
424431
func (r *Reader) readAheadForCorruption() error {
432+
if r.loggerForTesting != nil {
433+
r.loggerForTesting.logf("Starting read ahead for corruption. Block corrupted %d.\n", r.blockNum)
434+
}
435+
425436
for {
426437
// Load the next block into r.buf.
427438
n, err := io.ReadFull(r.r, r.buf[:])
428439
r.begin, r.end, r.n = 0, 0, n
429440
r.blockNum++
441+
if r.loggerForTesting != nil {
442+
r.loggerForTesting.logf("Read block %d with %d bytes\n", r.blockNum, n)
443+
}
430444

431445
if errors.Is(err, io.EOF) {
432446
// io.ErrUnexpectedEOF is returned instead of
@@ -439,6 +453,9 @@ func (r *Reader) readAheadForCorruption() error {
439453
// invalid chunk should have been valid, the chunk represents
440454
// an abrupt, unclean termination of the logical log. This
441455
// abrupt end of file represented by io.ErrUnexpectedEOF.
456+
if r.loggerForTesting != nil {
457+
r.loggerForTesting.logf("\tEncountered io.EOF; returning io.ErrUnexpectedEOF since no sync offset found.\n")
458+
}
442459
return io.ErrUnexpectedEOF
443460
}
444461
// The last block of a log can be less than 32KiB, which is
@@ -447,31 +464,54 @@ func (r *Reader) readAheadForCorruption() error {
447464
// However, if the error is not ErrUnexpectedEOF, then this
448465
// error should be surfaced.
449466
if err != nil && err != io.ErrUnexpectedEOF {
467+
if r.loggerForTesting != nil {
468+
r.loggerForTesting.logf("\tError reading block %d: %v", r.blockNum, err)
469+
}
450470
return err
451471
}
452472

453473
for r.end+legacyHeaderSize <= r.n {
454474
checksum := binary.LittleEndian.Uint32(r.buf[r.end+0 : r.end+4])
455475
length := binary.LittleEndian.Uint16(r.buf[r.end+4 : r.end+6])
456476
chunkEncoding := r.buf[r.end+6]
477+
478+
if r.loggerForTesting != nil {
479+
r.loggerForTesting.logf("\tBlock %d: Processing chunk at offset %d, checksum=%d, length=%d, encoding=%d\n", r.blockNum, r.end, checksum, length, chunkEncoding)
480+
}
481+
457482
if int(chunkEncoding) >= len(headerFormatMappings) {
483+
if r.loggerForTesting != nil {
484+
r.loggerForTesting.logf("\tInvalid chunk encoding encountered (value: %d); stopping chunk scan in block %d\n", chunkEncoding, r.blockNum)
485+
}
458486
break
459487
}
460488

461489
headerFormat := headerFormatMappings[chunkEncoding]
462490
chunkPosition, wireFormat, headerSize := headerFormat.chunkPosition, headerFormat.wireFormat, headerFormat.headerSize
463491
if checksum == 0 && length == 0 && chunkPosition == invalidChunkPosition {
492+
if r.loggerForTesting != nil {
493+
r.loggerForTesting.logf("\tFound invalid chunk marker at block %d offset %d; aborting this block scan\n", r.blockNum, r.end)
494+
}
464495
break
465496
}
466497
if wireFormat == invalidWireFormat {
498+
if r.loggerForTesting != nil {
499+
r.loggerForTesting.logf("\tInvalid wire format detected in block %d at offset %d\n", r.blockNum, r.end)
500+
}
467501
break
468502
}
469503
if wireFormat == recyclableWireFormat || wireFormat == walSyncWireFormat {
470504
if r.end+headerSize > r.n {
505+
if r.loggerForTesting != nil {
506+
r.loggerForTesting.logf("\tIncomplete header in block %d at offset %d; breaking out\n", r.blockNum, r.end)
507+
}
471508
break
472509
}
473510
logNum := binary.LittleEndian.Uint32(r.buf[r.end+7 : r.end+11])
474511
if logNum != r.logNum {
512+
if r.loggerForTesting != nil {
513+
r.loggerForTesting.logf("\tMismatch log number in block %d at offset %d (expected %d, got %d)\n", r.blockNum, r.end, r.logNum, logNum)
514+
}
475515
break
476516
}
477517
}
@@ -480,18 +520,30 @@ func (r *Reader) readAheadForCorruption() error {
480520
r.end = r.begin + int(length)
481521
if r.end > r.n {
482522
// The chunk straddles a 32KB boundary (or the end of file).
523+
if r.loggerForTesting != nil {
524+
r.loggerForTesting.logf("\tChunk in block %d spans beyond block boundaries (begin=%d, end=%d, n=%d)\n", r.blockNum, r.begin, r.end, r.n)
525+
}
483526
break
484527
}
485528
if checksum != crc.New(r.buf[r.begin-headerSize+6:r.end]).Value() {
529+
if r.loggerForTesting != nil {
530+
r.loggerForTesting.logf("\tChecksum mismatch in block %d at offset %d; potential corruption\n", r.blockNum, r.end)
531+
}
486532
break
487533
}
488534

489535
// Decode offset in header when chunk has the WAL Sync wire format.
490536
if wireFormat == walSyncWireFormat {
491537
syncedOffset := binary.LittleEndian.Uint64(r.buf[r.begin-headerSize+11 : r.begin-headerSize+19])
538+
if r.loggerForTesting != nil {
539+
r.loggerForTesting.logf("\tBlock %d: Found WAL sync chunk with syncedOffset=%d (invalidOffset=%d)\n", r.blockNum, syncedOffset, r.invalidOffset)
540+
}
492541
// If the encountered chunk offset promises durability beyond the invalid offset,
493542
// the invalid offset must have been corruption.
494543
if syncedOffset > r.invalidOffset {
544+
if r.loggerForTesting != nil {
545+
r.loggerForTesting.logf("\tCorruption confirmed: syncedOffset %d exceeds invalidOffset %d\n", syncedOffset, r.invalidOffset)
546+
}
495547
return r.err
496548
}
497549
}

record/record_test.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -884,6 +884,20 @@ func TestRecycleLogWithPartialRecord(t *testing.T) {
884884
require.Equal(t, err, io.ErrUnexpectedEOF)
885885
}
886886

887+
type readerLogger struct {
888+
builder strings.Builder
889+
}
890+
891+
var _ loggerForTesting = (*readerLogger)(nil)
892+
893+
func (l *readerLogger) getLog() string {
894+
return l.builder.String()
895+
}
896+
897+
func (l *readerLogger) logf(format string, args ...interface{}) {
898+
fmt.Fprintf(&l.builder, format, args...)
899+
}
900+
887901
func TestWALSync(t *testing.T) {
888902
var buffer bytes.Buffer
889903
result := make([]byte, 0)
@@ -894,6 +908,7 @@ func TestWALSync(t *testing.T) {
894908
case "init":
895909
buffer.Reset()
896910
result = result[:0]
911+
corruptChunkNumbers = corruptChunkNumbers[:0]
897912

898913
for chunkNumber, line := range strings.Split(d.Input, "\n") {
899914
switch {
@@ -956,16 +971,17 @@ func TestWALSync(t *testing.T) {
956971

957972
case "read":
958973
r := NewReader(bytes.NewBuffer(buffer.Bytes()), 1)
959-
974+
r.loggerForTesting = &readerLogger{}
960975
for {
961976
reader, err := r.Next()
962977
if err != nil {
963-
return fmt.Sprintf("error reading next: %v\nfinal blockNum: %d\nbytes read: %d", err, r.blockNum, len(result))
978+
r.loggerForTesting.logf("error reading next: %v\nfinal blockNum: %d\nbytes read: %d\n", err, r.blockNum, len(result))
979+
return r.loggerForTesting.(*readerLogger).getLog()
964980
}
965-
966981
data, err := io.ReadAll(reader)
967982
if err != nil {
968-
return fmt.Sprintf("error reading all: %v\nfinal blockNum: %d\nbytes read: %d", err, r.blockNum, len(result))
983+
r.loggerForTesting.logf("error reading all: %v\nfinal blockNum: %d\nbytes read: %d\n", err, r.blockNum, len(result))
984+
return r.loggerForTesting.(*readerLogger).getLog()
969985
}
970986
result = append(result, data...)
971987
}

0 commit comments

Comments
 (0)