Skip to content

Commit

Permalink
Log which input scanner has failed when merging their content
Browse files Browse the repository at this point in the history
  • Loading branch information
brawer committed May 16, 2024
1 parent 518c4b3 commit b13f38b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 20 deletions.
6 changes: 4 additions & 2 deletions cmd/qrank-builder/itemsignals.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ func buildItemSignals(ctx context.Context, pageviews []string, sites *map[string

writer := NewItemSignalsWriter(compressor)
scanners := make([]LineScanner, 0, len(pageviews)+1)
scannerNames := make([]string, 0, len(pageviews)+1)
scanners = append(scanners, NewPageSignalsScanner(sites, s3))
scannerNames = append(scannerNames, "page_signals")
for _, pv := range pageviews {
reader, err := NewS3Reader(ctx, "qrank", pv, s3)
if err != nil {
Expand All @@ -180,6 +182,7 @@ func buildItemSignals(ctx context.Context, pageviews []string, sites *map[string
return time.Time{}, err
}
scanners = append(scanners, bufio.NewScanner(decompressor))
scannerNames = append(scannerNames, pv)
}

// Produce a stream of ItemSignals, sorted by Wikidata item ID.
Expand All @@ -188,7 +191,7 @@ func buildItemSignals(ctx context.Context, pageviews []string, sites *map[string
config.ChunkSize = 8 * 1024 * 1024 / 64 // 8 MiB, 64 Bytes/line avg
config.NumWorkers = runtime.NumCPU()
sorter, outChan, errChan := extsort.New(sigChan, ItemSignalsFromBytes, ItemSignalsLess, config)
merger := NewLineMerger(scanners)
merger := NewLineMerger(scanners, scannerNames)
logger.Printf("BuildItemSignals(): merging signals from %d files; #0=PageSignalsScanner; rest=pageviews", len(scanners))
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
Expand Down Expand Up @@ -266,7 +269,6 @@ type itemSignalsJoiner struct {
}

func (j *itemSignalsJoiner) Process(line string) error {
// fmt.Println("*** GIRAFFE", line)
cols := strings.Split(line, ",")
if len(cols) < 3 {
return fmt.Errorf(`bad line: "%s"`, line)
Expand Down
25 changes: 16 additions & 9 deletions cmd/qrank-builder/linemerger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"bytes"
"container/heap"
"fmt"
)

// Merges the lines of a multiple io.Readers whose content is in sorted order.
Expand All @@ -23,18 +24,25 @@ type LineScanner interface {
Text() string
}

func NewLineMerger(r []LineScanner) *LineMerger {
// NewLineMerger creates an iterator that merges multiple sorted files,
// returning their lines in sort order. The passed names identify the
// scanners, and are part of the error message in case of failures.
// Being able to identify the failing input is useful for debugging.
// https://github.com/brawer/wikidata-qrank/issues/40
func NewLineMerger(r []LineScanner, names []string) *LineMerger {
if len(r) != len(names) {
panic(fmt.Sprintf("len(r) must be len(names), got %d vs %d", len(r), len(names)))
}

m := &LineMerger{}
m.heap = make(lineMergerHeap, 0, len(r))
for _, rr := range r {
item := &mergee{scanner: rr}
for i, rr := range r {
item := &mergee{scanner: rr, name: names[i]}
if item.scanner.Scan() {
m.heap = append(m.heap, item)
}
if err := item.scanner.Err(); err != nil {
if logger != nil {
logger.Printf("LineMerger: scanner #%d failed to scan first line, err=%v", item.index, err)
}
logger.Printf(`LineMerger: scanner "%s" failed to scan first line, err=%v`, item.name, err)
m.err = err
return m
}
Expand Down Expand Up @@ -62,9 +70,7 @@ func (m *LineMerger) Advance() bool {
}
if err := item.scanner.Err(); err != nil {
m.err = err
if logger != nil {
logger.Printf("LineMerger: scanner #%d failed, err=%v", item.index, err)
}
logger.Printf(`LineMerger: scanner "%s" failed, err=%v`, item.name, err)
return false
}
return len(m.heap) > 0
Expand All @@ -86,6 +92,7 @@ func (m *LineMerger) Line() string {
type mergee struct {
scanner LineScanner
index int
name string
}

type lineMergerHeap []*mergee
Expand Down
52 changes: 46 additions & 6 deletions cmd/qrank-builder/linemerger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,22 @@ package main

import (
"bufio"
"bytes"
"errors"
"log"
"strings"
"testing"
)

func TestLineMerger(t *testing.T) {
logger = log.New(&bytes.Buffer{}, "", log.Lshortfile)
m := NewLineMerger([]LineScanner{
bufio.NewScanner(strings.NewReader("C1\nD1")),
bufio.NewScanner(strings.NewReader("B2\nE2")),
bufio.NewScanner(strings.NewReader("A3\nB3")),
bufio.NewScanner(strings.NewReader("")),
bufio.NewScanner(strings.NewReader("B5")),
})
}, []string{"S1", "S2", "S3", "S4", "S5"})
result := make([]string, 0, 5)
for m.Advance() {
result = append(result, m.Line())
Expand All @@ -33,22 +36,59 @@ func TestLineMerger(t *testing.T) {
}
}

type errReader struct{}
type errReader struct {
numReads int
maxReads int
}

var testErr = errors.New("test error")

func (e *errReader) Read(p []byte) (n int, err error) {
return 0, testErr
if e.numReads >= e.maxReads {
return 0, testErr
}
e.numReads += 1
p[0] = '.'
p[1] = '\n'
return 2, nil
}

func TestLineMergerError(t *testing.T) {
reader := &errReader{}
m := NewLineMerger([]LineScanner{bufio.NewScanner(reader)})
func TestLineMerger_ErrorAtStart(t *testing.T) {
var logfile bytes.Buffer
logger = log.New(&logfile, "", log.Lshortfile)
reader := &errReader{numReads: 0, maxReads: 0}
m := NewLineMerger([]LineScanner{bufio.NewScanner(reader)}, []string{"🐞"})
if m.Advance() {
t.Error("expected m.Advance()=false, got true")
return
}
if err := m.Err(); err != testErr {
t.Errorf("expected test error, got %q", err)
}
gotLog := string(logfile.Bytes())
if !strings.Contains(gotLog, `scanner "🐞" failed to scan first line, err=test error`) {
t.Errorf("name of failing input scanner should be logged, got %s", gotLog)
}
}

func TestLineMerger_ErrorAtRead(t *testing.T) {
var logfile bytes.Buffer
logger = log.New(&logfile, "", log.Lshortfile)
reader := &errReader{numReads: 0, maxReads: 1}
m := NewLineMerger([]LineScanner{bufio.NewScanner(reader)}, []string{"🐞"})
if !m.Advance() {
t.Error("expected first m.Advance()=true, got false")
return
}
if m.Advance() {
t.Error("expected second m.Advance()=false, got true")
return
}
if err := m.Err(); err != testErr {
t.Errorf("expected test error, got %q", err)
}
gotLog := string(logfile.Bytes())
if !strings.Contains(gotLog, `scanner "🐞" failed, err=test error`) {
t.Errorf("name of failing input scanner should be logged, got %s", gotLog)
}
}
9 changes: 6 additions & 3 deletions cmd/qrank-builder/qviews.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,17 @@ func buildQViews(testRun bool, date time.Time, sitelinks string, pageviews []str
defer sitelinksFile.Close()

qfiles := make([]io.Reader, 1, len(pageviews)+1)
qfilenames := make([]string, 1, len(pageviews)+1)
qfiles[0] = brotli.NewReader(sitelinksFile)
qfilenames[0] = sitelinks
for _, pv := range pageviews {
pvFile, err := os.Open(pv)
if err != nil {
return "", err
}
defer pvFile.Close()
qfiles = append(qfiles, brotli.NewReader(pvFile))
qfilenames = append(qfilenames, pv)
}

ch := make(chan extsort.SortType, 10000)
Expand All @@ -94,7 +97,7 @@ func buildQViews(testRun bool, date time.Time, sitelinks string, pageviews []str
config.NumWorkers = runtime.NumCPU()
sorter, outChan, errChan := extsort.New(ch, QViewCountFromBytes, QViewCountLess, config)
g.Go(func() error {
return readQViewInputs(testRun, qfiles, ch, subCtx)
return readQViewInputs(testRun, qfiles, qfilenames, ch, subCtx)
})
g.Go(func() error {
sorter.Sort(ctx) // not subCtx, as per extsort docs
Expand Down Expand Up @@ -157,13 +160,13 @@ func writeQViewCount(w io.Writer, entity int64, count int64) error {
return err
}

func readQViewInputs(testRun bool, inputs []io.Reader, ch chan<- extsort.SortType, ctx context.Context) error {
func readQViewInputs(testRun bool, inputs []io.Reader, inputNames []string, ch chan<- extsort.SortType, ctx context.Context) error {
defer close(ch)
scanners := make([]LineScanner, 0, len(inputs))
for _, input := range inputs {
scanners = append(scanners, bufio.NewScanner(input))
}
merger := NewLineMerger(scanners)
merger := NewLineMerger(scanners, inputNames)
var lastKey string
var entity, numViews, numLinesRead int64
for merger.Advance() {
Expand Down

0 comments on commit b13f38b

Please sign in to comment.